Skip to content

Commit

Permalink
[#1228] send query response back to concierge; use single ask for liv…
Browse files Browse the repository at this point in the history
…e commands

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Dec 3, 2021
1 parent d28ad36 commit d70fd64
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,15 @@ private void onQueryCommandResponse(final ThingQueryCommandResponse<?> response)
logger.debug("Got <{}>, valid=<{}>", response, validResponse);
if (validResponse) {
responseReceived = true;
}
if (messageSender != null) {
messageSender.forward(response, getContext());
checkCompletion();
if (messageSender != null) {
messageSender.forward(response, getContext());
checkCompletion();
} else {
logger.error("Got response without receiving command");
stopSelf("Message sender not found");
}
} else {
logger.error("Got response without receiving command");
stopSelf("Message sender not found");
acknowledgementReceiver.forward(response, getContext());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Function;

import org.eclipse.ditto.base.model.exceptions.AskException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
Expand Down Expand Up @@ -73,26 +75,9 @@ protected CompletionStage<R> askAndBuildJsonView(
.thenApply(response -> filterJsonView(response, enforcer));
}

@SuppressWarnings("unchecked")
protected CompletionStage<R> askAndBuildJsonView(
final ActorRef actorToAsk,
final C commandWithReadSubjects,
final Object publish,
final Enforcer enforcer,
final Scheduler scheduler,
final Executor executor) {

return ask(actorToAsk, commandWithReadSubjects, publish, "before building JsonView", scheduler, executor)
.thenApply(response -> filterJsonView((R) response.setDittoHeaders(
response.getDittoHeaders()
.toBuilder()
.authorizationContext(commandWithReadSubjects.getDittoHeaders().getAuthorizationContext())
.build()
), enforcer));
}

/**
* Asks the given {@code actorToAsk} for a response by telling {@code commandWithReadSubjects}.
* This method uses {@link AskWithRetry}.
*
* @param actorToAsk the actor that should be asked.
* @param commandWithReadSubjects the command that is used to ask.
Expand All @@ -109,48 +94,68 @@ protected CompletionStage<R> ask(
final Scheduler scheduler,
final Executor executor) {

final var publish = wrapBeforeAsk(commandWithReadSubjects);
return ask(actorToAsk, commandWithReadSubjects, publish, hint, scheduler, executor);
final BiFunction<ActorRef, Object, CompletionStage<R>> askWithRetry =
(toAsk, message) -> AskWithRetry.askWithRetry(toAsk, message, getAskWithRetryConfig(), scheduler,
executor, getResponseCaster(commandWithReadSubjects, hint)
);

return ask(actorToAsk, commandWithReadSubjects, askWithRetry);
}

@SuppressWarnings("unchecked") // We can ignore this warning since it is tested that response class is assignable
/**
* Asks the given {@code actorToAsk} for a response by telling {@code commandWithReadSubjects}.
*
* @param actorToAsk the actor that should be asked.
* @param commandWithReadSubjects the command that is used to ask.
* @param askStrategy a function which does the actual ask, e.g. with timeout or with retry.
* @return A completion stage which either completes with a filtered response of type {@link R} or fails with a
* {@link DittoRuntimeException}.
*/
protected CompletionStage<R> ask(
final ActorRef actorToAsk,
final C signal,
final Object publish,
final String hint,
final Scheduler scheduler,
final Executor executor) {
final C commandWithReadSubjects,
final BiFunction<ActorRef, Object, CompletionStage<R>> askStrategy) {

return AskWithRetry.askWithRetry(actorToAsk, publish,
getAskWithRetryConfig(), scheduler, executor,
response -> {
if (responseClass.isAssignableFrom(response.getClass())) {
return (R) response;
} else if (response instanceof ErrorResponse) {
throw ((ErrorResponse<?>) response).getDittoRuntimeException();
} else if (response instanceof AskException) {
throw handleAskTimeoutForCommand(signal, (Throwable) response);
} else if (response instanceof AskTimeoutException) {
throw handleAskTimeoutForCommand(signal, (Throwable) response);
} else {
throw reportErrorOrResponse(hint, response, null);
}
}
).exceptionally(throwable -> {
return askStrategy.apply(actorToAsk, wrapBeforeAsk(commandWithReadSubjects))
.exceptionally(throwable -> {
final DittoRuntimeException dre = DittoRuntimeException.asDittoRuntimeException(throwable, cause ->
AskException.newBuilder()
.dittoHeaders(signal.getDittoHeaders())
.dittoHeaders(commandWithReadSubjects.getDittoHeaders())
.cause(cause)
.build());
if (dre instanceof AskException) {
throw handleAskTimeoutForCommand(signal, throwable);
throw handleAskTimeoutForCommand(commandWithReadSubjects, throwable);
} else {
throw dre;
}
});
}

/**
* Returns a mapping function, which casts an Object response to the command response class.
*
* @param commandWithReadSubjects the original command.
* @param hint used for logging purposes.
* @return the mapping function.
*/
@SuppressWarnings("unchecked")
protected Function<Object, R> getResponseCaster(final C commandWithReadSubjects, final String hint) {

return response -> {
if (responseClass.isAssignableFrom(response.getClass())) {
return (R) response;
} else if (response instanceof ErrorResponse) {
throw ((ErrorResponse<?>) response).getDittoRuntimeException();
} else if (response instanceof AskException) {
throw handleAskTimeoutForCommand(commandWithReadSubjects, (Throwable) response);
} else if (response instanceof AskTimeoutException) {
throw handleAskTimeoutForCommand(commandWithReadSubjects, (Throwable) response);
} else {
throw reportErrorOrResponse(hint, response, null);
}
};
}

/**
* Allows to wrap an command into something different before
* {@link #ask(ActorRef,Signal, String, Scheduler, Executor) asking}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@

import static org.eclipse.ditto.policies.api.Permission.WRITE;

import java.time.Duration;
import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;

import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.common.ConditionChecker;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
Expand Down Expand Up @@ -63,13 +65,17 @@
import akka.actor.ActorRef;
import akka.actor.ActorRefFactory;
import akka.japi.Pair;
import akka.pattern.Patterns;

/**
* Enforces live commands (including message commands) and live events.
*/
public final class LiveSignalEnforcement extends AbstractEnforcementWithAsk<SignalWithEntityId<?>,
ThingQueryCommandResponse<?>> {

private static final Duration MIN_LIVE_TIMEOUT = Duration.ofSeconds(1L);
private static final Duration DEFAULT_LIVE_TIMEOUT = Duration.ofSeconds(60L);

private static final AckExtractor<ThingCommand<?>> THING_COMMAND_ACK_EXTRACTOR =
AckExtractor.of(ThingCommand::getEntityId, ThingCommand::getDittoHeaders);
private static final AckExtractor<ThingEvent<?>> THING_EVENT_ACK_EXTRACTOR =
Expand Down Expand Up @@ -313,16 +319,8 @@ private CompletionStage<Contextual<WithDittoHeaders>> enforceLiveSignal(final St
log(withReadSubjects).info("Live Command was authorized: <{}>", withReadSubjects);
final boolean isThingQueryCommandRequiringResponse =
liveSignal instanceof ThingQueryCommand && liveSignal.getDittoHeaders().isResponseRequired();
final boolean hasCustomAckRequests = hasCustomAcknowledgementRequests(withReadSubjects);
if (isThingQueryCommandRequiringResponse && !hasCustomAckRequests) {
return addToResponseReceiver(withReadSubjects).thenApply(newSignal ->
askAndBuildJsonView((ThingCommand<?>) newSignal, THING_COMMAND_ACK_EXTRACTOR,
liveSignalPub.command(), enforcer)
);
} else if (isThingQueryCommandRequiringResponse) {
return addToResponseReceiver(withReadSubjects).thenApply(newSignal ->
askAndBuildJsonViewWithAckForwarding((ThingCommand<?>) newSignal,
THING_COMMAND_ACK_EXTRACTOR, liveSignalPub.command(), enforcer));
if (isThingQueryCommandRequiringResponse) {
return publishLiveQueryCommandAndBuildJsonView(withReadSubjects, enforcer);
} else {
return publishLiveSignal(withReadSubjects, THING_COMMAND_ACK_EXTRACTOR, liveSignalPub.command());
}
Expand All @@ -335,11 +333,11 @@ private CompletionStage<Contextual<WithDittoHeaders>> enforceLiveSignal(final St
}
}

private static boolean hasCustomAcknowledgementRequests(final Signal<?> signal) {
return !signal.getDittoHeaders()
.getAcknowledgementRequests()
.stream()
.allMatch(request -> DittoAcknowledgementLabel.LIVE_RESPONSE.equals(request.getLabel()));
private CompletionStage<Contextual<WithDittoHeaders>> publishLiveQueryCommandAndBuildJsonView(
final ThingCommand<?> withReadSubjects, final Enforcer enforcer) {
return addToResponseReceiver(withReadSubjects).thenApply(newSignal ->
askAndBuildJsonViewWithAckForwarding((ThingCommand<?>) newSignal,
liveSignalPub.command(), enforcer));
}

/**
Expand Down Expand Up @@ -439,30 +437,26 @@ private <T extends Signal<?>, S extends T> CompletionStage<Contextual<WithDittoH

private Contextual<WithDittoHeaders> askAndBuildJsonViewWithAckForwarding(
final ThingCommand<?> signal,
final AckExtractor<ThingCommand<?>> ackExtractor,
final DistributedPub<ThingCommand<?>> pub,
final Enforcer enforcer) {

final var publish = pub.wrapForPublicationWithAcks(signal, ackExtractor);
final var castSignal = (SignalWithEntityId<?>) signal;
final var props = LiveResponseAndAcknowledgementForwarder.props(signal, pub.getPublisher(), sender());
final var liveResponseForwarder = actorRefFactory.actorOf(props);
return withMessageToReceiverViaAskFuture(signal, sender(), () ->
askAndBuildJsonView(liveResponseForwarder, castSignal, publish, enforcer, context.getScheduler(),
context.getExecutor()));
}

private <T extends Signal<?>> Contextual<WithDittoHeaders> askAndBuildJsonView(
final T signal,
final AckExtractor<T> ackExtractor,
final DistributedPub<T> pub,
final Enforcer enforcer) {

final var publish = pub.wrapForPublicationWithAcks(signal, ackExtractor);
final var castSignal = (SignalWithEntityId<?>) signal;
return withMessageToReceiverViaAskFuture(signal, sender(), () ->
askAndBuildJsonView(pub.getPublisher(), castSignal, publish, enforcer, context.getScheduler(),
context.getExecutor()));
final var startTime = Instant.now();
return withMessageToReceiverViaAskFuture(signal, sender(), () -> {
final BiFunction<ActorRef, Object, CompletionStage<ThingQueryCommandResponse<?>>> askStrategy =
(toAsk, message) -> {
// TODO: move timeout adjustment to thing command enforcement
final var timeout = getAdjustedTimeout(signal, startTime);
final var signalWithAdjustedTimeout = adjustTimeout(signal, timeout);
final var publish =
pub.wrapForPublicationWithAcks(signalWithAdjustedTimeout, THING_COMMAND_ACK_EXTRACTOR);
return Patterns.ask(toAsk, publish, timeout)
.thenApply(getResponseCaster(signal, "before building JsonView"));
};
return ask(liveResponseForwarder, signal, askStrategy)
.thenApply(response -> filterJsonView(response, enforcer));
});
}

private CompletionStage<Signal<?>> addToResponseReceiver(final Signal<?> signal) {
Expand Down Expand Up @@ -551,4 +545,18 @@ private CompletionStage<String> findUniqueCorrelationId(final String startingId,
});
}

private static Duration getAdjustedTimeout(final Signal<?> signal, final Instant startTime) {
final var baseTimeout = signal.getDittoHeaders().getTimeout().orElse(DEFAULT_LIVE_TIMEOUT);
final var adjustedTimeout = baseTimeout.minus(Duration.between(startTime, Instant.now()));
return adjustedTimeout.minus(MIN_LIVE_TIMEOUT).isNegative() ? MIN_LIVE_TIMEOUT : adjustedTimeout;
}

private static ThingCommand<?> adjustTimeout(final ThingCommand<?> command, final Duration adjustedTimeout) {
return command.setDittoHeaders(
command.getDittoHeaders()
.toBuilder()
.timeout(adjustedTimeout)
.build()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,14 +187,8 @@ private void handleInboundResponseOrAcknowledgement(final Signal<?> responseOrAc

final var context = getContext();
final var proxyActor = settings.getProxyActor();
final Consumer<ActorRef> forwardAck = acknowledgementForwarder -> {
if (responseOrAck instanceof ThingQueryCommandResponse && isLiveResponse(responseOrAck)) {
// forward live command responses to concierge to filter response
proxyActor.tell(responseOrAck, getSender());
} else {
acknowledgementForwarder.forward(responseOrAck, context);
}
};
final Consumer<ActorRef> forwardAck =
acknowledgementForwarder -> acknowledgementForwarder.forward(responseOrAck, context);

final Runnable forwardToConcierge = () -> {
final var forwarderActorClassName = AcknowledgementForwarderActor.class.getSimpleName();
Expand Down

0 comments on commit d70fd64

Please sign in to comment.