Skip to content

Commit

Permalink
Make PreEnforcerProvider load all PreEnforcer via config
Browse files Browse the repository at this point in the history
Signed-off-by: David Schwilk <david.schwilk@bosch.io>
  • Loading branch information
DerSchwilk committed May 25, 2022
1 parent 17d9ecd commit ab05043
Show file tree
Hide file tree
Showing 24 changed files with 229 additions and 113 deletions.
Expand Up @@ -15,7 +15,6 @@
import java.util.concurrent.CompletionStage;

import org.eclipse.ditto.connectivity.model.ConnectionId;
//todo dgs: Extension
/**
* Provides the priority of a given connection.
*/
Expand Down
8 changes: 7 additions & 1 deletion connectivity/service/src/main/resources/connectivity.conf
Expand Up @@ -3,7 +3,13 @@ ditto {
signal-transformer = "org.eclipse.ditto.edge.api.dispatching.NoOpSignalTransformer"
root-child-actor-starter = "org.eclipse.ditto.base.service.NoOpRootChildActorStarter"
root-actor-starter = "org.eclipse.ditto.base.service.NoOpRootActorStarter"
pre-enforcer-provider = "org.eclipse.ditto.policies.enforcement.DefaultPreEnforcerProvider"
#todo dgs: adjust (blockingNamespace likely not needed)
pre-enforcers = [
"org.eclipse.ditto.policies.enforcement.BlockedNamespacePreEnforcer",
"org.eclipse.ditto.policies.enforcement.validators.CommandWithOptionalEntityValidator",
"org.eclipse.ditto.policies.enforcement.HeaderSetter",
"org.eclipse.ditto.policies.enforcement.placeholders.PlaceholderSubstitution"
]

mongodb {
database = "connectivity"
Expand Down
Expand Up @@ -17,7 +17,6 @@
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
Expand All @@ -39,7 +38,6 @@
import org.junit.BeforeClass;
import org.junit.Test;

import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;

import akka.actor.ActorRef;
Expand Down Expand Up @@ -74,9 +72,7 @@ public final class ConnectionPersistenceActorRecoveryTest extends WithMockServer

@BeforeClass
public static void setUp() {
actorSystem = ActorSystem.create("AkkaTestSystem",
ConfigFactory.parseMap(Map.of("ditto.pre-enforcer-provider","org.eclipse.ditto.policies.enforcement." +
"DefaultPreEnforcerProvider")).withFallback(TestConstants.CONFIG));
actorSystem = ActorSystem.create("AkkaTestSystem", TestConstants.CONFIG);
pubSubMediator = DistributedPubSub.get(actorSystem).mediator();
proxyActor = actorSystem.actorOf(TestConstants.ProxyActorMock.props());
}
Expand Down Expand Up @@ -127,9 +123,7 @@ public void testRecoveryOfDeletedConnectionsWithoutSnapshot() {
public void testRecoveryOfConnectionWithBlockedHost() {
final ActorSystem akkaTestSystem =
ActorSystem.create("AkkaTestSystem", TestConstants.CONFIG.withValue("ditto.connectivity.connection" +
".blocked-hostnames", ConfigValueFactory.fromAnyRef("127.0.0.1"))
.withValue("ditto.pre-enforcer-provider",ConfigValueFactory.fromAnyRef(
"org.eclipse.ditto.policies.enforcement.DefaultPreEnforcerProvider")));
".blocked-hostnames", ConfigValueFactory.fromAnyRef("127.0.0.1")));
final ActorRef mediator = DistributedPubSub.get(akkaTestSystem).mediator();
final ActorRef proxyActor = actorSystem.actorOf(TestConstants.ProxyActorMock.props());

Expand Down
Expand Up @@ -54,9 +54,7 @@ public void setUp(final boolean allowFirstCreateCommand, final boolean allowClos
Map.of("ditto.connectivity.connection.client-actor-props-factory",
"org.eclipse.ditto.connectivity.service.messaging.FaultyClientActorPropsFactory",
"allowFirstCreateCommand", allowFirstCreateCommand, "allowCloseCommands",
allowCloseCommands, "ditto.pre-enforcer-provider","org.eclipse.ditto.policies" +
".enforcement." +
"DefaultPreEnforcerProvider"))
allowCloseCommands))
.withFallback(TestConstants.CONFIG));
final DittoConnectivityConfig connectivityConfig =
DittoConnectivityConfig.of(DefaultScopedConfig.dittoScoped(actorSystem.settings().config()));
Expand Down
7 changes: 6 additions & 1 deletion connectivity/service/src/test/resources/test.conf
Expand Up @@ -20,7 +20,12 @@ akka.cluster.roles = ["thing-event-aware", "live-signal-aware", "acks-aware", "p

ditto {
mapping-strategy.implementation = "org.eclipse.ditto.connectivity.api.ConnectivityMappingStrategies"
pre-enforcer-provider = "org.eclipse.ditto.policies.enforcement.DefaultPreEnforcerProvider"
pre-enforcers = [
"org.eclipse.ditto.policies.enforcement.BlockedNamespacePreEnforcer",
"org.eclipse.ditto.policies.enforcement.validators.CommandWithOptionalEntityValidator",
"org.eclipse.ditto.policies.enforcement.HeaderSetter",
"org.eclipse.ditto.policies.enforcement.placeholders.PlaceholderSubstitution"
]

signal-enrichment = {
caching-signal-enrichment-facade.provider = org.eclipse.ditto.internal.models.signalenrichment.DittoCachingSignalEnrichmentFacadeProvider
Expand Down
1 change: 1 addition & 0 deletions deployment/docker/nginx.htpasswd
@@ -1,2 +1,3 @@
# this file contains sample users and their hashed password
ditto:A6BgmB8IEtPTs
david:xpe1zRdjOlAhE
Expand Up @@ -189,7 +189,6 @@ private Config getEventSourcingConfiguration() {
"akka.cluster.seed-nodes=[]\n" +
"akka.coordinated-shutdown.exit-jvm=off\n" +
"ditto.things.log-incoming-messages=true\n" +
"ditto.pre-enforcer-provider=org.eclipse.ditto.policies.enforcement.DefaultPreEnforcerProvider\n" +
"akka.contrib.persistence.mongodb.mongo.mongouri=\"" + mongoDbUri + "\"\n";

// load the service config for info about event journal, snapshot store and metadata
Expand Down
Expand Up @@ -41,7 +41,6 @@
import org.eclipse.ditto.internal.utils.namespaces.BlockedNamespaces;
import org.eclipse.ditto.policies.enforcement.CreationRestrictionEnforcer;
import org.eclipse.ditto.policies.enforcement.DefaultCreationRestrictionEnforcer;
import org.eclipse.ditto.policies.enforcement.PreEnforcer;
import org.eclipse.ditto.policies.enforcement.PreEnforcerProvider;
import org.eclipse.ditto.policies.enforcement.config.DefaultEntityCreationConfig;

Expand Down Expand Up @@ -557,7 +556,7 @@ protected CompletionStage<Object> enforceSignalAndForwardToTargetActor(final S s

if (null != enforcerChild) {
return PreEnforcerProvider.get(getContext().getSystem()).apply(signal).thenCompose(preEnforcedCommand ->
askEnforcerChild(preEnforcedCommand)
askEnforcerChild((Signal<?>) preEnforcedCommand)
.thenCompose(this::modifyEnforcerActorEnforcedSignalResponse)
.thenCompose(enforcedCommand -> enforcerResponseToTargetActor(
preEnforcedCommand.getDittoHeaders(),
Expand Down
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.policies.enforcement;

import java.util.concurrent.CompletionStage;

import org.eclipse.ditto.base.model.headers.DittoHeadersSettable;
import org.eclipse.ditto.internal.utils.namespaces.BlockNamespaceBehavior;
import org.eclipse.ditto.internal.utils.namespaces.BlockedNamespaces;

import akka.actor.ActorSystem;

public final class BlockedNamespacePreEnforcer implements PreEnforcer{

final BlockNamespaceBehavior blockNamespaceBehavior;

public BlockedNamespacePreEnforcer(final ActorSystem actorSystem) {
blockNamespaceBehavior = BlockNamespaceBehavior.of(BlockedNamespaces.of(actorSystem));
}

@Override
public CompletionStage<DittoHeadersSettable<?>> apply(final DittoHeadersSettable<?> t) {
return blockNamespaceBehavior.block(t);
}
}

This file was deleted.

Expand Up @@ -82,7 +82,7 @@ default boolean changesAuthorization(final T signal) {
* @return the stream.
*/
default Flow<Contextual<WithDittoHeaders>, EnforcementTask, NotUsed> createEnforcementTask(
final PreEnforcer<T> preEnforcer) {
final PreEnforcer preEnforcer) {

return Flow.<Contextual<WithDittoHeaders>, Optional<Contextual<T>>>fromFunction(
contextual -> contextual.tryToMapMessage(this::mapToHandledClass))
Expand All @@ -94,7 +94,7 @@ default Flow<Contextual<WithDittoHeaders>, EnforcementTask, NotUsed> createEnfor
}

private Optional<EnforcementTask> buildEnforcementTask(final Contextual<T> contextual,
final PreEnforcer<T> preEnforcer) {
final PreEnforcer preEnforcer) {

final T message = contextual.getMessage();
final boolean changesAuthorization = changesAuthorization(message);
Expand Down
Expand Up @@ -21,14 +21,19 @@
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.DittoHeadersSettable;

import akka.actor.ActorSystem;

/**
* Sets additional headers to a signal.
* @param <T> the signal.
*/
public class HeaderSetter<T extends DittoHeadersSettable<?>> implements PreEnforcer<T> {
public class HeaderSetter implements PreEnforcer {

public HeaderSetter(final ActorSystem actorSystem) {

}

@Override
public CompletionStage<T> apply(final T originalSignal) {
public CompletionStage<DittoHeadersSettable<?>> apply(final DittoHeadersSettable<?> originalSignal) {
return CompletableFuture.completedFuture(setOriginatorHeader(originalSignal));
}

Expand Down
Expand Up @@ -13,6 +13,7 @@
package org.eclipse.ditto.policies.enforcement;

import java.text.MessageFormat;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
Expand All @@ -27,19 +28,23 @@
import org.eclipse.ditto.base.model.headers.DittoHeadersSettable;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.service.DittoExtensionPoint;
import org.eclipse.ditto.internal.utils.akka.AkkaClassLoader;
import org.eclipse.ditto.internal.utils.akka.controlflow.WithSender;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLogger;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;

import akka.actor.AbstractExtensionId;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.ExtendedActorSystem;

/**
* Create processing units of Akka stream graph before enforcement from an asynchronous function that may abort
* enforcement by throwing exceptions.
*
* @param <T> the type of the signals to pre-enforce.
*/
public interface PreEnforcer<T extends DittoHeadersSettable<?>> extends Function<T, CompletionStage<T>> {
public interface PreEnforcer extends Function<DittoHeadersSettable<?>, CompletionStage<DittoHeadersSettable<?>>>,
DittoExtensionPoint {

/**
* Logger of pre-enforcers.
Expand Down Expand Up @@ -91,7 +96,7 @@ default <S extends WithSender<? extends DittoHeadersSettable<?>>, F> CompletionS
final F onError,
final Function<S, CompletionStage<F>> andThen) {

final T message = (T) withSender.getMessage();
final DittoHeadersSettable message = withSender.getMessage();
return apply(message)
// the cast to (S) is safe if the post-condition of this.apply(DittoHeadersSettable<?>) holds.
.thenCompose(msg -> andThen.apply((S) ((WithSender) withSender).withMessage(msg)))
Expand All @@ -115,4 +120,25 @@ default <S extends WithSender<? extends DittoHeadersSettable<?>>, F> CompletionS
});
}

final class ExtensionId extends AbstractExtensionId<PreEnforcer> {

private final String implementation;

ExtensionId(final String implementation) {
this.implementation = implementation;
}

static ExtensionId get(final String implementation, final ActorSystem actorSystem) {
return PreEnforcerExtensionIds.INSTANCE.get(actorSystem).get(implementation);
}

@Override
public PreEnforcer createExtension(final ExtendedActorSystem system) {

return AkkaClassLoader.instantiate(system, PreEnforcer.class,
implementation,
List.of(ActorSystem.class),
List.of(system));
}
}
}
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.policies.enforcement;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.eclipse.ditto.internal.utils.akka.AkkaClassLoader;

import akka.actor.AbstractExtensionId;
import akka.actor.ExtendedActorSystem;
import akka.actor.Extension;

final class PreEnforcerExtensionIds implements Extension {

static final ExtensionId INSTANCE = new ExtensionId();
private final Map<String, PreEnforcer.ExtensionId> extensionIds = new HashMap<>();

PreEnforcer.ExtensionId get(final String implementation) {
return extensionIds.computeIfAbsent(implementation, PreEnforcer.ExtensionId::new);
}

static final class ExtensionId extends AbstractExtensionId<PreEnforcerExtensionIds> {

@Override
public PreEnforcerExtensionIds createExtension(final ExtendedActorSystem system) {

return AkkaClassLoader.instantiate(system, PreEnforcerExtensionIds.class,
PreEnforcerExtensionIds.class.getCanonicalName(),
List.of(),
List.of());
}
}

}

0 comments on commit ab05043

Please sign in to comment.