Skip to content

Commit

Permalink
[#1228] Apply live channel fallback strategy to live commands.
Browse files Browse the repository at this point in the history
Akka HTTP timeout is turned off because we manage timeout ourselves
via HttpRequestActor and AcknowledgementAggregatorActor.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Dec 13, 2021
1 parent 4dd7a41 commit 75d98da
Show file tree
Hide file tree
Showing 17 changed files with 184 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public Class<SignalWithEntityId<?>> getCommandClass() {

@Override
public boolean isApplicable(final SignalWithEntityId<?> signal) {
return SignalInformationPoint.isChannelLive(signal);
return SignalInformationPoint.isChannelLive(signal) && !SignalInformationPoint.isChannelSmart(signal);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ private ThingCommandEnforcement(final Contextual<ThingCommand<?>> data,
super(data, ThingQueryCommandResponse.class);
this.thingsShardRegion = requireNonNull(thingsShardRegion);
this.policiesShardRegion = requireNonNull(policiesShardRegion);

this.thingIdCache = requireNonNull(thingIdCache);
this.policyEnforcerCache = requireNonNull(policyEnforcerCache);
this.preEnforcer = preEnforcer;
Expand Down Expand Up @@ -309,13 +308,14 @@ private Contextual<WithDittoHeaders> enforceThingCommandByPolicyEnforcer(
// drop query command with response-required=false
result = withMessageToReceiver(null, ActorRef.noSender());
} else if (thingQueryCommand instanceof RetrieveThing && shouldRetrievePolicyWithThing(thingQueryCommand)) {
final var retrieveThing = (RetrieveThing) thingQueryCommand;
final var retrieveThing = (RetrieveThing) ensureTwinChannel(thingQueryCommand);
result = withMessageToReceiverViaAskFuture(retrieveThing, sender(), () ->
retrieveThingAndPolicy(retrieveThing, policyId, enforcer).thenCompose(response ->
doSmartChannelSelection(retrieveThing, response, startTime, enforcer))
doSmartChannelSelection(thingQueryCommand, response, startTime, enforcer))
);
} else {
result = withMessageToReceiverViaAskFuture(thingQueryCommand, sender(), () ->
final var twinCommand = ensureTwinChannel(thingQueryCommand);
result = withMessageToReceiverViaAskFuture(twinCommand, sender(), () ->
askAndBuildJsonView(thingsShardRegion, thingQueryCommand, enforcer,
context.getScheduler(), context.getExecutor()).thenCompose(response ->
doSmartChannelSelection(thingQueryCommand, response, startTime, enforcer))
Expand All @@ -333,8 +333,7 @@ private CompletionStage<ThingQueryCommandResponse<?>> doSmartChannelSelection(fi
final ThingQueryCommandResponse<?> twinResponseWithTwinChannel = setTwinChannel(response);
final ThingQueryCommandResponse<?> twinResponse = CommandHeaderRestoration.restoreCommandConnectivityHeaders(
twinResponseWithTwinChannel, command.getDittoHeaders());
if (command.getDittoHeaders().getLiveChannelCondition().isEmpty() ||
!twinResponse.getDittoHeaders().didLiveChannelConditionMatch()) {
if (!shouldAttemptLiveChannel(command, twinResponse)) {
return CompletableFuture.completedStage(twinResponse);
}

Expand Down Expand Up @@ -368,6 +367,23 @@ private Function<Object, CompletionStage<ThingQueryCommandResponse<?>>> getFallb
};
}

private static boolean shouldAttemptLiveChannel(final ThingQueryCommand<?> command,
final ThingQueryCommandResponse<?> twinResponse) {
return isLiveChannelConditionMatched(command, twinResponse) || isLiveQueryCommandWithTimeoutStrategy(command);
}

private static boolean isLiveChannelConditionMatched(final ThingQueryCommand<?> command,
final ThingQueryCommandResponse<?> twinResponse) {
return command.getDittoHeaders().getLiveChannelCondition().isPresent() &&
twinResponse.getDittoHeaders().didLiveChannelConditionMatch();
}

static boolean isLiveQueryCommandWithTimeoutStrategy(final Signal<?> command) {
return command instanceof ThingQueryCommand &&
command.getDittoHeaders().getLiveChannelTimeoutStrategy().isPresent() &&
SignalInformationPoint.isChannelLive(command);
}

private static <T extends DittoHeadersSettable<T>> T setAdditionalHeaders(final DittoHeadersSettable<T> settable,
final DittoHeaders commandHeaders) {
// TODO: ensure pre-enforcer headers in responses
Expand Down Expand Up @@ -1242,18 +1258,29 @@ private static ThingQueryCommandResponse<?> setTwinChannel(final ThingQueryComma
.build());
}

private static ThingQueryCommand<?> ensureTwinChannel(final ThingQueryCommand<?> command) {
if (SignalInformationPoint.isChannelLive(command)) {
return command.setDittoHeaders(command.getDittoHeaders()
.toBuilder()
.channel(TopicPath.Channel.TWIN.getName())
.build());
} else {
return command;
}
}

private static DittoHeaders getAdditionalLiveResponseHeaders(final DittoHeaders responseHeaders) {
// TODO: ensure pre-enforcer headers in responses
final var liveChannelConditionMatched = responseHeaders.getOrDefault(
DittoHeaderDefinition.LIVE_CHANNEL_CONDITION_MATCHED.getKey(), Boolean.TRUE.toString());
return DittoHeaders.newBuilder()
final var dittoHeadersBuilder = DittoHeaders.newBuilder()
.putHeader(DittoHeaderDefinition.LIVE_CHANNEL_CONDITION_MATCHED.getKey(), liveChannelConditionMatched)
// TODO: ensure pre-enforcer headers in responses
.putHeader(DittoHeaderDefinition.ORIGINATOR.getKey(),
responseHeaders.getAuthorizationContext().getFirstAuthorizationSubject()
.map(AuthorizationSubject::toString)
.orElseThrow())
.responseRequired(false)
.build();
.responseRequired(false);
responseHeaders.getAuthorizationContext().getFirstAuthorizationSubject()
.map(AuthorizationSubject::toString)
.ifPresent(firstSubject ->
dittoHeadersBuilder.putHeader(DittoHeaderDefinition.ORIGINATOR.getKey(), firstSubject));
return dittoHeadersBuilder.build();
}

/**
Expand Down Expand Up @@ -1324,7 +1351,7 @@ public boolean isApplicable(final ThingCommand<?> command) {

// live commands are not applicable for thing command enforcement
// because they should never be forwarded to things shard region
return !SignalInformationPoint.isChannelLive(command);
return !SignalInformationPoint.isChannelLive(command) || SignalInformationPoint.isChannelSmart(command);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,8 +478,7 @@ private void startAckregatorAndForwardSignal(final EntityId entityId,
final Signal<?> signal,
@Nullable final ActorRef sender) {

ackregatorStarter.doStart(entityId,
signal,
ackregatorStarter.doStart(entityId, signal, null,
responseSignal -> {

// potentially publish response/aggregated acks to reply target
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.eclipse.ditto.connectivity.api.OutboundSignalFactory;
import org.eclipse.ditto.connectivity.model.Target;
import org.eclipse.ditto.internal.models.acks.AcknowledgementForwarderActor;
import org.eclipse.ditto.internal.models.signal.SignalInformationPoint;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.protocol.TopicPath;
Expand Down Expand Up @@ -173,7 +174,7 @@ private boolean isLiveCommandExpectingResponse(final Signal<?> signal) {
final var headers = signal.getDittoHeaders();
return signal instanceof Command &&
headers.isResponseRequired() &&
(TopicPath.Channel.LIVE.getName().equals(headers.getChannel().orElse("")));
SignalInformationPoint.isChannelLive(signal);
}

private void handleInboundResponseOrAcknowledgement(final Signal<?> responseOrAck) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,8 @@
import org.eclipse.ditto.messages.model.signals.commands.MessageCommandResponse;
import org.eclipse.ditto.messages.model.signals.commands.acks.MessageCommandAckRequestSetter;
import org.eclipse.ditto.protocol.HeaderTranslator;
import org.eclipse.ditto.protocol.TopicPath;
import org.eclipse.ditto.things.model.signals.commands.acks.ThingLiveCommandAckRequestSetter;
import org.eclipse.ditto.things.model.signals.commands.acks.ThingModifyCommandAckRequestSetter;
import org.eclipse.ditto.things.model.signals.commands.query.ThingQueryCommand;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
Expand All @@ -91,6 +89,7 @@
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.AskTimeoutException;
import akka.util.ByteString;
import scala.Option;
import scala.util.Either;

/**
Expand Down Expand Up @@ -220,10 +219,12 @@ private void handleCommand(final Command<?> command) {
logger.setCorrelationId(command);
receivedCommand = command;
setDefaultTimeoutExceptionSupplier(command);
final var timeoutOverride = getReceiveTimeout(command, commandConfig);
ackregatorStarter.start(command,
timeoutOverride,
this::onAggregatedResponseOrError,
this::handleCommandWithAckregator,
this::handleCommandWithoutAckregator
command1 -> handleCommandWithoutAckregator(command1, timeoutOverride)
);
final var responseBehavior = ReceiveBuilder.create()
.match(Acknowledgements.class, this::completeAcknowledgements)
Expand Down Expand Up @@ -261,9 +262,9 @@ private Void handleCommandWithAckregator(final Signal<?> command, final ActorRef
return null;
}

private Void handleCommandWithoutAckregator(final Signal<?> command) {
private Void handleCommandWithoutAckregator(final Signal<?> command, final Duration timeoutOverride) {
if (isDevOpsCommand(command) || !shallAcceptImmediately(command)) {
handleCommandWithResponse(command, getResponseAwaitingBehavior());
handleCommandWithResponse(command, getResponseAwaitingBehavior(), timeoutOverride);
setDefaultTimeoutExceptionSupplier(command);
} else {
handleCommandAndAcceptImmediately(command);
Expand Down Expand Up @@ -322,20 +323,16 @@ protected WhoamiResponse createWhoamiResponse(final Whoami request) {
return WhoamiResponse.of(userInformation, dittoHeaders);
}

private void handleCommandWithResponse(final Signal<?> command, final Receive awaitCommandResponseBehavior) {
private void handleCommandWithResponse(final Signal<?> command, final Receive awaitCommandResponseBehavior,
final Duration timeoutOverride) {
logger.debug("Got <{}>. Telling the target actor about it.", command);
proxyActor.tell(command, getSelf());

final ActorContext context = getContext();
if (!isDevOpsCommand(command)) {
final var dittoHeaders = command.getDittoHeaders();

// DevOpsCommands do have their own timeout mechanism, don't reply with a command timeout for the user
// for DevOps commands, so only set the receiveTimeout for non-DevOps commands:
context.setReceiveTimeout(
// TODO consolidate with ackgregator
getReceiveTimeout(command, commandConfig.getDefaultTimeout(), commandConfig.getMaxTimeout())
);
context.setReceiveTimeout(timeoutOverride);
}

// After a Command was received, this Actor can only receive the correlating CommandResponse:
Expand All @@ -353,7 +350,8 @@ private Receive getResponseAwaitingBehavior() {
// If an actor downstream replies with an HTTP response, simply forward it.
.match(HttpResponse.class, this::completeWithResult)
.match(MessageCommandResponse.class,
messageCommandResponse -> completeWithResult(handleMessageResponseMessage(messageCommandResponse)))
messageCommandResponse -> completeWithResult(
handleMessageResponseMessage(messageCommandResponse)))
.match(CommandResponse.class, WithEntity.class::isInstance, commandResponse -> {
logger.withCorrelationId(commandResponse).debug("Got <{}> message.", commandResponse.getType());
handleCommandResponseWithEntity(commandResponse);
Expand Down Expand Up @@ -440,8 +438,8 @@ private HttpResponse handleMessageResponseMessage(final MessageCommandResponse<?
final Optional<akka.http.scaladsl.model.ContentType> optionalContentType = message.getContentType()
.map(ContentType$.MODULE$::parse)
.filter(Either::isRight)
.map(Either::right)
.map(Either.RightProjection::get);
.map(Either::toOption)
.map(Option::get);

final boolean isBinary = optionalContentType
.map(akka.http.scaladsl.model.ContentType::value)
Expand Down Expand Up @@ -718,21 +716,17 @@ private void handleJsonValueSourceRef(final JsonValueSourceRef jsonValueSourceRe
completeWithResult(httpResponse);
}

// TODO consolidate with ackregator
private static Duration getReceiveTimeout(final Signal<?> originatingSignal, final Duration defaultTimeout,
final Duration maxTimeout) {
private static Duration getReceiveTimeout(final Signal<?> originatingSignal, final CommandConfig commandConfig) {

final var defaultTimeout = commandConfig.getDefaultTimeout();
final var maxTimeout = commandConfig.getMaxTimeout();
final var headers = originatingSignal.getDittoHeaders();
final var candidateTimeout = headers.getTimeout()
.filter(timeout -> timeout.minus(maxTimeout).isNegative())
.orElse(defaultTimeout);

// TODO consolidate condition; configure budget
if ((headers.getLiveChannelCondition().isPresent() ||
TopicPath.Channel.LIVE.getName().equals(headers.getChannel().orElse("")) &&
headers.getLiveChannelTimeoutStrategy().isPresent()) &&
originatingSignal instanceof ThingQueryCommand) {
return candidateTimeout.plus(Duration.ofSeconds(10));
if (SignalInformationPoint.isChannelSmart(originatingSignal)) {
return candidateTimeout.plus(commandConfig.getSmartChannelBuffer());
} else {
return candidateTimeout;
}
Expand Down

0 comments on commit 75d98da

Please sign in to comment.