Skip to content

Commit

Permalink
Cap max RetryableAction wait time/timeout. (#74940) (#76152)
Browse files Browse the repository at this point in the history
RetryableAction uses randomized and exponential back off. If unlucky,
the randomization would cause a series of very short waits, which would
double the bound every time, risking a subsequent very long wait. Now
randomize between [bound/2, bound[.

Closes #70996
  • Loading branch information
henningandersen committed Aug 5, 2021
1 parent 54f28ee commit 7dd1a22
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,12 @@ public void onRejection(Exception e) {

public abstract boolean shouldRetry(Exception e);

protected long calculateDelay(long previousDelay) {
return Math.min(previousDelay * 2, Integer.MAX_VALUE);
protected long calculateDelayBound(long previousDelayBound) {
return Math.min(previousDelayBound * 2, Integer.MAX_VALUE);
}

protected long minimumDelayMillis() {
return 1L;
return 0L;
}

public void onFinished() {
Expand Down Expand Up @@ -145,10 +145,12 @@ public void onFailure(Exception e) {
} else {
addException(e);

final long nextDelayMillisBound = calculateDelay(delayMillisBound);
final long nextDelayMillisBound = calculateDelayBound(delayMillisBound);
final RetryingListener retryingListener = new RetryingListener(nextDelayMillisBound, caughtExceptions);
final Runnable runnable = createRunnable(retryingListener);
final long delayMillis = Randomness.get().nextInt(Math.toIntExact(delayMillisBound)) + minimumDelayMillis();
int range = Math.toIntExact((delayMillisBound + 1) / 2);
final long delayMillis = Randomness.get().nextInt(range) + delayMillisBound - range + 1L;
assert delayMillis > 0;
if (isDone.get() == false) {
final TimeValue delay = TimeValue.timeValueMillis(delayMillis);
logger.debug(() -> new ParameterizedMessage("retrying action that failed in {}", delay), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void testRetryableActionTimeout() {
final AtomicInteger retryCount = new AtomicInteger();
final PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
final RetryableAction<Boolean> retryableAction = new RetryableAction<Boolean>(logger, taskQueue.getThreadPool(),
TimeValue.timeValueMillis(10), TimeValue.timeValueSeconds(1), future) {
TimeValue.timeValueMillis(randomFrom(1, 10, randomIntBetween(100, 2000))), TimeValue.timeValueSeconds(1), future) {

@Override
public void tryAction(ActionListener<Boolean> listener) {
Expand All @@ -122,6 +122,7 @@ public boolean shouldRetry(Exception e) {
return e instanceof EsRejectedExecutionException;
}
};
long begin = taskQueue.getCurrentTimeMillis();
retryableAction.run();
taskQueue.runAllRunnableTasks();
long previousDeferredTime = 0;
Expand All @@ -136,6 +137,10 @@ public boolean shouldRetry(Exception e) {
assertFalse(taskQueue.hasRunnableTasks());

expectThrows(EsRejectedExecutionException.class, future::actionGet);

long end = taskQueue.getCurrentTimeMillis();
// max 3x timeout since we minimum wait half the bound for every retry.
assertThat(end - begin, lessThanOrEqualTo(3000L));
}

public void testTimeoutOfZeroMeansNoRetry() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,6 @@ private abstract class MlRetryableAction<Request, Response> extends RetryableAct
final Consumer<String> msgHandler;
final BiConsumer<Request, ActionListener<Response>> action;
volatile int currentAttempt = 0;
volatile long currentMin = MIN_RETRY_SLEEP_MILLIS;
volatile long currentMax = MIN_RETRY_SLEEP_MILLIS;

MlRetryableAction(String jobId,
Expand Down Expand Up @@ -453,30 +452,21 @@ public boolean shouldRetry(Exception e) {
}

@Override
protected long calculateDelay(long previousDelay) {
// Since we exponentially increase, we don't want force randomness to have an excessively long sleep
if (currentMax < MAX_RETRY_SLEEP_MILLIS) {
currentMin = currentMax;
}
protected long calculateDelayBound(long previousDelayBound) {
// Exponential backoff calculation taken from: https://en.wikipedia.org/wiki/Exponential_backoff
int uncappedBackoff = ((1 << Math.min(currentAttempt, MAX_RETRY_EXPONENT)) - 1) * (50);
currentMax = Math.min(uncappedBackoff, MAX_RETRY_SLEEP_MILLIS);
// Its good to have a random window along the exponentially increasing curve
// so that not all bulk requests rest for the same amount of time
int randBound = (int)(1 + (currentMax - currentMin));
String msg = new ParameterizedMessage(
"failed to {} after [{}] attempts. Will attempt again.",
getName(),
currentAttempt)
.getFormattedMessage();
LOGGER.warn(() -> new ParameterizedMessage("[{}] {}", jobId, msg));
msgHandler.accept(msg);
return randBound;
}

@Override
protected long minimumDelayMillis() {
return currentMin;
// RetryableAction randomizes in the interval [currentMax/2 ; currentMax].
// Its good to have a random window along the exponentially increasing curve
// so that not all bulk requests rest for the same amount of time
return currentMax;
}

@Override
Expand Down

0 comments on commit 7dd1a22

Please sign in to comment.