Skip to content

Commit

Permalink
MQTT: Do not suppress reconnection for redelivery.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Jul 18, 2021
1 parent 38e7007 commit 03a77da
Showing 1 changed file with 10 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ abstract ActorRef startConsumerActor(boolean dryRun, Source source, ActorRef inb
@Override
public void postStop() {
logger.info("actor stopped, stopping clients");
safelyDisconnectClient(client, CONSUMER);
safelyDisconnectClient(publisherClient, PUBLISHER);
safelyDisconnectClient(client, CONSUMER, true);
safelyDisconnectClient(publisherClient, PUBLISHER, true);
super.postStop();
}

Expand Down Expand Up @@ -201,7 +201,7 @@ private FSM.State<BaseClientState, BaseClientData> doReconnectConsumerClient(fin
final BaseClientData data) {

final ClientWithCancelSwitch oldClient = getClient();
safelyDisconnectClient(oldClient, CONSUMER);
safelyDisconnectClient(oldClient, CONSUMER, false);
return stay();
}

Expand Down Expand Up @@ -387,7 +387,8 @@ protected CompletionStage<Status.Status> doTestConnection(final TestConnection t
}
stopCommandConsumers(testSubscriptions);
stopChildActor(publisherActor);
safelyDisconnectClient(new ClientWithCancelSwitch(testClient, new AtomicBoolean(false)), "test");
safelyDisconnectClient(new ClientWithCancelSwitch(testClient, new AtomicBoolean(false)), "test",
true);
return status;
});
}
Expand Down Expand Up @@ -515,8 +516,8 @@ protected void doDisconnectClient(final Connection connection, @Nullable final A
protected void cleanupResourcesForConnection() {
stopCommandConsumers(subscriptionHandler);
stopChildActor(publisherActor);
safelyDisconnectClient(client, CONSUMER);
safelyDisconnectClient(publisherClient, PUBLISHER);
safelyDisconnectClient(client, CONSUMER, true);
safelyDisconnectClient(publisherClient, PUBLISHER, true);
resetClientAndSubscriptionHandler();
}

Expand All @@ -532,13 +533,14 @@ protected ActorRef getPublisherActor() {
*
* @param clientToDisconnect the client to disconnect
* @param name a name describing the client's purpose
* @param prevenntAutomaticReconnect whether automatic reconnect should be disabled.
*/
private void safelyDisconnectClient(@Nullable final ClientWithCancelSwitch clientToDisconnect,
final String name) {
final String name, final boolean prevenntAutomaticReconnect) {
if (clientToDisconnect != null) {
try {
logger.info("Disconnecting mqtt <{}> client, ignoring any errors.", name);
clientToDisconnect.disconnect(true).exceptionally(error -> {
clientToDisconnect.disconnect(prevenntAutomaticReconnect).exceptionally(error -> {
logger.error(error, "Failed to disconnect client <{}>", clientToDisconnect.mqttClient);
return null;
});
Expand Down

0 comments on commit 03a77da

Please sign in to comment.