Skip to content

Commit

Permalink
PreEnforcerProvider should directly apply to the signal
Browse files Browse the repository at this point in the history
Saves the step of first getting the pre-enforcer and applying it afterwards.
Additionally make diffferent PreEnforcers implement PreEnforcer Interface, to make it easier viewing the different exisitng pre-enforcers.

Signed-off-by: David Schwilk <david.schwilk@bosch.io>
  • Loading branch information
DerSchwilk committed May 24, 2022
1 parent 966c53d commit 51f9f21
Show file tree
Hide file tree
Showing 10 changed files with 89 additions and 74 deletions.
Expand Up @@ -81,7 +81,6 @@ public abstract class AbstractPersistenceSupervisor<E extends EntityId, S extend
protected final DittoDiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);

@Nullable protected final BlockedNamespaces blockedNamespaces;
protected final PreEnforcer<S> preEnforcer;
protected final CreationRestrictionEnforcer creationRestrictionEnforcer;

@Nullable protected E entityId;
Expand All @@ -104,7 +103,6 @@ protected AbstractPersistenceSupervisor(@Nullable final ActorRef persistenceActo
this.persistenceActorChild = persistenceActorChild;
this.enforcerChild = enforcerChild;
this.blockedNamespaces = blockedNamespaces;
preEnforcer = PreEnforcerProvider.get(getContext().getSystem()).getPreEnforcer();
exponentialBackOffConfig = getExponentialBackOffConfig();
backOff = ExponentialBackOff.initial(exponentialBackOffConfig);
creationRestrictionEnforcer = DefaultCreationRestrictionEnforcer.of(
Expand Down Expand Up @@ -558,7 +556,7 @@ private BiConsumer<Object, Throwable> handleSignalEnforcementResponse(final With
protected CompletionStage<Object> enforceSignalAndForwardToTargetActor(final S signal, final ActorRef sender) {

if (null != enforcerChild) {
return preEnforcer.apply(signal).thenCompose(preEnforcedCommand ->
return PreEnforcerProvider.get(getContext().getSystem()).apply(signal).thenCompose(preEnforcedCommand ->
askEnforcerChild(preEnforcedCommand)
.thenCompose(this::modifyEnforcerActorEnforcedSignalResponse)
.thenCompose(enforcedCommand -> enforcerResponseToTargetActor(
Expand Down
Expand Up @@ -12,6 +12,8 @@
*/
package org.eclipse.ditto.policies.enforcement;

import java.util.concurrent.CompletionStage;

import org.eclipse.ditto.base.model.headers.DittoHeadersSettable;
import org.eclipse.ditto.internal.utils.namespaces.BlockNamespaceBehavior;
import org.eclipse.ditto.internal.utils.namespaces.BlockedNamespaces;
Expand All @@ -29,12 +31,12 @@ public DefaultPreEnforcerProvider(final ActorSystem actorSystem) {
}

@Override
public <T extends DittoHeadersSettable<?>> PreEnforcer<T> getPreEnforcer() {
return dittoHeadersSettable ->
BlockNamespaceBehavior.of(BlockedNamespaces.of(actorSystem))
.block(dittoHeadersSettable)
.thenApply(CommandWithOptionalEntityValidator.createInstance())
.thenApply(PreEnforcer::setOriginatorHeader)
public <T extends DittoHeadersSettable<?>> CompletionStage<T> apply(T signal) {
return BlockNamespaceBehavior.of(BlockedNamespaces.of(actorSystem))
.block(signal)
.thenCompose(CommandWithOptionalEntityValidator.createInstance())
.thenCompose(new HeaderSetter<>())
.thenCompose(PlaceholderSubstitution.newInstance());
}

}
Expand Up @@ -82,7 +82,8 @@ default boolean changesAuthorization(final T signal) {
* @return the stream.
*/
default Flow<Contextual<WithDittoHeaders>, EnforcementTask, NotUsed> createEnforcementTask(
final PreEnforcer preEnforcer) {
final PreEnforcer<T> preEnforcer) {

return Flow.<Contextual<WithDittoHeaders>, Optional<Contextual<T>>>fromFunction(
contextual -> contextual.tryToMapMessage(this::mapToHandledClass))
.filter(Optional::isPresent)
Expand All @@ -93,7 +94,8 @@ default Flow<Contextual<WithDittoHeaders>, EnforcementTask, NotUsed> createEnfor
}

private Optional<EnforcementTask> buildEnforcementTask(final Contextual<T> contextual,
final PreEnforcer preEnforcer) {
final PreEnforcer<T> preEnforcer) {

final T message = contextual.getMessage();
final boolean changesAuthorization = changesAuthorization(message);

Expand All @@ -109,7 +111,7 @@ private Optional<EnforcementTask> buildEnforcementTask(final Contextual<T> conte
return Optional.of(EnforcementTask.of(entityId, changesAuthorization,
() -> preEnforcer.withErrorHandlingAsync(contextual.setMessage(messageWithTraceContext),
contextual.setMessage(null).withReceiver(null),
converted -> createEnforcement((Contextual<T>) converted).enforceSafely())
converted -> createEnforcement(converted).enforceSafely())
.whenComplete((result, error) -> {
timer.tag("outcome", error != null ? "fail" : "success");
timer.stop();
Expand Down
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.policies.enforcement;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.auth.AuthorizationSubject;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.DittoHeadersSettable;

/**
* Sets additional headers to a signal.
* @param <T> the signal.
*/
public class HeaderSetter<T extends DittoHeadersSettable<?>> implements PreEnforcer<T> {

@Override
public CompletionStage<T> apply(final T originalSignal) {
return CompletableFuture.completedFuture(setOriginatorHeader(originalSignal));
}

/**
* Set the "ditto-originator" header to the primary authorization subject of a signal.
*
* @param originalSignal A signal with authorization context.
* @param <T> the type of the {@code originalSignal} to preserve in the response.
* @return A copy of the signal with the header "ditto-originator" set.
* @since 3.0.0
*/
@SuppressWarnings("unchecked")
public static <T extends DittoHeadersSettable<?>> T setOriginatorHeader(final T originalSignal) {
final DittoHeaders dittoHeaders = originalSignal.getDittoHeaders();
final AuthorizationContext authorizationContext = dittoHeaders.getAuthorizationContext();
return authorizationContext.getFirstAuthorizationSubject()
.map(AuthorizationSubject::getId)
.map(originatorSubjectId -> DittoHeaders.newBuilder(dittoHeaders)
.putHeader(DittoHeaderDefinition.ORIGINATOR.getKey(), originatorSubjectId)
.build())
.map(originatorHeader -> (T) originalSignal.setDittoHeaders(originatorHeader))
.orElse(originalSignal);
}
}
Expand Up @@ -34,24 +34,13 @@
*
* @param <T> the type of the signals to pre-enforce.
*/
@FunctionalInterface
public interface PreEnforcer<T extends DittoHeadersSettable<?>> {
public interface PreEnforcer<T extends DittoHeadersSettable<?>> extends Function<T, CompletionStage<T>>{

/**
* Logger of pre-enforcers.
*/
DittoLogger LOGGER = DittoLoggerFactory.getLogger(PreEnforcer.class);

/**
* Apply pre-enforcement.
* Post-condition: the type of signal in the future of the return value is identical to the type of
* signal in the argument.
*
* @param signal the signal.
* @return future result of the pre-enforcement.
*/
CompletionStage<T> apply(T signal);

/**
* Perform pre-enforcement with error handling.
*
Expand Down Expand Up @@ -92,25 +81,4 @@ default <S extends WithSender<? extends DittoHeadersSettable<?>>, F> CompletionS
});
}

/**
* Set the "ditto-originator" header to the primary authorization subject of a signal.
*
* @param originalSignal A signal with authorization context.
* @param <T> the type of the {@code originalSignal} to preserve in the response.
* @return A copy of the signal with the header "ditto-originator" set.
* @since 3.0.0
*/
@SuppressWarnings("unchecked")
static <T extends DittoHeadersSettable<?>> T setOriginatorHeader(final T originalSignal) {
final DittoHeaders dittoHeaders = originalSignal.getDittoHeaders();
final AuthorizationContext authorizationContext = dittoHeaders.getAuthorizationContext();
return authorizationContext.getFirstAuthorizationSubject()
.map(AuthorizationSubject::getId)
.map(originatorSubjectId -> DittoHeaders.newBuilder(dittoHeaders)
.putHeader(DittoHeaderDefinition.ORIGINATOR.getKey(), originatorSubjectId)
.build())
.map(originatorHeader -> (T) originalSignal.setDittoHeaders(originatorHeader))
.orElse(originalSignal);
}

}
Expand Up @@ -14,12 +14,12 @@

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.function.BinaryOperator;

import org.eclipse.ditto.base.model.headers.DittoHeadersSettable;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.service.DittoExtensionPoint;
import org.eclipse.ditto.internal.utils.akka.AkkaClassLoader;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;

import akka.actor.ActorSystem;

Expand All @@ -28,12 +28,13 @@
*
* @since 3.0.0
*/
public interface PreEnforcerProvider extends DittoExtensionPoint {
public interface PreEnforcerProvider extends DittoExtensionPoint{

/**
* Gets the pre-enforcer.
* Applies the pre-enforcement to the signal.
* @param signal the signal the pre-enforcement is executed for.
*/
<T extends DittoHeadersSettable<?>> PreEnforcer<T> getPreEnforcer();
<T extends DittoHeadersSettable<?>> CompletionStage<T> apply(T signal);

/**
* Loads the implementation of {@code PreEnforcerProvider} which is configured for the
Expand Down
Expand Up @@ -26,6 +26,7 @@

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.DittoHeadersSettable;
import org.eclipse.ditto.policies.enforcement.PreEnforcer;
import org.eclipse.ditto.policies.enforcement.placeholders.strategies.SubstitutionStrategy;
import org.eclipse.ditto.policies.enforcement.placeholders.strategies.SubstitutionStrategyRegistry;

Expand All @@ -36,8 +37,7 @@
* @param <T> the subtype of {@link DittoHeadersSettable} handled by the PlaceholderSubstitution.
*/
@Immutable
public final class PlaceholderSubstitution<T extends DittoHeadersSettable<?>>
implements Function<T, CompletionStage<T>> {
public final class PlaceholderSubstitution<T extends DittoHeadersSettable<?>> implements PreEnforcer<T> {

private final HeaderBasedPlaceholderSubstitutionAlgorithm substitutionAlgorithm;
private final SubstitutionStrategyRegistry substitutionStrategyRegistry;
Expand Down
Expand Up @@ -13,7 +13,8 @@
package org.eclipse.ditto.policies.enforcement.validators;

import java.util.Optional;
import java.util.function.UnaryOperator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import org.eclipse.ditto.base.model.exceptions.DittoJsonException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
Expand All @@ -23,29 +24,31 @@
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonParseException;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.policies.enforcement.PreEnforcer;

/**
* Checks that commands that modify entities cause no harm downstream.
*/
public final class CommandWithOptionalEntityValidator<T extends DittoHeadersSettable<?>> implements UnaryOperator<T> {
public final class CommandWithOptionalEntityValidator<T extends DittoHeadersSettable<?>> implements PreEnforcer<T> {

public static <T extends DittoHeadersSettable<?>> CommandWithOptionalEntityValidator<T> createInstance() {
return new CommandWithOptionalEntityValidator<>();
}

@Override
public T apply(final T withDittoHeaders) {
public CompletionStage<T> apply(final T withDittoHeaders) {
return checkForHarmfulEntity(withDittoHeaders);
}

private static <T extends DittoHeadersSettable<?>> T checkForHarmfulEntity(final T withDittoHeaders) {
private static <T extends DittoHeadersSettable<?>> CompletionStage<T> checkForHarmfulEntity(
final T withDittoHeaders) {
if (withDittoHeaders instanceof Command && withDittoHeaders instanceof WithOptionalEntity) {
final Optional<JsonValue> optionalEntity = ((WithOptionalEntity) withDittoHeaders).getEntity();
if (optionalEntity.isPresent() && isJsonValueIllegal(optionalEntity.get())) {
throw buildError(withDittoHeaders);
}
}
return withDittoHeaders;
return CompletableFuture.completedFuture(withDittoHeaders);
}

private static boolean isJsonValueIllegal(final JsonValue entity) {
Expand Down
Expand Up @@ -162,21 +162,6 @@ private static Props getPolicySupervisorActorProps(final SnapshotAdapter<Policy>
return PolicySupervisorActor.props(pubSubMediator, snapshotAdapter, policyAnnouncementPub, blockedNamespaces);
}

/**
* TODO CR-11297 provide extension mechanism here
* TODO CR-11297 consolidate with ThingsRootActor.newPreEnforcer
*/
private static <T extends DittoHeadersSettable<?>> PreEnforcer<T> newPreEnforcer(
final BlockedNamespaces blockedNamespaces) {

return dittoHeadersSettable ->
BlockNamespaceBehavior.of(blockedNamespaces)
.block(dittoHeadersSettable)
.thenApply(CommandWithOptionalEntityValidator.createInstance())
.thenApply(PreEnforcer::setOriginatorHeader)
.thenCompose(PlaceholderSubstitution.newInstance());
}

/**
* Creates Akka configuration object Props for this PoliciesRootActor.
*
Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.eclipse.ditto.internal.models.signal.correlation.CommandAndCommandResponseMatchingValidator;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.policies.enforcement.HeaderSetter;
import org.eclipse.ditto.policies.enforcement.PreEnforcer;
import org.eclipse.ditto.things.model.signals.commands.query.ThingQueryCommand;

Expand Down Expand Up @@ -119,7 +120,7 @@ private void onAcknowledgements(final Acknowledgements acks) {
}

private void onCommandResponse(final CommandResponse<?> incomingResponse) {
final CommandResponse<?> response = PreEnforcer.setOriginatorHeader(incomingResponse);
final CommandResponse<?> response = HeaderSetter.setOriginatorHeader(incomingResponse);
final boolean validResponse = isValidResponse(response);
log.debug("Got <{}>, valid=<{}>", response, validResponse);
if (validResponse) {
Expand Down

0 comments on commit 51f9f21

Please sign in to comment.