Skip to content

Commit

Permalink
[#1228] remove unused second component of response receiver cache; tu…
Browse files Browse the repository at this point in the history
…rn the cache into an actor system extension to avoid cross-contamination between unit tests; remove workaround in SmartChannelSelectionWithResponseReceiverTest.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Dec 20, 2021
1 parent faef3d8 commit fc6f0a2
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 155 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,12 @@

import java.time.Duration;
import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;

import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.common.ConditionChecker;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.SignalWithEntityId;
Expand Down Expand Up @@ -64,7 +61,7 @@

import akka.actor.ActorRef;
import akka.actor.ActorRefFactory;
import akka.japi.Pair;
import akka.actor.ActorSystem;
import akka.pattern.Patterns;

/**
Expand Down Expand Up @@ -132,30 +129,32 @@ public static final class Provider implements EnforcementProvider<SignalWithEnti
private final Cache<EnforcementCacheKey, Entry<EnforcementCacheKey>> thingIdCache;
private final Cache<EnforcementCacheKey, Entry<Enforcer>> policyEnforcerCache;
private final LiveSignalPub liveSignalPub;
private final ActorRefFactory actorRefFactory;
private final ActorSystem actorSystem;
private final EnforcementConfig enforcementConfig;
private final ResponseReceiverCache responseReceiverCache;

/**
* Constructs a {@code Provider}.
*
* @param thingIdCache the thing-id-cache.
* @param policyEnforcerCache the policy-enforcer cache.
* @param actorRefFactory the actor ref factory to create new actors with.
* @param actorSystem the actor ref factory to create new actors with.
* @param liveSignalPub distributed-pub access for live signal publication.
* @param enforcementConfig configuration properties for enforcement.
* @throws NullPointerException if any argument but is {@code null}.
*/
public Provider(final Cache<EnforcementCacheKey, Entry<EnforcementCacheKey>> thingIdCache,
final Cache<EnforcementCacheKey, Entry<Enforcer>> policyEnforcerCache,
final ActorRefFactory actorRefFactory,
final ActorSystem actorSystem,
final LiveSignalPub liveSignalPub,
final EnforcementConfig enforcementConfig) {

this.thingIdCache = ConditionChecker.checkNotNull(thingIdCache, "thingIdCache");
this.policyEnforcerCache = ConditionChecker.checkNotNull(policyEnforcerCache, "policyEnforcerCache");
this.liveSignalPub = ConditionChecker.checkNotNull(liveSignalPub, "liveSignalPub");
this.actorRefFactory = ConditionChecker.checkNotNull(actorRefFactory, "actorRefFactory");
this.actorSystem = ConditionChecker.checkNotNull(actorSystem, "actorRefFactory");
this.enforcementConfig = ConditionChecker.checkNotNull(enforcementConfig, "enforcementConfig");
responseReceiverCache = ResponseReceiverCache.lookup(actorSystem);
}

@Override
Expand All @@ -174,8 +173,8 @@ public LiveSignalEnforcement createEnforcement(final Contextual<SignalWithEntity
return new LiveSignalEnforcement(context,
thingIdCache,
policyEnforcerCache,
actorRefFactory,
ResponseReceiverCache.getDefaultInstance(),
actorSystem,
responseReceiverCache,
liveSignalPub,
enforcementConfig);
}
Expand Down Expand Up @@ -205,7 +204,7 @@ private CompletionStage<Contextual<WithDittoHeaders>> doEnforce(final SignalWith
// claim messages require no enforcement, publish them right away:
result = publishMessageCommand((MessageCommand<?, ?>) liveSignal, enforcer);
} else if (SignalInformationPoint.isCommandResponse(liveSignal)) {
result = enforceLiveCommandResponse((CommandResponse<?>) liveSignal, correlationIdOpt.get(), enforcer);
result = enforceLiveCommandResponse((CommandResponse<?>) liveSignal, correlationIdOpt.get());
} else {
final var streamingType = StreamingType.fromSignal(liveSignal);
if (streamingType.isPresent()) {
Expand All @@ -232,12 +231,11 @@ private CompletionStage<Contextual<WithDittoHeaders>> doEnforce(final SignalWith

private CompletionStage<Contextual<WithDittoHeaders>> enforceLiveCommandResponse(
final CommandResponse<?> liveResponse,
final CharSequence correlationId,
final Enforcer enforcer
final CharSequence correlationId
) {
final CompletionStage<Contextual<WithDittoHeaders>> result;
if (enforcementConfig.isDispatchLiveResponsesGlobally()) {
result = returnCommandResponseContextual(liveResponse, correlationId, enforcer);
result = returnCommandResponseContextual(liveResponse, correlationId);
} else {
log().info("Got live response when global dispatching is inactive: <{}> with correlation ID <{}>",
liveResponse.getType(),
Expand All @@ -251,50 +249,23 @@ private CompletionStage<Contextual<WithDittoHeaders>> enforceLiveCommandResponse

private CompletionStage<Contextual<WithDittoHeaders>> returnCommandResponseContextual(
final CommandResponse<?> liveResponse,
final CharSequence correlationId,
final Enforcer enforcer) {
final CharSequence correlationId) {

return responseReceiverCache.get(correlationId)
.thenApply(responseReceiverEntry -> {
final Contextual<WithDittoHeaders> commandResponseContextual;
if (responseReceiverEntry.isPresent()) {
final var responseReceiver = responseReceiverEntry.get();
final CommandResponse<?> response;
if (liveResponse instanceof ThingQueryCommandResponse) {
final var liveResponseWithRequesterAuthCtx =
injectRequestersAuthContext((ThingQueryCommandResponse<?>) liveResponse,
responseReceiver.second());

response = ThingCommandEnforcement.buildJsonViewForThingQueryCommandResponse(
liveResponseWithRequesterAuthCtx,
enforcer);
} else {
response = liveResponse;
}
log().info("Scheduling CommandResponse <{}> to original sender <{}>", liveResponse,
responseReceiver);
commandResponseContextual = withMessageToReceiver(response, responseReceiver.first());
final var receiver = responseReceiverEntry.get();
log().info("Scheduling CommandResponse <{}> to original sender <{}>", liveResponse, receiver);
commandResponseContextual = withMessageToReceiver(liveResponse, receiver);
} else {
log().info("Got <{}> with unknown correlation ID: <{}>", liveResponse.getType(), correlationId);
commandResponseContextual = withMessageToReceiver(null, null);
}

return commandResponseContextual;
});
}

private static ThingQueryCommandResponse<?> injectRequestersAuthContext(
final ThingQueryCommandResponse<?> liveResponse,
final AuthorizationContext requesterAuthContext) {

final var dittoHeadersWithResponseReceiverAuthContext = liveResponse.getDittoHeaders()
.toBuilder()
.authorizationContext(requesterAuthContext)
.build();

return liveResponse.setDittoHeaders(dittoHeadersWithResponseReceiverAuthContext);
}

private CompletionStage<Contextual<WithDittoHeaders>> enforceLiveSignal(final StreamingType streamingType,
final Signal<?> liveSignal, final Enforcer enforcer) {

Expand Down Expand Up @@ -502,59 +473,6 @@ private static ResourceKey extractMessageResourceKey(final MessageCommand<?, ?>
}
}

private CompletionStage<Signal<?>> insertResponseReceiverConflictFree(final Command<?> command,
final Pair<ActorRef, AuthorizationContext> responseReceiver) {

return setUniqueCorrelationIdForGlobalDispatching(command)
.thenApply(commandWithUniqueCorrelationId -> {
responseReceiverCache.putCommand(commandWithUniqueCorrelationId, responseReceiver);
return commandWithUniqueCorrelationId;
});
}

private CompletionStage<Command<?>> setUniqueCorrelationIdForGlobalDispatching(final Command<?> command) {
final var correlationId = SignalInformationPoint.getCorrelationId(command)
.orElseGet(() -> UUID.randomUUID().toString());

return responseReceiverCache.get(correlationId)
.thenCompose(entry -> {
final CompletionStage<String> uniqueCorrelationIdFuture;
if (entry.isPresent()) {
uniqueCorrelationIdFuture = findUniqueCorrelationId(correlationId, getNextSuffix());
} else {
uniqueCorrelationIdFuture = CompletableFuture.completedStage(correlationId);
}
return uniqueCorrelationIdFuture.thenApply(newCorrelationId -> {
final Command<?> result;
if (correlationId.equals(newCorrelationId)) {
result = command;
} else {
result = command.setDittoHeaders(DittoHeaders.newBuilder(command.getDittoHeaders())
// always set "keep-cid" to true because global dispatching is active
.correlationId(newCorrelationId)
.build());
}
return result;
});
});
}

private static String getNextSuffix() {
return Long.toHexString(Double.doubleToRawLongBits(Math.random()));
}

private CompletionStage<String> findUniqueCorrelationId(final String startingId, final String suffix) {
final var nextCorrelationId = startingId + "#x" + suffix;
return responseReceiverCache.get(nextCorrelationId)
.thenCompose(entry -> {
if (entry.isPresent()) {
return findUniqueCorrelationId(startingId, getNextSuffix());
} else {
return CompletableFuture.completedStage(nextCorrelationId);
}
});
}

static Duration getLiveSignalTimeout(final Signal<?> signal) {
return signal.getDittoHeaders().getTimeout().orElse(DEFAULT_LIVE_TIMEOUT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import javax.annotation.concurrent.Immutable;
import javax.annotation.concurrent.NotThreadSafe;

import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.common.ConditionChecker;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
Expand All @@ -41,8 +40,11 @@
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Expiry;

import akka.actor.AbstractExtensionId;
import akka.actor.ActorRef;
import akka.japi.Pair;
import akka.actor.ActorSystem;
import akka.actor.ExtendedActorSystem;
import akka.actor.Extension;

/**
* A cache of response receivers and their associated correlation ID.
Expand All @@ -57,28 +59,28 @@
* is used.
*/
@NotThreadSafe
final class ResponseReceiverCache {
final class ResponseReceiverCache implements Extension {

private static final ExtensionId EXTENSION_ID = new ExtensionId();
private static final Duration DEFAULT_ENTRY_EXPIRY = Duration.ofMinutes(2L);
private static final ResponseReceiverCache DEFAULT_INSTANCE = newInstance();

private final Duration fallBackEntryExpiry;
private final Cache<CorrelationIdKey, Pair<ActorRef, AuthorizationContext>> cache;

private ResponseReceiverCache(final Duration fallBackEntryExpiry,
final Cache<CorrelationIdKey, Pair<ActorRef, AuthorizationContext>> cache) {
private final Cache<CorrelationIdKey, ActorRef> cache;

private ResponseReceiverCache(final Duration fallBackEntryExpiry, final Cache<CorrelationIdKey, ActorRef> cache) {
this.fallBackEntryExpiry = fallBackEntryExpiry;
this.cache = cache;
}

/**
* Returns a static default instance of {@code ResponseReceiverCache} with a hard-coded fall-back entry expiry.
* Returns a default instance of {@code ResponseReceiverCache} with a hard-coded fall-back entry expiry
* for an actor system.
*
* @param actorSystem the actor system.
* @return the instance.
*/
static ResponseReceiverCache getDefaultInstance() {
return DEFAULT_INSTANCE;
static ResponseReceiverCache lookup(final ActorSystem actorSystem) {
return EXTENSION_ID.get(actorSystem);
}

/**
Expand Down Expand Up @@ -106,7 +108,7 @@ static ResponseReceiverCache newInstance(final Duration fallBackEntryExpiry) {
return new ResponseReceiverCache(fallBackEntryExpiry, createCache(fallBackEntryExpiry));
}

private static Cache<CorrelationIdKey, Pair<ActorRef, AuthorizationContext>> createCache(
private static Cache<CorrelationIdKey, ActorRef> createCache(
final Duration fallBackEntryExpiry
) {
return CaffeineCache.of(Caffeine.newBuilder().expireAfter(new CorrelationIdKeyExpiry(fallBackEntryExpiry)));
Expand All @@ -118,7 +120,7 @@ private static Cache<CorrelationIdKey, Pair<ActorRef, AuthorizationContext>> cre
* @throws NullPointerException if any argument is {@code null}.
* @throws IllegalArgumentException if the headers of {@code signal} do not contain a correlation ID.
*/
public void putCommand(final Signal<?> signal, final Pair<ActorRef, AuthorizationContext> responseReceiver) {
public void putCommand(final Signal<?> signal, final ActorRef responseReceiver) {
cache.put(getCorrelationIdKeyForInsertion(checkNotNull(signal, "command")),
checkNotNull(responseReceiver, "responseReceiver"));
}
Expand Down Expand Up @@ -147,7 +149,7 @@ private Duration getExpiry(final DittoHeaders commandDittoHeaders) {
* @throws NullPointerException if {@code correlationId} is {@code null}.
* @throws IllegalArgumentException if {@code correlationId} is empty or blank.
*/
public CompletableFuture<Optional<Pair<ActorRef, AuthorizationContext>>> get(final CharSequence correlationId) {
public CompletableFuture<Optional<ActorRef>> get(final CharSequence correlationId) {
final var correlationIdString = String.valueOf(checkNotNull(correlationId, "correlationId"));
ConditionChecker.checkArgument(correlationIdString,
Predicate.not(String::isBlank),
Expand Down Expand Up @@ -189,10 +191,7 @@ public <S extends Signal<?>, T> CompletionStage<T> insertResponseReceiverConflic
return setUniqueCorrelationIdForGlobalDispatching(command, false)
.thenCompose(commandWithUniqueCorrelationId -> {
final ActorRef receiver = receiverCreator.apply(commandWithUniqueCorrelationId);
// TODO change pair simply to receiver
final var responseReceiver =
Pair.create(receiver, command.getDittoHeaders().getAuthorizationContext());
putCommand(commandWithUniqueCorrelationId, responseReceiver);
putCommand(commandWithUniqueCorrelationId, receiver);
return responseHandler.apply(commandWithUniqueCorrelationId, receiver);
});
}
Expand Down Expand Up @@ -269,8 +268,7 @@ public String toString() {
}

@Immutable
private static final class CorrelationIdKeyExpiry
implements Expiry<CorrelationIdKey, Pair<ActorRef, AuthorizationContext>> {
private static final class CorrelationIdKeyExpiry implements Expiry<CorrelationIdKey, ActorRef> {

private final Duration fallBackEntryExpiry;

Expand All @@ -279,33 +277,32 @@ private CorrelationIdKeyExpiry(final Duration fallBackEntryExpiry) {
}

@Override
public long expireAfterCreate(final CorrelationIdKey key,
final Pair<ActorRef, AuthorizationContext> value,
final long currentTime) {

public long expireAfterCreate(final CorrelationIdKey key, final ActorRef value, final long currentTime) {
final var entryExpiry = Objects.requireNonNullElse(key.expiry, fallBackEntryExpiry);

return entryExpiry.toNanos(); // it is crucial to return nanoseconds here
}

@Override
public long expireAfterUpdate(final CorrelationIdKey key,
final Pair<ActorRef, AuthorizationContext> value,
final long currentTime,
public long expireAfterUpdate(final CorrelationIdKey key, final ActorRef value, final long currentTime,
final long currentDuration) {

return currentDuration;
}

@Override
public long expireAfterRead(final CorrelationIdKey key,
final Pair<ActorRef, AuthorizationContext> value,
final long currentTime,
public long expireAfterRead(final CorrelationIdKey key, final ActorRef value, final long currentTime,
final long currentDuration) {

return currentDuration;
}

}

static final class ExtensionId extends AbstractExtensionId<ResponseReceiverCache> {

@Override
public ResponseReceiverCache createExtension(final ExtendedActorSystem system) {
return newInstance();
}
}
}

0 comments on commit fc6f0a2

Please sign in to comment.