Skip to content

Commit

Permalink
Add resub function to StreamingSessionActor.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed May 18, 2022
1 parent 03135ef commit 7bb78a6
Show file tree
Hide file tree
Showing 10 changed files with 76 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ private Receive createConnectAndMetricsBehavior() {
final String sessionActorName = getUniqueChildActorName(connect.getConnectionCorrelationId());
final ActorRef streamingSessionActor = getContext().actorOf(
StreamingSessionActor.props(connect, dittoProtocolSub,
commandRouter, streamingConfig.getAcknowledgementConfig(), headerTranslator,
commandRouter, streamingConfig, headerTranslator,
subscriptionManagerProps, jwtValidator, jwtAuthenticationResultProvider),
sessionActorName);
getSender().tell(streamingSessionActor, ActorRef.noSender());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@
import org.eclipse.ditto.gateway.service.streaming.RefreshSession;
import org.eclipse.ditto.gateway.service.streaming.StartStreaming;
import org.eclipse.ditto.gateway.service.streaming.StopStreaming;
import org.eclipse.ditto.gateway.service.util.config.streaming.StreamingConfig;
import org.eclipse.ditto.internal.models.acks.AcknowledgementAggregatorActorStarter;
import org.eclipse.ditto.internal.models.acks.AcknowledgementForwarderActor;
import org.eclipse.ditto.internal.models.acks.config.AcknowledgementConfig;
import org.eclipse.ditto.internal.models.signal.SignalInformationPoint;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
Expand Down Expand Up @@ -103,7 +103,7 @@ final class StreamingSessionActor extends AbstractActorWithTimers {
private final DittoProtocolSub dittoProtocolSub;
private final SourceQueueWithComplete<SessionedJsonifiable> eventAndResponsePublisher;
private final ActorRef commandRouter;
private final AcknowledgementConfig acknowledgementConfig;
private final StreamingConfig streamingConfig;
private final ActorRef subscriptionManager;
private final Set<StreamingType> outstandingSubscriptionAcks;
private final Map<StreamingType, StreamingSession> streamingSessions;
Expand All @@ -119,7 +119,7 @@ final class StreamingSessionActor extends AbstractActorWithTimers {
private StreamingSessionActor(final Connect connect,
final DittoProtocolSub dittoProtocolSub,
final ActorRef commandRouter,
final AcknowledgementConfig acknowledgementConfig,
final StreamingConfig streamingConfig,
final HeaderTranslator headerTranslator,
final Props subscriptionManagerProps,
final JwtValidator jwtValidator,
Expand All @@ -131,14 +131,14 @@ private StreamingSessionActor(final Connect connect,
this.dittoProtocolSub = dittoProtocolSub;
eventAndResponsePublisher = connect.getEventAndResponsePublisher();
this.commandRouter = commandRouter;
this.acknowledgementConfig = acknowledgementConfig;
this.streamingConfig = streamingConfig;
this.jwtValidator = jwtValidator;
this.jwtAuthenticationResultProvider = jwtAuthenticationResultProvider;
outstandingSubscriptionAcks = EnumSet.noneOf(StreamingType.class);
authorizationContext = connect.getConnectionAuthContext();
streamingSessions = new EnumMap<>(StreamingType.class);
ackregatorStarter = AcknowledgementAggregatorActorStarter.of(getContext(),
acknowledgementConfig,
streamingConfig.getAcknowledgementConfig(),
headerTranslator,
null,
ThingModifyCommandAckRequestSetter.getInstance(),
Expand All @@ -149,6 +149,7 @@ private StreamingSessionActor(final Connect connect,
connect.getSessionExpirationTime().ifPresent(this::startSessionTimeout);
subscriptionManager = getContext().actorOf(subscriptionManagerProps, SubscriptionManager.ACTOR_NAME);
declaredAcks = connect.getDeclaredAcknowledgementLabels();
startSubscriptionRefreshTimer();
}

/**
Expand All @@ -157,7 +158,7 @@ private StreamingSessionActor(final Connect connect,
* @param connect the command to start a streaming session.
* @param dittoProtocolSub manager of subscriptions.
* @param commandRouter the actor who distributes incoming commands in the Ditto cluster.
* @param acknowledgementConfig the config to apply for Acknowledgements.
* @param streamingConfig the config to apply for the streaming session.
* @param headerTranslator translates headers from external sources or to external sources.
* @param subscriptionManagerProps Props of the subscription manager for search protocol.
* @param jwtValidator validator of JWT tokens.
Expand All @@ -167,7 +168,7 @@ private StreamingSessionActor(final Connect connect,
static Props props(final Connect connect,
final DittoProtocolSub dittoProtocolSub,
final ActorRef commandRouter,
final AcknowledgementConfig acknowledgementConfig,
final StreamingConfig streamingConfig,
final HeaderTranslator headerTranslator,
final Props subscriptionManagerProps,
final JwtValidator jwtValidator,
Expand All @@ -177,7 +178,7 @@ static Props props(final Connect connect,
connect,
dittoProtocolSub,
commandRouter,
acknowledgementConfig,
streamingConfig,
headerTranslator,
subscriptionManagerProps,
jwtValidator,
Expand Down Expand Up @@ -355,6 +356,7 @@ private Receive createPubSubBehavior() {
})
.match(ConfirmSubscription.class, msg -> confirmSubscription(msg.getStreamingType()))
.match(ConfirmUnsubscription.class, msg -> confirmUnsubscription(msg.getStreamingType()))
.matchEquals(Control.RESUBSCRIBE, this::resubscribe)
.build();
}

Expand Down Expand Up @@ -466,7 +468,7 @@ private Signal<?> startAckForwarder(final Signal<?> signal) {
sender(),
entityIdWithType,
signal,
acknowledgementConfig,
streamingConfig.getAcknowledgementConfig(),
declaredAcks::contains);
} else {
return signal;
Expand Down Expand Up @@ -694,6 +696,20 @@ private void confirmUnsubscription(final StreamingType streamingType) {
logger.debug("Unsubscribed from Cluster <{}> in <{}> session.", streamingType, type);
}

private void startSubscriptionRefreshTimer() {
final var delay = streamingConfig.getSubscriptionRefreshDelay();
final var randomizedDelay = delay.plus(Duration.ofMillis((long) (delay.toMillis() * Math.random())));
timers().startSingleTimer(Control.RESUBSCRIBE, Control.RESUBSCRIBE, randomizedDelay);
}

private void resubscribe(final Control trigger) {
if (!streamingSessions.isEmpty() && outstandingSubscriptionAcks.isEmpty()) {
dittoProtocolSub.subscribe(streamingSessions.keySet(), authorizationContext.getAuthorizationSubjectIds(),
getSelf(), null, true);
}
startSubscriptionRefreshTimer();
}

private static Optional<DittoHeaderInvalidException> checkForAcksWithoutResponse(final Signal<?> signal) {
final var dittoHeaders = signal.getDittoHeaders();
if (!dittoHeaders.isResponseRequired() && !dittoHeaders.getAcknowledgementRequests().isEmpty()) {
Expand Down Expand Up @@ -750,7 +766,8 @@ private ConfirmUnsubscription(final StreamingType streamingType) {

private enum Control {
TERMINATED,
SESSION_TERMINATION
SESSION_TERMINATION,
RESUBSCRIBE
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public final class DefaultStreamingConfig implements StreamingConfig {
private final int parallelism;
private final AcknowledgementConfig acknowledgementConfig;
private final Duration searchIdleTimeout;
private final Duration subscriptionRefreshDelay;
private final WebsocketConfig websocketConfig;
private final SseConfig sseConfig;
private final GatewaySignalEnrichmentConfig signalEnrichmentConfig;
Expand All @@ -44,6 +45,8 @@ private DefaultStreamingConfig(final ScopedConfig scopedConfig) {
parallelism = scopedConfig.getPositiveIntOrThrow(StreamingConfigValue.PARALLELISM);
acknowledgementConfig = DefaultAcknowledgementConfig.of(scopedConfig);
searchIdleTimeout = scopedConfig.getNonNegativeDurationOrThrow(StreamingConfigValue.SEARCH_IDLE_TIMEOUT);
subscriptionRefreshDelay =
scopedConfig.getNonNegativeDurationOrThrow(StreamingConfigValue.SUBSCRIPTION_REFRESH_DELAY);
websocketConfig = DefaultWebsocketConfig.of(scopedConfig);
sseConfig = DefaultSseConfig.of(scopedConfig);
signalEnrichmentConfig = DefaultGatewaySignalEnrichmentConfig.of(scopedConfig);
Expand Down Expand Up @@ -96,6 +99,11 @@ public Duration getSearchIdleTimeout() {
return searchIdleTimeout;
}

@Override
public Duration getSubscriptionRefreshDelay() {
return subscriptionRefreshDelay;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand All @@ -107,6 +115,7 @@ public boolean equals(final Object o) {
final DefaultStreamingConfig that = (DefaultStreamingConfig) o;
return parallelism == that.parallelism &&
Objects.equals(searchIdleTimeout, that.searchIdleTimeout) &&
Objects.equals(subscriptionRefreshDelay, that.subscriptionRefreshDelay) &&
Objects.equals(sessionCounterScrapeInterval, that.sessionCounterScrapeInterval) &&
Objects.equals(signalEnrichmentConfig, that.signalEnrichmentConfig) &&
Objects.equals(acknowledgementConfig, that.acknowledgementConfig) &&
Expand All @@ -117,7 +126,7 @@ public boolean equals(final Object o) {
@Override
public int hashCode() {
return Objects.hash(parallelism, sessionCounterScrapeInterval, signalEnrichmentConfig, acknowledgementConfig,
websocketConfig, sseConfig, searchIdleTimeout);
websocketConfig, sseConfig, searchIdleTimeout, subscriptionRefreshDelay);
}

@Override
Expand All @@ -126,6 +135,7 @@ public String toString() {
"sessionCounterScrapeInterval=" + sessionCounterScrapeInterval +
", parallelism=" + parallelism +
", searchIdleTimeout=" + searchIdleTimeout +
", subscriptionRefreshDelay=" + subscriptionRefreshDelay +
", signalEnrichmentConfig=" + signalEnrichmentConfig +
", acknowledgementConfig=" + acknowledgementConfig +
", websocketConfig=" + websocketConfig +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@ public interface StreamingConfig {
*/
Duration getSearchIdleTimeout();

/**
* Returns the minimum delay before refreshing the Ditto pubsub subscriptions of a stream.
*
* @return the minimum delay.
*/
Duration getSubscriptionRefreshDelay();

/**
* Render this object into a Config object from which a copy of this object can be constructed.
*
Expand All @@ -94,6 +101,7 @@ default Config render() {
getSessionCounterScrapeInterval().toMillis() + "ms");
map.put(StreamingConfigValue.PARALLELISM.getConfigPath(), getParallelism());
map.put(StreamingConfigValue.SEARCH_IDLE_TIMEOUT.getConfigPath(), getSearchIdleTimeout());
map.put(StreamingConfigValue.SUBSCRIPTION_REFRESH_DELAY.getConfigPath(), getSubscriptionRefreshDelay());
return ConfigFactory.parseMap(map)
.withFallback(getWebsocketConfig().render())
.withFallback(getSignalEnrichmentConfig().render())
Expand All @@ -119,7 +127,12 @@ enum StreamingConfigValue implements KnownConfigValue {
/**
* How long to wait before closing an idle search stream.
*/
SEARCH_IDLE_TIMEOUT("search-idle-timeout", Duration.ofSeconds(45));
SEARCH_IDLE_TIMEOUT("search-idle-timeout", Duration.ofSeconds(45)),

/**
* Minimum delay before refreshing the Ditto pubsub subscriptions of a stream.
*/
SUBSCRIPTION_REFRESH_DELAY("subscription-refresh-delay", Duration.ofMinutes(5));

private final String path;
private final Object defaultValue;
Expand Down
3 changes: 3 additions & 0 deletions gateway/service/src/main/resources/gateway.conf
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ ditto {
search-idle-timeout = 60s
search-idle-timeout = ${?GATEWAY_STREAMING_SEARCH_IDLE_TIMEOUT}

subscription-refresh-delay = 5m
subscription-refresh-delay = ${?GATEWAY_STREAMING_SUBSCRIPTION_REFRESH_DELAY}

signal-enrichment {
# indicates whether caching should be used for signal enrichment.
caching-enabled = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,16 @@
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.base.model.json.Jsonifiable;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.gateway.service.security.authentication.jwt.JwtAuthenticationResultProvider;
import org.eclipse.ditto.gateway.service.security.authentication.jwt.JwtValidator;
import org.eclipse.ditto.things.model.Thing;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.protocol.HeaderTranslator;
import org.eclipse.ditto.gateway.service.streaming.Connect;
import org.eclipse.ditto.gateway.service.streaming.IncomingSignal;
import org.eclipse.ditto.internal.models.acks.config.DefaultAcknowledgementConfig;
import org.eclipse.ditto.gateway.service.util.config.streaming.DefaultStreamingConfig;
import org.eclipse.ditto.internal.utils.pubsub.DittoProtocolSub;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.protocol.HeaderTranslator;
import org.eclipse.ditto.things.model.Thing;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.commands.exceptions.ThingNotAccessibleException;
import org.eclipse.ditto.things.model.signals.commands.modify.ModifyThing;
import org.eclipse.ditto.things.model.signals.commands.modify.ModifyThingResponse;
Expand Down Expand Up @@ -177,7 +177,7 @@ private ActorRef createStreamingSessionActor() {
new Connect(sourceQueue, "connectionCorrelationId", "ws",
JsonSchemaVersion.V_2, null, Set.of(), AuthorizationModelFactory.emptyAuthContext());
final Props props = StreamingSessionActor.props(connect, dittoProtocolSub, commandRouterProbe.ref(),
DefaultAcknowledgementConfig.of(ConfigFactory.empty()), HeaderTranslator.empty(),
DefaultStreamingConfig.of(ConfigFactory.empty()), HeaderTranslator.empty(),
Props.create(TestProbeForwarder.class, subscriptionManagerProbe), Mockito.mock(JwtValidator.class),
Mockito.mock(JwtAuthenticationResultProvider.class));
final ActorRef createdActor = actorSystem.actorOf(props);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import org.eclipse.ditto.gateway.service.streaming.Jwt;
import org.eclipse.ditto.gateway.service.streaming.StartStreaming;
import org.eclipse.ditto.gateway.service.streaming.StreamingAck;
import org.eclipse.ditto.internal.models.acks.config.DefaultAcknowledgementConfig;
import org.eclipse.ditto.gateway.service.util.config.streaming.DefaultStreamingConfig;
import org.eclipse.ditto.internal.utils.akka.ActorSystemResource;
import org.eclipse.ditto.internal.utils.pubsub.DittoProtocolSub;
import org.eclipse.ditto.internal.utils.pubsub.StreamingType;
Expand Down Expand Up @@ -373,7 +373,7 @@ private Props getProps(final String... declaredAcks) {
return StreamingSessionActor.props(getConnect(getAcknowledgementLabels(declaredAcks)),
mockSub,
commandRouterProbe.ref(),
DefaultAcknowledgementConfig.of(ConfigFactory.empty()),
DefaultStreamingConfig.of(ConfigFactory.empty()),
HeaderTranslator.empty(),
Props.create(Actor.class, () -> new TestActor(new LinkedBlockingDeque<>())),
mockValidator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ public void underTestReturnsDefaultValuesIfBaseConfigWasEmpty() {
softly.assertThat(underTest.getSearchIdleTimeout())
.as(StreamingConfig.StreamingConfigValue.SEARCH_IDLE_TIMEOUT.getConfigPath())
.isEqualTo(StreamingConfig.StreamingConfigValue.SEARCH_IDLE_TIMEOUT.getDefaultValue());
softly.assertThat(underTest.getSubscriptionRefreshDelay())
.as(StreamingConfig.StreamingConfigValue.SUBSCRIPTION_REFRESH_DELAY.getConfigPath())
.isEqualTo(StreamingConfig.StreamingConfigValue.SUBSCRIPTION_REFRESH_DELAY.getDefaultValue());
}

@Test
Expand All @@ -86,6 +89,9 @@ public void underTestReturnsValuesOfConfigFile() {
softly.assertThat(underTest.getSearchIdleTimeout())
.as(StreamingConfig.StreamingConfigValue.SEARCH_IDLE_TIMEOUT.getConfigPath())
.isEqualTo(Duration.ofHours(7L));
softly.assertThat(underTest.getSubscriptionRefreshDelay())
.as(StreamingConfig.StreamingConfigValue.SUBSCRIPTION_REFRESH_DELAY.getConfigPath())
.isEqualTo(Duration.ofHours(8));
softly.assertThat(underTest.getSignalEnrichmentConfig().isCachingEnabled())
.as(GatewaySignalEnrichmentConfig.CachingSignalEnrichmentFacadeConfigValue.CACHING_ENABLED.getConfigPath())
.isFalse();
Expand Down
2 changes: 2 additions & 0 deletions gateway/service/src/test/resources/streaming-test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ streaming {

search-idle-timeout = 7h

subscription-refresh-delay = 8h

signal-enrichment {
caching-enabled = false
ask-timeout = 20s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,10 @@ private void unsubscribe(final Unsubscribe unsubscribe) {

private void checkForLostSubscriber(final Subscribe subscribe, final boolean changed) {
if (subscribe.isResubscribe() && changed) {
log().error("Subscriber was missing from Ditto Pubsub: <{}>", subscribe.getSubscriber());
log().error("[RESUB] Subscriber was missing: <{}>", subscribe.getSubscriber());
errorCounter++;
} else if (subscribe.isResubscribe()) {
log().debug("[RESUB] Refreshed subscriber <{}>", subscribe.getSubscriber());
}
}

Expand Down

0 comments on commit 7bb78a6

Please sign in to comment.