Skip to content

Commit

Permalink
[#1228] fix various issues in live channel condition evaluation
Browse files Browse the repository at this point in the history
- StreamingSessionActor did not send live thing query responses to sender
- live channel condition was not an allowed HTTP query parameter
- gateway timeout did not add a buffer for the extra roundtrip for smart-channel commands
- response  headers were not set consistently in concierge

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Dec 8, 2021
1 parent 41cf635 commit df8f6cd
Show file tree
Hide file tree
Showing 9 changed files with 190 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.eclipse.ditto.internal.models.signal.correlation.CommandAndCommandResponseMatchingValidator;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.things.model.signals.commands.query.ThingQueryCommandResponse;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
Expand Down Expand Up @@ -87,9 +86,9 @@ public static Props props(final Command<?> command,
@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.match(CommandResponse.class, this::onCommandResponse)
.match(Acknowledgement.class, this::onAcknowledgement)
.match(Acknowledgements.class, this::onAcknowledgements)
.match(CommandResponse.class, this::onCommandResponse)
.match(ReceiveTimeout.class, this::stopSelf)
.matchAny(this::sendMessage)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
import java.util.function.Function;

import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.common.ConditionChecker;
Expand Down Expand Up @@ -73,11 +73,10 @@
public final class LiveSignalEnforcement extends AbstractEnforcementWithAsk<SignalWithEntityId<?>,
ThingQueryCommandResponse<?>> {

// TODO: configure
static final Duration MIN_LIVE_TIMEOUT = Duration.ofSeconds(1L);
static final Duration DEFAULT_LIVE_TIMEOUT = Duration.ofSeconds(60L);
private static final Duration MIN_LIVE_TIMEOUT = Duration.ofSeconds(1L);
private static final Duration DEFAULT_LIVE_TIMEOUT = Duration.ofSeconds(60L);

static final AckExtractor<ThingCommand<?>> THING_COMMAND_ACK_EXTRACTOR =
private static final AckExtractor<ThingCommand<?>> THING_COMMAND_ACK_EXTRACTOR =
AckExtractor.of(ThingCommand::getEntityId, ThingCommand::getDittoHeaders);
private static final AckExtractor<ThingEvent<?>> THING_EVENT_ACK_EXTRACTOR =
AckExtractor.of(ThingEvent::getEntityId, ThingEvent::getDittoHeaders);
Expand Down Expand Up @@ -354,7 +353,8 @@ static <T extends Signal<T>> T addEffectedReadSubjectsToThingLiveSignal(final Si

final var resourceKey = ResourceKey.newInstance(ThingConstants.ENTITY_TYPE, signal.getResourcePath());
final var effectedSubjects = enforcer.getSubjectsWithPermission(resourceKey, Permission.READ);
final var newHeaders = DittoHeaders.newBuilder(signal.getDittoHeaders())
final var newHeaders = signal.getDittoHeaders()
.toBuilder()
.readGrantedSubjects(effectedSubjects.getGranted())
.readRevokedSubjects(effectedSubjects.getRevoked())
.build();
Expand Down Expand Up @@ -398,7 +398,8 @@ private CompletionStage<Contextual<WithDittoHeaders>> publishMessageCommand(fina
final ResourceKey resourceKey =
ResourceKey.newInstance(MessageCommand.RESOURCE_TYPE, command.getResourcePath());
final EffectedSubjects effectedSubjects = enforcer.getSubjectsWithPermission(resourceKey, Permission.READ);
final DittoHeaders headersWithReadSubjects = DittoHeaders.newBuilder(command.getDittoHeaders())
final var headersWithReadSubjects = command.getDittoHeaders()
.toBuilder()
.readGrantedSubjects(effectedSubjects.getGranted())
.readRevokedSubjects(effectedSubjects.getRevoked())
.build();
Expand Down Expand Up @@ -436,30 +437,6 @@ private <T extends Signal<?>, S extends T> CompletionStage<Contextual<WithDittoH
obj -> pub.wrapForPublicationWithAcks((S) obj, ackExtractor)));
}

private Contextual<WithDittoHeaders> askAndBuildJsonViewWithAckForwarding(
final ThingCommand<?> signal,
final DistributedPub<ThingCommand<?>> pub,
final Enforcer enforcer) {

final var props = LiveResponseAndAcknowledgementForwarder.props(signal, pub.getPublisher(), sender());
final var liveResponseForwarder = actorRefFactory.actorOf(props);
final var startTime = Instant.now();
return withMessageToReceiverViaAskFuture(signal, sender(), () -> {
final BiFunction<ActorRef, Object, CompletionStage<ThingQueryCommandResponse<?>>> askStrategy =
(toAsk, message) -> {
// TODO: move timeout adjustment to thing command enforcement
final var timeout = getAdjustedTimeout(signal, startTime);
final var signalWithAdjustedTimeout = adjustTimeout(signal, timeout);
final var publish =
pub.wrapForPublicationWithAcks(signalWithAdjustedTimeout, THING_COMMAND_ACK_EXTRACTOR);
return Patterns.ask(toAsk, publish, timeout)
.thenApply(getResponseCaster(signal, "before building JsonView"));
};
return ask(liveResponseForwarder, signal, askStrategy)
.thenApply(response -> filterJsonView(replaceAuthContext(response, signal), enforcer));
});
}

private CompletionStage<Signal<?>> addToResponseReceiver(final Signal<?> signal) {
final CompletionStage<Signal<?>> result;
final var dittoHeaders = signal.getDittoHeaders();
Expand All @@ -476,6 +453,41 @@ private CompletionStage<Signal<?>> addToResponseReceiver(final Signal<?> signal)
return result;
}

private Contextual<WithDittoHeaders> askAndBuildJsonViewWithAckForwarding(
final ThingCommand<?> signal,
final DistributedPub<ThingCommand<?>> pub,
final Enforcer enforcer) {

final var props = LiveResponseAndAcknowledgementForwarder.props(signal, pub.getPublisher(), sender()
);
final var liveResponseForwarder = actorRefFactory.actorOf(props);
final var startTime = Instant.now();
final var responseCaster = getResponseCaster(signal, "before building JsonView")
.<CompletionStage<ThingQueryCommandResponse<?>>>andThen(CompletableFuture::completedStage);
return withMessageToReceiverViaAskFuture(signal, sender(), () ->
adjustTimeoutAndFilterLiveQueryResponse(this, signal, startTime, pub, liveResponseForwarder, enforcer,
responseCaster));
}

static CompletionStage<ThingQueryCommandResponse<?>> adjustTimeoutAndFilterLiveQueryResponse(
final AbstractEnforcementWithAsk<? super ThingCommand<?>, ThingQueryCommandResponse<?>> enforcement,
final ThingCommand<?> command,
final Instant startTime,
final DistributedPub<ThingCommand<?>> pub,
final ActorRef liveResponseForwarder,
final Enforcer enforcer,
final Function<Object, CompletionStage<ThingQueryCommandResponse<?>>> responseCaster) {

final var timeout = getAdjustedTimeout(command, startTime);
final var signalWithAdjustedTimeout = adjustTimeout(command, timeout);
final var publish = pub.wrapForPublicationWithAcks(signalWithAdjustedTimeout,
LiveSignalEnforcement.THING_COMMAND_ACK_EXTRACTOR);
return Patterns.ask(liveResponseForwarder, publish, timeout)
.exceptionally(e -> e)
.thenCompose(responseCaster)
.thenApply(response -> enforcement.filterJsonView(replaceAuthContext(response, command), enforcer));
}

private static boolean isAuthorized(final MessageCommand<?, ?> command, final Enforcer enforcer) {
return enforcer.hasUnrestrictedPermissions(extractMessageResourceKey(command),
command.getDittoHeaders().getAuthorizationContext(), WRITE);
Expand Down Expand Up @@ -546,15 +558,19 @@ private CompletionStage<String> findUniqueCorrelationId(final String startingId,
});
}

static Duration getLiveSignalTimeout(final Signal<?> signal) {
return signal.getDittoHeaders().getTimeout().orElse(DEFAULT_LIVE_TIMEOUT);
}

private static Duration getAdjustedTimeout(final Signal<?> signal, final Instant startTime) {
final var baseTimeout = signal.getDittoHeaders().getTimeout().orElse(DEFAULT_LIVE_TIMEOUT);
final var baseTimeout = getLiveSignalTimeout(signal);
final var adjustedTimeout = baseTimeout.minus(Duration.between(startTime, Instant.now()));
return adjustedTimeout.minus(MIN_LIVE_TIMEOUT).isNegative() ? MIN_LIVE_TIMEOUT : adjustedTimeout;
}

private static ThingCommand<?> adjustTimeout(final ThingCommand<?> command, final Duration adjustedTimeout) {
return command.setDittoHeaders(
command.getDittoHeaders()
private static ThingCommand<?> adjustTimeout(final ThingCommand<?> signal, final Duration adjustedTimeout) {
return signal.setDittoHeaders(
signal.getDittoHeaders()
.toBuilder()
.timeout(adjustedTimeout)
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,10 @@

import static java.util.Objects.requireNonNull;
import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;
import static org.eclipse.ditto.concierge.service.enforcement.LiveSignalEnforcement.DEFAULT_LIVE_TIMEOUT;
import static org.eclipse.ditto.concierge.service.enforcement.LiveSignalEnforcement.MIN_LIVE_TIMEOUT;
import static org.eclipse.ditto.concierge.service.enforcement.LiveSignalEnforcement.THING_COMMAND_ACK_EXTRACTOR;
import static org.eclipse.ditto.concierge.service.enforcement.LiveSignalEnforcement.addEffectedReadSubjectsToThingLiveSignal;
import static org.eclipse.ditto.concierge.service.enforcement.LiveSignalEnforcement.replaceAuthContext;
import static org.eclipse.ditto.concierge.service.enforcement.LiveSignalEnforcement.adjustTimeoutAndFilterLiveQueryResponse;
import static org.eclipse.ditto.policies.api.Permission.MIN_REQUIRED_POLICY_PERMISSIONS;

import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashSet;
Expand All @@ -31,6 +27,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;

import javax.annotation.Nullable;
Expand All @@ -39,15 +36,21 @@
import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.auth.AuthorizationSubject;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.exceptions.AskException;
import org.eclipse.ditto.base.model.exceptions.DittoJsonException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.DittoHeadersSettable;
import org.eclipse.ditto.base.model.headers.LiveChannelTimeoutStrategy;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.json.FieldType;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.base.model.namespaces.NamespaceBlockedException;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.commands.CommandToExceptionRegistry;
import org.eclipse.ditto.base.model.signals.commands.ErrorResponse;
import org.eclipse.ditto.base.model.signals.commands.exceptions.GatewayCommandTimeoutException;
import org.eclipse.ditto.base.model.signals.commands.exceptions.GatewayInternalErrorException;
import org.eclipse.ditto.concierge.api.ConciergeMessagingConstants;
import org.eclipse.ditto.concierge.service.actors.LiveResponseAndAcknowledgementForwarder;
Expand Down Expand Up @@ -124,7 +127,6 @@
import akka.actor.ActorRef;
import akka.actor.ActorRefFactory;
import akka.pattern.AskTimeoutException;
import akka.pattern.Patterns;

/**
* Authorize {@code ThingCommand}.
Expand Down Expand Up @@ -325,36 +327,76 @@ private Contextual<WithDittoHeaders> enforceThingCommandByPolicyEnforcer(
}

private CompletionStage<ThingQueryCommandResponse<?>> doSmartChannelSelection(final ThingQueryCommand<?> command,
final ThingQueryCommandResponse<?> twinResponse, final Instant startTime, final Enforcer enforcer) {
final ThingQueryCommandResponse<?> response, final Instant startTime, final Enforcer enforcer) {

final var twinResponse = setTwinChannel(response);
if (command.getDittoHeaders().getLiveChannelCondition().isEmpty() ||
!twinResponse.getDittoHeaders().didLiveChannelConditionMatch()) {
return CompletableFuture.completedStage(twinResponse);
}

final ThingQueryCommand<?> liveCommand = toLiveCommand(command, enforcer);

final var pub = liveSignalPub.command();
// TODO: twin fallback
final var props =
LiveResponseAndAcknowledgementForwarder.props(liveCommand, pub.getPublisher(), sender());
final var props = LiveResponseAndAcknowledgementForwarder.props(liveCommand, pub.getPublisher(), sender());
final var liveResponseForwarder = actorRefFactory.actorOf(props);
final BiFunction<ActorRef, Object, CompletionStage<ThingQueryCommandResponse<?>>> askStrategy =
(toAsk, message) -> {
// TODO: consolidate with live signal enforcement
final var timeout = getAdjustedLiveTimeout(liveCommand, startTime);
final var signalWithAdjustedTimeout =
adjustTimeoutAndSetReadSubjects(liveCommand, timeout);
final var publish =
pub.wrapForPublicationWithAcks(signalWithAdjustedTimeout, THING_COMMAND_ACK_EXTRACTOR);
return Patterns.ask(toAsk, publish, timeout)
.thenApply(getResponseCaster(liveCommand, "before building JsonView"));
};
return ask(liveResponseForwarder, liveCommand, askStrategy)
.thenApply(response -> filterJsonView(replaceAuthContext(response, command), enforcer));
}

private ThingQueryCommand<?> toLiveCommand(final ThingQueryCommand<?> command, final Enforcer enforcer) {
return adjustTimeoutAndFilterLiveQueryResponse(this, liveCommand, startTime, pub, liveResponseForwarder,
enforcer, getFallbackResponseCaster(liveCommand, twinResponse));
}

private Function<Object, CompletionStage<ThingQueryCommandResponse<?>>> getFallbackResponseCaster(
final ThingQueryCommand<?> liveCommand, final ThingQueryCommandResponse<?> twinResponse) {

return response -> {
if (response instanceof ThingQueryCommandResponse) {
return CompletableFuture.completedStage(
setAdditionalHeaders((ThingQueryCommandResponse<?>) response));
} else if (response instanceof ErrorResponse) {
return CompletableFuture.failedStage(
setAdditionalHeaders(((ErrorResponse<?>) response).getDittoRuntimeException()));
} else if (response instanceof AskException || response instanceof AskTimeoutException) {
return applyTimeoutStrategy(liveCommand, twinResponse);
} else {
final var errorToReport = reportErrorOrResponse(
"before building JsonView for live response via smart channel selection",
response, null);
return CompletableFuture.failedStage(errorToReport);
}
};
}

private static <T extends DittoHeadersSettable<T>> T setAdditionalHeaders(final DittoHeadersSettable<T> settable) {
// TODO: ensure pre-enforcer headers in responses
return settable.setDittoHeaders(settable.getDittoHeaders()
.toBuilder()
.putHeaders(getAdditionalLiveResponseHeaders(settable.getDittoHeaders()))
.build());
}

private static CompletionStage<ThingQueryCommandResponse<?>> applyTimeoutStrategy(
final ThingCommand<?> command,
final ThingQueryCommandResponse<?> twinResponse) {

if (isTwinFallbackEnabled(twinResponse)) {
return CompletableFuture.completedStage(twinResponse);
} else {
final var timeout = LiveSignalEnforcement.getLiveSignalTimeout(command);
final var timeoutException = GatewayCommandTimeoutException.newBuilder(timeout)
.dittoHeaders(twinResponse.getDittoHeaders()
.toBuilder()
.putHeaders(getAdditionalLiveResponseHeaders(twinResponse.getDittoHeaders()))
.build())
.build();
return CompletableFuture.failedStage(timeoutException);
}
}

private static boolean isTwinFallbackEnabled(final Signal<?> signal) {
final var liveChannelFallbackStrategy =
signal.getDittoHeaders().getLiveChannelTimeoutStrategy().orElse(LiveChannelTimeoutStrategy.FAIL);
return LiveChannelTimeoutStrategy.USE_TWIN == liveChannelFallbackStrategy;
}

private static ThingQueryCommand<?> toLiveCommand(final ThingQueryCommand<?> command, final Enforcer enforcer) {
final ThingQueryCommand<?> withReadSubjects = addEffectedReadSubjectsToThingLiveSignal(command, enforcer);
return withReadSubjects.setDittoHeaders(withReadSubjects.getDittoHeaders().toBuilder()
.liveChannelCondition(null)
Expand Down Expand Up @@ -1178,20 +1220,24 @@ private static Optional<Policy> getDefaultPolicy(final AuthorizationContext auth
.build());
}

private static Duration getAdjustedLiveTimeout(final Signal<?> signal, final Instant startTime) {
final var baseTimeout = signal.getDittoHeaders().getTimeout().orElse(DEFAULT_LIVE_TIMEOUT);
final var adjustedTimeout = baseTimeout.minus(Duration.between(startTime, Instant.now()));
return adjustedTimeout.minus(MIN_LIVE_TIMEOUT).isNegative() ? MIN_LIVE_TIMEOUT : adjustedTimeout;
private static ThingQueryCommandResponse<?> setTwinChannel(final ThingQueryCommandResponse<?> response) {
return response.setDittoHeaders(response.getDittoHeaders()
.toBuilder()
.channel(TopicPath.Channel.TWIN.getName())
.putHeaders(getAdditionalLiveResponseHeaders(response.getDittoHeaders()))
.build());
}

private static ThingCommand<?> adjustTimeoutAndSetReadSubjects(final ThingCommand<?> command,
final Duration adjustedTimeout) {
return command.setDittoHeaders(
command.getDittoHeaders()
.toBuilder()
.timeout(adjustedTimeout)
.build()
);
private static DittoHeaders getAdditionalLiveResponseHeaders(final DittoHeaders responseHeaders) {
final var liveChannelConditionMatched = responseHeaders.getOrDefault(
DittoHeaderDefinition.LIVE_CHANNEL_CONDITION_MATCHED.getKey(), Boolean.TRUE.toString());
return DittoHeaders.newBuilder()
.putHeader(DittoHeaderDefinition.LIVE_CHANNEL_CONDITION_MATCHED.getKey(), liveChannelConditionMatched)
// TODO: ensure pre-enforcer headers in responses
.putHeader(DittoHeaderDefinition.ORIGINATOR.getKey(),
responseHeaders.getAuthorizationContext().getFirstAuthorizationSubject().toString())
.responseRequired(false)
.build();
}

/**
Expand Down

0 comments on commit df8f6cd

Please sign in to comment.