Skip to content

Commit

Permalink
Add PreEnforcer Extension
Browse files Browse the repository at this point in the history
Signed-off-by: David Schwilk <david.schwilk@bosch.io>
  • Loading branch information
DerSchwilk committed May 20, 2022
1 parent 1c1eae9 commit f51b321
Show file tree
Hide file tree
Showing 29 changed files with 229 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,36 +64,36 @@ public abstract class DittoRootActor extends AbstractActor {
e.printStackTrace(pw);

log.warning("Illegal Argument in child actor: {}", sw.toString());
return (SupervisorStrategy.Directive) SupervisorStrategy.resume();
return SupervisorStrategy.resume();
}).match(IllegalStateException.class, e -> {
log.warning("Illegal State in child actor: {}", e.getMessage());
return (SupervisorStrategy.Directive) SupervisorStrategy.resume();
return SupervisorStrategy.resume();
}).match(IndexOutOfBoundsException.class, e -> {
log.warning("IndexOutOfBounds in child actor: {}", e.getMessage());
return (SupervisorStrategy.Directive) SupervisorStrategy.resume();
return SupervisorStrategy.resume();
}).match(NoSuchElementException.class, e -> {
log.warning("NoSuchElement in child actor: {}", e.getMessage());
return (SupervisorStrategy.Directive) SupervisorStrategy.resume();
return SupervisorStrategy.resume();
}).match(AskTimeoutException.class, e -> {
log.warning("AskTimeoutException in child actor: {}", e.getMessage());
return (SupervisorStrategy.Directive) SupervisorStrategy.resume();
return SupervisorStrategy.resume();
}).match(ConnectException.class, e -> {
log.warning("ConnectException in child actor: {}", e.getMessage());
return restartChild();
}).match(InvalidActorNameException.class, e -> {
log.warning("InvalidActorNameException in child actor: {}", e.getMessage());
return (SupervisorStrategy.Directive) SupervisorStrategy.resume();
return SupervisorStrategy.resume();
}).match(ActorInitializationException.class, e -> {
log.error(e, "ActorInitializationException in child actor: {}", e.getMessage());
return (SupervisorStrategy.Directive) SupervisorStrategy.stop();
return SupervisorStrategy.stop();
}).match(ActorKilledException.class, e -> {
log.error(e, "ActorKilledException in child actor: {}", e.message());
return restartChild();
}).match(DittoRuntimeException.class, e -> {
log.error(e,
"DittoRuntimeException '{}' should not be escalated to ConnectivityRootActor. Simply resuming Actor.",
e.getErrorCode());
return (SupervisorStrategy.Directive) SupervisorStrategy.resume();
return SupervisorStrategy.resume();
}).match(Throwable.class, e -> {
log.error(e, "Escalating above root actor!");
return (SupervisorStrategy.Directive) SupervisorStrategy.escalate();
Expand Down Expand Up @@ -151,7 +151,7 @@ protected ActorRef startChildActor(final String actorName, final Props props) {
*/
protected SupervisorStrategy.Directive restartChild() {
log.info("Restarting child ...");
return (SupervisorStrategy.Directive) SupervisorStrategy.restart();
return SupervisorStrategy.restart();
}

private void startChildActor(final StartChildActor startChildActor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
*/
package org.eclipse.ditto.connectivity.service;

import java.util.concurrent.CompletableFuture;

import javax.jms.JMSRuntimeException;
import javax.naming.NamingException;

Expand Down Expand Up @@ -43,7 +41,6 @@
import org.eclipse.ditto.internal.utils.persistentactors.PersistencePingActor;
import org.eclipse.ditto.internal.utils.persistentactors.cleanup.PersistenceCleanupActor;
import org.eclipse.ditto.internal.utils.pubsub.DittoProtocolSub;
import org.eclipse.ditto.policies.enforcement.PreEnforcer;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
Expand Down Expand Up @@ -125,12 +122,7 @@ private ConnectivityRootActor(final ConnectivityConfig connectivityConfig,
private static Props getConnectivitySupervisorActorProps(final ActorRef pubSubMediator,
final ActorRef proxyActor) {

return ConnectionSupervisorActor.props(proxyActor, pubSubMediator, providePreEnforcer());
}

private static PreEnforcer providePreEnforcer() {
return CompletableFuture::completedStage;
// TODO CR-11297 provide extension mechanism here
return ConnectionSupervisorActor.props(proxyActor, pubSubMediator);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.eclipse.ditto.connectivity.service.enforcement.ConnectivityCommandEnforcement;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.internal.utils.persistentactors.AbstractPersistenceSupervisor;
import org.eclipse.ditto.policies.enforcement.PreEnforcer;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
Expand Down Expand Up @@ -76,12 +75,8 @@ public final class ConnectionSupervisorActor
private boolean isRegisteredForConnectivityConfigChanges = false;

@SuppressWarnings("unused")
private ConnectionSupervisorActor(
final ActorRef proxyActor,
final ActorRef pubSubMediator,
final PreEnforcer<ConnectivityCommand<?>> preEnforcer) {

super(null, preEnforcer);
private ConnectionSupervisorActor(final ActorRef proxyActor, final ActorRef pubSubMediator) {
super(null);
this.proxyActor = proxyActor;
this.pubSubMediator = pubSubMediator;
}
Expand All @@ -95,15 +90,13 @@ private ConnectionSupervisorActor(
*
* @param proxyActor the actor used to send signals into the ditto cluster..
* @param pubSubMediator pub-sub-mediator for the shutdown behavior.
* @param preEnforcer the PreEnforcer to apply as extension mechanism of the enforcement.
* @return the {@link Props} to create this actor.
*/
public static Props props(final ActorRef proxyActor,
final ActorRef pubSubMediator,
final PreEnforcer<ConnectivityCommand<?>> preEnforcer) {
final ActorRef pubSubMediator) {

return Props.create(ConnectionSupervisorActor.class, proxyActor,
pubSubMediator, preEnforcer);
pubSubMediator);
}

@Override
Expand Down
19 changes: 18 additions & 1 deletion connectivity/service/src/main/resources/connectivity.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ ditto {
signal-transformer = "org.eclipse.ditto.edge.api.dispatching.NoOpSignalTransformer"
root-child-actor-starter = "org.eclipse.ditto.base.service.NoOpRootChildActorStarter"
root-actor-starter = "org.eclipse.ditto.base.service.NoOpRootActorStarter"
pre-enforcer-provider = "org.eclipse.ditto.policies.enforcement.DefaultPreEnforcerProvider"

mongodb {
database = "connectivity"
Expand Down Expand Up @@ -931,7 +932,8 @@ akka {
}

roles = [
"connectivity"
"connectivity",
"blocked-namespaces-aware"
]
}
coordinated-shutdown {
Expand Down Expand Up @@ -1066,4 +1068,19 @@ kafka-producer-dispatcher {
executor = "thread-pool-executor"
}

blocked-namespaces-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 4
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 3.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 32
parallelism-max = ${?DEFAULT_DISPATCHER_PARALLELISM_MAX}
}
throughput = 5
}

include "connectivity-extension"
Original file line number Diff line number Diff line change
Expand Up @@ -918,8 +918,7 @@ public static ActorRef createConnectionSupervisorActor(final ConnectionId connec
final ActorRef pubSubMediator) {

final Props props =
ConnectionSupervisorActor.props(proxyActor, pubSubMediator,
CompletableFuture::completedStage);
ConnectionSupervisorActor.props(proxyActor, pubSubMediator);

final Props shardRegionMockProps = Props.create(ShardRegionMockActor.class, props, connectionId.toString());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
package org.eclipse.ditto.connectivity.service.messaging.persistence;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;

import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.auth.AuthorizationSubject;
Expand Down Expand Up @@ -125,7 +124,7 @@ protected ActorRef startEntityActor(final ActorSystem system, final ActorRef pub
// essentially never restart
final TestProbe proxyActorProbe = new TestProbe(system, "proxyActor");
final Props props =
ConnectionSupervisorActor.props(proxyActorProbe.ref(), pubSubMediator, CompletableFuture::completedStage);
ConnectionSupervisorActor.props(proxyActorProbe.ref(), pubSubMediator);

return system.actorOf(props, String.valueOf(id));
}
Expand Down
16 changes: 16 additions & 0 deletions connectivity/service/src/test/resources/test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ akka.cluster.roles = ["thing-event-aware", "live-signal-aware", "acks-aware", "p

ditto {
mapping-strategy.implementation = "org.eclipse.ditto.connectivity.api.ConnectivityMappingStrategies"
pre-enforcer-provider = "org.eclipse.ditto.policies.enforcement.DefaultPreEnforcerProvider"

signal-enrichment = {
caching-signal-enrichment-facade.provider = org.eclipse.ditto.internal.models.signalenrichment.DittoCachingSignalEnrichmentFacadeProvider
Expand Down Expand Up @@ -538,3 +539,18 @@ kafka-producer-dispatcher {
type = PinnedDispatcher
executor = "thread-pool-executor"
}

blocked-namespaces-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 4
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 3.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 32
parallelism-max = ${?DEFAULT_DISPATCHER_PARALLELISM_MAX}
}
throughput = 5
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.eclipse.ditto.policies.enforcement.CreationRestrictionEnforcer;
import org.eclipse.ditto.policies.enforcement.DefaultCreationRestrictionEnforcer;
import org.eclipse.ditto.policies.enforcement.PreEnforcer;
import org.eclipse.ditto.policies.enforcement.PreEnforcerProvider;
import org.eclipse.ditto.policies.enforcement.config.DefaultEntityCreationConfig;

import akka.actor.ActorRef;
Expand Down Expand Up @@ -92,20 +93,18 @@ public abstract class AbstractPersistenceSupervisor<E extends EntityId, S extend
private ExponentialBackOff backOff;
private boolean waitingForStopBeforeRestart = false;

protected AbstractPersistenceSupervisor(@Nullable final BlockedNamespaces blockedNamespaces,
final PreEnforcer<S> preEnforcer) {
this(null, null, blockedNamespaces, preEnforcer);
protected AbstractPersistenceSupervisor(@Nullable final BlockedNamespaces blockedNamespaces) {
this(null, null, blockedNamespaces);
}

protected AbstractPersistenceSupervisor(@Nullable final ActorRef persistenceActorChild,
@Nullable final ActorRef enforcerChild,
@Nullable final BlockedNamespaces blockedNamespaces,
final PreEnforcer<S> preEnforcer) {
@Nullable final BlockedNamespaces blockedNamespaces) {

this.persistenceActorChild = persistenceActorChild;
this.enforcerChild = enforcerChild;
this.blockedNamespaces = blockedNamespaces;
this.preEnforcer = preEnforcer;
preEnforcer = PreEnforcerProvider.get(getContext().getSystem()).getPreEnforcer();
exponentialBackOffConfig = getExponentialBackOffConfig();
backOff = ExponentialBackOff.initial(exponentialBackOffConfig);
creationRestrictionEnforcer = DefaultCreationRestrictionEnforcer.of(
Expand Down
8 changes: 8 additions & 0 deletions policies/enforcement/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-base-model</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-base-service</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-policies-model</artifactId>
Expand Down Expand Up @@ -119,6 +123,10 @@
<artifactId>logback-classic</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-utils-namespaces</artifactId>
</dependency>
</dependencies>

<parent>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.policies.enforcement;

import org.eclipse.ditto.base.model.headers.DittoHeadersSettable;
import org.eclipse.ditto.internal.utils.namespaces.BlockNamespaceBehavior;
import org.eclipse.ditto.internal.utils.namespaces.BlockedNamespaces;
import org.eclipse.ditto.policies.enforcement.placeholders.PlaceholderSubstitution;
import org.eclipse.ditto.policies.enforcement.validators.CommandWithOptionalEntityValidator;

import akka.actor.ActorSystem;

public class DefaultPreEnforcerProvider implements PreEnforcerProvider{

private final ActorSystem actorSystem;

public DefaultPreEnforcerProvider(final ActorSystem actorSystem) {
this.actorSystem = actorSystem;
}

@Override
public <T extends DittoHeadersSettable<?>> PreEnforcer<T> getPreEnforcer() {
return dittoHeadersSettable ->
BlockNamespaceBehavior.of(BlockedNamespaces.of(actorSystem))
.block(dittoHeadersSettable)
.thenApply(CommandWithOptionalEntityValidator.createInstance())
.thenApply(PreEnforcer::setOriginatorHeader)
.thenCompose(PlaceholderSubstitution.newInstance());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.policies.enforcement;

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import java.util.List;

import org.eclipse.ditto.base.model.headers.DittoHeadersSettable;
import org.eclipse.ditto.base.service.DittoExtensionPoint;
import org.eclipse.ditto.internal.utils.akka.AkkaClassLoader;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;

import akka.actor.ActorSystem;

/**
* Extension to provide the pre-enforcer for a service.
*
* @since 3.0.0
*/
public interface PreEnforcerProvider extends DittoExtensionPoint {

static final String CONFIG_PATH = "pre-enforcer-provider";

/**
* Gets the pre-enforcer.
*/
<T extends DittoHeadersSettable<?>> PreEnforcer<T> getPreEnforcer();

/**
* Loads the implementation of {@code PreEnforcerProvider} which is configured for the
* {@code ActorSystem}.
*
* @param actorSystem the actorSystem in which the {@code PreEnforcerProvider} should be loaded.
* @return the {@code PreEnforcerProvider} implementation.
* @throws NullPointerException if {@code actorSystem} is {@code null}.
*/
static PreEnforcerProvider get(final ActorSystem actorSystem) {
checkNotNull(actorSystem, "actorSystem");
final DefaultScopedConfig dittoScoped = DefaultScopedConfig.dittoScoped(actorSystem.settings().config());
final var implementation = dittoScoped.getString(CONFIG_PATH);

return AkkaClassLoader.instantiate(actorSystem, PreEnforcerProvider.class,
implementation,
List.of(ActorSystem.class),
List.of(actorSystem));
}
}
Loading

0 comments on commit f51b321

Please sign in to comment.