From 330794da73c96aec810cbaaa95b0e4d66318054c Mon Sep 17 00:00:00 2001 From: Stefan Maute Date: Thu, 27 Oct 2022 07:57:19 +0200 Subject: [PATCH] review: fixed some minor findings and adjusted code style; Signed-off-by: Stefan Maute --- .../sudo/SudoRetrieveConnectionStatus.java | 3 -- .../SudoRetrieveConnectionStatusResponse.java | 2 +- .../service/ConnectivityRootActor.java | 1 + .../service/messaging/BaseClientActor.java | 9 +++-- .../service/messaging/ClientActorId.java | 1 + .../ClientActorPropsArgsSerializer.java | 15 +++++++-- .../service/messaging/ClientSupervisor.java | 21 +++++++----- .../ConnectionPersistenceActor.java | 4 +-- .../SudoRetrieveConnectionStatusStrategy.java | 3 +- .../kafka/KafkaConsumerActorTest.java | 2 +- .../mqtt/hivemq/MqttClientActorTest.java | 33 ++++++++++--------- .../ConnectionPersistenceActorTest.java | 18 ---------- .../service/src/test/resources/test.conf | 4 +-- .../cluster/ShardedBinarySerializer.java | 7 +++- 14 files changed, 65 insertions(+), 58 deletions(-) diff --git a/connectivity/api/src/main/java/org/eclipse/ditto/connectivity/api/commands/sudo/SudoRetrieveConnectionStatus.java b/connectivity/api/src/main/java/org/eclipse/ditto/connectivity/api/commands/sudo/SudoRetrieveConnectionStatus.java index 063661442b..1e6f7df88c 100644 --- a/connectivity/api/src/main/java/org/eclipse/ditto/connectivity/api/commands/sudo/SudoRetrieveConnectionStatus.java +++ b/connectivity/api/src/main/java/org/eclipse/ditto/connectivity/api/commands/sudo/SudoRetrieveConnectionStatus.java @@ -27,7 +27,6 @@ import org.eclipse.ditto.base.model.signals.commands.CommandJsonDeserializer; import org.eclipse.ditto.connectivity.model.ConnectionId; import org.eclipse.ditto.connectivity.model.WithConnectionId; -import org.eclipse.ditto.json.JsonFactory; import org.eclipse.ditto.json.JsonField; import org.eclipse.ditto.json.JsonObject; import org.eclipse.ditto.json.JsonObjectBuilder; @@ -65,7 +64,6 @@ public static SudoRetrieveConnectionStatus of(final ConnectionId connectionId, f return new SudoRetrieveConnectionStatus(connectionId, dittoHeaders); } - /** * Creates a new {@code SudoRetrieveConnectionStatus} from a JSON object. * @@ -89,7 +87,6 @@ public static SudoRetrieveConnectionStatus fromJson(final JsonObject jsonObject, @Override protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder, final JsonSchemaVersion schemaVersion, final Predicate thePredicate) { - jsonObjectBuilder.set(ConnectivitySudoCommand.JsonFields.JSON_CONNECTION_ID, connectionId.toString(), schemaVersion.and(thePredicate)); } diff --git a/connectivity/api/src/main/java/org/eclipse/ditto/connectivity/api/commands/sudo/SudoRetrieveConnectionStatusResponse.java b/connectivity/api/src/main/java/org/eclipse/ditto/connectivity/api/commands/sudo/SudoRetrieveConnectionStatusResponse.java index e5c156989a..9a43c62541 100644 --- a/connectivity/api/src/main/java/org/eclipse/ditto/connectivity/api/commands/sudo/SudoRetrieveConnectionStatusResponse.java +++ b/connectivity/api/src/main/java/org/eclipse/ditto/connectivity/api/commands/sudo/SudoRetrieveConnectionStatusResponse.java @@ -36,7 +36,7 @@ /** * Response to a {@link SudoRetrieveConnectionStatus} command. * - * @since 3.0.0 + * @since 3.1.0 */ @Immutable @JsonParsableCommandResponse(type = SudoRetrieveConnectionStatusResponse.TYPE) diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/ConnectivityRootActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/ConnectivityRootActor.java index 9dcb3f15d4..adec9da5fd 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/ConnectivityRootActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/ConnectivityRootActor.java @@ -190,4 +190,5 @@ private static ActorRef startClientShardRegion(final ActorSystem actorSystem, fi return ShardRegionCreator.start(actorSystem, ConnectivityMessagingConstants.CLIENT_SHARD_REGION, props, numberOfShards, ConnectivityMessagingConstants.CLUSTER_ROLE); } + } diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActor.java index 173e123acc..81517023bc 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActor.java @@ -2050,6 +2050,7 @@ private void startSubscriptionRefreshTimer() { private Supplier> askSelfServiceUnbind() { final var shutdownTimeout = Duration.ofMinutes(2); + return () -> Patterns.ask(getSelf(), Control.SERVICE_UNBIND, shutdownTimeout).thenApply(answer -> Done.done()); } @@ -2072,7 +2073,6 @@ private FSM.State ackregatorStopped(final Contr private FSM.State serviceRequestsDone(final Control event, final BaseClientData data) { - shuttingDown = true; logger.info("{}: ackregatorCount={}", Control.STOP_SHARDED_ACTOR, ackregatorCount); if (ackregatorCount == 0) { @@ -2080,6 +2080,7 @@ private FSM.State serviceRequestsDone(final Con } else { final var shutdownTimeout = connectivityConfig.getConnectionConfig().getShutdownTimeout(); startSingleTimer(Control.SHUTDOWN_TIMEOUT.name(), Control.SHUTDOWN_TIMEOUT, shutdownTimeout); + return stay(); } } @@ -2088,6 +2089,7 @@ private FSM.State shutdownTimeout(final Control final BaseClientData data) { final var shutdownTimeout = connectivityConfig.getConnectionConfig().getShutdownTimeout(); logger.warning("Shutdown timeout <{}> reached; aborting <{}> ackregators", shutdownTimeout, ackregatorCount); + return stop(); } @@ -2098,6 +2100,7 @@ private CompletionStage sendDisconnectAnnouncementInCoordinatedShutdown() { getSelf().tell(SEND_DISCONNECT_ANNOUNCEMENT, ActorRef.noSender()); final CompletableFuture waitForAnnouncementFuture = new CompletableFuture<>(); waitForAnnouncementFuture.completeOnTimeout(null, disconnectAnnouncementTimeout, TimeUnit.MILLISECONDS); + return waitForAnnouncementFuture; } else { return CompletableFuture.completedStage(null); @@ -2189,6 +2192,7 @@ public void reset() { @Override public Duration getNextTimeout() { increaseTimeoutAfterRecovery(); + return currentTimeout; } @@ -2197,6 +2201,7 @@ public Duration getNextBackoff() { // no need to perform recovery here because timeout always happens after a backoff final Duration result = nextBackoff; nextBackoff = minDuration(maxBackoff, nextBackoff.multipliedBy(2L)); + return result; } @@ -2235,6 +2240,7 @@ private static boolean isLonger(final Duration d1, final Duration d2) { private static Predicate isLowerThanOrEqual(final Duration otherDuration) { return arg -> { final Duration minus = arg.minus(otherDuration); + return minus.isNegative() || minus.isZero(); }; } @@ -2266,7 +2272,6 @@ public String toString() { "outboundSignal=" + outboundSignal + "]"; } - } /** diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/ClientActorId.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/ClientActorId.java index 0c2c46a8d9..2267ad0730 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/ClientActorId.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/ClientActorId.java @@ -48,6 +48,7 @@ public static ClientActorId fromActorName(final ActorRef actorRef) { } final var connectionId = ConnectionId.of(actorName.substring(0, separatorIndex)); final int clientNumber = Integer.parseInt(actorName.substring(separatorIndex + 1)); + return new ClientActorId(connectionId, clientNumber); } } diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/ClientActorPropsArgsSerializer.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/ClientActorPropsArgsSerializer.java index c704150630..5556069378 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/ClientActorPropsArgsSerializer.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/ClientActorPropsArgsSerializer.java @@ -35,7 +35,7 @@ import akka.serialization.Serializers; /** - * Serializer of {@link org.eclipse.ditto.connectivity.service.messaging.ClientActorPropsArgsSerializer}. + * Serializer of {@link org.eclipse.ditto.connectivity.service.messaging.ClientActorPropsArgs}. */ public final class ClientActorPropsArgsSerializer extends SerializerWithStringManifest implements ByteBufferSerializer { @@ -75,10 +75,12 @@ public void toBinary(final Object o, final ByteBuffer buf) { public byte[] toBinary(final Object o) { final var args = (ClientActorPropsArgs) o; final var connection = new Field(args.connection().toJsonString()); - final var commandForwarder = toFieldWithManifest(args.commandForwarderActor()); + final var commandForwarder = + toFieldWithManifest(args.commandForwarderActor()); final var connectionActor = toField(args.connectionActor()); final var dittoHeaders = new Field(args.dittoHeaders().toJsonString()); - final var connectivityConfigOverwrites = toFieldWithManifest(args.connectivityConfigOverwrites()); + final var connectivityConfigOverwrites = + toFieldWithManifest(args.connectivityConfigOverwrites()); final var buffer = ByteBuffer.allocate( connection.length() + @@ -94,6 +96,7 @@ public byte[] toBinary(final Object o) { connectionActor.write(buffer); dittoHeaders.write(buffer); connectivityConfigOverwrites.write(buffer); + return buffer.array(); } @@ -112,6 +115,7 @@ public Object fromBinary(final ByteBuffer buf, final String manifest) { final var dittoHeaders = Field.read(buf); final var connectivityConfigOverwrites = FieldWithManifest.read(buf); buf.order(originalByteOrder); + return new ClientActorPropsArgs( ConnectivityModelFactory.connectionFromJson(JsonObject.of(connection.asString())), toActorRef(commandForwarder, commandForwarder.value()), @@ -125,6 +129,7 @@ private Serialization getSerialization() { if (serialization == null) { serialization = SerializationExtension.get(actorSystem); } + return serialization; } @@ -147,6 +152,7 @@ private FieldWithManifest toFieldWithManifest(final Object value) { final int id = serializer.identifier(); final var manifest = new Field(Serializers.manifestFor(serializer, value)); final var valueField = new Field(serialization.serialize(value).get()); + return new FieldWithManifest(id, manifest, valueField); } @@ -172,6 +178,7 @@ private int length() { private static Field read(final ByteBuffer buffer) { final var bytes = new byte[buffer.getInt()]; buffer.get(bytes); + return new Field(bytes); } } @@ -188,6 +195,7 @@ private static FieldWithManifest read(final ByteBuffer buffer) { final var id = buffer.getInt(); final var manifest = Field.read(buffer); final var value = Field.read(buffer); + return new FieldWithManifest(id, manifest, value); } @@ -195,4 +203,5 @@ private int length() { return 4 + manifest.length() + value.length(); } } + } diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/ClientSupervisor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/ClientSupervisor.java index 7dd1fb9594..bc0de411ac 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/ClientSupervisor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/ClientSupervisor.java @@ -49,7 +49,7 @@ public final class ClientSupervisor extends AbstractActorWithTimers { private final DittoDiagnosticLoggingAdapter logger = DittoLoggerFactory.getDiagnosticLoggingAdapter(this); - ; + private final ClientActorId clientActorId = ClientActorId.fromActorName(getSelf()); private final SudoRetrieveConnectionStatus sudoRetrieveConnectionStatus = SudoRetrieveConnectionStatus.of(clientActorId.connectionId(), @@ -60,17 +60,18 @@ public final class ClientSupervisor extends AbstractActorWithTimers { private Props props; private ActorRef clientActor; + @SuppressWarnings({"unused"}) private ClientSupervisor(final int numberOfShards, final Duration statusCheckInterval) { this.statusCheckInterval = statusCheckInterval; final var actorSystem = getContext().getSystem(); final var clusterSharding = ClusterSharding.get(actorSystem); final var extractor = ShardRegionExtractor.of(numberOfShards, actorSystem); connectionShardRegion = clusterSharding.startProxy(ConnectivityMessagingConstants.SHARD_REGION, - Optional.of(ConnectivityMessagingConstants.CLUSTER_ROLE), - extractor); + Optional.of(ConnectivityMessagingConstants.CLUSTER_ROLE), extractor); propsFactory = getClientActorPropsFactory(actorSystem); } + @SuppressWarnings({"unused"}) // constructor for unit tests private ClientSupervisor(final Duration statusCheckInterval, final ActorRef connectionShardRegion) { this.statusCheckInterval = statusCheckInterval; @@ -147,18 +148,19 @@ private void childTerminated(final Terminated terminated) { } private void startClientActor(final ClientActorPropsArgs propsArgs) { - final var props = propsFactory.getActorProps(propsArgs, getContext().getSystem()); - if (props.equals(this.props)) { - logger.debug("Refreshing props"); + final var actorProps = propsFactory.getActorProps(propsArgs, getContext().getSystem()); + if (actorProps.equals(this.props)) { + logger.debug("Refreshing actorProps"); } else { final var oldClientActor = clientActor; if (oldClientActor != null) { getContext().unwatch(oldClientActor); getContext().stop(oldClientActor); } - this.props = props; - clientActor = getContext().watch(getContext().actorOf(props)); - logger.debug("New props received; stopped client actor <{}> and started <{}>", oldClientActor, clientActor); + this.props = actorProps; + clientActor = getContext().watch(getContext().actorOf(actorProps)); + logger.debug("New actorProps received; stopped client actor <{}> and started <{}>", oldClientActor, + clientActor); } } @@ -228,4 +230,5 @@ private static ClientActorPropsFactory getClientActorPropsFactory(final ActorSys private enum Control { STATUS_CHECK } + } diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/ConnectionPersistenceActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/ConnectionPersistenceActor.java index c53c1b6838..8dd9a6cf3f 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/ConnectionPersistenceActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/ConnectionPersistenceActor.java @@ -634,7 +634,7 @@ private void interpretStagedCommand(final StagedCommand command) { becomeDeletedHandler(); interpretStagedCommand(command.next()); } - case CHECK_LOGGING_ENABLED -> CHECK_LOGGING_ENABLED(command.next()); + case CHECK_LOGGING_ENABLED -> checkLoggingEnabled(command.next()); case BROADCAST_TO_CLIENT_ACTORS_IF_STARTED -> { broadcastToClientActors(command.getCommand(), getSelf()); interpretStagedCommand(command.next()); @@ -763,7 +763,7 @@ private void updatePriority(final UpdatePriority updatePriority) { } } - private void CHECK_LOGGING_ENABLED(final StagedCommand command) { + private void checkLoggingEnabled(final StagedCommand command) { if (isDesiredStateOpen()) { startEnabledLoggingChecker(); updateLoggingIfEnabled(); diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/strategies/commands/SudoRetrieveConnectionStatusStrategy.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/strategies/commands/SudoRetrieveConnectionStatusStrategy.java index 6bf262c8d8..d24a610b2a 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/strategies/commands/SudoRetrieveConnectionStatusStrategy.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/strategies/commands/SudoRetrieveConnectionStatusStrategy.java @@ -42,12 +42,13 @@ protected Result> doApply(final Context co final long nextRevision, final SudoRetrieveConnectionStatus command, @Nullable final Metadata metadata) { - final var connectionStatus = entity != null && !entity.hasLifecycle(ConnectionLifecycle.DELETED) ? entity.getConnectionStatus() : ConnectivityStatus.CLOSED; final var clientCount = entity != null ? entity.getClientCount() : 0; + return ResultFactory.newQueryResult(command, SudoRetrieveConnectionStatusResponse.of(connectionStatus, clientCount, command.getDittoHeaders())); } + } diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaConsumerActorTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaConsumerActorTest.java index bfaab3325a..3fedef32d3 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaConsumerActorTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaConsumerActorTest.java @@ -140,7 +140,7 @@ protected Props getConsumerActorProps(final Sink inboundMapping )); final HeaderMapping mappingWithSpecialKafkaHeaders = ConnectivityModelFactory.newHeaderMapping(map); - AtMostOnceKafkaConsumerSourceSupplier sourceSupplier = mock(AtMostOnceKafkaConsumerSourceSupplier.class); + final AtMostOnceKafkaConsumerSourceSupplier sourceSupplier = mock(AtMostOnceKafkaConsumerSourceSupplier.class); when(sourceSupplier.get()).thenReturn(source); final String address = "kafka"; final org.eclipse.ditto.connectivity.model.Source connectionSource = ConnectivityModelFactory.newSourceBuilder() diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/MqttClientActorTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/MqttClientActorTest.java index cf77b42a00..bfe5e9af90 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/MqttClientActorTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/MqttClientActorTest.java @@ -14,7 +14,11 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.nio.charset.StandardCharsets; import java.text.MessageFormat; @@ -80,7 +84,6 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.MockedStatic; -import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; import com.hivemq.client.mqtt.datatypes.MqttQos; @@ -165,13 +168,13 @@ public void before() { } private void enableGenericMqttClientMethodStubbing() { - Mockito.when(genericMqttClient.connect()).thenReturn(CompletableFuture.completedFuture(null)); - Mockito.when(genericMqttClient.disconnect()).thenReturn(CompletableFuture.completedFuture(null)); - Mockito.when(genericMqttClient.subscribe(any())) - .thenReturn(Single.just(Mockito.mock(GenericMqttSubAck.class))); - Mockito.when(genericMqttClient.consumeSubscribedPublishesWithManualAcknowledgement()) + when(genericMqttClient.connect()).thenReturn(CompletableFuture.completedFuture(null)); + when(genericMqttClient.disconnect()).thenReturn(CompletableFuture.completedFuture(null)); + when(genericMqttClient.subscribe(any())) + .thenReturn(Single.just(mock(GenericMqttSubAck.class))); + when(genericMqttClient.consumeSubscribedPublishesWithManualAcknowledgement()) .thenReturn(Flowable.never()); - Mockito.when(genericMqttClient.publish(any())) + when(genericMqttClient.publish(any())) .thenAnswer(invocation -> { final GenericMqttPublish genericMqttPublish = invocation.getArgument(0); final var commandForwarderRef = commandForwarder.ref(); @@ -181,7 +184,7 @@ private void enableGenericMqttClientMethodStubbing() { } private void enableGenericMqttClientFactoryMethodStubbing() { - Mockito.when(genericMqttClientFactory.getGenericMqttClient(any())).thenReturn(genericMqttClient); + when(genericMqttClientFactory.getGenericMqttClient(any())).thenReturn(genericMqttClient); genericMqttClientFactoryMock.when(() -> GenericMqttClientFactory.newInstance()) .thenReturn(genericMqttClientFactory); } @@ -243,7 +246,7 @@ private DittoHeaders getDittoHeadersWithCorrelationId() { @Test public void subscribeFails() { final var mqttSubscribeException = new MqttSubscribeException("Quisquam omnis in quia hic et libero.", null); - Mockito.when(genericMqttClient.subscribe(any())).thenReturn(Single.error(mqttSubscribeException)); + when(genericMqttClient.subscribe(any())).thenReturn(Single.error(mqttSubscribeException)); final var underTest = TestActorRef.apply( createClientActor(commandForwarder.ref(), ConnectivityModelFactory.newConnectionBuilder(getConnection(false)) @@ -298,7 +301,7 @@ public void testConnectionIsSuccessful() { final var connection = ConnectivityModelFactory.newConnectionBuilder(getConnection(false)) .connectionStatus(ConnectivityStatus.CLOSED) .build(); - Mockito.when(genericMqttClient.connect(any())).thenReturn(CompletableFuture.completedStage(null)); + when(genericMqttClient.connect(any())).thenReturn(CompletableFuture.completedStage(null)); final var testKit = actorSystemResource.newTestKit(); final var underTest = testKit.watch(TestActorRef.apply( createClientActor(commandForwarder.ref(), connection), @@ -316,7 +319,7 @@ public void testConnectionIsSuccessful() { @Test public void testConnectionFails() { final var mqttClientConnectException = new MqttClientConnectException("Failed to connect.", null); - Mockito.when(genericMqttClient.connect(any())).thenThrow(mqttClientConnectException); + when(genericMqttClient.connect(any())).thenThrow(mqttClientConnectException); final var connection = ConnectivityModelFactory.newConnectionBuilder(getConnection(false)) .connectionStatus(ConnectivityStatus.CLOSED) .build(); @@ -336,7 +339,7 @@ public void testConnectionFails() { assertThat(connectionFailedException).hasCause(mqttClientConnectException); }); testKit.expectTerminated(Duration.ofSeconds(5L), underTest); - verify(genericMqttClient, Mockito.never()).disconnect(); + verify(genericMqttClient, never()).disconnect(); } private String getSerializedModifyThingCommand(final Object... correlationIdSuffixes) { @@ -361,7 +364,7 @@ private static GenericMqttPublish getMqttPublish(final String topic, final Strin } private void enableSubscribingAndConsumingMethodStubbing(final GenericMqttPublish... incomingPublishes) { - Mockito.doAnswer(invocation -> { + doAnswer(invocation -> { final GenericMqttSubscribe genericMqttSubscribe = invocation.getArgument(0); final var subscribedMqttTopicFilters = genericMqttSubscribe.genericMqttSubscriptions() @@ -369,7 +372,7 @@ private void enableSubscribingAndConsumingMethodStubbing(final GenericMqttPublis .collect(Collectors.toList()); // This needs to be a side effect, unfortunately. - Mockito.when(genericMqttClient.consumeSubscribedPublishesWithManualAcknowledgement()) + when(genericMqttClient.consumeSubscribedPublishesWithManualAcknowledgement()) .thenReturn(Flowable.fromIterable( Stream.of(incomingPublishes) .filter(incoming -> subscribedMqttTopicFilters.stream() @@ -377,7 +380,7 @@ private void enableSubscribingAndConsumingMethodStubbing(final GenericMqttPublis .collect(Collectors.toList()) )); - return Single.just(Mockito.mock(GenericMqttSubAck.class)); + return Single.just(mock(GenericMqttSubAck.class)); }) .when(genericMqttClient).subscribe(any()); } diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/persistence/ConnectionPersistenceActorTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/persistence/ConnectionPersistenceActorTest.java index dbd49972e2..4c9515f5b6 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/persistence/ConnectionPersistenceActorTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/persistence/ConnectionPersistenceActorTest.java @@ -86,14 +86,12 @@ import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigValueFactory; -import akka.actor.AbstractActor; import akka.actor.ActorRef; import akka.actor.Props; import akka.actor.Status; import akka.cluster.Cluster; import akka.cluster.pubsub.DistributedPubSub; import akka.cluster.pubsub.DistributedPubSubMediator; -import akka.japi.pf.ReceiveBuilder; import akka.testkit.TestActor; import akka.testkit.TestProbe; import scala.PartialFunction; @@ -248,21 +246,6 @@ public void tryToSendOtherCommandThanCreateDuringInitialization() { .build()); } - private static final class DummyActor extends AbstractActor { - - @Override - public void preStart() { - System.out.println("--------------- ACTOR START ---------------"); - } - - @Override - public Receive createReceive() { - return ReceiveBuilder.create() - .matchAny(msg -> getSender().tell(getSelf(), getSelf())) - .build(); - } - } - @Test public void manageConnection() { final var underTest = createSupervisor(); @@ -468,7 +451,6 @@ public void createClosedConnectionWithUnknownHost() { mockClientActorProbe.expectNoMessage(); } - @Test public void testConnectionWithUnknownHost() { final var testConnectionWithUnknownHost = TestConnection.of( diff --git a/connectivity/service/src/test/resources/test.conf b/connectivity/service/src/test/resources/test.conf index eee2bef130..08936d72cb 100644 --- a/connectivity/service/src/test/resources/test.conf +++ b/connectivity/service/src/test/resources/test.conf @@ -1,4 +1,4 @@ -loggersakka { +akka { loggers = ["akka.event.slf4j.Slf4jLogger"] loglevel = "DEBUG" logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" @@ -13,7 +13,7 @@ loggersakka { } } -// dead lettters log disabled because actors are being killed all the time +// dead letters log disabled because actors are being killed all the time akka.log-dead-letters = 0 akka.cluster.jmx.multi-mbeans-in-same-jvm = on akka.cluster.roles = ["thing-event-aware", "live-signal-aware", "acks-aware", "policy-announcement-aware", diff --git a/internal/utils/cluster/src/main/java/org/eclipse/ditto/internal/utils/cluster/ShardedBinarySerializer.java b/internal/utils/cluster/src/main/java/org/eclipse/ditto/internal/utils/cluster/ShardedBinarySerializer.java index 4fdb6e73a1..8ef4092c38 100644 --- a/internal/utils/cluster/src/main/java/org/eclipse/ditto/internal/utils/cluster/ShardedBinarySerializer.java +++ b/internal/utils/cluster/src/main/java/org/eclipse/ditto/internal/utils/cluster/ShardedBinarySerializer.java @@ -29,7 +29,7 @@ import akka.serialization.Serializers; /** - * Serializer of {@link ShardedBinarySerializer}. + * Serializer of {@link ShardedBinaryEnvelope}. */ public final class ShardedBinarySerializer extends SerializerWithStringManifest implements ByteBufferSerializer { @@ -59,6 +59,7 @@ public int identifier() { public String manifest(final Object o) { final var envelope = (ShardedBinaryEnvelope) o; final var message = envelope.message(); + return Serializers.manifestFor(getSerialization().findSerializerFor(message), message); } @@ -81,6 +82,7 @@ public byte[] toBinary(final Object o) { buffer.putInt(messageBytes.length); buffer.put(entityNameBytes); buffer.put(messageBytes); + return buffer.array(); } @@ -105,6 +107,7 @@ public Object fromBinary(final ByteBuffer buf, final String manifest) { final var message = getSerialization().deserialize(messageBytes, serializerId, manifest).get(); final var entityName = new String(entityNameBytes, CHARSET); buf.order(originalByteOrder); + return new ShardedBinaryEnvelope(message, entityName); } catch (final RuntimeException e) { final var bytes = buf.array(); @@ -118,6 +121,8 @@ private Serialization getSerialization() { if (serialization == null) { serialization = SerializationExtension.get(actorSystem); } + return serialization; } + }