Skip to content

Commit

Permalink
fix review findings
Browse files Browse the repository at this point in the history
  • Loading branch information
dguggemos committed Sep 27, 2019
1 parent ef71266 commit 6703772
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 3 deletions.
Expand Up @@ -345,7 +345,7 @@ protected final ActorRef startChildActorConflictFree(final String prefix, final
*
* @param actor the ActorRef
*/
protected final void stopChildActor(final ActorRef actor) {
protected final void stopChildActor(@Nullable final ActorRef actor) {
if (actor != null) {
log.debug("Stopping child actor <{}>.", actor.path());
getContext().stop(actor);
Expand Down Expand Up @@ -505,7 +505,6 @@ private FSM.State<BaseClientState, BaseClientData> publishMappedMessage(final Pu
getPublisherActor().forward(message.getOutboundSignal(), getContext());
} else {
log.warning("No publisher actor available, dropping message.");
connectionLogger.failure("No publisher actor available, dropping message.");
}
return stay();
}
Expand Down Expand Up @@ -754,8 +753,26 @@ private State<BaseClientState, BaseClientData> handleInitializationResult(
}
}

/**
* Subclasses should start their publisher actor in the implementation of this method and report success or
* failure in the returned {@link CompletionStage}. {@link BaseClientActor} calls this method when the client is
* connected.
*
* @return a completion stage that completes either successfully when the publisher actor was started
* successfully or exceptionally when the publisher actor could not be started successfully
*/
protected abstract CompletionStage<Status.Status> startPublisherActor();

/**
* Subclasses should start their consumer actors in the implementation of this method and report success or
* failure in the returned {@link CompletionStage}. {@link BaseClientActor} calls this method when the client is
* connected and the publisher actor was started (this is important otherwise we are not able to publish
* potential error responses for consumed messages).
*
* @param clientConnected message indicating that the client has successfully been connected to the external system
* @return a completion stage that completes either successfully when all consumers were started
* successfully or exceptionally when starting a consumer actor failed
*/
protected CompletionStage<Status.Status> startConsumerActors(final ClientConnected clientConnected) {
return CompletableFuture.completedFuture(new Status.Success(Done.getInstance()));
}
Expand Down
Expand Up @@ -58,7 +58,7 @@ public final class BaseClientData {
@Nullable final String connectionStatusDetails,
final Instant inConnectionStatusSince,
@Nullable final ActorRef sessionSender,
@Nullable DittoHeaders sessionHeaders) {
@Nullable final DittoHeaders sessionHeaders) {
this.connectionId = connectionId;
this.connection = connection;
this.connectionStatus = connectionStatus;
Expand Down
Expand Up @@ -262,6 +262,7 @@ protected void allocateResourcesOnConnection(final ClientConnected clientConnect
}
}

@Override
protected CompletionStage<Status.Status> startPublisherActor() {
final CompletableFuture<Status.Status> future = new CompletableFuture<>();
stopChildActor(amqpPublisherActor);
Expand Down

0 comments on commit 6703772

Please sign in to comment.