Skip to content

Commit

Permalink
Add resubscription to BaseClientActor.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed May 18, 2022
1 parent 6b6ae4b commit 03135ef
Show file tree
Hide file tree
Showing 23 changed files with 117 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,14 @@ public interface ClientConfig {
*/
Duration getClientActorRefsNotificationDelay();

/**
* Min delay to refresh each client actor's knowledge of other client actors when client count of the connection
* is above 1. The actual delay is random between 1x and 2x this value.
*
* @return the subscription refresh interval.
*/
Duration getSubscriptionRefreshDelay();

/**
* An enumeration of the known config path expressions and their associated default values for {@code ClientConfig}.
*/
Expand Down Expand Up @@ -161,7 +169,12 @@ enum ClientConfigValue implements KnownConfigValue {
/**
* See documentation on {@link ClientConfig#getClientActorRefsNotificationDelay()}.
*/
CLIENT_ACTOR_REFS_NOTIFICATION_DELAY("client-actor-refs-notification-delay", Duration.ofMinutes(5L));
CLIENT_ACTOR_REFS_NOTIFICATION_DELAY("client-actor-refs-notification-delay", Duration.ofMinutes(5L)),

/**
* See documentation on {@link ClientConfig#getSubscriptionRefreshDelay()}.
*/
SUBSCRIPTION_REFRESH_DELAY("subscription-refresh-delay", Duration.ofMinutes(5L));

private final String path;
private final Object defaultValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public final class DefaultClientConfig implements ClientConfig {
private final Duration minBackoff;
private final Duration maxBackoff;
private final Duration clientActorRefsNotificationDelay;
private final Duration subscriptionRefreshDelay;

private DefaultClientConfig(final ScopedConfig config) {
initTimeout = config.getNonNegativeDurationOrThrow(ClientConfigValue.INIT_TIMEOUT);
Expand All @@ -59,6 +60,7 @@ private DefaultClientConfig(final ScopedConfig config) {
maxBackoff = config.getNonNegativeAndNonZeroDurationOrThrow(ClientConfigValue.MAX_BACKOFF);
clientActorRefsNotificationDelay =
config.getNonNegativeDurationOrThrow(ClientConfigValue.CLIENT_ACTOR_REFS_NOTIFICATION_DELAY);
subscriptionRefreshDelay = config.getNonNegativeDurationOrThrow(ClientConfigValue.SUBSCRIPTION_REFRESH_DELAY);
}

/**
Expand Down Expand Up @@ -112,6 +114,11 @@ public Duration getClientActorRefsNotificationDelay() {
return clientActorRefsNotificationDelay;
}

@Override
public Duration getSubscriptionRefreshDelay() {
return subscriptionRefreshDelay;
}

@Override
public Duration getMinBackoff() {
return minBackoff;
Expand Down Expand Up @@ -141,14 +148,15 @@ public boolean equals(@Nullable final Object o) {
Objects.equals(minBackoff, that.minBackoff) &&
Objects.equals(maxBackoff, that.maxBackoff) &&
Objects.equals(subscriptionManagerTimeout, that.subscriptionManagerTimeout) &&
Objects.equals(clientActorRefsNotificationDelay, that.clientActorRefsNotificationDelay);
Objects.equals(clientActorRefsNotificationDelay, that.clientActorRefsNotificationDelay) &&
Objects.equals(subscriptionRefreshDelay, that.subscriptionRefreshDelay);
}

@Override
public int hashCode() {
return Objects.hash(initTimeout, connectingMinTimeout, connectingMaxTimeout, disconnectingMaxTimeout,
disconnectAnnouncementTimeout, connectingMaxTries, testingTimeout, minBackoff, maxBackoff,
subscriptionManagerTimeout, clientActorRefsNotificationDelay);
subscriptionManagerTimeout, clientActorRefsNotificationDelay, subscriptionRefreshDelay);
}

@Override
Expand All @@ -165,6 +173,7 @@ public String toString() {
", maxBackoff=" + maxBackoff +
", subscriptionManagerTimeout=" + subscriptionManagerTimeout +
", clientActorRefsNotificationDelay=" + clientActorRefsNotificationDelay +
", subscriptionRefreshDelay=" + subscriptionRefreshDelay +
"]";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@
import akka.actor.SupervisorStrategy;
import akka.actor.Terminated;
import akka.cluster.pubsub.DistributedPubSub;
import akka.http.javadsl.ConnectionContext;
import akka.japi.Pair;
import akka.japi.pf.DeciderBuilder;
import akka.japi.pf.FSMStateFunctionBuilder;
Expand All @@ -165,7 +164,6 @@ public abstract class BaseClientActor extends AbstractFSMWithStash<BaseClientSta
protected final ThreadSafeDittoLoggingAdapter logger;
protected static final Status.Success DONE = new Status.Success(Done.getInstance());

protected ConnectionContext connectionContext;
protected final ConnectionLogger connectionLogger;
protected final ConnectivityStatusResolver connectivityStatusResolver;

Expand Down Expand Up @@ -654,6 +652,7 @@ private void onTransition(final BaseClientState from, final BaseClientState to)
if (to == CONNECTED || to == DISCONNECTED || to == INITIALIZED) {
cancelStateTimeout();
}
maintainResubscribeTimer(to);
}

private void publishConnectionOpenedAnnouncement() {
Expand Down Expand Up @@ -737,7 +736,8 @@ protected FSMStateFunctionBuilder<BaseClientState, BaseClientData> inConnectedSt
.event(CloseConnectionAndShutdown.class, this::closeConnectionAndShutdown)
.event(SshTunnelActor.TunnelClosed.class, this::tunnelClosed)
.event(OpenConnection.class, this::connectionAlreadyOpen)
.event(ConnectionFailure.class, this::connectedConnectionFailed);
.event(ConnectionFailure.class, this::connectedConnectionFailed)
.eventEquals(Control.RESUBSCRIBE, this::resubscribe);
}

@Nullable
Expand Down Expand Up @@ -1852,6 +1852,12 @@ private FSM.State<BaseClientState, BaseClientData> forwardThingSearchCommand(fin
return stay();
}

private FSM.State<BaseClientState, BaseClientData> resubscribe(final Control trigger, final BaseClientData data) {
subscribeAndDeclareAcknowledgementLabels(dryRun, true);
startSubscriptionRefreshTimer();
return stay();
}

protected boolean isDryRun() {
return dryRun;
}
Expand Down Expand Up @@ -1965,7 +1971,7 @@ private CompletionStage<Void> subscribeAndDeclareAcknowledgementLabels(final boo
final String group = getPubsubGroup();
final CompletionStage<Void> subscribe = subscribeToStreamingTypes(group, resubscribe);
final CompletionStage<Void> declare =
dittoProtocolSub.declareAcknowledgementLabels(getDeclaredAcks(), getSelf(), group, resubscribe);
dittoProtocolSub.declareAcknowledgementLabels(getDeclaredAcks(), getSelf(), group);
return declare.thenCompose(unused -> subscribe);
}
}
Expand Down Expand Up @@ -2005,6 +2011,20 @@ private Set<StreamingType> getUniqueStreamingTypes() {
.collect(Collectors.toSet());
}

private void maintainResubscribeTimer(final BaseClientState nextState) {
if (nextState == CONNECTED) {
startSubscriptionRefreshTimer();
} else {
cancelTimer(Control.RESUBSCRIBE.name());
}
}

private void startSubscriptionRefreshTimer() {
final var delay = connectivityConfig.getClientConfig().getSubscriptionRefreshDelay();
final var randomizedDelay = delay.plus(Duration.ofMillis((long) (delay.toMillis() * Math.random())));
startSingleTimer(Control.RESUBSCRIBE.name(), Control.RESUBSCRIBE, randomizedDelay);
}

private static Optional<StreamingType> toStreamingTypes(final Topic topic) {
switch (topic) {
case POLICY_ANNOUNCEMENTS:
Expand Down Expand Up @@ -2223,7 +2243,8 @@ public String toString() {
private enum Control {
INIT_COMPLETE,
CONNECT_AFTER_TUNNEL_ESTABLISHED,
GOTO_CONNECTED_AFTER_INITIALIZATION
GOTO_CONNECTED_AFTER_INITIALIZATION,
RESUBSCRIBE
}

private static final Object SEND_DISCONNECT_ANNOUNCEMENT = new Object();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private static Duration randomize(final Duration base) {
}

@Override
public void preStart() throws Exception {
public void preStart() {
when(AGGREGATED, inAggregatedState());
when(AGGREGATING, inAggregatingState());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1045,10 +1045,11 @@ private void startClientActorsIfRequired(final int clientCount, final DittoHeade
new ClusterRouterPool(pool, clusterRouterPoolSettings).props(props);

// start client actor without name so it does not conflict with its previous incarnation
clientActorRouter = getContext().actorOf(clusterRouterPoolProps);
final var routerPool = getContext().actorOf(clusterRouterPoolProps);
clientActorRouter = routerPool;
if (clientCount > 1) {
clientActorRefsAggregationActor = getContext().actorOf(
ClientActorRefsAggregationActor.props(clientCount, getSelf(), clientActorRouter,
ClientActorRefsAggregationActor.props(clientCount, getSelf(), routerPool,
connectivityConfig.getClientConfig().getClientActorRefsNotificationDelay(),
clientActorAskTimeout));
}
Expand Down
3 changes: 3 additions & 0 deletions connectivity/service/src/main/resources/connectivity.conf
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,9 @@ ditto {
max-backoff = ${?CONNECTIVITY_CLIENT_MAX_BACKOFF}
# How often to refresh the cache of other client actors if client count > 1
client-actor-refs-notification-delay = 5m
# How often to refresh Ditto pubsub subscriptions
subscription-refresh-delay = 5m
subscription-refresh-delay = ${?CONNECTIVITY_CLIENT_SUBSCRIPTION_REFRESH_INTERVAL}
}

monitoring {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ public void underTestReturnsDefaultValuesIfBaseConfigWasEmpty() {
softly.assertThat(underTest.getClientActorRefsNotificationDelay())
.as(ClientConfig.ClientConfigValue.CLIENT_ACTOR_REFS_NOTIFICATION_DELAY.getConfigPath())
.isEqualTo(ClientConfig.ClientConfigValue.CLIENT_ACTOR_REFS_NOTIFICATION_DELAY.getDefaultValue());
softly.assertThat(underTest.getSubscriptionRefreshDelay())
.as(ClientConfig.ClientConfigValue.SUBSCRIPTION_REFRESH_DELAY.getConfigPath())
.isEqualTo(ClientConfig.ClientConfigValue.SUBSCRIPTION_REFRESH_DELAY.getDefaultValue());
}

@Test
Expand Down Expand Up @@ -122,6 +125,9 @@ public void underTestReturnsValuesOfConfigFile() {
softly.assertThat(underTest.getClientActorRefsNotificationDelay())
.as(ClientConfig.ClientConfigValue.CLIENT_ACTOR_REFS_NOTIFICATION_DELAY.getConfigPath())
.isEqualTo(Duration.ofMinutes(8L));
softly.assertThat(underTest.getSubscriptionRefreshDelay())
.as(ClientConfig.ClientConfigValue.SUBSCRIPTION_REFRESH_DELAY.getConfigPath())
.isEqualTo(Duration.ofMinutes(9L));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,9 @@ public CompletionStage<Void> removePolicyAnnouncementSubscriber(final ActorRef s
@Override
public CompletionStage<Void> declareAcknowledgementLabels(
final Collection<AcknowledgementLabel> acknowledgementLabels, final ActorRef subscriber,
@Nullable final String group, final boolean resubscribe) {
@Nullable final String group) {
if (delegate != null) {
return delegate.declareAcknowledgementLabels(acknowledgementLabels, subscriber, group, resubscribe);
return delegate.declareAcknowledgementLabels(acknowledgementLabels, subscriber, group);
} else {
return CompletableFuture.completedStage(null);
}
Expand Down
1 change: 1 addition & 0 deletions connectivity/service/src/test/resources/client-test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ client {
min-backoff = 6s
max-backoff = 7s
client-actor-refs-notification-delay = 8m
subscription-refresh-delay = 9m
}
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ private void refreshWebSocketSession(final Jwt jwt) {
private void declareAcknowledgementLabels(final Collection<AcknowledgementLabel> acknowledgementLabels) {
final ActorRef self = getSelf();
logger.info("Declaring acknowledgement labels <{}>", acknowledgementLabels);
dittoProtocolSub.declareAcknowledgementLabels(acknowledgementLabels, self, null, false)
dittoProtocolSub.declareAcknowledgementLabels(acknowledgementLabels, self, null)
.thenAccept(unused -> logger.info("Acknowledgement label declaration successful for labels: <{}>",
acknowledgementLabels))
.exceptionally(error -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.when;

import java.time.Duration;
Expand Down Expand Up @@ -140,7 +139,7 @@ public static void shutdown() {

@Before
public void setup() {
when(dittoProtocolSub.declareAcknowledgementLabels(any(), any(), any(), anyBoolean()))
when(dittoProtocolSub.declareAcknowledgementLabels(any(), any(), any()))
.thenReturn(CompletableFuture.completedFuture(null));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
package org.eclipse.ditto.gateway.service.streaming.actors;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyBoolean;

import java.time.Duration;
import java.time.Instant;
Expand Down Expand Up @@ -124,7 +123,7 @@ public static void beforeClass() {
public void before() {
commandRouterProbe = actorSystemResource.newTestProbe("commandRouter");

Mockito.when(mockSub.declareAcknowledgementLabels(Mockito.any(), Mockito.any(), Mockito.any(), anyBoolean()))
Mockito.when(mockSub.declareAcknowledgementLabels(Mockito.any(), Mockito.any(), Mockito.any()))
.thenReturn(CompletableFuture.completedFuture(null));

final Sink<SessionedJsonifiable, TestSubscriber.Probe<SessionedJsonifiable>> sink =
Expand Down Expand Up @@ -382,7 +381,7 @@ private Props getProps(final String... declaredAcks) {
}

private void onDeclareAckLabels(final CompletionStage<Void> answer) {
Mockito.when(mockSub.declareAcknowledgementLabels(Mockito.any(), Mockito.any(), Mockito.any(), anyBoolean()))
Mockito.when(mockSub.declareAcknowledgementLabels(Mockito.any(), Mockito.any(), Mockito.any()))
.thenReturn(answer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ public interface DistributedAcks extends Extension {
* @return a future SubAck if the declaration succeeded, or a failed future if it failed.
*/
CompletionStage<AcksDeclared> declareAcknowledgementLabels(
Collection<AcknowledgementLabel> acknowledgementLabels, ActorRef subscriber, @Nullable String group,
final boolean resubscribe);
Collection<AcknowledgementLabel> acknowledgementLabels, ActorRef subscriber, @Nullable String group);

/**
* Declare labels of acknowledgements that a subscriber may send.
Expand All @@ -82,7 +81,7 @@ default CompletionStage<AcksDeclared> declareAcknowledgementLabels(
Collection<AcknowledgementLabel> acknowledgementLabels,
ActorRef subscriber) {

return declareAcknowledgementLabels(acknowledgementLabels, subscriber, null, false);
return declareAcknowledgementLabels(acknowledgementLabels, subscriber, null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void removeSubscriber(final ActorRef subscriber) {
@Override
public CompletionStage<AcksDeclared> declareAcknowledgementLabels(
final Collection<AcknowledgementLabel> acknowledgementLabels, final ActorRef subscriber,
@Nullable final String group, final boolean resubscribe) {
@Nullable final String group) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,21 +104,21 @@ public CompletionStage<AcksDeclared> declareAcknowledgementLabels(
.map(AcknowledgementLabel::toString)
.collect(Collectors.toSet());
final AckRequest request =
DeclareAcks.of(subscriber, null, ackLabelStrings, false);
DeclareAcks.of(subscriber, null, ackLabelStrings);
return askSupervisor(request);
}

@Override
public CompletionStage<AcksDeclared> declareAcknowledgementLabels(
final Collection<AcknowledgementLabel> acknowledgementLabels, final ActorRef subscriber,
@Nullable final String group, final boolean resubscribe) {
@Nullable final String group) {
if (group != null) {
ConditionChecker.checkNotEmpty(group, "group");
}
final Set<String> ackLabelStrings = acknowledgementLabels.stream()
.map(AcknowledgementLabel::toString)
.collect(Collectors.toSet());
return askSupervisor(DeclareAcks.of(subscriber, group, ackLabelStrings, resubscribe));
return askSupervisor(DeclareAcks.of(subscriber, group, ackLabelStrings));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ CompletionStage<Void> updateLiveSubscriptions(Collection<StreamingType> types, C
* {@code AcknowledgementLabelNotUniqueException} later.
*/
CompletionStage<Void> declareAcknowledgementLabels(Collection<AcknowledgementLabel> acknowledgementLabels,
ActorRef subscriber, @Nullable String group, final boolean resubscribe);
ActorRef subscriber, @Nullable String group);

/**
* Relinquish any acknowledgement labels declared by a subscriber.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,7 @@ public CompletionStage<Void> removePolicyAnnouncementSubscriber(final ActorRef s
public CompletionStage<Void> declareAcknowledgementLabels(
final Collection<AcknowledgementLabel> acknowledgementLabels,
final ActorRef subscriber,
@Nullable final String group,
final boolean resubscribe) {
@Nullable final String group) {
if (acknowledgementLabels.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
Expand All @@ -131,7 +130,7 @@ public CompletionStage<Void> declareAcknowledgementLabels(
// via the actor supervision strategy
ensureAcknowledgementLabelsAreFullyResolved(acknowledgementLabels);

return distributedAcks.declareAcknowledgementLabels(acknowledgementLabels, subscriber, group, resubscribe)
return distributedAcks.declareAcknowledgementLabels(acknowledgementLabels, subscriber, group)
.thenApply(ack -> null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public final class AckUpdater extends AbstractActorWithTimers implements Cluster
*/
public static final String ACTOR_NAME_PREFIX = "ackUpdater";

protected final ThreadSafeDittoLoggingAdapter log = DittoLoggerFactory.getThreadSafeDittoLoggingAdapter(this);
private final ThreadSafeDittoLoggingAdapter log = DittoLoggerFactory.getThreadSafeDittoLoggingAdapter(this);

private final GroupedRelation<ActorRef, String> localAckLabels;
private final Address ownAddress;
Expand Down
Loading

0 comments on commit 03135ef

Please sign in to comment.