Skip to content

Commit

Permalink
Try alternative message path for filtering live query response.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Nov 14, 2021
1 parent 83be45a commit 19aaa47
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,18 @@ protected CompletionStage<R> askAndBuildJsonView(
.thenApply(response -> filterJsonView(response, enforcer));
}

protected CompletionStage<R> askAndBuildJsonView(
final ActorRef actorToAsk,
final C commandWithReadSubjects,
final Object publish,
final Enforcer enforcer,
final Scheduler scheduler,
final Executor executor) {

return ask(actorToAsk, commandWithReadSubjects, publish, "before building JsonView", scheduler, executor)
.thenApply(response -> filterJsonView(response, enforcer));
}

/**
* Asks the given {@code actorToAsk} for a response by telling {@code commandWithReadSubjects}.
*
Expand All @@ -84,37 +96,49 @@ protected CompletionStage<R> askAndBuildJsonView(
* @return A completion stage which either completes with a filtered response of type {@link R} or fails with a
* {@link DittoRuntimeException}.
*/
@SuppressWarnings("unchecked") // We can ignore this warning since it is tested that response class is assignable
protected CompletionStage<R> ask(
final ActorRef actorToAsk,
final C commandWithReadSubjects,
final String hint,
final Scheduler scheduler,
final Executor executor) {

return AskWithRetry.askWithRetry(actorToAsk, wrapBeforeAsk(commandWithReadSubjects),
final var publish = wrapBeforeAsk(commandWithReadSubjects);
return ask(actorToAsk, commandWithReadSubjects, publish, hint, scheduler, executor);
}

@SuppressWarnings("unchecked") // We can ignore this warning since it is tested that response class is assignable
protected CompletionStage<R> ask(
final ActorRef actorToAsk,
final C signal,
final Object publish,
final String hint,
final Scheduler scheduler,
final Executor executor) {

return AskWithRetry.askWithRetry(actorToAsk, publish,
getAskWithRetryConfig(), scheduler, executor,
response -> {
if (responseClass.isAssignableFrom(response.getClass())) {
return (R) response;
} else if (response instanceof ErrorResponse) {
throw ((ErrorResponse<?>) response).getDittoRuntimeException();
} else if (response instanceof AskException) {
throw handleAskTimeoutForCommand(commandWithReadSubjects, (Throwable) response);
throw handleAskTimeoutForCommand(signal, (Throwable) response);
} else if (response instanceof AskTimeoutException) {
throw handleAskTimeoutForCommand(commandWithReadSubjects, (Throwable) response);
throw handleAskTimeoutForCommand(signal, (Throwable) response);
} else {
throw reportErrorOrResponse(hint, response, null);
}
}
).exceptionally(throwable -> {
final DittoRuntimeException dre = DittoRuntimeException.asDittoRuntimeException(throwable, cause ->
AskException.newBuilder()
.dittoHeaders(commandWithReadSubjects.getDittoHeaders())
.dittoHeaders(signal.getDittoHeaders())
.cause(cause)
.build());
if (dre instanceof AskException) {
throw handleAskTimeoutForCommand(commandWithReadSubjects, throwable);
throw handleAskTimeoutForCommand(signal, throwable);
} else {
throw dre;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.CompletionStage;

import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
Expand Down Expand Up @@ -52,6 +53,8 @@
import org.eclipse.ditto.things.model.signals.commands.ThingCommand;
import org.eclipse.ditto.things.model.signals.commands.exceptions.EventSendNotAllowedException;
import org.eclipse.ditto.things.model.signals.commands.exceptions.ThingNotAccessibleException;
import org.eclipse.ditto.things.model.signals.commands.exceptions.ThingUnavailableException;
import org.eclipse.ditto.things.model.signals.commands.query.ThingQueryCommand;
import org.eclipse.ditto.things.model.signals.commands.query.ThingQueryCommandResponse;
import org.eclipse.ditto.things.model.signals.events.ThingEvent;

Expand All @@ -61,7 +64,8 @@
/**
* Enforces live commands (including message commands) and live events.
*/
public final class LiveSignalEnforcement extends AbstractEnforcement<SignalWithEntityId<?>> {
public final class LiveSignalEnforcement extends AbstractEnforcementWithAsk<SignalWithEntityId<?>,
ThingQueryCommandResponse<?>> {

private static final AckExtractor<ThingCommand<?>> THING_COMMAND_ACK_EXTRACTOR =
AckExtractor.of(ThingCommand::getEntityId, ThingCommand::getDittoHeaders);
Expand All @@ -78,13 +82,32 @@ private LiveSignalEnforcement(final Contextual<SignalWithEntityId<?>> context,
final Cache<EnforcementCacheKey, Entry<Enforcer>> policyEnforcerCache,
final LiveSignalPub liveSignalPub) {

super(context);
super(context, ThingQueryCommandResponse.class);
requireNonNull(thingIdCache);
requireNonNull(policyEnforcerCache);
enforcerRetriever = PolicyEnforcerRetrieverFactory.create(thingIdCache, policyEnforcerCache);
this.liveSignalPub = liveSignalPub;
}

@Override
protected DittoRuntimeException handleAskTimeoutForCommand(final SignalWithEntityId<?> signal,
final Throwable askTimeout) {
log().info("Live command timed out. Response may be sent by another channel: <{}>", signal);
return ThingUnavailableException.newBuilder(ThingId.of(signal.getEntityId()))
.dittoHeaders(signal.getDittoHeaders())
.build();
}

@Override
protected ThingQueryCommandResponse<?> filterJsonView(final ThingQueryCommandResponse<?> commandResponse,
final Enforcer enforcer) {
try {
return ThingCommandEnforcement.buildJsonViewForThingQueryCommandResponse(commandResponse, enforcer);
} catch (final RuntimeException e) {
throw reportError("Error after building JsonView", e);
}
}

/**
* {@link EnforcementProvider} for {@link LiveSignalEnforcement}.
*/
Expand Down Expand Up @@ -250,12 +273,18 @@ private CompletionStage<Contextual<WithDittoHeaders>> enforceLiveSignal(final St
case LIVE_EVENTS:
return enforceLiveEvent(liveSignal, enforcer);
case LIVE_COMMANDS:

ThingCommandEnforcement.authorizeByPolicyOrThrow(enforcer, (ThingCommand<?>) liveSignal);
final ThingCommand<?> withReadSubjects =
addEffectedReadSubjectsToThingLiveSignal((ThingCommand<?>) liveSignal, enforcer);
log(withReadSubjects).info("Live Command was authorized: <{}>", withReadSubjects);
return publishLiveSignal(withReadSubjects, THING_COMMAND_ACK_EXTRACTOR, liveSignalPub.command());
if (liveSignal instanceof ThingQueryCommand && liveSignal.getDittoHeaders().isResponseRequired()) {
return addToResponseReceiver(withReadSubjects).thenApply(newSignal ->
askAndBuildJsonView((ThingCommand<?>) newSignal, THING_COMMAND_ACK_EXTRACTOR,
liveSignalPub.command(), enforcer)
);
} else {
return publishLiveSignal(withReadSubjects, THING_COMMAND_ACK_EXTRACTOR, liveSignalPub.command());
}
default:
log(liveSignal).warning("Ignoring unsupported command signal: <{}>", liveSignal);
throw UnknownCommandException.newBuilder(liveSignal.getName())
Expand Down Expand Up @@ -367,6 +396,19 @@ private <T extends Signal<?>, S extends T> CompletionStage<Contextual<WithDittoH
);
}

private <T extends Signal<?>> Contextual<WithDittoHeaders> askAndBuildJsonView(
final T signal,
final AckExtractor<T> ackExtractor,
final DistributedPub<T> pub,
final Enforcer enforcer) {

final var publish = pub.wrapForPublicationWithAcks(signal, ackExtractor);
final var castSignal = (SignalWithEntityId<?>) signal;
return withMessageToReceiverViaAskFuture(signal, sender(), () ->
askAndBuildJsonView(pub.getPublisher(), castSignal, publish, enforcer, context.getScheduler(),
context.getExecutor()));
}

private CompletionStage<Signal<?>> addToResponseReceiver(final Signal<?> signal) {
final var cacheOptional = context.getResponseReceivers();
if (cacheOptional.isPresent() && signal instanceof Command && signal.getDittoHeaders().isResponseRequired()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,16 +246,9 @@ private void sendBackResponses(final OutboundSignal.MultiMapped multiMapped, @Nu
final ThreadSafeDittoLoggingAdapter l = logger.withCorrelationId(multiMapped.getSource());
if (!nonAcknowledgementsResponses.isEmpty() && sender != null) {
nonAcknowledgementsResponses.forEach(response -> {
if (response instanceof ThingQueryCommandResponse && isLiveResponse(response)) {
l.debug("LiveQueryCommandResponse created from HTTP response. " +
"Sending response <{}> to concierge for filtering", response);

proxyActor.tell(response, sender);
} else {
l.debug("CommandResponse created from HTTP response. Replying to <{}>: <{}>", sender,
response);
sender.tell(response, getSelf());
}
l.debug("CommandResponse created from HTTP response. Replying to <{}>: <{}>", sender,
response);
sender.tell(response, getSelf());
});
} else if (nonAcknowledgementsResponses.isEmpty()) {
l.debug("No CommandResponse created from HTTP response.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.connectivity.api.InboundSignal;
import org.eclipse.ditto.connectivity.api.OutboundSignalFactory;
Expand Down Expand Up @@ -141,15 +142,16 @@ private boolean isSourceDeclaredOrTargetIssuedAck(final AcknowledgementLabel lab

private Signal<?> adjustSignalAndStartAckForwarder(final Signal<?> signal, final EntityId entityId) {
final Collection<AcknowledgementRequest> ackRequests = signal.getDittoHeaders().getAcknowledgementRequests();
if (ackRequests.isEmpty()) {
return signal;
}
final Predicate<AcknowledgementLabel> isSourceDeclaredAck = settings.getSourceDeclaredAcks()::contains;
final Set<AcknowledgementLabel> targetIssuedAcks = settings.getTargetIssuedAcks();
final boolean hasSourceDeclaredAcks = ackRequests.stream()
.map(AcknowledgementRequest::getLabel)
.anyMatch(isSourceDeclaredAck);
if (hasSourceDeclaredAcks) {
final boolean liveCommandExpectingResponse = isLiveCommandExpectingResponse(signal);
if (!liveCommandExpectingResponse && ackRequests.isEmpty()) {
return signal;
}
final Set<AcknowledgementLabel> targetIssuedAcks = settings.getTargetIssuedAcks();
if (hasSourceDeclaredAcks || liveCommandExpectingResponse) {
// start ackregator for source declared acks
return AcknowledgementForwarderActor.startAcknowledgementForwarder(getContext(),
entityId,
Expand All @@ -167,6 +169,13 @@ private Signal<?> adjustSignalAndStartAckForwarder(final Signal<?> signal, final
}
}

private boolean isLiveCommandExpectingResponse(final Signal<?> signal) {
final var headers = signal.getDittoHeaders();
return signal instanceof Command &&
headers.isResponseRequired() &&
(TopicPath.Channel.LIVE.getName().equals(headers.getChannel().orElse("")));
}

private void handleInboundResponseOrAcknowledgement(final Signal<?> responseOrAck) {
if (Acknowledgement.TYPE.equals(responseOrAck.getType())) {
final var acknowledgement = (Acknowledgement) responseOrAck;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ public void testLiveCommandHttpPushCreatesLiveCommandResponseFromProtocolMessage
publisherActor.tell(signalToMultiMapped(command, target, testKit.getRef()), testKit.getRef());

// Assert
final var responseSignal = proxyActorTestProbe.expectMsgClass(Signal.class);
final var responseSignal = testKit.expectMsgClass(Signal.class);
assertThat(responseSignal).isInstanceOfSatisfying(RetrieveThingResponse.class, retrieveThingResponse -> {
assertThat((CharSequence) retrieveThingResponse.getEntityId()).isEqualTo(thingId);
assertThat(retrieveThingResponse.getHttpStatus()).isEqualTo(retrieveThingMockResponse.getHttpStatus());
Expand Down

0 comments on commit 19aaa47

Please sign in to comment.