Skip to content

Commit

Permalink
Perform exponential backoff when retrying search updates.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Apr 10, 2022
1 parent c68eb06 commit 1c44efe
Showing 1 changed file with 11 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.eclipse.ditto.base.model.acks.AcknowledgementRequest;
import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.base.service.actors.ShutdownBehaviour;
import org.eclipse.ditto.base.service.config.supervision.ExponentialBackOff;
import org.eclipse.ditto.internal.models.streaming.IdentifiableStreamingMessage;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLogger;
Expand Down Expand Up @@ -81,9 +82,6 @@ public final class ThingUpdater extends AbstractFSMWithStash<ThingUpdater.State,
private static final Counter INCORRECT_PATCH_UPDATE_COUNT = DittoMetrics.counter("search_incorrect_patch_updates");
private static final Counter UPDATE_FAILURE_COUNT = DittoMetrics.counter("search_update_failures");

// TODO: use circuit breaker
private static final Duration RETRY_DELAY = Duration.ofSeconds(10L);

private static final Duration BLOCK_NAMESPACE_SHUTDOWN_DELAY = Duration.ofMinutes(2);

// alias Ditto Shutdown class because FSM shadows it
Expand All @@ -104,6 +102,7 @@ public final class ThingUpdater extends AbstractFSMWithStash<ThingUpdater.State,
private final Flow<Data, Result, NotUsed> flow;
private final Materializer materializer;
private final Duration writeInterval;
private ExponentialBackOff backOff;
private boolean shuttingDown = false;
@Nullable private UniqueKillSwitch killSwitch;

Expand Down Expand Up @@ -153,6 +152,8 @@ private ThingUpdater(final Flow<Data, Result, NotUsed> flow,
this.flow = flow;
materializer = Materializer.createMaterializer(getContext());
writeInterval = config.getUpdaterConfig().getStreamConfig().getWriteInterval();
backOff = ExponentialBackOff.initial(
config.getUpdaterConfig().getStreamConfig().getPersistenceConfig().getExponentialBackOffConfig());

getContext().setReceiveTimeout(config.getUpdaterConfig().getMaxIdleTime());

Expand Down Expand Up @@ -270,7 +271,13 @@ private FSM.State<State, Data> shutdown(final Object trigger, final Data data) {
private void handleTransition(final State previousState, final State nextState) {
switch (nextState) {
case READY, RETRYING -> {
final var delay = nextState == State.READY ? writeInterval : RETRY_DELAY;
final Duration delay;
if (nextState == State.READY) {
delay = writeInterval;
} else {
backOff = backOff.calculateNextBackOff();
delay = backOff.getRestartDelay();
}
startSingleTimer(Control.TICK.name(), Control.TICK, delay);
unstashAll();
}
Expand Down

0 comments on commit 1c44efe

Please sign in to comment.