Skip to content

Commit

Permalink
Replace consistent hashable envelope by a dedicated binary message en…
Browse files Browse the repository at this point in the history
…velope for communication between the connection persistence actor and its client actors.

Reason: Consistent hashable envelope is not serializable.

Still to do: Make all messages wrapped in ShardedBinaryEnvelope serializable, i. e., ClientActorPropsArgs.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Oct 10, 2022
1 parent 1e0c216 commit 4ce5f39
Show file tree
Hide file tree
Showing 8 changed files with 283 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging;

import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.connectivity.model.Connection;

import com.typesafe.config.Config;

import akka.actor.ActorRef;

/**
* Arguments to create a client actor props object.
*
* @param connection the connection.
* @param commandForwarderActor the actor used to send signals into the ditto cluster..
* @param connectionActor the connectionPersistenceActor which creates this client.
* @param dittoHeaders Ditto headers of the command that caused the client actors to be created.
*/
@Immutable
public record ClientActorPropsArgs(Connection connection, ActorRef commandForwarderActor, ActorRef connectionActor,
DittoHeaders dittoHeaders, Config connectivityConfigOverwrites) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.internal.utils.extension.DittoExtensionPoint;
import org.eclipse.ditto.internal.utils.extension.DittoExtensionIds;
import org.eclipse.ditto.internal.utils.extension.DittoExtensionPoint;

import com.typesafe.config.Config;

Expand Down Expand Up @@ -47,6 +47,18 @@ Props getActorPropsForType(Connection connection,
DittoHeaders dittoHeaders,
Config connectivityConfigOverwrites);

/**
* Create actor {@link Props} for a connection.
*
* @param args arguments of the client actor props.
* @param actorSystem the actorSystem.
* @return the actor props
*/
default Props getActorProps(final ClientActorPropsArgs args, final ActorSystem actorSystem) {
return getActorPropsForType(args.connection(), args.commandForwarderActor(), args.connectionActor(),
actorSystem, args.dittoHeaders(), args.connectivityConfigOverwrites());
}

/**
* Loads the implementation of {@code ClientActorPropsFactory} which is configured for the
* {@code ActorSystem}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,15 @@
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.cluster.ShardRegionExtractor;
import org.eclipse.ditto.internal.utils.cluster.ShardedBinaryEnvelope;
import org.eclipse.ditto.internal.utils.cluster.StopShardedActor;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;

import com.typesafe.config.Config;

import akka.actor.AbstractActorWithTimers;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
Expand All @@ -52,6 +57,7 @@ public final class ClientSupervisor extends AbstractActorWithTimers {
DittoHeaders.newBuilder().correlationId(clientActorId.toString()).build());
private final Duration statusCheckInterval;
private final ActorRef connectionShardRegion;
private final ClientActorPropsFactory propsFactory;
private Props props;
private ActorRef clientActor;

Expand All @@ -64,12 +70,14 @@ private ClientSupervisor(final int numberOfShards, final Duration statusCheckInt
connectionShardRegion = clusterSharding.startProxy(ConnectivityMessagingConstants.SHARD_REGION,
Optional.of(ConnectivityMessagingConstants.CLUSTER_ROLE),
extractor);
propsFactory = getClientActorPropsFactory(actorSystem);
}

// constructor for unit tests
private ClientSupervisor(final Duration statusCheckInterval, final ActorRef connectionShardRegion) {
this.statusCheckInterval = statusCheckInterval;
this.connectionShardRegion = connectionShardRegion;
propsFactory = getClientActorPropsFactory(getContext().getSystem());
}

/**
Expand Down Expand Up @@ -107,13 +115,13 @@ public void postStop() {
@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.match(Props.class, this::startClientActor)
.match(ClientActorPropsArgs.class, this::startClientActor)
.matchEquals(Control.STATUS_CHECK, this::startStatusCheck)
.match(SudoRetrieveConnectionStatusResponse.class, this::checkConnectionStatus)
.match(ConnectionNotAccessibleException.class, this::connectionNotAccessible)
.match(Terminated.class, this::childTerminated)
.match(CloseConnection.class, this::isNoClientActorStarted, this::respondAndStop)
.match(ConsistentHashingRouter.ConsistentHashableEnvelope.class, this::extractFromEnvelope)
.match(ShardedBinaryEnvelope.class, this::extractFromEnvelope)
.match(StopShardedActor.class, this::restartIfOpen)
.matchAny(this::forwardToClientActor)
.build();
Expand All @@ -127,7 +135,7 @@ public SupervisorStrategy supervisorStrategy() {
}).build());
}

private void extractFromEnvelope(final ConsistentHashingRouter.ConsistentHashableEnvelope envelope) {
private void extractFromEnvelope(final ShardedBinaryEnvelope envelope) {
getSelf().forward(envelope.message(), getContext());
}

Expand All @@ -140,10 +148,10 @@ private void childTerminated(final Terminated terminated) {
}
}

private void startClientActor(final Props props) {
private void startClientActor(final ClientActorPropsArgs propsArgs) {
final var props = propsFactory.getActorProps(propsArgs, getContext().getSystem());
if (props.equals(this.props)) {
logger.debug("Refreshing props");
Object x = props;
} else {
final var oldClientActor = clientActor;
if (oldClientActor != null) {
Expand Down Expand Up @@ -215,6 +223,11 @@ private void restartIfOpen(final StopShardedActor stopShardedActor) {
}
}

private static ClientActorPropsFactory getClientActorPropsFactory(final ActorSystem actorSystem) {
final Config dittoExtensionConfig = ScopedConfig.dittoExtension(actorSystem.settings().config());
return ClientActorPropsFactory.get(actorSystem, dittoExtensionConfig);
}

private enum Control {
STATUS_CHECK
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.eclipse.ditto.connectivity.service.config.MonitoringConfig;
import org.eclipse.ditto.connectivity.service.config.MqttConfig;
import org.eclipse.ditto.connectivity.service.messaging.ClientActorId;
import org.eclipse.ditto.connectivity.service.messaging.ClientActorPropsArgs;
import org.eclipse.ditto.connectivity.service.messaging.ClientActorPropsFactory;
import org.eclipse.ditto.connectivity.service.messaging.amqp.AmqpValidator;
import org.eclipse.ditto.connectivity.service.messaging.httppush.HttpPushValidator;
Expand Down Expand Up @@ -911,9 +912,9 @@ private void respondWithEmptyLogs(final WithDittoHeaders command, final ActorRef
}

private CompletionStage<Object> startAndAskClientActorForTest(final TestConnection cmd) {
final Props props = propsFactory.getActorPropsForType(cmd.getConnection(), commandForwarderActor, getSelf(),
getContext().getSystem(), cmd.getDittoHeaders(), connectivityConfigOverwrites);
final ActorRef clientActor = getContext().actorOf(props);
final var args = new ClientActorPropsArgs(cmd.getConnection(), commandForwarderActor, getSelf(),
cmd.getDittoHeaders(), connectivityConfigOverwrites);
final ActorRef clientActor = getContext().actorOf(propsFactory.getActorProps(args, getContext().getSystem()));
final var resultFuture = processClientAskResult(Patterns.ask(clientActor, cmd, clientActorAskTimeout));
resultFuture.whenComplete((result, error) -> clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender()));
return resultFuture;
Expand All @@ -924,14 +925,14 @@ private CompletionStage<Object> startAndAskClientActors(final SignalWithEntityId
return askAllClientActors(cmd);
}

private static Object consistentHashableEnvelope(final Object message, final Object hashKey) {
return new ConsistentHashingRouter.ConsistentHashableEnvelope(message, hashKey);
private static Object consistentHashableEnvelope(final Object message, final ClientActorId clientActorId) {
return new ConsistentHashingRouter.ConsistentHashableEnvelope(message, clientActorId.toString());
}

private void broadcastToClientActors(final Object cmd, final ActorRef sender) {
for (int i = 0; i < getClientCount(); ++i) {
final var clientActorId = new ClientActorId(entityId, i);
final var envelope = consistentHashableEnvelope(cmd, clientActorId.toString());
final var envelope = consistentHashableEnvelope(cmd, clientActorId);
clientShardRegion.tell(envelope, sender);
}
}
Expand All @@ -943,7 +944,7 @@ private CompletionStage<Object> askAllClientActors(final Signal<?> cmd) {
CompletionStage<Object> askFuture = CompletableFuture.completedStage(null);
for (int i = 0; i < getClientCount(); ++i) {
final var clientActorId = new ClientActorId(entityId, i);
final var envelope = consistentHashableEnvelope(cmd, clientActorId.toString());
final var envelope = consistentHashableEnvelope(cmd, clientActorId);
askFuture = askFuture.thenCombine(
processClientAskResult(Patterns.ask(clientShardRegion, envelope, clientActorAskTimeout)),
(left, right) -> right
Expand Down Expand Up @@ -1067,9 +1068,9 @@ public SupervisorStrategy supervisorStrategy() {
private void startClientActors(final int clientCount, final DittoHeaders dittoHeaders) {
if (entity != null && clientCount > 0) {
log.info("Starting ClientActor for connection <{}> with <{}> clients.", entityId, clientCount);
final Props props = propsFactory.getActorPropsForType(entity, commandForwarderActor, getSelf(),
getContext().getSystem(), dittoHeaders, connectivityConfigOverwrites);
broadcastToClientActors(props, ActorRef.noSender());
final var args = new ClientActorPropsArgs(entity, commandForwarderActor, getSelf(), dittoHeaders,
connectivityConfigOverwrites);
broadcastToClientActors(args, ActorRef.noSender());
updateLoggingIfEnabled();
} else {
log.error(new IllegalStateException(), "Trying to start client actor without a connection");
Expand All @@ -1081,7 +1082,6 @@ private int getClientCount() {
}

private void stopClientActors() {
// TODO: verify poison pill is serializable
log.debug("Stopping the client actors.");
broadcastToClientActors(PoisonPill.getInstance(), ActorRef.noSender());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

import akka.actor.ActorSystem;
import akka.cluster.sharding.ShardRegion;
import akka.routing.ConsistentHashingRouter;

/**
* Implementation of {@link ShardRegion.MessageExtractor} which does a {@code hashCode} based sharding with the
Expand Down Expand Up @@ -82,15 +81,14 @@ public String entityId(final Object message) {
result = entityId.toString();
} else if (message instanceof ShardRegion.StartEntity startEntity) {
result = startEntity.entityId();
} else if (message instanceof ConsistentHashingRouter.ConsistentHashableEnvelope envelope) {
result = envelope.hashKey().toString();
} else if (message instanceof ShardedBinaryEnvelope envelope) {
result = envelope.entityName();
} else {
result = null;
}
return result;
}

@Nullable
@Override
public Object entityMessage(final Object message) {
final Object entity;
Expand All @@ -102,7 +100,7 @@ public Object entityMessage(final Object message) {
} else if (message instanceof ShardedMessageEnvelope shardedMessageEnvelope) {
// message was sent from the same cluster node
entity = createJsonifiableFrom(shardedMessageEnvelope);
} else if (message instanceof ConsistentHashingRouter.ConsistentHashableEnvelope envelope) {
} else if (message instanceof ShardedBinaryEnvelope envelope) {
entity = envelope.message();
} else {
entity = message;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.internal.utils.cluster;

import javax.annotation.concurrent.Immutable;

/**
* Envelope to hold a binary message to a sharded entity. Serialization is performed recursively.
*
* @param message Message to the sharded entity.
* @param entityName Name of the recipient entity.
* @since 3.1.0
*/
@Immutable
public record ShardedBinaryEnvelope(Object message, String entityName) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.internal.utils.cluster;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

import javax.annotation.Nullable;

import akka.actor.ActorSystem;
import akka.actor.ExtendedActorSystem;
import akka.serialization.ByteBufferSerializer;
import akka.serialization.Serialization;
import akka.serialization.SerializationExtension;
import akka.serialization.SerializerWithStringManifest;
import akka.serialization.Serializers;

/**
* Serializer of {@link ShardedBinarySerializer}.
*/
public final class ShardedBinarySerializer
extends SerializerWithStringManifest implements ByteBufferSerializer {

private static final int UNIQUE_IDENTIFIER = 1259836351;
private static final Charset CHARSET = StandardCharsets.UTF_8;

private final ActorSystem actorSystem;
@Nullable private Serialization serialization;

/**
* Constructs a new sharded binary serializer.
*
* @param actorSystem the ExtendedActorSystem to use for serialization of wrapped messages.
*/
public ShardedBinarySerializer(final ExtendedActorSystem actorSystem) {
this.actorSystem = actorSystem;
}

@Override
public int identifier() {
return UNIQUE_IDENTIFIER;
}

@Override
public String manifest(final Object o) {
final var envelope = (ShardedBinaryEnvelope) o;
final var message = envelope.message();
return Serializers.manifestFor(getSerialization().findSerializerFor(message), message);
}

@Override
public void toBinary(final Object o, final ByteBuffer buf) {
buf.put(toBinary(o));
}

@Override
public byte[] toBinary(final Object o) {
final var envelope = (ShardedBinaryEnvelope) o;
final var message = envelope.message();
final var serialization = getSerialization();
final int serializerId = serialization.findSerializerFor(message).identifier();
final byte[] messageBytes = serialization.serialize(message).get();
final byte[] entityNameBytes = envelope.entityName().getBytes(CHARSET);
final var buffer = ByteBuffer.allocate(4 + 4 + entityNameBytes.length + messageBytes.length);
buffer.putInt(serializerId);
buffer.putInt(entityNameBytes.length);
buffer.put(entityNameBytes);
buffer.put(messageBytes);
return buffer.array();
}

@Override
public Object fromBinary(final byte[] bytes, final String manifest) {
return fromBinary(ByteBuffer.wrap(bytes), manifest);
}

@Override
public Object fromBinary(final ByteBuffer buf, final String manifest) {
final int serializerId = buf.getInt();
final int entityNameLength = buf.getInt();
final byte[] entityNameBytes = new byte[entityNameLength];
buf.get(entityNameBytes);
final byte[] messageBytes = new byte[buf.remaining()];
buf.get(messageBytes);
final var message = getSerialization().deserialize(messageBytes, serializerId, manifest).get();
final var entityName = new String(entityNameBytes, CHARSET);
return new ShardedBinaryEnvelope(message, entityName);
}

private Serialization getSerialization() {
if (serialization == null) {
serialization = SerializationExtension.get(actorSystem);
}
return serialization;
}
}
Loading

0 comments on commit 4ce5f39

Please sign in to comment.