Skip to content

Commit

Permalink
use min interval from backoffConfig to create RestartSettings;
Browse files Browse the repository at this point in the history
Signed-off-by: Stefan Maute <stefan.maute@bosch.io>
  • Loading branch information
Stefan Maute committed May 18, 2021
1 parent 8b177c9 commit 82ddf52
Showing 1 changed file with 9 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.thingsearch.service.common.config.PersistenceStreamConfig;
import org.eclipse.ditto.thingsearch.service.common.config.StreamCacheConfig;
import org.eclipse.ditto.thingsearch.service.common.config.StreamConfig;
import org.eclipse.ditto.thingsearch.service.common.config.StreamStageConfig;
import org.eclipse.ditto.thingsearch.service.common.config.UpdaterConfig;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.AbstractWriteModel;
Expand All @@ -32,7 +31,6 @@
import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.dispatch.MessageDispatcher;
import akka.stream.Attributes;
import akka.stream.KillSwitch;
import akka.stream.KillSwitches;
Expand Down Expand Up @@ -92,19 +90,19 @@ public static SearchUpdaterStream of(final UpdaterConfig updaterConfig,
final MongoDatabase database,
final BlockedNamespaces blockedNamespaces) {

final StreamConfig streamConfig = updaterConfig.getStreamConfig();
final var streamConfig = updaterConfig.getStreamConfig();

final StreamCacheConfig cacheConfig = streamConfig.getCacheConfig();
final String dispatcherName = cacheConfig.getDispatcherName();
final MessageDispatcher messageDispatcher = actorSystem.dispatchers().lookup(dispatcherName);
final var messageDispatcher = actorSystem.dispatchers().lookup(dispatcherName);

final EnforcementFlow enforcementFlow =
final var enforcementFlow =
EnforcementFlow.of(streamConfig, thingsShard, policiesShard, messageDispatcher);

final MongoSearchUpdaterFlow mongoSearchUpdaterFlow = MongoSearchUpdaterFlow.of(database,
final var mongoSearchUpdaterFlow = MongoSearchUpdaterFlow.of(database,
streamConfig.getPersistenceConfig());

final BulkWriteResultAckFlow bulkWriteResultAckFlow = BulkWriteResultAckFlow.of(updaterShard);
final var bulkWriteResultAckFlow = BulkWriteResultAckFlow.of(updaterShard);

return new SearchUpdaterStream(updaterConfig, enforcementFlow, mongoSearchUpdaterFlow, bulkWriteResultAckFlow,
changeQueueActor, blockedNamespaces);
Expand All @@ -129,7 +127,7 @@ public KillSwitch start(final ActorContext actorContext, final boolean withAckno

private Source<Source<AbstractWriteModel, NotUsed>, NotUsed> createRestartSource(
final boolean shouldAcknowledge) {
final StreamConfig streamConfig = updaterConfig.getStreamConfig();
final var streamConfig = updaterConfig.getStreamConfig();
final StreamStageConfig retrievalConfig = streamConfig.getRetrievalConfig();

final Source<Source<AbstractWriteModel, NotUsed>, NotUsed> source =
Expand All @@ -141,13 +139,13 @@ private Source<Source<AbstractWriteModel, NotUsed>, NotUsed> createRestartSource

final var backOffConfig = retrievalConfig.getExponentialBackOffConfig();
return RestartSource.withBackoff(
RestartSettings.create(backOffConfig.getMax(), backOffConfig.getMax(), backOffConfig.getRandomFactor()),
RestartSettings.create(backOffConfig.getMin(), backOffConfig.getMax(), backOffConfig.getRandomFactor()),
() -> source);
}

private Sink<Source<AbstractWriteModel, NotUsed>, NotUsed> createRestartSink(
final boolean shouldAcknowledge) {
final StreamConfig streamConfig = updaterConfig.getStreamConfig();
final var streamConfig = updaterConfig.getStreamConfig();
final PersistenceStreamConfig persistenceConfig = streamConfig.getPersistenceConfig();

final int parallelism = persistenceConfig.getParallelism();
Expand All @@ -165,7 +163,7 @@ private Sink<Source<AbstractWriteModel, NotUsed>, NotUsed> createRestartSink(

final var backOffConfig = persistenceConfig.getExponentialBackOffConfig();
return RestartSink.withBackoff(
RestartSettings.create(backOffConfig.getMax(), backOffConfig.getMax(), backOffConfig.getRandomFactor()),
RestartSettings.create(backOffConfig.getMin(), backOffConfig.getMax(), backOffConfig.getRandomFactor()),
() -> sink);
}

Expand Down

0 comments on commit 82ddf52

Please sign in to comment.