Skip to content

Commit

Permalink
[#1228] add initial implementation of smart channel selection.
Browse files Browse the repository at this point in the history
Still TODO:
- unit test
- code deduplication
- twin fallback

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Dec 5, 2021
1 parent d70fd64 commit d65131f
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ public S condition(final String condition) {
}

@Override
public S liveChannelCondition(final String liveChannelCondition) {
public S liveChannelCondition(@Nullable final String liveChannelCondition) {
putCharSequence(DittoHeaderDefinition.LIVE_CHANNEL_CONDITION, liveChannelCondition);
return myself;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ B acknowledgementRequest(AcknowledgementRequest acknowledgementRequest,
* @return this builder for method chaining.
* @since 2.2.0
*/
B liveChannelCondition(String liveChannelCondition);
B liveChannelCondition(@Nullable String liveChannelCondition);

/**
* Puts an arbitrary header with the specified {@code name} and String {@code value} to this builder.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,11 @@
public final class LiveSignalEnforcement extends AbstractEnforcementWithAsk<SignalWithEntityId<?>,
ThingQueryCommandResponse<?>> {

private static final Duration MIN_LIVE_TIMEOUT = Duration.ofSeconds(1L);
private static final Duration DEFAULT_LIVE_TIMEOUT = Duration.ofSeconds(60L);
// TODO: configure
static final Duration MIN_LIVE_TIMEOUT = Duration.ofSeconds(1L);
static final Duration DEFAULT_LIVE_TIMEOUT = Duration.ofSeconds(60L);

private static final AckExtractor<ThingCommand<?>> THING_COMMAND_ACK_EXTRACTOR =
static final AckExtractor<ThingCommand<?>> THING_COMMAND_ACK_EXTRACTOR =
AckExtractor.of(ThingCommand::getEntityId, ThingCommand::getDittoHeaders);
private static final AckExtractor<ThingEvent<?>> THING_EVENT_ACK_EXTRACTOR =
AckExtractor.of(ThingEvent::getEntityId, ThingEvent::getDittoHeaders);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,15 @@
package org.eclipse.ditto.concierge.service.enforcement;

import static java.util.Objects.requireNonNull;
import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;
import static org.eclipse.ditto.concierge.service.enforcement.LiveSignalEnforcement.DEFAULT_LIVE_TIMEOUT;
import static org.eclipse.ditto.concierge.service.enforcement.LiveSignalEnforcement.MIN_LIVE_TIMEOUT;
import static org.eclipse.ditto.concierge.service.enforcement.LiveSignalEnforcement.THING_COMMAND_ACK_EXTRACTOR;
import static org.eclipse.ditto.concierge.service.enforcement.LiveSignalEnforcement.addEffectedReadSubjectsToThingLiveSignal;
import static org.eclipse.ditto.policies.api.Permission.MIN_REQUIRED_POLICY_PERMISSIONS;

import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
Expand All @@ -38,9 +45,11 @@
import org.eclipse.ditto.base.model.json.FieldType;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.base.model.namespaces.NamespaceBlockedException;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.commands.CommandToExceptionRegistry;
import org.eclipse.ditto.base.model.signals.commands.exceptions.GatewayInternalErrorException;
import org.eclipse.ditto.concierge.api.ConciergeMessagingConstants;
import org.eclipse.ditto.concierge.service.actors.LiveResponseAndAcknowledgementForwarder;
import org.eclipse.ditto.concierge.service.enforcement.placeholders.references.PolicyIdReferencePlaceholderResolver;
import org.eclipse.ditto.concierge.service.enforcement.placeholders.references.ReferencePlaceholder;
import org.eclipse.ditto.internal.models.signal.SignalInformationPoint;
Expand All @@ -53,6 +62,7 @@
import org.eclipse.ditto.internal.utils.cacheloaders.EnforcementContext;
import org.eclipse.ditto.internal.utils.cacheloaders.PolicyEnforcer;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.internal.utils.pubsub.LiveSignalPub;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonFieldSelector;
import org.eclipse.ditto.json.JsonObject;
Expand Down Expand Up @@ -82,6 +92,7 @@
import org.eclipse.ditto.policies.model.signals.commands.modify.CreatePolicyResponse;
import org.eclipse.ditto.policies.model.signals.commands.query.RetrievePolicy;
import org.eclipse.ditto.policies.model.signals.commands.query.RetrievePolicyResponse;
import org.eclipse.ditto.protocol.TopicPath;
import org.eclipse.ditto.rql.model.ParserException;
import org.eclipse.ditto.rql.model.predicates.ast.RootNode;
import org.eclipse.ditto.rql.parser.RqlPredicateParser;
Expand Down Expand Up @@ -110,7 +121,9 @@
import org.eclipse.ditto.things.model.signals.commands.query.ThingQueryCommandResponse;

import akka.actor.ActorRef;
import akka.actor.ActorRefFactory;
import akka.pattern.AskTimeoutException;
import akka.pattern.Patterns;

/**
* Authorize {@code ThingCommand}.
Expand Down Expand Up @@ -140,13 +153,17 @@ public final class ThingCommandEnforcement
private final Cache<EnforcementCacheKey, Entry<Enforcer>> policyEnforcerCache;
private final PreEnforcer preEnforcer;
private final PolicyIdReferencePlaceholderResolver policyIdReferencePlaceholderResolver;
private final LiveSignalPub liveSignalPub;
private final ActorRefFactory actorRefFactory;

private ThingCommandEnforcement(final Contextual<ThingCommand<?>> data,
final ActorRef thingsShardRegion,
final ActorRef policiesShardRegion,
final Cache<EnforcementCacheKey, Entry<EnforcementCacheKey>> thingIdCache,
final Cache<EnforcementCacheKey, Entry<Enforcer>> policyEnforcerCache,
final PreEnforcer preEnforcer) {
final PreEnforcer preEnforcer,
final LiveSignalPub liveSignalPub,
final ActorRefFactory actorRefFactory) {

super(data, ThingQueryCommandResponse.class);
this.thingsShardRegion = requireNonNull(thingsShardRegion);
Expand All @@ -155,11 +172,13 @@ private ThingCommandEnforcement(final Contextual<ThingCommand<?>> data,
this.thingIdCache = requireNonNull(thingIdCache);
this.policyEnforcerCache = requireNonNull(policyEnforcerCache);
this.preEnforcer = preEnforcer;
this.actorRefFactory = actorRefFactory;
thingEnforcerRetriever = PolicyEnforcerRetrieverFactory.create(thingIdCache, policyEnforcerCache);
policyEnforcerRetriever = new EnforcerRetriever<>(IdentityCache.INSTANCE, policyEnforcerCache);
policyIdReferencePlaceholderResolver =
PolicyIdReferencePlaceholderResolver.of(conciergeForwarder(), getAskWithRetryConfig(),
context.getScheduler(), context.getExecutor());
this.liveSignalPub = liveSignalPub;
}

@Override
Expand Down Expand Up @@ -281,24 +300,67 @@ private Contextual<WithDittoHeaders> enforceThingCommandByPolicyEnforcer(
final Contextual<WithDittoHeaders> result;
if (commandWithReadSubjects instanceof ThingQueryCommand) {
final ThingQueryCommand<?> thingQueryCommand = (ThingQueryCommand<?>) commandWithReadSubjects;
final Instant startTime = Instant.now();
if (!isResponseRequired(thingQueryCommand)) {
// drop query command with response-required=false
result = withMessageToReceiver(null, ActorRef.noSender());
} else if (thingQueryCommand instanceof RetrieveThing && shouldRetrievePolicyWithThing(thingQueryCommand)) {
final var retrieveThing = (RetrieveThing) thingQueryCommand;
result = withMessageToReceiverViaAskFuture(retrieveThing, sender(),
() -> retrieveThingAndPolicy(retrieveThing, policyId, enforcer));
result = withMessageToReceiverViaAskFuture(retrieveThing, sender(), () ->
retrieveThingAndPolicy(retrieveThing, policyId, enforcer).thenCompose(response ->
doSmartChannelSelection(retrieveThing, response, startTime, enforcer))
);
} else {
result = withMessageToReceiverViaAskFuture(thingQueryCommand, sender(),
() -> askAndBuildJsonView(thingsShardRegion, thingQueryCommand, enforcer,
context.getScheduler(), context.getExecutor()));
result = withMessageToReceiverViaAskFuture(thingQueryCommand, sender(), () ->
askAndBuildJsonView(thingsShardRegion, thingQueryCommand, enforcer,
context.getScheduler(), context.getExecutor()).thenCompose(response ->
doSmartChannelSelection(thingQueryCommand, response, startTime, enforcer))
);
}
} else {
result = forwardToThingsShardRegion(commandWithReadSubjects);
}
return result;
}

private CompletionStage<ThingQueryCommandResponse<?>> doSmartChannelSelection(final ThingQueryCommand<?> command,
final ThingQueryCommandResponse<?> twinResponse, final Instant startTime, final Enforcer enforcer) {

if (command.getDittoHeaders().getLiveChannelCondition().isEmpty() ||
!twinResponse.getDittoHeaders().didLiveChannelConditionMatch()) {
return CompletableFuture.completedStage(twinResponse);
}

final ThingQueryCommand<?> liveCommand = toLiveCommand(command, enforcer);

final var pub = liveSignalPub.command();
// TODO: twin fallback
final var props =
LiveResponseAndAcknowledgementForwarder.props(liveCommand, pub.getPublisher(), sender());
final var liveResponseForwarder = actorRefFactory.actorOf(props);
final BiFunction<ActorRef, Object, CompletionStage<ThingQueryCommandResponse<?>>> askStrategy =
(toAsk, message) -> {
// TODO: consolidate with live signal enforcement
final var timeout = getAdjustedLiveTimeout(liveCommand, startTime);
final var signalWithAdjustedTimeout =
adjustTimeoutAndSetReadSubjects(liveCommand, timeout);
final var publish =
pub.wrapForPublicationWithAcks(signalWithAdjustedTimeout, THING_COMMAND_ACK_EXTRACTOR);
return Patterns.ask(toAsk, publish, timeout)
.thenApply(getResponseCaster(liveCommand, "before building JsonView"));
};
return ask(liveResponseForwarder, liveCommand, askStrategy)
.thenApply(response -> filterJsonView(response, enforcer));
}

private ThingQueryCommand<?> toLiveCommand(final ThingQueryCommand<?> command, final Enforcer enforcer) {
final ThingQueryCommand<?> withReadSubjects = addEffectedReadSubjectsToThingLiveSignal(command, enforcer);
return withReadSubjects.setDittoHeaders(withReadSubjects.getDittoHeaders().toBuilder()
.liveChannelCondition(null)
.channel(TopicPath.Channel.LIVE.getName())
.build());
}

/**
* Retrieve a thing and its policy and combine them into a response.
*
Expand Down Expand Up @@ -1115,6 +1177,22 @@ private static Optional<Policy> getDefaultPolicy(final AuthorizationContext auth
.build());
}

private static Duration getAdjustedLiveTimeout(final Signal<?> signal, final Instant startTime) {
final var baseTimeout = signal.getDittoHeaders().getTimeout().orElse(DEFAULT_LIVE_TIMEOUT);
final var adjustedTimeout = baseTimeout.minus(Duration.between(startTime, Instant.now()));
return adjustedTimeout.minus(MIN_LIVE_TIMEOUT).isNegative() ? MIN_LIVE_TIMEOUT : adjustedTimeout;
}

private static ThingCommand<?> adjustTimeoutAndSetReadSubjects(final ThingCommand<?> command,
final Duration adjustedTimeout) {
return command.setDittoHeaders(
command.getDittoHeaders()
.toBuilder()
.timeout(adjustedTimeout)
.build()
);
}

/**
* A pair of {@code CreateThing} command with {@code Enforcer}.
*/
Expand All @@ -1140,6 +1218,8 @@ public static final class Provider implements EnforcementProvider<ThingCommand<?
private final Cache<EnforcementCacheKey, Entry<EnforcementCacheKey>> thingIdCache;
private final Cache<EnforcementCacheKey, Entry<Enforcer>> policyEnforcerCache;
private final PreEnforcer preEnforcer;
private final LiveSignalPub liveSignalPub;
private final ActorRefFactory actorRefFactory;

/**
* Constructor.
Expand All @@ -1149,18 +1229,25 @@ public static final class Provider implements EnforcementProvider<ThingCommand<?
* @param thingIdCache the thing-id-cache.
* @param policyEnforcerCache the policy-enforcer cache.
* @param preEnforcer pre-enforcer function to block undesirable messages to policies shard region.
* @param liveSignalPub publisher of live signals.
* @param actorRefFactory factory with which to create actors.
*/
@SuppressWarnings("NullableProblems")
public Provider(final ActorRef thingsShardRegion,
final ActorRef policiesShardRegion,
final Cache<EnforcementCacheKey, Entry<EnforcementCacheKey>> thingIdCache,
final Cache<EnforcementCacheKey, Entry<Enforcer>> policyEnforcerCache,
@Nullable final PreEnforcer preEnforcer) {

this.thingsShardRegion = requireNonNull(thingsShardRegion);
this.policiesShardRegion = requireNonNull(policiesShardRegion);
this.thingIdCache = requireNonNull(thingIdCache);
this.policyEnforcerCache = requireNonNull(policyEnforcerCache);
@Nullable final PreEnforcer preEnforcer,
final LiveSignalPub liveSignalPub,
final ActorRefFactory actorRefFactory) {

this.thingsShardRegion = checkNotNull(thingsShardRegion, "thingShardRegion");
this.policiesShardRegion = checkNotNull(policiesShardRegion, "policiesShardRegion");
this.thingIdCache = checkNotNull(thingIdCache, "thingIdCache");
this.policyEnforcerCache = checkNotNull(policyEnforcerCache, "policyEnforcerCache");
this.preEnforcer = Objects.requireNonNullElseGet(preEnforcer, () -> CompletableFuture::completedFuture);
this.liveSignalPub = checkNotNull(liveSignalPub, "liveSignalPub");
this.actorRefFactory = actorRefFactory;
}

@Override
Expand Down Expand Up @@ -1189,7 +1276,9 @@ public AbstractEnforcement<ThingCommand<?>> createEnforcement(final Contextual<T
policiesShardRegion,
thingIdCache,
policyEnforcerCache,
preEnforcer);
preEnforcer,
liveSignalPub,
actorRefFactory);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ public ActorRef startEnforcerActor(final ActorContext context,

final Set<EnforcementProvider<?>> enforcementProviders = new HashSet<>();
enforcementProviders.add(new ThingCommandEnforcement.Provider(thingsShardRegionProxy,
policiesShardRegionProxy, thingIdCache, projectedEnforcerCache, preEnforcer));
policiesShardRegionProxy, thingIdCache, projectedEnforcerCache, preEnforcer, liveSignalPub,
actorSystem));
enforcementProviders.add(new PolicyCommandEnforcement.Provider(policiesShardRegionProxy, policyEnforcerCache));
enforcementProviders.add(new LiveSignalEnforcement.Provider(thingIdCache,
projectedEnforcerCache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.eclipse.ditto.things.model.ThingsModelFactory;
import org.eclipse.ditto.things.model.signals.commands.ThingCommand;
import org.eclipse.ditto.things.model.signals.events.ThingEvent;
import org.mockito.Mockito;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.typesafe.config.Config;
Expand Down Expand Up @@ -177,8 +178,10 @@ public ActorRef build() {
CaffeineCache.of(Caffeine.newBuilder(), thingEnforcementIdCacheLoader);

final Set<EnforcementProvider<?>> enforcementProviders = new HashSet<>();
final LiveSignalPub liveSignalPub = Mockito.mock(LiveSignalPub.class);
enforcementProviders.add(new ThingCommandEnforcement.Provider(thingsShardRegion,
policiesShardRegion, thingIdCache, projectedEnforcerCache, preEnforcer));
policiesShardRegion, thingIdCache, projectedEnforcerCache, preEnforcer, liveSignalPub,
system));
enforcementProviders.add(new PolicyCommandEnforcement.Provider(policiesShardRegion, policyEnforcerCache));
enforcementProviders.add(new LiveSignalEnforcement.Provider(thingIdCache,
projectedEnforcerCache,
Expand Down

0 comments on commit d65131f

Please sign in to comment.