Skip to content

Commit

Permalink
instead of RoundRobinPool for clientActorRouter use ConsistentHashing…
Browse files Browse the repository at this point in the history
…Pool

* and wrap messages to it in ConsistentHashableEnvelope with EntityId as hash key

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch-si.com>
  • Loading branch information
thjaeckle committed Dec 2, 2019
1 parent 4e51eff commit 2cefb64
Showing 1 changed file with 18 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@
import akka.pattern.Patterns;
import akka.persistence.RecoveryCompleted;
import akka.routing.Broadcast;
import akka.routing.RoundRobinPool;
import akka.routing.ConsistentHashingPool;
import akka.routing.ConsistentHashingRouter;
import akka.routing.Pool;

/**
* Handles {@code *Connection} commands and manages the persistence of connection. The actual connection handling to the
Expand Down Expand Up @@ -150,10 +152,6 @@ public final class ConnectionPersistenceActor

private final DiagnosticLoggingAdapter log = LogUtil.obtain(this);

/**
* Validator of all supported connections.
*/
private final ConnectionValidator connectionValidator;
private final DittoProtocolSub dittoProtocolSub;
private final ActorRef conciergeForwarder;
private final ClientActorPropsFactory propsFactory;
Expand Down Expand Up @@ -192,12 +190,13 @@ private ConnectionPersistenceActor(final ConnectionId connectionId,
);
config = connectivityConfig.getConnectionConfig();

connectionValidator = ConnectionValidator.of(connectivityConfig.getMappingConfig().getMapperLimitsConfig(),
RabbitMQValidator.newInstance(),
AmqpValidator.newInstance(),
MqttValidator.newInstance(),
KafkaValidator.getInstance(),
HttpPushValidator.newInstance());
final ConnectionValidator connectionValidator =
ConnectionValidator.of(connectivityConfig.getMappingConfig().getMapperLimitsConfig(),
RabbitMQValidator.newInstance(),
AmqpValidator.newInstance(),
MqttValidator.newInstance(),
KafkaValidator.getInstance(),
HttpPushValidator.newInstance());

final DittoConnectivityCommandValidator dittoCommandValidator =
new DittoConnectivityCommandValidator(propsFactory, conciergeForwarder, connectionValidator,
Expand Down Expand Up @@ -493,7 +492,9 @@ private void forwardSignalToClientActors(final Signal signal) {
subscribedAndAuthorizedTargets);

final OutboundSignal outbound = OutboundSignalFactory.newOutboundSignal(signal, subscribedAndAuthorizedTargets);
clientActorRouter.tell(outbound, getSender());
final Object msg = new ConsistentHashingRouter.ConsistentHashableEnvelope(outbound,
outbound.getSource().getEntityId().toString());
clientActorRouter.tell(msg, getSender());
}

private void prepareForSignalForwarding(final StagedCommand command) {
Expand Down Expand Up @@ -640,7 +641,9 @@ private void respondWithEmptyLogs(final RetrieveConnectionLogs command, final Ac

private CompletionStage<Object> startAndAskClientActors(final Command<?> cmd, final int clientCount) {
startClientActorsIfRequired(clientCount);
return processClientAskResult(Patterns.ask(clientActorRouter, cmd, clientActorAskTimeout));
final Object msg = new ConsistentHashingRouter.ConsistentHashableEnvelope(cmd,
cmd.getEntityId().toString());
return processClientAskResult(Patterns.ask(clientActorRouter, msg, clientActorAskTimeout));
}

private void broadcastToClientActorsIfStarted(final Command<?> cmd, final ActorRef sender) {
Expand Down Expand Up @@ -802,9 +805,9 @@ private void startClientActorsIfRequired(final int clientCount) {
final ClusterRouterPoolSettings clusterRouterPoolSettings =
new ClusterRouterPoolSettings(clientCount, 1, true,
Collections.singleton(CLUSTER_ROLE));
final RoundRobinPool roundRobinPool = new RoundRobinPool(clientCount);
final Pool pool = new ConsistentHashingPool(clientCount);
final Props clusterRouterPoolProps =
new ClusterRouterPool(roundRobinPool, clusterRouterPoolSettings).props(props);
new ClusterRouterPool(pool, clusterRouterPoolSettings).props(props);

// start client actor without name so it does not conflict with its previous incarnation
clientActorRouter = getContext().actorOf(clusterRouterPoolProps);
Expand Down

0 comments on commit 2cefb64

Please sign in to comment.