diff --git a/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/BaseClientActor.java b/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/BaseClientActor.java index 9783f36118..65ffffdd1e 100644 --- a/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/BaseClientActor.java +++ b/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/BaseClientActor.java @@ -345,7 +345,7 @@ protected final ActorRef startChildActorConflictFree(final String prefix, final * * @param actor the ActorRef */ - protected final void stopChildActor(final ActorRef actor) { + protected final void stopChildActor(@Nullable final ActorRef actor) { if (actor != null) { log.debug("Stopping child actor <{}>.", actor.path()); getContext().stop(actor); @@ -505,7 +505,6 @@ private FSM.State publishMappedMessage(final Pu getPublisherActor().forward(message.getOutboundSignal(), getContext()); } else { log.warning("No publisher actor available, dropping message."); - connectionLogger.failure("No publisher actor available, dropping message."); } return stay(); } @@ -754,8 +753,26 @@ private State handleInitializationResult( } } + /** + * Subclasses should start their publisher actor in the implementation of this method and report success or + * failure in the returned {@link CompletionStage}. {@link BaseClientActor} calls this method when the client is + * connected. + * + * @return a completion stage that completes either successfully when the publisher actor was started + * successfully or exceptionally when the publisher actor could not be started successfully + */ protected abstract CompletionStage startPublisherActor(); + /** + * Subclasses should start their consumer actors in the implementation of this method and report success or + * failure in the returned {@link CompletionStage}. {@link BaseClientActor} calls this method when the client is + * connected and the publisher actor was started (this is important otherwise we are not able to publish + * potential error responses for consumed messages). + * + * @param clientConnected message indicating that the client has successfully been connected to the external system + * @return a completion stage that completes either successfully when all consumers were started + * successfully or exceptionally when starting a consumer actor failed + */ protected CompletionStage startConsumerActors(final ClientConnected clientConnected) { return CompletableFuture.completedFuture(new Status.Success(Done.getInstance())); } diff --git a/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/BaseClientData.java b/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/BaseClientData.java index 6f4e446048..bfe7aac857 100644 --- a/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/BaseClientData.java +++ b/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/BaseClientData.java @@ -58,7 +58,7 @@ public final class BaseClientData { @Nullable final String connectionStatusDetails, final Instant inConnectionStatusSince, @Nullable final ActorRef sessionSender, - @Nullable DittoHeaders sessionHeaders) { + @Nullable final DittoHeaders sessionHeaders) { this.connectionId = connectionId; this.connection = connection; this.connectionStatus = connectionStatus; diff --git a/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/amqp/AmqpClientActor.java b/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/amqp/AmqpClientActor.java index 23e9093e20..bcd62b8442 100644 --- a/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/amqp/AmqpClientActor.java +++ b/services/connectivity/messaging/src/main/java/org/eclipse/ditto/services/connectivity/messaging/amqp/AmqpClientActor.java @@ -262,6 +262,7 @@ protected void allocateResourcesOnConnection(final ClientConnected clientConnect } } + @Override protected CompletionStage startPublisherActor() { final CompletableFuture future = new CompletableFuture<>(); stopChildActor(amqpPublisherActor);