Skip to content

Commit

Permalink
added application of preEnforcer in AbstractPersistenceSupervisor
Browse files Browse the repository at this point in the history
* fixed some open TODOs
* configured dispatcher for AskWithRetry in new reference.conf of that module
* configured dispatcher for PolicyEnforcerCacheLoader in new reference.conf of that module

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed May 6, 2022
1 parent 1b0f928 commit 64bea6c
Show file tree
Hide file tree
Showing 22 changed files with 134 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -347,20 +347,6 @@ public enum DittoHeaderDefinition implements HeaderDefinition {
*/
WEAK_ACK("ditto-weak-ack", boolean.class, false, true, HeaderValueValidators.getBooleanValidator()),

/**
* Header definition to identify when a Policy changing command did pre-emptively invalidate the caches related to
* the Policy enforcement in concierge.
* When this is {@code true}, no further policy enforcer invalidation has to be done.
* TODO TJ candidate for removal
*
* @since 2.0.0
*/
POLICY_ENFORCER_INVALIDATED_PREEMPTIVELY("ditto-policy-enforcer-invalidated-preemptively",
boolean.class,
false,
false,
HeaderValueValidators.getBooleanValidator()),

/**
* Internal header which may be set by PersistenceActors in order to declare tags to be stored by the event
* journaling {@code EventAdapter} as {@code tag} fields in the journal persistence.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonObjectBuilder;
import org.eclipse.ditto.json.JsonValue;
import org.junit.Ignore;
import org.junit.Test;

import nl.jqno.equalsverifier.EqualsVerifier;
Expand Down Expand Up @@ -107,7 +106,6 @@ public final class ImmutableDittoHeadersTest {
private static final MetadataHeaders KNOWN_METADATA_HEADERS;
private static final boolean KNOWN_ALLOW_POLICY_LOCKOUT = true;
private static final boolean KNOWN_IS_WEAK_ACK = false;
private static final boolean KNOWN_POLICY_ENFORCER_INVALIDATED_PREEMPTIVELY = true;
private static final List<String> KNOWN_JOURNAL_TAGS = Lists.list("tag-a", "tag-b");
private static final boolean KNOWN_IS_SUDO = true;
private static final String KNOWN_CONDITION = "eq(attributes/value)";
Expand Down Expand Up @@ -170,8 +168,6 @@ public void settingAllKnownHeadersWorksAsExpected() {
.expectedResponseTypes(KNOWN_EXPECTED_RESPONSE_TYPES)
.allowPolicyLockout(KNOWN_ALLOW_POLICY_LOCKOUT)
.putHeader(DittoHeaderDefinition.WEAK_ACK.getKey(), String.valueOf(KNOWN_IS_WEAK_ACK))
.putHeader(DittoHeaderDefinition.POLICY_ENFORCER_INVALIDATED_PREEMPTIVELY.getKey(),
String.valueOf(KNOWN_POLICY_ENFORCER_INVALIDATED_PREEMPTIVELY))
.putHeader(DittoHeaderDefinition.EVENT_JOURNAL_TAGS.getKey(),
charSequencesToJsonArray(KNOWN_JOURNAL_TAGS).toString())
.putHeader(DittoHeaderDefinition.DITTO_SUDO.getKey(), String.valueOf(KNOWN_IS_SUDO))
Expand Down Expand Up @@ -475,8 +471,6 @@ public void toJsonReturnsExpected() {
.set(DittoHeaderDefinition.PUT_METADATA.getKey(), KNOWN_METADATA_HEADERS.toJson())
.set(DittoHeaderDefinition.ALLOW_POLICY_LOCKOUT.getKey(), KNOWN_ALLOW_POLICY_LOCKOUT)
.set(DittoHeaderDefinition.WEAK_ACK.getKey(), KNOWN_IS_WEAK_ACK)
.set(DittoHeaderDefinition.POLICY_ENFORCER_INVALIDATED_PREEMPTIVELY.getKey(),
KNOWN_POLICY_ENFORCER_INVALIDATED_PREEMPTIVELY)
.set(DittoHeaderDefinition.EVENT_JOURNAL_TAGS.getKey(),
charSequencesToJsonArray(KNOWN_JOURNAL_TAGS))
.set(DittoHeaderDefinition.DITTO_SUDO.getKey(), KNOWN_IS_SUDO)
Expand Down Expand Up @@ -709,8 +703,6 @@ private static Map<String, String> createMapContainingAllKnownHeaders() {
result.put(DittoHeaderDefinition.PUT_METADATA.getKey(), KNOWN_METADATA_HEADERS.toJsonString());
result.put(DittoHeaderDefinition.ALLOW_POLICY_LOCKOUT.getKey(), String.valueOf(KNOWN_ALLOW_POLICY_LOCKOUT));
result.put(DittoHeaderDefinition.WEAK_ACK.getKey(), String.valueOf(KNOWN_IS_WEAK_ACK));
result.put(DittoHeaderDefinition.POLICY_ENFORCER_INVALIDATED_PREEMPTIVELY.getKey(),
String.valueOf(KNOWN_POLICY_ENFORCER_INVALIDATED_PREEMPTIVELY));
result.put(DittoHeaderDefinition.EVENT_JOURNAL_TAGS.getKey(),
charSequencesToJsonArray(KNOWN_JOURNAL_TAGS).toString());
result.put(DittoHeaderDefinition.DITTO_SUDO.getKey(), String.valueOf(KNOWN_IS_SUDO));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,12 @@ private StreamingActor(final DittoProtocolSub dittoProtocolSub,
*
* @param dittoProtocolSub the Ditto protocol sub access.
* @param commandRouter the command router used to send signals into the cluster.
* @param jwtValidator
* @param jwtAuthenticationResultProvider
* @param jwtValidator the validator of JWTs to use.
* @param jwtAuthenticationResultProvider the JwtAuthenticationResultProvider.
* @param streamingConfig the streaming config.
* @param headerTranslator translates headers from external sources or to external sources.
* @param pubSubMediator
* @param commandForwarder TODO TJ doc
* @param pubSubMediator the ActorRef to the Akka pub/sub mediator.
* @param commandForwarder the ActorRef of the actor to forward commands to.
* @return the Akka configuration Props object.
*/
public static Props props(final DittoProtocolSub dittoProtocolSub,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

/**
* Supertype of commands to request the start of a stream.
* // TODO TJ extend SudoCommand?
*/
public interface StartStreamRequest extends StreamingMessage {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.eclipse.ditto.internal.utils.cacheloaders.config.AskWithRetryConfig;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Scheduler;
import akka.pattern.AskTimeoutException;
import akka.pattern.Patterns;
Expand All @@ -57,6 +58,42 @@ private AskWithRetry() {
throw new AssertionError();
}

/**
* Dispatcher name to use for AskWithRetry.
*/
public static final String ASK_WITH_RETRY_DISPATCHER = "ask-with-retry-dispatcher";

/**
* Performs the "ask with retry" pattern by asking the passed in {@code actorToAsk} the passed in {@code message},
* mapping a successful response with the provided {@code responseMapper} and retrying the operation on Exceptions
* which are not {@link DittoRuntimeException}s based on the given {@code config}.
*
* @param actorToAsk the actor to ask the message.
* @param message the message to ask.
* @param config the "ask with retry" configuration to apply, e.g. whether to do retries at all,
* with which timeouts, with how many retries and delays, etc.
* @param actorSystem the actorSystem for looking up the scheduler and dispatcher to use.
* @param responseMapper a function converting the response of the asked message.
* @param <M> the type of the message to ask.
* @param <A> the type of the answer.
* @return a CompletionStage which is completed by applying the passed in {@code responseMapper} function on the
* response of the asked message or which is completed exceptionally with the Exception.
*/
public static <M, A> CompletionStage<A> askWithRetry(final ActorRef actorToAsk,
final M message,
final AskWithRetryConfig config,
final ActorSystem actorSystem,
final Function<Object, A> responseMapper) {

return askWithRetry(actorToAsk,
message,
config,
actorSystem.getScheduler(),
actorSystem.dispatchers().lookup(ASK_WITH_RETRY_DISPATCHER),
responseMapper
);
}

/**
* Performs the "ask with retry" pattern by asking the passed in {@code actorToAsk} the passed in {@code message},
* mapping a successful response with the provided {@code responseMapper} and retrying the operation on Exceptions
Expand All @@ -82,8 +119,8 @@ public static <M, A> CompletionStage<A> askWithRetry(final ActorRef actorToAsk,
final Function<Object, A> responseMapper) {

final DittoHeaders dittoHeaders;
if (message instanceof WithDittoHeaders) {
dittoHeaders = ((WithDittoHeaders) message).getDittoHeaders();
if (message instanceof WithDittoHeaders withDittoHeaders) {
dittoHeaders = withDittoHeaders.getDittoHeaders();
} else {
dittoHeaders = null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# This is the reference config file that contains all the default settings.
# Make your edits/overrides in your application.conf.

ask-with-retry-dispatcher {
type = "Dispatcher"
executor = "thread-pool-executor"
}
Original file line number Diff line number Diff line change
Expand Up @@ -407,12 +407,15 @@ private CompletionStage<Object> enforceCommandAndForwardToPersistenceActorIfAuth
final Command<?> command) {

if (null != enforcerChild) {
return Patterns.ask(enforcerChild, command, DEFAULT_LOCAL_ASK_TIMEOUT)
.thenCompose(this::modifyEnforcerActorEnforcedCommandResponse)
.handle((enforcerResponse, enforcerThrowable) ->
handleEnforcerResponse(sender, enforcerResponse, enforcerThrowable,
command.getDittoHeaders())
);
return preEnforcer.apply(command).thenCompose(preEnforcedCommand ->
Patterns.ask(enforcerChild, preEnforcedCommand, DEFAULT_LOCAL_ASK_TIMEOUT)
.thenCompose(this::modifyEnforcerActorEnforcedCommandResponse)
.handle((enforcerResponse, enforcerThrowable) ->
handleEnforcerResponse(sender, enforcerResponse, enforcerThrowable,
preEnforcedCommand.getDittoHeaders())
)
);

} else {
log.withCorrelationId(command)
.error("Could not enforce command because enforcerChild was not present");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@
@Immutable
public final class PolicyEnforcerCacheLoader implements AsyncCacheLoader<EnforcementCacheKey, Entry<PolicyEnforcer>> {

/**
* Dispatcher name for policy enforcer cache loader.
*/
public static final String ENFORCEMENT_CACHE_DISPATCHER = "enforcement-cache-dispatcher";

private final ActorAskCacheLoader<PolicyEnforcer, Command<?>, EnforcementContext> delegate;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
import java.util.concurrent.CompletionStage;
import java.util.function.Function;

import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.auth.AuthorizationSubject;
import org.eclipse.ditto.base.model.exceptions.DittoInternalErrorException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.DittoHeadersSettable;
import org.eclipse.ditto.internal.utils.akka.controlflow.WithSender;
Expand Down Expand Up @@ -86,4 +89,24 @@ default <S extends WithSender<? extends DittoHeadersSettable<?>>, T> CompletionS
});
}

/**
* Set the "ditto-originator" header to the primary authorization subject of a signal.
*
* @param originalSignal A signal with authorization context.
* @return A copy of the signal with the header "ditto-originator" set.
* @since 3.0.0
*/
@SuppressWarnings("unchecked")
static <T extends DittoHeadersSettable<?>> T setOriginatorHeader(final T originalSignal) {
final DittoHeaders dittoHeaders = originalSignal.getDittoHeaders();
final AuthorizationContext authorizationContext = dittoHeaders.getAuthorizationContext();
return authorizationContext.getFirstAuthorizationSubject()
.map(AuthorizationSubject::getId)
.map(originatorSubjectId -> DittoHeaders.newBuilder(dittoHeaders)
.putHeader(DittoHeaderDefinition.ORIGINATOR.getKey(), originatorSubjectId)
.build())
.map(originatorHeader -> (T) originalSignal.setDittoHeaders(originatorHeader))
.orElse(originalSignal);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;

import javax.annotation.concurrent.Immutable;
Expand All @@ -39,7 +38,7 @@
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThingResponse;

import akka.actor.ActorRef;
import akka.actor.Scheduler;
import akka.actor.ActorSystem;

/**
* Responsible for resolving a policy id of a referenced entity.
Expand All @@ -51,18 +50,16 @@ public final class PolicyIdReferencePlaceholderResolver implements ReferencePlac

private final ActorRef commandForwarderActor;
private final AskWithRetryConfig askWithRetryConfig;
private final Scheduler scheduler;
private final Executor executor;
private final ActorSystem actorSystem;
private final Map<ReferencePlaceholder.ReferencedEntityType, ResolveEntityReferenceStrategy>
supportedEntityTypesToActionMap = new EnumMap<>(ReferencePlaceholder.ReferencedEntityType.class);
private final Set<CharSequence> supportedEntityTypeNames;

private PolicyIdReferencePlaceholderResolver(final ActorRef commandForwarderActor,
final AskWithRetryConfig askWithRetryConfig, final Scheduler scheduler, final Executor executor) {
final AskWithRetryConfig askWithRetryConfig, final ActorSystem actorSystem) {
this.commandForwarderActor = commandForwarderActor;
this.askWithRetryConfig = askWithRetryConfig;
this.scheduler = scheduler;
this.executor = executor;
this.actorSystem = actorSystem;
initializeSupportedEntityTypeReferences();
this.supportedEntityTypeNames =
this.supportedEntityTypesToActionMap.keySet().stream().map(Enum::name).collect(Collectors.toSet());
Expand Down Expand Up @@ -108,17 +105,16 @@ private CompletionStage<String> handlePolicyIdReference(final ReferencePlacehold
.withSelectedFields(referencePlaceholder.getReferencedField().toFieldSelector())
.build();

return AskWithRetry.askWithRetry(commandForwarderActor, retrieveThingCommand, askWithRetryConfig, scheduler,
executor,
return AskWithRetry.askWithRetry(commandForwarderActor, retrieveThingCommand, askWithRetryConfig, actorSystem,
response -> handleRetrieveThingResponse(response, referencePlaceholder, dittoHeaders)
);
}

private static String handleRetrieveThingResponse(final Object response,
final ReferencePlaceholder referencePlaceholder, final DittoHeaders dittoHeaders) {

if (response instanceof RetrieveThingResponse) {
final JsonValue entity = ((RetrieveThingResponse) response).getEntity();
if (response instanceof RetrieveThingResponse retrieveThingResponse) {
final JsonValue entity = retrieveThingResponse.getEntity();
if (!entity.isObject()) {
LOGGER.withCorrelationId(dittoHeaders)
.error("Expected RetrieveThingResponse to contain a JsonObject as Entity but was: {}", entity);
Expand All @@ -128,19 +124,19 @@ private static String handleRetrieveThingResponse(final Object response,
.getValue(JsonFieldDefinition.ofString(referencePlaceholder.getReferencedField()))
.orElseThrow(() -> unknownFieldException(referencePlaceholder, dittoHeaders));

} else if (response instanceof ThingErrorResponse) {
} else if (response instanceof ThingErrorResponse thingErrorResponse) {
LOGGER.withCorrelationId(dittoHeaders)
.info("Got ThingErrorResponse when waiting on RetrieveThingResponse when resolving policy ID" +
" placeholder reference <{}>: {}", referencePlaceholder, response);
" placeholder reference <{}>: {}", referencePlaceholder, thingErrorResponse);
throw ((ThingErrorResponse) response).getDittoRuntimeException();
} else if (response instanceof DittoRuntimeException) {
} else if (response instanceof DittoRuntimeException dre) {
// ignore warning that second argument isn't used. Runtime exceptions will have their stacktrace printed
// in the logs according to https://www.slf4j.org/faq.html#paramException
LOGGER.withCorrelationId(dittoHeaders)
.info("Got Exception when waiting on RetrieveThingResponse when resolving policy ID placeholder reference <{}> - {}: {}",
referencePlaceholder, response.getClass().getSimpleName(),
((Throwable) response).getMessage());
throw (DittoRuntimeException) response;
referencePlaceholder, dre.getClass().getSimpleName(),
dre.getMessage());
throw dre;
} else {
LOGGER.withCorrelationId(dittoHeaders)
.error("Did not retrieve expected RetrieveThingResponse when resolving policy ID placeholder reference <{}>: {}",
Expand Down Expand Up @@ -175,15 +171,13 @@ private PlaceholderReferenceNotSupportedException notSupportedException(
* @param commandForwarderActor the ActorRef of the {@code ConciergeForwarderActor} which to ask for "retrieve"
* commands.
* @param askWithRetryConfig the configuration for the "ask with retry" pattern applied when asking for retrieves.
* @param scheduler the scheduler to use for the "ask with retry" for retries.
* @param executor the executor to use for the "ask with retry" for retries.
* @param actorSystem the actorSystem to load scheduler and dispatcher to use for the "ask with retry" pattern from.
* @return the created PolicyIdReferencePlaceholderResolver instance.
*/
public static PolicyIdReferencePlaceholderResolver of(final ActorRef commandForwarderActor,
final AskWithRetryConfig askWithRetryConfig, final Scheduler scheduler, final Executor executor) {
final AskWithRetryConfig askWithRetryConfig, final ActorSystem actorSystem) {

return new PolicyIdReferencePlaceholderResolver(commandForwarderActor, askWithRetryConfig, scheduler,
executor);
return new PolicyIdReferencePlaceholderResolver(commandForwarderActor, askWithRetryConfig, actorSystem);
}

interface ResolveEntityReferenceStrategy {
Expand Down
7 changes: 7 additions & 0 deletions policies/enforcement/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# This is the reference config file that contains all the default settings.
# Make your edits/overrides in your application.conf.

enforcement-cache-dispatcher {
type = "Dispatcher"
executor = "thread-pool-executor"
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ public static void stopActorSystem() {
public void setup() {
commandForwarderActorProbe = TestProbe.apply(actorSystem);
sut = PolicyIdReferencePlaceholderResolver.of(commandForwarderActorProbe.testActor(),
DefaultAskWithRetryConfig.of(ConfigFactory.empty(), "test"), actorSystem.scheduler(),
actorSystem.dispatcher());
DefaultAskWithRetryConfig.of(ConfigFactory.empty(), "test"), actorSystem);
}

@Test
Expand Down
Loading

0 comments on commit 64bea6c

Please sign in to comment.