Skip to content

Commit

Permalink
[#1228] Converge ResponseReceiver and Sender paths for live and smart…
Browse files Browse the repository at this point in the history
…-channel commands; fix receive timeout of AbstractHttpRequestActor.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Dec 16, 2021
1 parent 48efa0a commit dd30dba
Show file tree
Hide file tree
Showing 8 changed files with 375 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.internal.models.signal.SignalInformationPoint;
import org.eclipse.ditto.internal.utils.cacheloaders.config.AskWithRetryConfig;
import org.eclipse.ditto.internal.utils.config.KnownConfigValue;

Expand Down Expand Up @@ -45,6 +47,18 @@ public interface EnforcementConfig {
*/
boolean isDispatchLiveResponsesGlobally();

/**
* Check if global dispatch of a signal should be supported.
*
* @param signal the signal.
* @return whether global dispatch support is needed.
*/
default boolean shouldDispatchGlobally(final Signal<?> signal) {
return isDispatchLiveResponsesGlobally() &&
SignalInformationPoint.isCommand(signal) &&
signal.getDittoHeaders().isResponseRequired();
}

/**
* An enumeration of the known config path expressions and their associated default values for
* {@code EnforcementConfig}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,6 @@ protected ThingQueryCommandResponse<?> filterJsonView(final ThingQueryCommandRes
*/
public static final class Provider implements EnforcementProvider<SignalWithEntityId<?>> {

/*
* Defined as constant because it is crucial to use the same cache for
* multiple instances of LiveSignalEnforcement to ensure that
* correlating live commands with live responses works as expected.
* Technically this constant could be defined in LiveSignalEnforcement
* as well.
*/
private static final ResponseReceiverCache RESPONSE_RECEIVER_CACHE = ResponseReceiverCache.newInstance();

private final Cache<EnforcementCacheKey, Entry<EnforcementCacheKey>> thingIdCache;
private final Cache<EnforcementCacheKey, Entry<Enforcer>> policyEnforcerCache;
private final LiveSignalPub liveSignalPub;
Expand Down Expand Up @@ -184,7 +175,7 @@ public LiveSignalEnforcement createEnforcement(final Contextual<SignalWithEntity
thingIdCache,
policyEnforcerCache,
actorRefFactory,
RESPONSE_RECEIVER_CACHE,
ResponseReceiverCache.getDefaultInstance(),
liveSignalPub,
enforcementConfig);
}
Expand Down Expand Up @@ -261,8 +252,8 @@ private CompletionStage<Contextual<WithDittoHeaders>> enforceLiveCommandResponse
private CompletionStage<Contextual<WithDittoHeaders>> returnCommandResponseContextual(
final CommandResponse<?> liveResponse,
final CharSequence correlationId,
final Enforcer enforcer
) {
final Enforcer enforcer) {

return responseReceiverCache.get(correlationId)
.thenApply(responseReceiverEntry -> {
final Contextual<WithDittoHeaders> commandResponseContextual;
Expand Down Expand Up @@ -335,9 +326,17 @@ private CompletionStage<Contextual<WithDittoHeaders>> enforceLiveSignal(final St

private CompletionStage<Contextual<WithDittoHeaders>> publishLiveQueryCommandAndBuildJsonView(
final ThingCommand<?> withReadSubjects, final Enforcer enforcer) {
return addToResponseReceiver(withReadSubjects).thenApply(newSignal ->
askAndBuildJsonViewWithAckForwarding((ThingCommand<?>) newSignal,
liveSignalPub.command(), enforcer));
if (enforcementConfig.shouldDispatchGlobally(withReadSubjects)) {
return responseReceiverCache.insertResponseReceiverConflictFree(
withReadSubjects,
this::createReceiverActor,
(command, receiver) -> askAndBuildJsonViewWithReceiverActor(command, receiver, enforcer)
);
} else {
final var receiver = createReceiverActor(withReadSubjects);
final var result = askAndBuildJsonViewWithReceiverActor(withReadSubjects, receiver, enforcer);
return CompletableFuture.completedStage(result);
}
}

/**
Expand Down Expand Up @@ -422,7 +421,6 @@ private MessageSendNotAllowedException rejectMessageCommand(final MessageCommand
return error;
}

@SuppressWarnings("unchecked")
private <T extends Signal<?>, S extends T> CompletionStage<Contextual<WithDittoHeaders>> publishLiveSignal(
final S signal,
final AckExtractor<S> ackExtractor,
Expand All @@ -431,40 +429,40 @@ private <T extends Signal<?>, S extends T> CompletionStage<Contextual<WithDittoH
// using pub/sub to publish the command to any interested parties (e.g. a Websocket):
log(signal).debug("Publish message to pub-sub: <{}>", signal);

return addToResponseReceiver(signal)
.thenApply(newSignal -> withMessageToReceiver(newSignal,
pub.getPublisher(),
obj -> pub.wrapForPublicationWithAcks((S) obj, ackExtractor)));
}

private CompletionStage<Signal<?>> addToResponseReceiver(final Signal<?> signal) {
final CompletionStage<Signal<?>> result;
final var dittoHeaders = signal.getDittoHeaders();
if (enforcementConfig.isDispatchLiveResponsesGlobally() &&
SignalInformationPoint.isCommand(signal) &&
dittoHeaders.isResponseRequired()) {

result = insertResponseReceiverConflictFree((Command<?>) signal,
Pair.create(sender(), dittoHeaders.getAuthorizationContext()));
if (enforcementConfig.shouldDispatchGlobally(signal)) {
return responseReceiverCache.insertResponseReceiverConflictFree(signal,
newSignal -> sender(),
(newSignal, receiver) -> publishSignal(newSignal, ackExtractor, pub));
} else {
result = CompletableFuture.completedStage(signal);
return CompletableFuture.completedStage(publishSignal(signal, ackExtractor, pub));
}
}

return result;
@SuppressWarnings("unchecked")
private <T extends Signal<?>, S extends T> Contextual<WithDittoHeaders> publishSignal(final T signal,
final AckExtractor<S> ackExtractor, final DistributedPub<T> pub) {
return withMessageToReceiver(signal, pub.getPublisher(),
obj -> pub.wrapForPublicationWithAcks((S) obj, ackExtractor));
}

private Contextual<WithDittoHeaders> askAndBuildJsonViewWithAckForwarding(
final ThingCommand<?> signal,
final DistributedPub<ThingCommand<?>> pub,
private ActorRef createReceiverActor(final Command<?> signal) {
final var pub = liveSignalPub.command();
final var props = LiveResponseAndAcknowledgementForwarder.props(signal, pub.getPublisher(), sender());
return actorRefFactory.actorOf(props);
}

private Contextual<WithDittoHeaders> askAndBuildJsonViewWithReceiverActor(
final Command<?> command,
final ActorRef receiver,
final Enforcer enforcer) {

final var props = LiveResponseAndAcknowledgementForwarder.props(signal, pub.getPublisher(), sender());
final var liveResponseForwarder = actorRefFactory.actorOf(props);
final var thingCommand = (ThingCommand<?>) command;
final var startTime = Instant.now();
final var responseCaster = getResponseCaster(signal, "before building JsonView")
final var pub = liveSignalPub.command();
final var responseCaster = getResponseCaster(thingCommand, "before building JsonView")
.<CompletionStage<ThingQueryCommandResponse<?>>>andThen(CompletableFuture::completedStage);
return withMessageToReceiverViaAskFuture(signal, sender(), () ->
adjustTimeoutAndFilterLiveQueryResponse(this, signal, startTime, pub, liveResponseForwarder, enforcer,
return withMessageToReceiverViaAskFuture(thingCommand, sender(), () ->
adjustTimeoutAndFilterLiveQueryResponse(this, thingCommand, startTime, pub, receiver, enforcer,
responseCaster));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;

import javax.annotation.Nullable;
Expand All @@ -28,7 +32,9 @@
import org.eclipse.ditto.base.model.common.ConditionChecker;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.internal.models.signal.SignalInformationPoint;
import org.eclipse.ditto.internal.utils.cache.Cache;
import org.eclipse.ditto.internal.utils.cache.CaffeineCache;

Expand All @@ -40,9 +46,9 @@

/**
* A cache of response receivers and their associated correlation ID.
*
* <p>
* Each cache entry gets evicted after becoming expired.
*
* <p>
* To put a response receiver to this cache a {@link Command} has to be provided as key.
* The command is necessary because of its headers.
* The command headers are required to contain the mandatory correlation ID.
Expand All @@ -54,6 +60,7 @@
final class ResponseReceiverCache {

private static final Duration DEFAULT_ENTRY_EXPIRY = Duration.ofMinutes(2L);
private static final ResponseReceiverCache DEFAULT_INSTANCE = newInstance();

private final Duration fallBackEntryExpiry;
private final Cache<CorrelationIdKey, Pair<ActorRef, AuthorizationContext>> cache;
Expand All @@ -65,6 +72,15 @@ private ResponseReceiverCache(final Duration fallBackEntryExpiry,
this.cache = cache;
}

/**
* Returns a static default instance of {@code ResponseReceiverCache} with a hard-coded fall-back entry expiry.
*
* @return the instance.
*/
static ResponseReceiverCache getDefaultInstance() {
return DEFAULT_INSTANCE;
}

/**
* Returns a new instance of {@code ResponseReceiverCache} with a hard-coded fall-back entry expiry.
*
Expand Down Expand Up @@ -97,13 +113,13 @@ private static Cache<CorrelationIdKey, Pair<ActorRef, AuthorizationContext>> cre
}

/**
* Puts the specified response receiver for the correlation ID of the command header's correlation ID.
* Puts the specified response receiver for the correlation ID of the signal's correlation ID.
*
* @throws NullPointerException if any argument is {@code null}.
* @throws IllegalArgumentException if the headers of {@code command} do not contain a correlation ID.
* @throws IllegalArgumentException if the headers of {@code signal} do not contain a correlation ID.
*/
public void putCommand(final Command<?> command, final Pair<ActorRef, AuthorizationContext> responseReceiver) {
cache.put(getCorrelationIdKeyForInsertion(checkNotNull(command, "command")),
public void putCommand(final Signal<?> signal, final Pair<ActorRef, AuthorizationContext> responseReceiver) {
cache.put(getCorrelationIdKeyForInsertion(checkNotNull(signal, "command")),
checkNotNull(responseReceiver, "responseReceiver"));
}

Expand Down Expand Up @@ -140,6 +156,70 @@ public CompletableFuture<Optional<Pair<ActorRef, AuthorizationContext>>> get(fin
return cache.get(CorrelationIdKey.forCacheRetrieval(correlationIdString));
}

/**
* Insert a response receiver for a live or message command.
*
* @param command the command.
* @param receiverCreator creator of the receiver actor.
* @param responseHandler handler of the response.
* @param <T> type of results of the response handler.
* @return the result of the response handler.
*/
public <S extends Signal<?>, T> CompletionStage<T> insertResponseReceiverConflictFree(final S command,
final Function<S, ActorRef> receiverCreator,
final BiFunction<S, ActorRef, T> responseHandler) {

return insertResponseReceiverConflictFreeWithFuture(command, receiverCreator,
responseHandler.andThen(CompletableFuture::completedStage));
}

/**
* Insert a response receiver for a live or message command.
*
* @param command the command.
* @param receiverCreator creator of the receiver actor.
* @param responseHandler handler of the response.
* @param <T> type of results of the response handler.
* @return the result of the response handler.
*/
public <S extends Signal<?>, T> CompletionStage<T> insertResponseReceiverConflictFreeWithFuture(final S command,
final Function<S, ActorRef> receiverCreator,
final BiFunction<S, ActorRef, CompletionStage<T>> responseHandler) {

return setUniqueCorrelationIdForGlobalDispatching(command, false)
.thenCompose(commandWithUniqueCorrelationId -> {
final ActorRef receiver = receiverCreator.apply(commandWithUniqueCorrelationId);
// TODO change pair simply to receiver
final var responseReceiver =
Pair.create(receiver, command.getDittoHeaders().getAuthorizationContext());
putCommand(commandWithUniqueCorrelationId, responseReceiver);
return responseHandler.apply(commandWithUniqueCorrelationId, receiver);
});
}

@SuppressWarnings("unchecked")
private <S extends Signal<?>> CompletionStage<S> setUniqueCorrelationIdForGlobalDispatching(
final S signal, final boolean refreshCorrelationId) {

final String correlationId;
if (refreshCorrelationId) {
correlationId = UUID.randomUUID().toString();
} else {
correlationId = SignalInformationPoint.getCorrelationId(signal)
.orElseGet(() -> UUID.randomUUID().toString());
}

return get(correlationId).thenCompose(entry -> {
if (entry.isPresent()) {
return setUniqueCorrelationIdForGlobalDispatching(signal, true);
}
final S result = (S) signal.setDittoHeaders(DittoHeaders.newBuilder(signal.getDittoHeaders())
.correlationId(correlationId)
.build());
return CompletableFuture.completedStage(result);
});
}

@Immutable
private static final class CorrelationIdKey {

Expand Down

0 comments on commit dd30dba

Please sign in to comment.