Skip to content

Commit

Permalink
Issue #106 Enforce requesters auth context for live retrieve commands
Browse files Browse the repository at this point in the history
* Extend response-receiver cache by auth context
* use cached auth context to filter the response

Signed-off-by: Joel Bartelheimer <joel.bartelheimer@bosch.io>
  • Loading branch information
jbartelh committed Nov 2, 2021
1 parent 407985c commit 3a45ca5
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.concierge.service.common.ConciergeConfig;
import org.eclipse.ditto.concierge.service.common.DittoConciergeConfig;
Expand All @@ -34,6 +35,7 @@
import com.github.benmanes.caffeine.cache.Caffeine;

import akka.actor.ActorRef;
import akka.japi.Pair;
import akka.japi.pf.ReceiveBuilder;
import akka.stream.javadsl.Sink;

Expand Down Expand Up @@ -130,7 +132,7 @@ protected Contextual<WithDittoHeaders> mapMessage(final WithDittoHeaders message
}

@Nullable
private static Cache<String, ActorRef> createResponseReceiverCache(final ConciergeConfig conciergeConfig) {
private static Cache<String, Pair<ActorRef, AuthorizationContext>> createResponseReceiverCache(final ConciergeConfig conciergeConfig) {
if (conciergeConfig.getEnforcementConfig().shouldDispatchLiveResponsesGlobally()) {
return CaffeineCache.of(Caffeine.newBuilder().expireAfterWrite(Duration.ofSeconds(120L)));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.entity.id.WithEntityId;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
Expand All @@ -35,6 +36,7 @@
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Scheduler;
import akka.japi.Pair;

/**
* A message together with contextual information about the actor processing it.
Expand All @@ -59,7 +61,7 @@ public final class Contextual<T extends WithDittoHeaders> implements WithSender<

// for live signal enforcement
@Nullable
private final Cache<String, ActorRef> responseReceivers;
private final Cache<String, Pair<ActorRef, AuthorizationContext>> responseReceivers;

@Nullable
private final Supplier<CompletionStage<Object>> askFuture;
Expand All @@ -76,7 +78,7 @@ private Contextual(@Nullable final T message,
@Nullable final EnforcementCacheKey cacheKey,
@Nullable final ActorRef receiver,
@Nullable final Function<Object, Object> receiverWrapperFunction,
@Nullable final Cache<String, ActorRef> responseReceivers,
@Nullable final Cache<String, Pair<ActorRef, AuthorizationContext>> responseReceivers,
@Nullable final Supplier<CompletionStage<Object>> askFuture) {
this.message = message;
this.self = self;
Expand All @@ -100,7 +102,7 @@ static <T extends WithDittoHeaders> Contextual<T> forActor(final ActorRef self,
final ActorRef conciergeForwarder,
final AskWithRetryConfig askWithRetryConfig,
final ThreadSafeDittoLoggingAdapter log,
@Nullable final Cache<String, ActorRef> responseReceivers) {
@Nullable final Cache<String, Pair<ActorRef, AuthorizationContext>> responseReceivers) {

return new Contextual<>(null,
self,
Expand Down Expand Up @@ -229,7 +231,7 @@ Function<Object, Object> getReceiverWrapperFunction() {
return receiverWrapperFunction != null ? receiverWrapperFunction : Function.identity();
}

Optional<Cache<String, ActorRef>> getResponseReceivers() {
Optional<Cache<String, Pair<ActorRef, AuthorizationContext>>> getResponseReceivers() {
return Optional.ofNullable(responseReceivers);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
Expand Down Expand Up @@ -177,9 +178,10 @@ private CompletionStage<Contextual<WithDittoHeaders>> doEnforce(final SignalWith
private CompletionStage<Contextual<WithDittoHeaders>> enforceLiveCommandResponse(
final CommandResponse<?> liveResponse, final String correlationId, final Enforcer enforcer) {

final Optional<Cache<String, ActorRef>> responseReceiversOptional = context.getResponseReceivers();
final Optional<Cache<String, Pair<ActorRef, AuthorizationContext>>> responseReceiversOptional =
context.getResponseReceivers();
if (responseReceiversOptional.isPresent()) {
final Cache<String, ActorRef> responseReceivers = responseReceiversOptional.get();
final Cache<String, Pair<ActorRef, AuthorizationContext>> responseReceivers = responseReceiversOptional.get();
return returnCommandResponseContextual(responseReceivers, liveResponse, correlationId, enforcer);
} else {
log().info("Got live response when global dispatching is inactive: <{}> with correlation ID <{}>",
Expand All @@ -191,23 +193,27 @@ private CompletionStage<Contextual<WithDittoHeaders>> enforceLiveCommandResponse
}

private CompletionStage<Contextual<WithDittoHeaders>> returnCommandResponseContextual(
final Cache<String, ActorRef> responseReceivers, final CommandResponse<?> liveResponse,
final Cache<String, Pair<ActorRef, AuthorizationContext>> responseReceivers, final CommandResponse<?> liveResponse,
final String correlationId, final Enforcer enforcer) {
return responseReceivers.get(correlationId).thenApply(responseReceiverEntry -> {
final Contextual<WithDittoHeaders> commandResponseContextual;
if (responseReceiverEntry.isPresent()) {
responseReceivers.invalidate(correlationId);
final ActorRef responseReceiver = responseReceiverEntry.get();
final Pair<ActorRef, AuthorizationContext> responseReceiver = responseReceiverEntry.get();
final CommandResponse<?> response;
if (liveResponse instanceof ThingQueryCommandResponse) {
final var dittoHeadersWithResponseReceiverAuthContext = liveResponse.getDittoHeaders()
.toBuilder()
.authorizationContext(responseReceiver.second())
.build();
response = ThingCommandEnforcement.buildJsonViewForThingQueryCommandResponse(
(ThingQueryCommandResponse<?>) liveResponse, enforcer);
(ThingQueryCommandResponse<?>) liveResponse.setDittoHeaders(dittoHeadersWithResponseReceiverAuthContext), enforcer);
} else {
response = liveResponse;
}
log().info("Scheduling CommandResponse <{}> to original sender <{}>", liveResponse,
responseReceiver);
commandResponseContextual = withMessageToReceiver(response, responseReceiver);
commandResponseContextual = withMessageToReceiver(response, responseReceiver.first());
} else {
log().info("Got <{}> with unknown correlation ID: <{}>", liveResponse.getType(), correlationId);
commandResponseContextual = withMessageToReceiver(null, null);
Expand Down Expand Up @@ -344,9 +350,10 @@ private <T extends Signal<?>, S extends T> CompletionStage<Contextual<WithDittoH
}

private CompletionStage<Signal<?>> addToResponseReceiver(final Signal<?> signal) {
final Optional<Cache<String, ActorRef>> cacheOptional = context.getResponseReceivers();
final var cacheOptional = context.getResponseReceivers();
if (cacheOptional.isPresent() && signal instanceof Command && signal.getDittoHeaders().isResponseRequired()) {
return insertResponseReceiverConflictFree(cacheOptional.get(), signal, sender());
return insertResponseReceiverConflictFree(cacheOptional.get(), signal,
Pair.create(sender(), signal.getDittoHeaders().getAuthorizationContext()));
} else {
return CompletableFuture.completedStage(signal);
}
Expand All @@ -370,8 +377,8 @@ private static ResourceKey extractMessageResourceKey(final MessageCommand<?, ?>
}
}

private static CompletionStage<Signal<?>> insertResponseReceiverConflictFree(final Cache<String, ActorRef> cache,
final Signal<?> signal, final ActorRef responseReceiver) {
private static CompletionStage<Signal<?>> insertResponseReceiverConflictFree(final Cache<String, Pair<ActorRef, AuthorizationContext>> cache,
final Signal<?> signal, final Pair<ActorRef, AuthorizationContext> responseReceiver) {

return setUniqueCorrelationIdForGlobalDispatching(cache, signal)
.thenApply(correlationIdAndSignal -> {
Expand Down

0 comments on commit 3a45ca5

Please sign in to comment.