Skip to content

Commit

Permalink
Replace fix configurable time to wait before retrying with fibonacci …
Browse files Browse the repository at this point in the history
…increment

* Maximum time to wait between retries is capped to 13 seconds, Minimum is 1

Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Jul 13, 2020
1 parent b1afafa commit fae4b73
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 35 deletions.
Expand Up @@ -13,7 +13,6 @@
package org.eclipse.ditto.client.messaging.internal;

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
Expand All @@ -33,52 +32,65 @@
*/
final class Retry<T> implements Supplier<CompletionStage<T>> {

private static final int[] TIME_TO_WAIT_BETWEEN_RETRIES_IN_SECONDS = new int[]{1, 1, 2, 3, 5, 8, 13};
private static final Logger LOGGER = LoggerFactory.getLogger(Retry.class.getName());

private final String sessionId;
private final String nameOfAction;
private final Supplier<T> retriedSupplier;
private final Duration timeToWaitBeforeRetry;
private final ExecutorService executorService;


private Retry(final String nameOfAction,
final String sessionId,
final Supplier<T> retriedSupplier,
final Duration timeToWaitBeforeRetry,
final ExecutorService executorService) {

this.sessionId = sessionId;
this.nameOfAction = nameOfAction;
this.retriedSupplier = retriedSupplier;
this.timeToWaitBeforeRetry = timeToWaitBeforeRetry;
this.executorService = executorService;
}

private static int ensureIndexIntoTimeToWaitBounds(final int index) {
if (index < 0) {
return 0;
} else if (index >= TIME_TO_WAIT_BETWEEN_RETRIES_IN_SECONDS.length) {
return TIME_TO_WAIT_BETWEEN_RETRIES_IN_SECONDS.length - 1;
} else {
return index;
}
}

/**
* Executes the provided supplier until a result is returned.
*
* @return A completion stage which finally completes with the result of the supplier. Result can be null.
*/
@Override
public CompletionStage<T> get() {
return CompletableFuture.supplyAsync(this::doGet, executorService);
return CompletableFuture.supplyAsync(() -> this.doGet(1), executorService);
}

private T doGet() {
private T doGet(int attempt) {
try {
return retriedSupplier.get();
} catch (final RuntimeException e) {
// log error, but try again (don't end loop)
LOGGER.error("Client <{}>: Failed to <{}>: {}.", sessionId, nameOfAction, e.getMessage());
waitFor(timeToWaitBeforeRetry);
return doGet();
waitFor(getTimeToWaitForAttempt(attempt));
return doGet(attempt + 1);
}
}

private Duration getTimeToWaitForAttempt(int attempt) {
final int attemptIndex = ensureIndexIntoTimeToWaitBounds(attempt - 1);
return Duration.ofSeconds(TIME_TO_WAIT_BETWEEN_RETRIES_IN_SECONDS[attemptIndex]);
}

private void waitFor(final Duration timeToWait) {
try {
LOGGER.info("Client <{}>: Waiting for <{}> seconds before retrying to <{}>.",
LOGGER.info("Client <{}>: Waiting for <{}> second(s) before retrying to <{}>.",
sessionId, timeToWait.getSeconds(), nameOfAction);
TimeUnit.SECONDS.sleep(timeToWait.getSeconds());
} catch (final InterruptedException ie) {
Expand Down Expand Up @@ -133,14 +145,6 @@ interface RetryBuilderFinal<T> extends Supplier<CompletionStage<T>> {
*/
RetryBuilderFinal<T> withExecutor(final ExecutorService executorService);

/**
* Specifies the time to wait before a retry should be performed.
*
* @param timeToWaitBeforeRetry the time to wait before a retry should be performed.
* @return a new instance of this builder step.
*/
RetryBuilderFinal<T> andWaitBetweenRetriesFor(Duration timeToWaitBeforeRetry);

/**
* Executes the provided supplier unit the supplier returns a result.
*
Expand All @@ -158,50 +162,39 @@ interface RetryBuilderFinal<T> extends Supplier<CompletionStage<T>> {
*/
static class RetryBuilder<T> implements RetryBuilderStep1<T>, RetryBuilderFinal<T> {

private static final Duration DEFAULT_TIME_TO_WAIT_BETWEEN_RETRIES = Duration.ofSeconds(1);

private final String nameOfAction;
private final Supplier<T> retriedSupplier;
private final String sessionId;
private final Duration timeToWaitBeforeRetry;
private final ExecutorService executorService;

private RetryBuilder(final String nameOfAction, final Supplier<T> retriedSupplier) {
this(nameOfAction, retriedSupplier, "", DEFAULT_TIME_TO_WAIT_BETWEEN_RETRIES, ForkJoinPool.commonPool());
this(nameOfAction, retriedSupplier, "", ForkJoinPool.commonPool());
}

private RetryBuilder(final String nameOfAction,
final Supplier<T> retriedSupplier,
final String sessionId,
final Duration timeToWaitBeforeRetry,
final ExecutorService executorService) {

this.nameOfAction = nameOfAction;
this.retriedSupplier = retriedSupplier;
this.sessionId = sessionId;
this.timeToWaitBeforeRetry = timeToWaitBeforeRetry;
this.executorService = executorService;
}

@Override
public RetryBuilderFinal<T> inClientSession(final String sessionId) {
return new RetryBuilder<>(nameOfAction, retriedSupplier, sessionId, timeToWaitBeforeRetry, executorService);
return new RetryBuilder<>(nameOfAction, retriedSupplier, sessionId, executorService);
}

@Override
public RetryBuilderFinal<T> withExecutor(final ExecutorService executorService) {
return new RetryBuilder<>(nameOfAction, retriedSupplier, sessionId, timeToWaitBeforeRetry, executorService);
return new RetryBuilder<>(nameOfAction, retriedSupplier, sessionId, executorService);
}

@Override
public RetryBuilderFinal<T> andWaitBetweenRetriesFor(final Duration timeToWaitBeforeRetry) {
return new RetryBuilder<>(nameOfAction, retriedSupplier, sessionId, timeToWaitBeforeRetry, executorService);
}


@Override
public CompletionStage<T> get() {
return new Retry<>(nameOfAction, sessionId, retriedSupplier, timeToWaitBeforeRetry, executorService).get();
return new Retry<>(nameOfAction, sessionId, retriedSupplier, executorService).get();
}

}
Expand Down
Expand Up @@ -16,7 +16,6 @@

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;
Expand Down Expand Up @@ -67,7 +66,6 @@ public final class WebSocketMessagingProvider extends WebSocketAdapter implement
private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketMessagingProvider.class);
private static final int CONNECTION_TIMEOUT_MS = 5000;
private static final int RECONNECTION_TIMEOUT_SECONDS = 5;
private static final Duration TIME_BETWEEN_CONNECTION_TRIES = Duration.ofSeconds(1);

private final AdaptableBus adaptableBus;
private final MessagingConfiguration messagingConfiguration;
Expand Down Expand Up @@ -402,7 +400,6 @@ private CompletionStage<WebSocket> connectWithRetries(final Supplier<WebSocket>
.retryTo("initialize WebSocket connection", () -> initiateConnection(webSocket.get()))
.inClientSession(sessionId)
.withExecutor(executorService)
.andWaitBetweenRetriesFor(TIME_BETWEEN_CONNECTION_TRIES)
.get();
}

Expand Down

0 comments on commit fae4b73

Please sign in to comment.