Skip to content

Commit

Permalink
refactor validation of connectivity commands to be extensible
Browse files Browse the repository at this point in the history
Signed-off-by: Dominik Guggemos <dominik.guggemos@bosch-si.com>
  • Loading branch information
dguggemos committed Jun 21, 2018
1 parent c6917a5 commit 13aa6bd
Show file tree
Hide file tree
Showing 10 changed files with 277 additions and 52 deletions.
Expand Up @@ -25,7 +25,7 @@
import org.eclipse.ditto.model.base.headers.DittoHeaders;

/**
* Thrown if a placeholder in the connection configuration could be resolved.
* Thrown if a placeholder in the connection configuration could not be resolved.
*/
@Immutable
public final class UnresolvedPlaceholderException extends DittoRuntimeException
Expand Down
Expand Up @@ -37,6 +37,8 @@
import org.eclipse.ditto.model.connectivity.Target;
import org.eclipse.ditto.model.connectivity.Topic;
import org.eclipse.ditto.services.connectivity.messaging.persistence.ConnectionMongoSnapshotAdapter;
import org.eclipse.ditto.services.connectivity.messaging.validation.CompoundConnectivityCommandInterceptor;
import org.eclipse.ditto.services.connectivity.messaging.validation.DittoConnectivityCommandValidator;
import org.eclipse.ditto.services.connectivity.util.ConfigKeys;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.services.utils.persistence.SnapshotAdapter;
Expand Down Expand Up @@ -101,7 +103,7 @@
* Handles {@code *Connection} commands and manages the persistence of connection. The actual connection handling to the
* remote server is delegated to a child actor that uses a specific client (AMQP 1.0 or 0.9.1).
*/
final class ConnectionActor extends AbstractPersistentActor {
public final class ConnectionActor extends AbstractPersistentActor {

private static final String PERSISTENCE_ID_PREFIX = "connection:";

Expand All @@ -120,6 +122,7 @@ final class ConnectionActor extends AbstractPersistentActor {
private final long snapshotThreshold;
private final SnapshotAdapter<Connection> snapshotAdapter;
private final ConnectionActorPropsFactory propsFactory;
private final Consumer<ConnectivityCommand<?>> commandValidator;
private final Receive connectionCreatedBehaviour;

@Nullable private ActorRef clientActor;
Expand All @@ -132,11 +135,20 @@ final class ConnectionActor extends AbstractPersistentActor {
private Set<Topic> uniqueTopicPaths = Collections.emptySet();

private ConnectionActor(final String connectionId, final ActorRef pubSubMediator,
final ActorRef conciergeForwarder, final ConnectionActorPropsFactory propsFactory) {
final ActorRef conciergeForwarder, final ConnectionActorPropsFactory propsFactory,
@Nullable final Consumer<ConnectivityCommand<?>> customCommandValidator) {
this.connectionId = connectionId;
this.pubSubMediator = pubSubMediator;
this.conciergeForwarder = conciergeForwarder;
this.propsFactory = propsFactory;
final DittoConnectivityCommandValidator
dittoCommandValidator = new DittoConnectivityCommandValidator(propsFactory, conciergeForwarder);
if (customCommandValidator != null) {
this.commandValidator =
new CompoundConnectivityCommandInterceptor(dittoCommandValidator, customCommandValidator);
} else {
this.commandValidator = dittoCommandValidator;
}

final Config config = getContext().system().settings().config();
snapshotThreshold = config.getLong(ConfigKeys.Connection.SNAPSHOT_THRESHOLD);
Expand All @@ -154,13 +166,15 @@ private ConnectionActor(final String connectionId, final ActorRef pubSubMediator
* @return the Akka configuration Props object
*/
public static Props props(final String connectionId, final ActorRef pubSubMediator,
final ActorRef conciergeForwarder, final ConnectionActorPropsFactory propsFactory) {
final ActorRef conciergeForwarder, final ConnectionActorPropsFactory propsFactory,
@Nullable final Consumer<ConnectivityCommand<?>> commandValidator) {
return Props.create(ConnectionActor.class, new Creator<ConnectionActor>() {
private static final long serialVersionUID = 1L;

@Override
public ConnectionActor create() {
return new ConnectionActor(connectionId, pubSubMediator, conciergeForwarder, propsFactory);
return new ConnectionActor(connectionId, pubSubMediator, conciergeForwarder, propsFactory,
commandValidator);
}
});
}
Expand Down Expand Up @@ -237,9 +251,11 @@ public Receive createReceiveRecover() {

@Override
public Receive createReceive() {

return ReceiveBuilder.create()
.match(TestConnection.class, this::testConnection)
.match(CreateConnection.class, this::createConnection)
.match(TestConnection.class, testConnection -> validateAndForward(testConnection, this::testConnection))
.match(CreateConnection.class,
createConnection -> validateAndForward(createConnection, this::createConnection))
.match(ConnectivityCommand.class, this::handleCommandDuringInitialization)
.match(Shutdown.class, shutdown -> stopSelf())
.match(Status.Failure.class, f -> log.warning("Got failure in initial behaviour with cause {}: {}",
Expand All @@ -266,7 +282,8 @@ private Receive createConnectionCreatedBehaviour() {
.build();
getSender().tell(conflictException, getSelf());
})
.match(ModifyConnection.class, this::modifyConnection)
.match(ModifyConnection.class,
modifyConnection -> validateAndForward(modifyConnection, this::modifyConnection))
.match(OpenConnection.class, this::openConnection)
.match(CloseConnection.class, this::closeConnection)
.match(DeleteConnection.class, this::deleteConnection)
Expand Down Expand Up @@ -322,10 +339,6 @@ private void handleSignal(final Signal<?> signal) {

private void testConnection(final TestConnection command) {
final ActorRef origin = getSender();
if (!isConnectionConfigurationValid(command.getConnection(), origin)) {
return;
}

connection = command.getConnection();

askClientActor(command, response -> {
Expand All @@ -342,11 +355,19 @@ private void testConnection(final TestConnection command) {
});
}

private void createConnection(final CreateConnection command) {
private <T extends ConnectivityCommand> void validateAndForward(final T command, final Consumer<T> target) {
final ActorRef origin = getSender();
if (!isConnectionConfigurationValid(command.getConnection(), origin)) {
return;
try {
commandValidator.accept(command);
target.accept(command);
} catch (final Exception e) {
handleException(command.getType(), origin, e);
stopSelf();
}
}

private void createConnection(final CreateConnection command) {
final ActorRef origin = getSender();

final ConnectionCreated connectionCreated =
ConnectionCreated.of(command.getConnection(), command.getDittoHeaders());
Expand All @@ -370,23 +391,8 @@ private void createConnection(final CreateConnection command) {
});
}

private boolean isConnectionConfigurationValid(final Connection connection, final ActorRef origin) {
try {
// try to create actor props before persisting the connection to fail early
propsFactory.getActorPropsForType(connection, conciergeForwarder);
return true;
} catch (final Exception e) {
handleException("connect", origin, e);
stopSelf();
return false;
}
}

private void modifyConnection(final ModifyConnection command) {
final ActorRef origin = getSender();
if (!isConnectionConfigurationValid(command.getConnection(), origin)) {
return;
}

if (connection != null && !connection.getConnectionType().equals(command.getConnection().getConnectionType())) {
handleException("modify", origin, ConnectionConfigurationInvalidException
Expand Down
Expand Up @@ -25,6 +25,7 @@

import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.signals.commands.connectivity.ConnectivityCommandInterceptor;
import org.eclipse.ditto.signals.commands.connectivity.exceptions.ConnectionUnavailableException;

import akka.actor.AbstractActor;
Expand Down Expand Up @@ -70,7 +71,8 @@ private ConnectionSupervisorActor(final SupervisorStrategy supervisorStrategy,
final double randomFactor,
final ActorRef pubSubMediator,
final ActorRef conciergeForwarder,
final ConnectionActorPropsFactory propsFactory) {
final ConnectionActorPropsFactory propsFactory,
@Nullable final ConnectivityCommandInterceptor commandValidator) {
try {
this.connectionId = URLDecoder.decode(getSelf().path().name(), StandardCharsets.UTF_8.name());
} catch (final UnsupportedEncodingException e) {
Expand All @@ -81,7 +83,7 @@ private ConnectionSupervisorActor(final SupervisorStrategy supervisorStrategy,
this.maxBackoff = maxBackoff;
this.randomFactor = randomFactor;
this.persistenceActorProps =
ConnectionActor.props(connectionId, pubSubMediator, conciergeForwarder, propsFactory);
ConnectionActor.props(connectionId, pubSubMediator, conciergeForwarder, propsFactory, commandValidator);
}

/**
Expand All @@ -99,14 +101,16 @@ private ConnectionSupervisorActor(final SupervisorStrategy supervisorStrategy,
* @param pubSubMediator the PubSub mediator actor.
* @param conciergeForwarder the actor used to send signals to the concierge service.
* @param propsFactory the {@link ConnectionActorPropsFactory}
* @param commandValidator a custom command validator for connectivity commands
* @return the {@link Props} to create this actor.
*/
public static Props props(final Duration minBackoff,
final Duration maxBackoff,
final double randomFactor,
final ActorRef pubSubMediator,
final ActorRef conciergeForwarder,
final ConnectionActorPropsFactory propsFactory) {
final ConnectionActorPropsFactory propsFactory,
@Nullable final ConnectivityCommandInterceptor commandValidator) {

return Props.create(ConnectionSupervisorActor.class, new Creator<ConnectionSupervisorActor>() {
private static final long serialVersionUID = 1L;
Expand All @@ -121,7 +125,8 @@ public ConnectionSupervisorActor create() {
.match(ActorKilledException.class, e -> SupervisorStrategy.stop())
.matchAny(e -> SupervisorStrategy.escalate())
.build()),
minBackoff, maxBackoff, randomFactor, pubSubMediator, conciergeForwarder, propsFactory);
minBackoff, maxBackoff, randomFactor, pubSubMediator, conciergeForwarder, propsFactory,
commandValidator);
}
});
}
Expand Down
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2017 Bosch Software Innovations GmbH.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* which accompanies this distribution, and is available at
* https://www.eclipse.org/org/documents/epl-2.0/index.php
* Contributors:
* Bosch Software Innovations GmbH - initial contribution
*
*/
package org.eclipse.ditto.services.connectivity.messaging.validation;

import java.util.Arrays;
import java.util.Collection;
import java.util.function.Consumer;

import org.eclipse.ditto.signals.commands.connectivity.ConnectivityCommand;
import org.eclipse.ditto.signals.commands.connectivity.ConnectivityCommandInterceptor;

/**
* Accepts multiple {@link ConnectivityCommandInterceptor}s and simply executes all of them.
*/
public class CompoundConnectivityCommandInterceptor implements ConnectivityCommandInterceptor {

private final Collection<Consumer<ConnectivityCommand<?>>> validators;

@SafeVarargs
public CompoundConnectivityCommandInterceptor(final Consumer<ConnectivityCommand<?>>... validators) {
this.validators = Arrays.asList(validators);
}

@Override
public void accept(final ConnectivityCommand<?> command) {
validators.forEach(c -> c.accept(command));
}
}
@@ -0,0 +1,52 @@
/*
* Copyright (c) 2017 Bosch Software Innovations GmbH.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* which accompanies this distribution, and is available at
* https://www.eclipse.org/org/documents/epl-2.0/index.php
* Contributors:
* Bosch Software Innovations GmbH - initial contribution
*
*/
package org.eclipse.ditto.services.connectivity.messaging.validation;

import org.eclipse.ditto.model.connectivity.Connection;
import org.eclipse.ditto.services.connectivity.messaging.ConnectionActorPropsFactory;
import org.eclipse.ditto.signals.commands.connectivity.ConnectivityCommand;
import org.eclipse.ditto.signals.commands.connectivity.ConnectivityCommandInterceptor;
import org.eclipse.ditto.signals.commands.connectivity.modify.CreateConnection;
import org.eclipse.ditto.signals.commands.connectivity.modify.ModifyConnection;
import org.eclipse.ditto.signals.commands.connectivity.modify.TestConnection;

import akka.actor.ActorRef;

/**
* Checks if the given {@link ConnectivityCommand} is valid by trying to create the client actor props.
*/
public class DittoConnectivityCommandValidator implements ConnectivityCommandInterceptor {

private final ConnectionActorPropsFactory propsFactory;
private final ActorRef conciergeForwarder;

public DittoConnectivityCommandValidator(
final ConnectionActorPropsFactory propsFactory, final ActorRef conciergeForwarder) {
this.propsFactory = propsFactory;
this.conciergeForwarder = conciergeForwarder;
}

@Override
public void accept(final ConnectivityCommand<?> command) {
switch (command.getType()) {
case CreateConnection.TYPE:
case TestConnection.TYPE:
case ModifyConnection.TYPE:
final Connection connection = getConnectionFromCommand(command);
if (connection != null) {
propsFactory.getActorPropsForType(connection, conciergeForwarder);
}
break;
default: //nothing to validate for other commands
}
}
}
Expand Up @@ -12,6 +12,7 @@
package org.eclipse.ditto.services.connectivity.messaging;

import static org.assertj.core.api.Assertions.assertThat;
import static org.eclipse.ditto.services.connectivity.messaging.MockConnectionActor.mockConnectionActorPropsFactory;

import java.util.Collections;
import java.util.Set;
Expand All @@ -26,6 +27,7 @@
import org.eclipse.ditto.services.utils.test.Retry;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.commands.connectivity.exceptions.ConnectionNotAccessibleException;
import org.eclipse.ditto.signals.commands.connectivity.exceptions.ConnectionUnavailableException;
import org.eclipse.ditto.signals.commands.connectivity.modify.CloseConnection;
import org.eclipse.ditto.signals.commands.connectivity.modify.CloseConnectionResponse;
import org.eclipse.ditto.signals.commands.connectivity.modify.CreateConnection;
Expand Down Expand Up @@ -274,7 +276,7 @@ public void exceptionDuringClientActorPropsCreation() {
(connection, conciergeForwarder) -> {
throw ConnectionConfigurationInvalidException.newBuilder("validation failed...")
.build();
});
}, null);
// create another actor because this it is stopped and we want to test if the child is terminated
final TestKit parent = new TestKit(actorSystem);
final ActorRef connectionActorRef = watch(parent.childActorOf(connectionActorProps));
Expand All @@ -287,6 +289,38 @@ public void exceptionDuringClientActorPropsCreation() {
final Exception exception = parent.expectMsgClass(ConnectionConfigurationInvalidException.class);
assertThat(exception).hasMessageContaining("validation failed...");

// expect the connection actor is terminated
expectTerminated(connectionActorRef);
}};
}

@Test
public void exceptionDueToCustomValidator() {
new TestKit(actorSystem) {{
final Props connectionActorProps =
ConnectionActor.props(TestConstants.createRandomConnectionId(), pubSubMediator,
conciergeForwarder, mockConnectionActorPropsFactory,
command -> {
throw ConnectionUnavailableException.newBuilder(connectionId)
.dittoHeaders(command.getDittoHeaders())
.message("not valid")
.build();
});

// create another actor because we want to test if the child is terminated
final TestKit parent = new TestKit(actorSystem);
final ActorRef connectionActorRef = watch(parent.childActorOf(connectionActorProps));

// create connection
connectionActorRef.tell(createConnection, parent.getRef());
parent.expectMsgClass(ConnectionSupervisorActor.ManualReset.class); // is sent after "empty" recovery

// expect ConnectionUnavailableException sent to parent
final ConnectionUnavailableException exception =
parent.expectMsgClass(ConnectionUnavailableException.class);
assertThat(exception).hasMessageContaining("not valid");

// expect the connection actor is terminated
expectTerminated(connectionActorRef);
}};
}
Expand Down
Expand Up @@ -163,7 +163,7 @@ static ActorRef createConnectionSupervisorActor(final String connectionId, final
final Duration maxBackoff = Duration.ofSeconds(5);
final Double randomFactor = 1.0;
final Props props = ConnectionSupervisorActor.props(minBackoff, maxBackoff, randomFactor, pubSubMediator,
conciergeForwarder, connectionActorPropsFactory);
conciergeForwarder, connectionActorPropsFactory, null);

final int maxAttemps = 5;
final long backoffMs = 1000L;
Expand Down

0 comments on commit 13aa6bd

Please sign in to comment.