Skip to content

Commit

Permalink
Add serializer for ClientActorPropsArgs.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Oct 11, 2022
1 parent 4ce5f39 commit f5af50e
Show file tree
Hide file tree
Showing 11 changed files with 299 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory;
import org.eclipse.ditto.json.JsonObject;

import com.typesafe.config.Config;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.ExtendedActorSystem;
import akka.serialization.ByteBufferSerializer;
import akka.serialization.Serialization;
import akka.serialization.SerializationExtension;
import akka.serialization.SerializerWithStringManifest;
import akka.serialization.Serializers;

/**
* Serializer of {@link org.eclipse.ditto.connectivity.service.messaging.ClientActorPropsArgsSerializer}.
*/
public final class ClientActorPropsArgsSerializer
extends SerializerWithStringManifest implements ByteBufferSerializer {

private static final int UNIQUE_IDENTIFIER = 597861065;
private static final Charset CHARSET = StandardCharsets.UTF_8;

private final ActorSystem actorSystem;
@Nullable private Serialization serialization;

/**
* Constructs a new sharded binary serializer.
*
* @param actorSystem the ExtendedActorSystem to use for serialization of wrapped messages.
*/
public ClientActorPropsArgsSerializer(final ExtendedActorSystem actorSystem) {
this.actorSystem = actorSystem;
}

@Override
public int identifier() {
return UNIQUE_IDENTIFIER;
}

@Override
public String manifest(final Object o) {
return ClientActorPropsArgs.class.getSimpleName();
}

@Override
public void toBinary(final Object o, final ByteBuffer buf) {
buf.put(toBinary(o));
}

@Override
public byte[] toBinary(final Object o) {
final var args = (ClientActorPropsArgs) o;
final var connection = new Field(args.connection().toJsonString());
final var commandForwarder = toFieldWithManifest(args.commandForwarderActor());
final var connectionActor = toField(args.connectionActor());
final var dittoHeaders = new Field(args.dittoHeaders().toJsonString());
final var connectivityConfigOverwrites = toFieldWithManifest(args.connectivityConfigOverwrites());

final var buffer = ByteBuffer.allocate(
connection.length() +
commandForwarder.length() +
connectionActor.length() +
dittoHeaders.length() +
connectivityConfigOverwrites.length()
);

connection.write(buffer);
commandForwarder.write(buffer);
connectionActor.write(buffer);
dittoHeaders.write(buffer);
connectivityConfigOverwrites.write(buffer);
return buffer.array();
}

@Override
public Object fromBinary(final byte[] bytes, final String manifest) {
return fromBinary(ByteBuffer.wrap(bytes), manifest);
}

@Override
public Object fromBinary(final ByteBuffer buf, final String manifest) {
final var connection = Field.read(buf);
final var commandForwarder = FieldWithManifest.read(buf);
final var connectionActor = Field.read(buf);
final var dittoHeaders = Field.read(buf);
final var connectivityConfigOverwrites = FieldWithManifest.read(buf);
return new ClientActorPropsArgs(
ConnectivityModelFactory.connectionFromJson(JsonObject.of(connection.asString())),
toActorRef(commandForwarder, commandForwarder.value()),
toActorRef(commandForwarder, connectionActor),
DittoHeaders.newBuilder(JsonObject.of(dittoHeaders.asString())).build(),
toConfig(connectivityConfigOverwrites)
);
}

private Serialization getSerialization() {
if (serialization == null) {
serialization = SerializationExtension.get(actorSystem);
}
return serialization;
}

private ActorRef toActorRef(final FieldWithManifest meta, final Field value) {
return (ActorRef) getSerialization().deserialize(value.bytes(), meta.id(), meta.manifest().asString()).get();
}

private Config toConfig(final FieldWithManifest field) {
return (Config) getSerialization()
.deserialize(field.value().bytes(), field.id(), field.manifest().asString()).get();
}

private Field toField(final Object value) {
return new Field(getSerialization().serialize(value).get());
}

private FieldWithManifest toFieldWithManifest(final Object value) {
final var serialization = getSerialization();
final var serializer = serialization.findSerializerFor(value);
final int id = serializer.identifier();
final var manifest = new Field(Serializers.manifestFor(serializer, value));
final var valueField = new Field(serialization.serialize(value).get());
return new FieldWithManifest(id, manifest, valueField);
}

private record Field(byte[] bytes) {

private Field(final String string) {
this(string.getBytes(CHARSET));
}

private void write(final ByteBuffer buffer) {
buffer.putInt(bytes.length);
buffer.put(bytes);
}

private String asString() {
return new String(bytes, CHARSET);
}

private int length() {
return 4 + bytes.length;
}

private static Field read(final ByteBuffer buffer) {
final var bytes = new byte[buffer.getInt()];
buffer.get(bytes);
return new Field(bytes);
}
}

private record FieldWithManifest(int id, Field manifest, Field value) {

private void write(final ByteBuffer buffer) {
buffer.putInt(id);
manifest.write(buffer);
value.write(buffer);
}

private static FieldWithManifest read(final ByteBuffer buffer) {
final var id = buffer.getInt();
final var manifest = Field.read(buffer);
final var value = Field.read(buffer);
return new FieldWithManifest(id, manifest, value);
}

private int length() {
return 4 + manifest.length() + value.length();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.internal.utils.cluster.ShardedBinaryEnvelope;
import org.eclipse.ditto.internal.utils.cluster.StopShardedActor;
import org.eclipse.ditto.internal.utils.config.InstanceIdentifierSupplier;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;
Expand Down Expand Up @@ -138,7 +139,6 @@
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.Patterns;
import akka.persistence.RecoveryCompleted;
import akka.routing.ConsistentHashingRouter;

/**
* Handles {@code *Connection} commands and manages the persistence of connection. The actual connection handling to the
Expand Down Expand Up @@ -925,14 +925,14 @@ private CompletionStage<Object> startAndAskClientActors(final SignalWithEntityId
return askAllClientActors(cmd);
}

private static Object consistentHashableEnvelope(final Object message, final ClientActorId clientActorId) {
return new ConsistentHashingRouter.ConsistentHashableEnvelope(message, clientActorId.toString());
private static Object shardedBinaryEnvelope(final Object message, final ClientActorId clientActorId) {
return new ShardedBinaryEnvelope(message, clientActorId.toString());
}

private void broadcastToClientActors(final Object cmd, final ActorRef sender) {
for (int i = 0; i < getClientCount(); ++i) {
final var clientActorId = new ClientActorId(entityId, i);
final var envelope = consistentHashableEnvelope(cmd, clientActorId);
final var envelope = shardedBinaryEnvelope(cmd, clientActorId);
clientShardRegion.tell(envelope, sender);
}
}
Expand All @@ -944,7 +944,7 @@ private CompletionStage<Object> askAllClientActors(final Signal<?> cmd) {
CompletionStage<Object> askFuture = CompletableFuture.completedStage(null);
for (int i = 0; i < getClientCount(); ++i) {
final var clientActorId = new ClientActorId(entityId, i);
final var envelope = consistentHashableEnvelope(cmd, clientActorId);
final var envelope = shardedBinaryEnvelope(cmd, clientActorId);
askFuture = askFuture.thenCombine(
processClientAskResult(Patterns.ask(clientShardRegion, envelope, clientActorAskTimeout)),
(left, right) -> right
Expand Down
3 changes: 3 additions & 0 deletions connectivity/service/src/main/resources/connectivity.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1121,4 +1121,7 @@ blocked-namespaces-dispatcher {
throughput = 5
}

akka.actor.serializers.client-actor-props = "org.eclipse.ditto.connectivity.service.messaging.ClientActorPropsArgsSerializer"
akka.actor.serialization-bindings."org.eclipse.ditto.connectivity.service.messaging.ClientActorPropsArgs" = "client-actor-props"

include "connectivity-extension.conf"
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging;

import static org.assertj.core.api.Assertions.assertThat;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.junit.After;
import org.junit.Test;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import akka.actor.ActorSystem;
import akka.serialization.SerializationExtension;
import akka.serialization.Serializers;
import akka.testkit.TestProbe;

/**
* Tests {@link ClientActorPropsArgsSerializer}.
*/
public final class ClientActorPropsArgsSerializerTest {

private final Config config = ConfigFactory.parseString("""
akka.actor {
enable-additional-serialization-bindings = on
serializers {
client-actor-props = "org.eclipse.ditto.connectivity.service.messaging.ClientActorPropsArgsSerializer"
}
serialization-bindings {
"org.eclipse.ditto.connectivity.service.messaging.ClientActorPropsArgs" = "client-actor-props"
}
}
""");

private final ActorSystem actorSystem = ActorSystem.create("ClientActorPropsArgsSerializerTest", config);

@After
public void termiante() {
actorSystem.terminate();
}

@Test
public void serializeClientActorPropsArgs() {
final var serialization = SerializationExtension.get(actorSystem);

final var original = new ClientActorPropsArgs(
TestConstants.createConnection().toBuilder().lifecycle(null).build(),
TestProbe.apply(actorSystem).testActor(),
TestProbe.apply(actorSystem).testActor(),
DittoHeaders.newBuilder().randomCorrelationId().build(),
ConfigFactory.parseString("dummy-key = 'dummy-value'")
);

final var serializer = serialization.findSerializerFor(original);
final int id = serializer.identifier();
final var manifest = Serializers.manifestFor(serializer, original);
final var bytes = serialization.serialize(original).get();

final var deserialized = serialization.deserialize(bytes, id, manifest).get();
assertThat(deserialized).isEqualTo(original);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@
import akka.actor.FSM;
import akka.actor.Props;
import akka.actor.Status;
import akka.cluster.Cluster;
import akka.pattern.AskTimeoutException;
import akka.testkit.TestActorRef;
import akka.testkit.TestProbe;
Expand Down Expand Up @@ -174,6 +175,7 @@ public void setUp() {
@After
public void tearDown() {
if (actorSystem != null) {
Cluster.get(actorSystem).prepareForFullClusterShutdown();
actorSystem.terminate();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.Status;
import akka.cluster.Cluster;
import akka.http.javadsl.ConnectionContext;
import akka.http.javadsl.Http;
import akka.http.javadsl.HttpsConnectionContext;
Expand Down Expand Up @@ -114,6 +115,7 @@ public void createActorSystem() {
@After
public void stopActorSystem() {
if (actorSystem != null) {
Cluster.get(actorSystem).prepareForFullClusterShutdown();
actorSystem.terminate();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.Status;
import akka.cluster.Cluster;
import akka.testkit.TestProbe;
import akka.testkit.javadsl.TestKit;
import scala.concurrent.duration.FiniteDuration;
Expand Down Expand Up @@ -102,7 +103,10 @@ private static void startMockServer() {

@AfterClass
public static void tearDown() {
actorSystem.terminate();
if (actorSystem != null) {
Cluster.get(actorSystem).prepareForFullClusterShutdown();
actorSystem.terminate();
}
stopMockServer();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.Status;
import akka.cluster.Cluster;
import akka.testkit.CallingThreadDispatcher;
import akka.testkit.javadsl.TestKit;

Expand Down Expand Up @@ -105,7 +106,10 @@ public void setUp() {

@After
public void tearDown() {
actorSystem.terminate();
if (actorSystem != null) {
Cluster.get(actorSystem).prepareForFullClusterShutdown();
actorSystem.terminate();
}
}

@Before
Expand Down
Loading

0 comments on commit f5af50e

Please sign in to comment.