Skip to content

Commit

Permalink
Wait for active acknowledgement aggregator actors for a short time du…
Browse files Browse the repository at this point in the history
…ring coordinated shutdown.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Sep 30, 2022
1 parent f54f063 commit 259c394
Show file tree
Hide file tree
Showing 11 changed files with 100 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,13 @@ public interface ConnectionConfig extends WithSupervisorConfig, WithActivityChec
*/
boolean areAllClientActorsOnOneNode();

/**
* Get the timeout waiting for responses and acknowledgements during coordinated shutdown.
*
* @return The timeout.
*/
Duration getShutdownTimeout();

/**
* An enumeration of the known config path expressions and their associated default values for
* {@code ConnectionConfig}.
Expand Down Expand Up @@ -213,7 +220,13 @@ enum ConnectionConfigValue implements KnownConfigValue {
/**
* How often the priority of a connection is getting updated.
*/
PRIORITY_UPDATE_INTERVAL("priority-update-interval", Duration.ofHours(24L));
PRIORITY_UPDATE_INTERVAL("priority-update-interval", Duration.ofHours(24L)),

/**
* Timeout waiting for responses and acknowledgements during coordinated shutdown.
*/
SHUTDOWN_TIMEOUT("shutdown-timeout", Duration.ofSeconds(3));


private final String path;
private final Object defaultValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public final class DefaultConnectionConfig implements ConnectionConfig {
private final Duration ackLabelDeclareInterval;
private final Duration priorityUpdateInterval;
private final boolean allClientActorsOnOneNode;
private final Duration shutdownTimeout;

private DefaultConnectionConfig(final ConfigWithFallback config) {
clientActorAskTimeout =
Expand Down Expand Up @@ -88,6 +89,7 @@ private DefaultConnectionConfig(final ConfigWithFallback config) {
config.getBoolean(ConnectionConfigValue.ALL_CLIENT_ACTORS_ON_ONE_NODE.getConfigPath());
priorityUpdateInterval =
config.getNonNegativeAndNonZeroDurationOrThrow(ConnectionConfigValue.PRIORITY_UPDATE_INTERVAL);
shutdownTimeout = config.getDuration(ConnectionConfigValue.SHUTDOWN_TIMEOUT.getConfigPath());
}

/**
Expand Down Expand Up @@ -204,6 +206,11 @@ public boolean areAllClientActorsOnOneNode() {
return allClientActorsOnOneNode;
}

@Override
public Duration getShutdownTimeout() {
return shutdownTimeout;
}

@Override
public ActivityCheckConfig getActivityCheckConfig() {
return activityCheckConfig;
Expand Down Expand Up @@ -243,6 +250,7 @@ public boolean equals(final Object o) {
Objects.equals(maxNumberOfSources, that.maxNumberOfSources) &&
Objects.equals(ackLabelDeclareInterval, that.ackLabelDeclareInterval) &&
Objects.equals(priorityUpdateInterval, that.priorityUpdateInterval) &&
Objects.equals(shutdownTimeout, that.shutdownTimeout) &&
allClientActorsOnOneNode == that.allClientActorsOnOneNode;
}

Expand All @@ -252,7 +260,7 @@ public int hashCode() {
blockedHostnames, blockedSubnets, blockedHostRegex, supervisorConfig, snapshotConfig,
acknowledgementConfig, cleanupConfig, maxNumberOfTargets, maxNumberOfSources, activityCheckConfig,
amqp10Config, amqp091Config, mqttConfig, kafkaConfig, httpPushConfig, ackLabelDeclareInterval,
priorityUpdateInterval, allClientActorsOnOneNode);
priorityUpdateInterval, allClientActorsOnOneNode, shutdownTimeout);
}

@Override
Expand All @@ -279,6 +287,7 @@ public String toString() {
", ackLabelDeclareInterval=" + ackLabelDeclareInterval +
", priorityUpdateInterval=" + priorityUpdateInterval +
", allClientActorsOnOneNode=" + allClientActorsOnOneNode +
", shutdownTimeout=" + shutdownTimeout +
"]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ public abstract class BaseClientActor extends AbstractFSMWithStash<BaseClientSta
private ActorRef outboundDispatchingActor;
private ActorRef subscriptionManager;
private ActorRef tunnelActor;
private int ackregatorCount = 0;
private boolean shuttingDown = false;

protected BaseClientActor(final Connection connection,
final ActorRef commandForwarderActor,
Expand Down Expand Up @@ -364,8 +366,8 @@ private void addCoordinatedShutdownTasks() {
final var id = getSelf().path().toString();
cancelOnStopTasks.add(coordinatedShutdown.addCancellableTask(CoordinatedShutdown.PhaseServiceUnbind(),
"service-unbind-" + id, askSelfShutdownTask(Control.SERVICE_UNBIND)));
cancelOnStopTasks.add(coordinatedShutdown.addCancellableTask(CoordinatedShutdown.PhaseServiceRequestsDone(),
"service-requests-done" + id, askSelfShutdownTask(Control.SERVICE_REQUESTS_DONE)));
coordinatedShutdown.addActorTerminationTask(CoordinatedShutdown.PhaseServiceRequestsDone(),
"service-requests-done" + id, getSelf(), Optional.of(Control.SERVICE_REQUESTS_DONE));
}

private boolean shouldAnyTargetSendConnectionAnnouncements() {
Expand Down Expand Up @@ -515,7 +517,11 @@ protected FSMStateFunctionBuilder<BaseClientState, BaseClientData> inAnyState()
.event(PublishMappedMessage.class, this::publishMappedMessage)
.event(ConnectivityCommand.class, this::onUnknownEvent) // relevant connectivity commands were handled
.event(Signal.class, this::handleSignal)
.event(FatalPubSubException.class, this::failConnectionDueToPubSubException);
.event(FatalPubSubException.class, this::failConnectionDueToPubSubException)
.eventEquals(Control.ACKREGATOR_STARTED, this::ackregatorStarted)
.eventEquals(Control.ACKREGATOR_STOPPED, this::ackregatorStopped)
.eventEquals(Control.SERVICE_REQUESTS_DONE, this::serviceRequestsDone)
.eventEquals(Control.SHUTDOWN_TIMEOUT, this::shutdownTimeout);
}

/**
Expand Down Expand Up @@ -2074,6 +2080,44 @@ private Supplier<CompletionStage<Done>> askSelfShutdownTask(final Object questio
return () -> Patterns.ask(getSelf(), question, shutdownTimeout).thenApply(answer -> Done.done());
}

private FSM.State<BaseClientState, BaseClientData> ackregatorStarted(final Control event,
final BaseClientData data) {
++ackregatorCount;
return stay();
}

private FSM.State<BaseClientState, BaseClientData> ackregatorStopped(final Control event,
final BaseClientData data) {
--ackregatorCount;
if (shuttingDown && ackregatorCount == 0) {
logger.info("{}: finished waiting for ackregators", Control.SERVICE_REQUESTS_DONE);
return stop();
} else {
return stay();
}
}

private FSM.State<BaseClientState, BaseClientData> serviceRequestsDone(final Control event,
final BaseClientData data) {

shuttingDown = true;
logger.info("{}: ackregatorCount={}", Control.SERVICE_REQUESTS_DONE, ackregatorCount);
if (ackregatorCount == 0) {
return stop();
} else {
final var shutdownTimeout = connectivityConfig.getConnectionConfig().getShutdownTimeout();
startSingleTimer(Control.SHUTDOWN_TIMEOUT.name(), Control.SHUTDOWN_TIMEOUT, shutdownTimeout);
return stay();
}
}

private FSM.State<BaseClientState, BaseClientData> shutdownTimeout(final Control event,
final BaseClientData data) {
final var shutdownTimeout = connectivityConfig.getConnectionConfig().getShutdownTimeout();
logger.warning("Shutdown timeout <{}> reached; aborting <{}> ackregators", shutdownTimeout, ackregatorCount);
return stop();
}

private static Optional<StreamingType> toStreamingTypes(final Topic topic) {
return switch (topic) {
case POLICY_ANNOUNCEMENTS -> Optional.of(StreamingType.POLICY_ANNOUNCEMENTS);
Expand Down Expand Up @@ -2316,7 +2360,10 @@ public enum Control {
GOTO_CONNECTED_AFTER_INITIALIZATION,
RESUBSCRIBE,
SERVICE_UNBIND,
SERVICE_REQUESTS_DONE
SERVICE_REQUESTS_DONE,
ACKREGATOR_STARTED,
ACKREGATOR_STOPPED,
SHUTDOWN_TIMEOUT
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -607,8 +607,11 @@ private void startAckregatorAndForwardSignal(final EntityId entityId,
if (sender != null) {
sender.tell(responseSignal, ActorRef.noSender());
}

clientActor.tell(BaseClientActor.Control.ACKREGATOR_STOPPED, ActorRef.noSender());
},
(ackregator, adjustedSignal) -> {
clientActor.tell(BaseClientActor.Control.ACKREGATOR_STARTED, ActorRef.noSender());
proxyActor.tell(adjustedSignal, ackregator);
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,8 @@ private static Connection validateConnection(final Connection connection, final

@Override
public void postStop() {
ensureJmsConnectionClosed();
super.postStop();
ensureJmsConnectionClosed();
}

@Override
Expand Down
3 changes: 3 additions & 0 deletions connectivity/service/src/main/resources/connectivity.conf
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ ditto {
all-client-actors-on-one-node = false
all-client-actors-on-one-node = ${?CONNECTIVITY_ALL_CLIENT_ACTORS_ON_ONE_NODE}

shutdown-timeout = 3s
shutdown-timeout = ${?CONNECTION_SHUTDOWN_TIMEOUT}

acknowledgement {
# lifetime of ack forwarder. Must be bigger than the largest possible command timeout (60s)
forwarder-fallback-timeout = 65s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ public void underTestReturnsValuesOfConfigFile() {
softly.assertThat(underTest.areAllClientActorsOnOneNode())
.as(ConnectionConfig.ConnectionConfigValue.ALL_CLIENT_ACTORS_ON_ONE_NODE.getConfigPath())
.isEqualTo(true);

softly.assertThat(underTest.getShutdownTimeout())
.as(ConnectionConfig.ConnectionConfigValue.SHUTDOWN_TIMEOUT.getConfigPath())
.isEqualTo(Duration.ofMinutes(7));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ void testExternalMessageInDittoProtocolIsProcessed(
ActorRef.noSender());

if (expectSuccess) {
final ModifyAttribute modifyAttribute = expectMsgClass(ModifyAttribute.class);
final ModifyAttribute modifyAttribute = fishForMsg(this, ModifyAttribute.class);
assertThat(modifyAttribute.getType()).isEqualTo(ModifyAttribute.TYPE);
assertThat(modifyAttribute.getDittoHeaders().getCorrelationId()).contains(
modifyCommand.getDittoHeaders().getCorrelationId().orElse(null));
Expand Down Expand Up @@ -288,7 +288,7 @@ <T> void testMessageMappingWithoutCorrelationId(
new ExternalMessageWithSender(externalMessage, collectorProbe.ref()),
ActorRef.noSender());

final T received = expectMsgClass(expectedMessageClass);
final T received = fishForMsg(this, expectedMessageClass);
verifyReceivedMessage.accept(received);
}};
}
Expand Down Expand Up @@ -322,7 +322,7 @@ <T> void testMessageMapping(final String correlationId,
new ExternalMessageWithSender(externalMessage, collectorProbe.ref()),
ActorRef.noSender());

final T received = expectMsgClass(expectedMessageClass);
final T received = fishForMsg(this, expectedMessageClass);
verifyReceivedMessage.accept(received);
}};
}
Expand Down Expand Up @@ -448,6 +448,10 @@ private ModifyAttribute createModifyAttributeCommand() {
.build());
}

static <T> T fishForMsg(final TestKit testKit, final Class<T> clazz) {
return clazz.cast(testKit.fishForMessage(Duration.ofSeconds(3L), clazz.getSimpleName(), clazz::isInstance));
}

static ThreadSafeDittoLoggingAdapter mockLoggingAdapter() {
final ThreadSafeDittoLoggingAdapter logger = Mockito.mock(ThreadSafeDittoLoggingAdapter.class);
Mockito.when(logger.withMdcEntry(Mockito.any(CharSequence.class), Mockito.nullable(CharSequence.class)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ public void run() {

// transport-layer settlement based on requested-acks alone
if (settleImmediately && !isBadRequest) {
collectorProbe.expectMsg(FiniteDuration.apply(20l, TimeUnit.SECONDS), ResponseCollectorActor.setCount(0));
collectorProbe.expectMsg(FiniteDuration.apply(20L, TimeUnit.SECONDS),
ResponseCollectorActor.setCount(0));
} else if (isBadRequest) {
// bad requests should settle immediately because no command is forwarded
collectorProbe.expectMsgClass(DittoHeaderInvalidException.class);
Expand All @@ -108,7 +109,7 @@ public void run() {

if (!isBadRequest) {
// command forwarded for non-bad requests.
final ModifyThing forwardedModifyThing = expectMsgClass(ModifyThing.class);
final ModifyThing forwardedModifyThing = fishForMsg(this, ModifyThing.class);

// send a response always - MessageMappingProcessorActor should drop it if not wanted.
final Object response = getModifyThingResponse(forwardedModifyThing);
Expand All @@ -123,13 +124,10 @@ public void run() {
if (expectedStatusCode.isPresent()) {
// check published response for expected status
final BaseClientActor.PublishMappedMessage publish =
expectMsgClass(BaseClientActor.PublishMappedMessage.class);
fishForMsg(this, BaseClientActor.PublishMappedMessage.class);
final HttpStatus publishedStatusCode =
((CommandResponse<?>) publish.getOutboundSignal().getSource()).getHttpStatus();
assertThat(publishedStatusCode).isEqualTo(expectedStatusCode.get());
} else {
// check that no response is published
expectNoMessage(Duration.ofMillis(250L));
}
}};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ public void testAggregationOfAcknowledgements() {
ActorRef.noSender()
);

final ModifyAttribute modifyAttribute = expectMsgClass(ModifyAttribute.class);
final ModifyAttribute modifyAttribute = fishForMsg(this, ModifyAttribute.class);
assertThat(modifyAttribute.getDittoHeaders().getAcknowledgementRequests()).isEqualTo(validationSet);
}};
}
Expand Down
2 changes: 2 additions & 0 deletions connectivity/service/src/test/resources/connection-test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ connection {

ack-label-declare-interval = 99s

shutdown-timeout = 7m

mqtt {
# maximum mumber of MQTT messages to buffer in a source (presumably for at-least-once and exactly-once delivery)
source-buffer-size = 7
Expand Down

0 comments on commit 259c394

Please sign in to comment.