diff --git a/model/connectivity/src/main/java/org/eclipse/ditto/model/connectivity/UnresolvedPlaceholderException.java b/model/connectivity/src/main/java/org/eclipse/ditto/model/connectivity/UnresolvedPlaceholderException.java index 77659fb57d..e0af6a0b31 100644 --- a/model/connectivity/src/main/java/org/eclipse/ditto/model/connectivity/UnresolvedPlaceholderException.java +++ b/model/connectivity/src/main/java/org/eclipse/ditto/model/connectivity/UnresolvedPlaceholderException.java @@ -25,7 +25,7 @@ import org.eclipse.ditto.model.base.headers.DittoHeaders; /** - * Thrown if a placeholder in the connection configuration could be resolved. + * Thrown if a placeholder in the connection configuration could not be resolved. */ @Immutable public final class UnresolvedPlaceholderException extends DittoRuntimeException diff --git a/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/ConnectionActor.java b/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/ConnectionActor.java index 790b082aaa..925c76580c 100644 --- a/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/ConnectionActor.java +++ b/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/ConnectionActor.java @@ -37,6 +37,8 @@ import org.eclipse.ditto.model.connectivity.Target; import org.eclipse.ditto.model.connectivity.Topic; import org.eclipse.ditto.services.connectivity.messaging.persistence.ConnectionMongoSnapshotAdapter; +import org.eclipse.ditto.services.connectivity.messaging.validation.CompoundConnectivityCommandInterceptor; +import org.eclipse.ditto.services.connectivity.messaging.validation.DittoConnectivityCommandValidator; import org.eclipse.ditto.services.connectivity.util.ConfigKeys; import org.eclipse.ditto.services.utils.akka.LogUtil; import org.eclipse.ditto.services.utils.persistence.SnapshotAdapter; @@ -101,7 +103,7 @@ * Handles {@code *Connection} commands and manages the persistence of connection. The actual connection handling to the * remote server is delegated to a child actor that uses a specific client (AMQP 1.0 or 0.9.1). */ -final class ConnectionActor extends AbstractPersistentActor { +public final class ConnectionActor extends AbstractPersistentActor { private static final String PERSISTENCE_ID_PREFIX = "connection:"; @@ -120,6 +122,7 @@ final class ConnectionActor extends AbstractPersistentActor { private final long snapshotThreshold; private final SnapshotAdapter snapshotAdapter; private final ConnectionActorPropsFactory propsFactory; + private final Consumer> commandValidator; private final Receive connectionCreatedBehaviour; @Nullable private ActorRef clientActor; @@ -132,11 +135,20 @@ final class ConnectionActor extends AbstractPersistentActor { private Set uniqueTopicPaths = Collections.emptySet(); private ConnectionActor(final String connectionId, final ActorRef pubSubMediator, - final ActorRef conciergeForwarder, final ConnectionActorPropsFactory propsFactory) { + final ActorRef conciergeForwarder, final ConnectionActorPropsFactory propsFactory, + @Nullable final Consumer> customCommandValidator) { this.connectionId = connectionId; this.pubSubMediator = pubSubMediator; this.conciergeForwarder = conciergeForwarder; this.propsFactory = propsFactory; + final DittoConnectivityCommandValidator + dittoCommandValidator = new DittoConnectivityCommandValidator(propsFactory, conciergeForwarder); + if (customCommandValidator != null) { + this.commandValidator = + new CompoundConnectivityCommandInterceptor(dittoCommandValidator, customCommandValidator); + } else { + this.commandValidator = dittoCommandValidator; + } final Config config = getContext().system().settings().config(); snapshotThreshold = config.getLong(ConfigKeys.Connection.SNAPSHOT_THRESHOLD); @@ -154,13 +166,15 @@ private ConnectionActor(final String connectionId, final ActorRef pubSubMediator * @return the Akka configuration Props object */ public static Props props(final String connectionId, final ActorRef pubSubMediator, - final ActorRef conciergeForwarder, final ConnectionActorPropsFactory propsFactory) { + final ActorRef conciergeForwarder, final ConnectionActorPropsFactory propsFactory, + @Nullable final Consumer> commandValidator) { return Props.create(ConnectionActor.class, new Creator() { private static final long serialVersionUID = 1L; @Override public ConnectionActor create() { - return new ConnectionActor(connectionId, pubSubMediator, conciergeForwarder, propsFactory); + return new ConnectionActor(connectionId, pubSubMediator, conciergeForwarder, propsFactory, + commandValidator); } }); } @@ -237,9 +251,11 @@ public Receive createReceiveRecover() { @Override public Receive createReceive() { + return ReceiveBuilder.create() - .match(TestConnection.class, this::testConnection) - .match(CreateConnection.class, this::createConnection) + .match(TestConnection.class, testConnection -> validateAndForward(testConnection, this::testConnection)) + .match(CreateConnection.class, + createConnection -> validateAndForward(createConnection, this::createConnection)) .match(ConnectivityCommand.class, this::handleCommandDuringInitialization) .match(Shutdown.class, shutdown -> stopSelf()) .match(Status.Failure.class, f -> log.warning("Got failure in initial behaviour with cause {}: {}", @@ -266,7 +282,8 @@ private Receive createConnectionCreatedBehaviour() { .build(); getSender().tell(conflictException, getSelf()); }) - .match(ModifyConnection.class, this::modifyConnection) + .match(ModifyConnection.class, + modifyConnection -> validateAndForward(modifyConnection, this::modifyConnection)) .match(OpenConnection.class, this::openConnection) .match(CloseConnection.class, this::closeConnection) .match(DeleteConnection.class, this::deleteConnection) @@ -322,10 +339,6 @@ private void handleSignal(final Signal signal) { private void testConnection(final TestConnection command) { final ActorRef origin = getSender(); - if (!isConnectionConfigurationValid(command.getConnection(), origin)) { - return; - } - connection = command.getConnection(); askClientActor(command, response -> { @@ -342,11 +355,19 @@ private void testConnection(final TestConnection command) { }); } - private void createConnection(final CreateConnection command) { + private void validateAndForward(final T command, final Consumer target) { final ActorRef origin = getSender(); - if (!isConnectionConfigurationValid(command.getConnection(), origin)) { - return; + try { + commandValidator.accept(command); + target.accept(command); + } catch (final Exception e) { + handleException(command.getType(), origin, e); + stopSelf(); } + } + + private void createConnection(final CreateConnection command) { + final ActorRef origin = getSender(); final ConnectionCreated connectionCreated = ConnectionCreated.of(command.getConnection(), command.getDittoHeaders()); @@ -370,23 +391,8 @@ private void createConnection(final CreateConnection command) { }); } - private boolean isConnectionConfigurationValid(final Connection connection, final ActorRef origin) { - try { - // try to create actor props before persisting the connection to fail early - propsFactory.getActorPropsForType(connection, conciergeForwarder); - return true; - } catch (final Exception e) { - handleException("connect", origin, e); - stopSelf(); - return false; - } - } - private void modifyConnection(final ModifyConnection command) { final ActorRef origin = getSender(); - if (!isConnectionConfigurationValid(command.getConnection(), origin)) { - return; - } if (connection != null && !connection.getConnectionType().equals(command.getConnection().getConnectionType())) { handleException("modify", origin, ConnectionConfigurationInvalidException diff --git a/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/ConnectionSupervisorActor.java b/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/ConnectionSupervisorActor.java index 28ca591f37..1757d26f2c 100644 --- a/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/ConnectionSupervisorActor.java +++ b/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/ConnectionSupervisorActor.java @@ -25,6 +25,7 @@ import org.eclipse.ditto.model.base.headers.WithDittoHeaders; import org.eclipse.ditto.services.utils.akka.LogUtil; +import org.eclipse.ditto.signals.commands.connectivity.ConnectivityCommandInterceptor; import org.eclipse.ditto.signals.commands.connectivity.exceptions.ConnectionUnavailableException; import akka.actor.AbstractActor; @@ -70,7 +71,8 @@ private ConnectionSupervisorActor(final SupervisorStrategy supervisorStrategy, final double randomFactor, final ActorRef pubSubMediator, final ActorRef conciergeForwarder, - final ConnectionActorPropsFactory propsFactory) { + final ConnectionActorPropsFactory propsFactory, + @Nullable final ConnectivityCommandInterceptor commandValidator) { try { this.connectionId = URLDecoder.decode(getSelf().path().name(), StandardCharsets.UTF_8.name()); } catch (final UnsupportedEncodingException e) { @@ -81,7 +83,7 @@ private ConnectionSupervisorActor(final SupervisorStrategy supervisorStrategy, this.maxBackoff = maxBackoff; this.randomFactor = randomFactor; this.persistenceActorProps = - ConnectionActor.props(connectionId, pubSubMediator, conciergeForwarder, propsFactory); + ConnectionActor.props(connectionId, pubSubMediator, conciergeForwarder, propsFactory, commandValidator); } /** @@ -99,6 +101,7 @@ private ConnectionSupervisorActor(final SupervisorStrategy supervisorStrategy, * @param pubSubMediator the PubSub mediator actor. * @param conciergeForwarder the actor used to send signals to the concierge service. * @param propsFactory the {@link ConnectionActorPropsFactory} + * @param commandValidator a custom command validator for connectivity commands * @return the {@link Props} to create this actor. */ public static Props props(final Duration minBackoff, @@ -106,7 +109,8 @@ public static Props props(final Duration minBackoff, final double randomFactor, final ActorRef pubSubMediator, final ActorRef conciergeForwarder, - final ConnectionActorPropsFactory propsFactory) { + final ConnectionActorPropsFactory propsFactory, + @Nullable final ConnectivityCommandInterceptor commandValidator) { return Props.create(ConnectionSupervisorActor.class, new Creator() { private static final long serialVersionUID = 1L; @@ -121,7 +125,8 @@ public ConnectionSupervisorActor create() { .match(ActorKilledException.class, e -> SupervisorStrategy.stop()) .matchAny(e -> SupervisorStrategy.escalate()) .build()), - minBackoff, maxBackoff, randomFactor, pubSubMediator, conciergeForwarder, propsFactory); + minBackoff, maxBackoff, randomFactor, pubSubMediator, conciergeForwarder, propsFactory, + commandValidator); } }); } diff --git a/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/validation/CompoundConnectivityCommandInterceptor.java b/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/validation/CompoundConnectivityCommandInterceptor.java new file mode 100644 index 0000000000..829b00cf93 --- /dev/null +++ b/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/validation/CompoundConnectivityCommandInterceptor.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2017 Bosch Software Innovations GmbH. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v2.0 + * which accompanies this distribution, and is available at + * https://www.eclipse.org/org/documents/epl-2.0/index.php + * Contributors: + * Bosch Software Innovations GmbH - initial contribution + * + */ +package org.eclipse.ditto.services.connectivity.messaging.validation; + +import java.util.Arrays; +import java.util.Collection; +import java.util.function.Consumer; + +import org.eclipse.ditto.signals.commands.connectivity.ConnectivityCommand; +import org.eclipse.ditto.signals.commands.connectivity.ConnectivityCommandInterceptor; + +/** + * Accepts multiple {@link ConnectivityCommandInterceptor}s and simply executes all of them. + */ +public class CompoundConnectivityCommandInterceptor implements ConnectivityCommandInterceptor { + + private final Collection>> validators; + + @SafeVarargs + public CompoundConnectivityCommandInterceptor(final Consumer>... validators) { + this.validators = Arrays.asList(validators); + } + + @Override + public void accept(final ConnectivityCommand command) { + validators.forEach(c -> c.accept(command)); + } +} diff --git a/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/validation/DittoConnectivityCommandValidator.java b/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/validation/DittoConnectivityCommandValidator.java new file mode 100644 index 0000000000..474bcd68af --- /dev/null +++ b/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/validation/DittoConnectivityCommandValidator.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2017 Bosch Software Innovations GmbH. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v2.0 + * which accompanies this distribution, and is available at + * https://www.eclipse.org/org/documents/epl-2.0/index.php + * Contributors: + * Bosch Software Innovations GmbH - initial contribution + * + */ +package org.eclipse.ditto.services.connectivity.messaging.validation; + +import org.eclipse.ditto.model.connectivity.Connection; +import org.eclipse.ditto.services.connectivity.messaging.ConnectionActorPropsFactory; +import org.eclipse.ditto.signals.commands.connectivity.ConnectivityCommand; +import org.eclipse.ditto.signals.commands.connectivity.ConnectivityCommandInterceptor; +import org.eclipse.ditto.signals.commands.connectivity.modify.CreateConnection; +import org.eclipse.ditto.signals.commands.connectivity.modify.ModifyConnection; +import org.eclipse.ditto.signals.commands.connectivity.modify.TestConnection; + +import akka.actor.ActorRef; + +/** + * Checks if the given {@link ConnectivityCommand} is valid by trying to create the client actor props. + */ +public class DittoConnectivityCommandValidator implements ConnectivityCommandInterceptor { + + private final ConnectionActorPropsFactory propsFactory; + private final ActorRef conciergeForwarder; + + public DittoConnectivityCommandValidator( + final ConnectionActorPropsFactory propsFactory, final ActorRef conciergeForwarder) { + this.propsFactory = propsFactory; + this.conciergeForwarder = conciergeForwarder; + } + + @Override + public void accept(final ConnectivityCommand command) { + switch (command.getType()) { + case CreateConnection.TYPE: + case TestConnection.TYPE: + case ModifyConnection.TYPE: + final Connection connection = getConnectionFromCommand(command); + if (connection != null) { + propsFactory.getActorPropsForType(connection, conciergeForwarder); + } + break; + default: //nothing to validate for other commands + } + } +} \ No newline at end of file diff --git a/services/connectivity/messaging/src/test/java/org/eclipse/ditto/services/connectivity/messaging/ConnectionActorTest.java b/services/connectivity/messaging/src/test/java/org/eclipse/ditto/services/connectivity/messaging/ConnectionActorTest.java index 3f36a93bcd..a53ef38dcf 100644 --- a/services/connectivity/messaging/src/test/java/org/eclipse/ditto/services/connectivity/messaging/ConnectionActorTest.java +++ b/services/connectivity/messaging/src/test/java/org/eclipse/ditto/services/connectivity/messaging/ConnectionActorTest.java @@ -12,6 +12,7 @@ package org.eclipse.ditto.services.connectivity.messaging; import static org.assertj.core.api.Assertions.assertThat; +import static org.eclipse.ditto.services.connectivity.messaging.MockConnectionActor.mockConnectionActorPropsFactory; import java.util.Collections; import java.util.Set; @@ -26,6 +27,7 @@ import org.eclipse.ditto.services.utils.test.Retry; import org.eclipse.ditto.signals.base.Signal; import org.eclipse.ditto.signals.commands.connectivity.exceptions.ConnectionNotAccessibleException; +import org.eclipse.ditto.signals.commands.connectivity.exceptions.ConnectionUnavailableException; import org.eclipse.ditto.signals.commands.connectivity.modify.CloseConnection; import org.eclipse.ditto.signals.commands.connectivity.modify.CloseConnectionResponse; import org.eclipse.ditto.signals.commands.connectivity.modify.CreateConnection; @@ -274,7 +276,7 @@ public void exceptionDuringClientActorPropsCreation() { (connection, conciergeForwarder) -> { throw ConnectionConfigurationInvalidException.newBuilder("validation failed...") .build(); - }); + }, null); // create another actor because this it is stopped and we want to test if the child is terminated final TestKit parent = new TestKit(actorSystem); final ActorRef connectionActorRef = watch(parent.childActorOf(connectionActorProps)); @@ -287,6 +289,38 @@ public void exceptionDuringClientActorPropsCreation() { final Exception exception = parent.expectMsgClass(ConnectionConfigurationInvalidException.class); assertThat(exception).hasMessageContaining("validation failed..."); + // expect the connection actor is terminated + expectTerminated(connectionActorRef); + }}; + } + + @Test + public void exceptionDueToCustomValidator() { + new TestKit(actorSystem) {{ + final Props connectionActorProps = + ConnectionActor.props(TestConstants.createRandomConnectionId(), pubSubMediator, + conciergeForwarder, mockConnectionActorPropsFactory, + command -> { + throw ConnectionUnavailableException.newBuilder(connectionId) + .dittoHeaders(command.getDittoHeaders()) + .message("not valid") + .build(); + }); + + // create another actor because we want to test if the child is terminated + final TestKit parent = new TestKit(actorSystem); + final ActorRef connectionActorRef = watch(parent.childActorOf(connectionActorProps)); + + // create connection + connectionActorRef.tell(createConnection, parent.getRef()); + parent.expectMsgClass(ConnectionSupervisorActor.ManualReset.class); // is sent after "empty" recovery + + // expect ConnectionUnavailableException sent to parent + final ConnectionUnavailableException exception = + parent.expectMsgClass(ConnectionUnavailableException.class); + assertThat(exception).hasMessageContaining("not valid"); + + // expect the connection actor is terminated expectTerminated(connectionActorRef); }}; } diff --git a/services/connectivity/messaging/src/test/java/org/eclipse/ditto/services/connectivity/messaging/TestConstants.java b/services/connectivity/messaging/src/test/java/org/eclipse/ditto/services/connectivity/messaging/TestConstants.java index 27a778ef23..3c9feec1e5 100644 --- a/services/connectivity/messaging/src/test/java/org/eclipse/ditto/services/connectivity/messaging/TestConstants.java +++ b/services/connectivity/messaging/src/test/java/org/eclipse/ditto/services/connectivity/messaging/TestConstants.java @@ -163,7 +163,7 @@ static ActorRef createConnectionSupervisorActor(final String connectionId, final final Duration maxBackoff = Duration.ofSeconds(5); final Double randomFactor = 1.0; final Props props = ConnectionSupervisorActor.props(minBackoff, maxBackoff, randomFactor, pubSubMediator, - conciergeForwarder, connectionActorPropsFactory); + conciergeForwarder, connectionActorPropsFactory, null); final int maxAttemps = 5; final long backoffMs = 1000L; diff --git a/services/connectivity/messaging/src/test/java/org/eclipse/ditto/services/connectivity/messaging/amqp/AmqpClientActorTest.java b/services/connectivity/messaging/src/test/java/org/eclipse/ditto/services/connectivity/messaging/amqp/AmqpClientActorTest.java index 32ff613bc0..0da82150dd 100644 --- a/services/connectivity/messaging/src/test/java/org/eclipse/ditto/services/connectivity/messaging/amqp/AmqpClientActorTest.java +++ b/services/connectivity/messaging/src/test/java/org/eclipse/ditto/services/connectivity/messaging/amqp/AmqpClientActorTest.java @@ -28,6 +28,7 @@ import java.util.HashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.stream.Stream; @@ -303,7 +304,7 @@ public void testCloseConnectionFails() throws JMSException { @Test public void testConsumeMessageAndExpectForwardToConciergeForwarder() throws JMSException { - testConsumeMessageAndExpectForwardToConciergeForwarder(connection, + testConsumeMessageAndExpectForwardToConciergeForwarder(connection, 1, c -> assertThat(c.getDittoHeaders().getAuthorizationContext()).isEqualTo( TestConstants.Authorization.AUTHORIZATION_CONTEXT)); } @@ -313,9 +314,25 @@ public void testConsumeMessageForSourcesWithSameAddress() throws JMSException { final Connection connection = TestConstants.createConnection(connectionId, actorSystem, TestConstants.Sources.SOURCES_WITH_SAME_ADDRESS); - testConsumeMessageAndExpectForwardToConciergeForwarder(connection, - c -> assertThat(c.getDittoHeaders().getAuthorizationContext()).isEqualTo( - TestConstants.Authorization.SOURCE_SPECIFIC_CONTEXT)); + + final AtomicBoolean messageReceivedForGlobalContext = new AtomicBoolean(false); + final AtomicBoolean messageReceivedForSourceContext = new AtomicBoolean(false); + + testConsumeMessageAndExpectForwardToConciergeForwarder(connection, 2, + c -> { + if (c.getDittoHeaders() + .getAuthorizationContext() + .equals(TestConstants.Authorization.SOURCE_SPECIFIC_CONTEXT)) { + messageReceivedForSourceContext.set(true); + } + if (c.getDittoHeaders() + .getAuthorizationContext() + .equals(TestConstants.Authorization.AUTHORIZATION_CONTEXT)) { + messageReceivedForGlobalContext.set(true); + } + }); + + assertThat(messageReceivedForGlobalContext.get() && messageReceivedForSourceContext.get()).isTrue(); } @Test @@ -323,13 +340,13 @@ public void testConsumeMessageAndExpectForwardToConciergeForwarderWithCorrectAut final Connection connection = TestConstants.createConnection(connectionId, actorSystem, TestConstants.Sources.SOURCES_WITH_AUTH_CONTEXT); - testConsumeMessageAndExpectForwardToConciergeForwarder(connection, + testConsumeMessageAndExpectForwardToConciergeForwarder(connection, 1, c -> assertThat(c.getDittoHeaders().getAuthorizationContext()).isEqualTo( TestConstants.Authorization.SOURCE_SPECIFIC_CONTEXT)); } private void testConsumeMessageAndExpectForwardToConciergeForwarder(final Connection connection, - final Consumer commandConsumer) throws JMSException { + final int consumers, final Consumer commandConsumer) throws JMSException { new TestKit(actorSystem) {{ final Props props = AmqpClientActor.propsForTests(connection, connectionStatus, getRef(), (ac, el) -> mockConnection); @@ -339,14 +356,17 @@ private void testConsumeMessageAndExpectForwardToConciergeForwarder(final Connec expectMsg(CONNECTED_SUCCESS); final ArgumentCaptor captor = ArgumentCaptor.forClass(MessageListener.class); - verify(mockConsumer, timeout(1000).atLeastOnce()).setMessageListener(captor.capture()); - final MessageListener messageListener = captor.getValue(); - messageListener.onMessage(mockMessage()); + verify(mockConsumer, timeout(1000).atLeast(consumers)).setMessageListener(captor.capture()); + for (final MessageListener messageListener : captor.getAllValues()) { + messageListener.onMessage(mockMessage()); + } - final Command command = expectMsgClass(Command.class); - assertThat(command.getId()).isEqualTo(TestConstants.Things.THING_ID); - assertThat(command.getDittoHeaders().getCorrelationId()).contains(TestConstants.CORRELATION_ID); - commandConsumer.accept(command); + for (int i = 0; i < consumers; i++) { + final Command command = expectMsgClass(Command.class); + assertThat(command.getId()).isEqualTo(TestConstants.Things.THING_ID); + assertThat(command.getDittoHeaders().getCorrelationId()).contains(TestConstants.CORRELATION_ID); + commandConsumer.accept(command); + } }}; } diff --git a/services/connectivity/starter/src/main/java/org/eclipse/ditto/services/connectivity/actors/ConnectivityRootActor.java b/services/connectivity/starter/src/main/java/org/eclipse/ditto/services/connectivity/actors/ConnectivityRootActor.java index 1bd6a0e6c0..b742f28699 100644 --- a/services/connectivity/starter/src/main/java/org/eclipse/ditto/services/connectivity/actors/ConnectivityRootActor.java +++ b/services/connectivity/starter/src/main/java/org/eclipse/ditto/services/connectivity/actors/ConnectivityRootActor.java @@ -23,6 +23,7 @@ import java.util.concurrent.CompletionStage; import java.util.function.Function; +import javax.annotation.Nullable; import javax.jms.JMSRuntimeException; import javax.naming.NamingException; @@ -45,6 +46,7 @@ import org.eclipse.ditto.services.utils.health.routes.StatusRoute; import org.eclipse.ditto.services.utils.persistence.mongo.MongoClientActor; import org.eclipse.ditto.signals.base.Signal; +import org.eclipse.ditto.signals.commands.connectivity.ConnectivityCommandInterceptor; import com.typesafe.config.Config; @@ -143,7 +145,8 @@ public final class ConnectivityRootActor extends AbstractActor { private ConnectivityRootActor(final ServiceConfigReader configReader, final ActorRef pubSubMediator, final ActorMaterializer materializer, - final Function, Signal> conciergeForwarderSignalTransformer) { + final Function, Signal> conciergeForwarderSignalTransformer, + @Nullable final ConnectivityCommandInterceptor commandValidator) { final Config config = configReader.getRawConfig(); final boolean healthCheckEnabled = config.getBoolean(ConfigKeys.HealthCheck.ENABLED); @@ -182,7 +185,7 @@ private ConnectivityRootActor(final ServiceConfigReader configReader, final Acto final ConnectionActorPropsFactory propsFactory = DefaultConnectionActorPropsFactory.getInstance(); final Props connectionSupervisorProps = ConnectionSupervisorActor.props(minBackoff, maxBackoff, randomFactor, pubSubMediator, - conciergeForwarder, propsFactory); + conciergeForwarder, propsFactory, commandValidator); final ClusterShardingSettings shardingSettings = ClusterShardingSettings.create(actorSystem) @@ -216,6 +219,32 @@ private ConnectivityRootActor(final ServiceConfigReader configReader, final Acto }); } + /** + * Creates Akka configuration object Props for this ConnectivityRootActor. + * + * @param configReader the configuration reader of this service. + * @param pubSubMediator the PubSub mediator Actor. + * @param materializer the materializer for the akka actor system. + * @param conciergeForwarderSignalTransformer a function which transforms signals before forwarding them to the + * concierge service + * @param commandValidator custom command validator for connectivity commands + * @return the Akka configuration Props object. + */ + public static Props props(final ServiceConfigReader configReader, final ActorRef pubSubMediator, + final ActorMaterializer materializer, + final Function, Signal> conciergeForwarderSignalTransformer, + final ConnectivityCommandInterceptor commandValidator) { + return Props.create(ConnectivityRootActor.class, new Creator() { + private static final long serialVersionUID = 1L; + + @Override + public ConnectivityRootActor create() { + return new ConnectivityRootActor(configReader, pubSubMediator, materializer, + conciergeForwarderSignalTransformer, commandValidator); + } + }); + } + /** * Creates Akka configuration object Props for this ConnectivityRootActor. * @@ -235,7 +264,7 @@ public static Props props(final ServiceConfigReader configReader, final ActorRef @Override public ConnectivityRootActor create() { return new ConnectivityRootActor(configReader, pubSubMediator, materializer, - conciergeForwarderSignalTransformer); + conciergeForwarderSignalTransformer, null); } }); } diff --git a/signals/commands/connectivity/src/main/java/org/eclipse/ditto/signals/commands/connectivity/ConnectivityCommandInterceptor.java b/signals/commands/connectivity/src/main/java/org/eclipse/ditto/signals/commands/connectivity/ConnectivityCommandInterceptor.java new file mode 100644 index 0000000000..7e04550b14 --- /dev/null +++ b/signals/commands/connectivity/src/main/java/org/eclipse/ditto/signals/commands/connectivity/ConnectivityCommandInterceptor.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2017 Bosch Software Innovations GmbH. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v2.0 + * which accompanies this distribution, and is available at + * https://www.eclipse.org/org/documents/epl-2.0/index.php + * Contributors: + * Bosch Software Innovations GmbH - initial contribution + * + */ +package org.eclipse.ditto.signals.commands.connectivity; + +import java.util.function.Consumer; + +import javax.annotation.Nullable; + +import org.eclipse.ditto.model.connectivity.Connection; +import org.eclipse.ditto.signals.commands.connectivity.modify.CreateConnection; +import org.eclipse.ditto.signals.commands.connectivity.modify.ModifyConnection; +import org.eclipse.ditto.signals.commands.connectivity.modify.TestConnection; + +/** + * Intercepts a {@link ConnectivityCommand}s and may throw a {@link org.eclipse.ditto.model.base.exceptions.DittoRuntimeException} + * if the command is invalid. + */ +public interface ConnectivityCommandInterceptor extends Consumer> { + + @Nullable + default Connection getConnectionFromCommand(final ConnectivityCommand command) { + switch (command.getType()) { + case CreateConnection.TYPE: + return ((CreateConnection) command).getConnection(); + case TestConnection.TYPE: + return ((TestConnection) command).getConnection(); + case ModifyConnection.TYPE: + return ((ModifyConnection) command).getConnection(); + default: + return null; + } + } +}