Skip to content

Commit

Permalink
Reduce code duplication
Browse files Browse the repository at this point in the history
* Extract ExponentialBackOff into separate class and reuse it in both
  locations

Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Sep 6, 2021
1 parent f4ae05d commit 27fedfb
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 120 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.base.service.config.supervision;

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ThreadLocalRandom;

/**
* Responsible to calculate a restart delay with using exponential back off.
*/
public final class ExponentialBackOff {

private final Instant lastRestart = Instant.now();
private final ExponentialBackOffConfig exponentialBackOffConfig;
private final Duration restartDelay;
private final Duration resetToMinThreshold;

private ExponentialBackOff(final ExponentialBackOffConfig exponentialBackOffConfig) {
resetToMinThreshold = exponentialBackOffConfig.getMax().multipliedBy(2);
this.exponentialBackOffConfig = exponentialBackOffConfig;
this.restartDelay = exponentialBackOffConfig.getMin();
}

private ExponentialBackOff(final ExponentialBackOffConfig exponentialBackOffConfig, final Duration restartDelay) {
resetToMinThreshold = exponentialBackOffConfig.getMax().multipliedBy(2);
this.exponentialBackOffConfig = exponentialBackOffConfig;
this.restartDelay = restartDelay;
}

/**
* Creates the first instance of {@link ExponentialBackOff} with a {@link #getRestartDelay() restart delay}
* which is equal to {@link ExponentialBackOffConfig#getMin() the minimum duration} specified by the given
* {@code exponentialBackOffConfig}.
*
* @param exponentialBackOffConfig the config to use.
* @return the first back off.
*/
public static ExponentialBackOff initial(final ExponentialBackOffConfig exponentialBackOffConfig) {
return new ExponentialBackOff(exponentialBackOffConfig);
}

/**
* Use this method at the time when the failure occurs. It will consider the last failure timestamp
* when calculating the {@link #getRestartDelay() restart delay}.
*
* @return the new back off holding the current timestamp as last failure state and with a calculated restart delay.
*/
public ExponentialBackOff calculateNextBackOff() {
return new ExponentialBackOff(exponentialBackOffConfig, calculateRestartDelay());
}

/**
* @return the restart delay.
*/
public Duration getRestartDelay() {
return restartDelay;
}

private Duration calculateRestartDelay() {
final Duration minBackOff = exponentialBackOffConfig.getMin();
final Duration maxBackOff = exponentialBackOffConfig.getMax();
final Instant now = Instant.now();
final Duration sinceLastError = Duration.between(lastRestart, now);
if (resetToMinThreshold.compareTo(sinceLastError) <= 0) {
// no restart if time since last error exceed backoff threshold; reset to minBackOff.
return minBackOff;
} else {
// increase delay.
final double randomFactor = exponentialBackOffConfig.getRandomFactor();
return calculateNextBackOffDuration(minBackOff, restartDelay, maxBackOff, randomFactor);
}
}

private static Duration calculateNextBackOffDuration(final Duration minBackOff, final Duration restartDelay,
final Duration maxBackOff, final double randomFactor) {
final Duration nextBackoff = restartDelay.plus(randomize(restartDelay, randomFactor));
return boundDuration(minBackOff, nextBackoff, maxBackOff);
}

/**
* Return a random duration between the base duration and {@code (1 + randomFactor)} times the base duration.
*
* @param base the base duration.
* @param randomFactor the random factor.
* @return the random duration.
*/
private static Duration randomize(final Duration base, final double randomFactor) {
final double multiplier = 1.0 + ThreadLocalRandom.current().nextDouble() * randomFactor;
return Duration.ofMillis((long) (base.toMillis() * multiplier));
}

private static Duration boundDuration(final Duration min, final Duration duration, final Duration max) {
if (duration.minus(min).isNegative()) {
return min;
} else if (max.minus(duration).isNegative()) {
return max;
} else {
return duration;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.base.service.config.supervision;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

import org.junit.Test;
import org.mockito.Mockito;


public final class ExponentialBackOffTest {

@Test
public void initialIsMinDelay() {
final ExponentialBackOffConfig exponentialBackOffConfig = Mockito.mock(ExponentialBackOffConfig.class);
final Duration expected = Duration.ofSeconds(1234);
when(exponentialBackOffConfig.getMin()).thenReturn(expected);
when(exponentialBackOffConfig.getMax()).thenReturn(expected.multipliedBy(20));
when(exponentialBackOffConfig.getRandomFactor()).thenReturn(1.0);
final ExponentialBackOff backOff = ExponentialBackOff.initial(exponentialBackOffConfig);

assertThat(backOff.getRestartDelay()).isEqualTo(expected);
}

@Test
public void nextBackOffIsAtLeastTwiceOfMin() {
final ExponentialBackOffConfig exponentialBackOffConfig = Mockito.mock(ExponentialBackOffConfig.class);
final Duration min = Duration.ofSeconds(1234);
when(exponentialBackOffConfig.getMin()).thenReturn(min);
when(exponentialBackOffConfig.getMax()).thenReturn(min.multipliedBy(20));
when(exponentialBackOffConfig.getRandomFactor()).thenReturn(1.0);
final ExponentialBackOff backOff = ExponentialBackOff.initial(exponentialBackOffConfig);

final ExponentialBackOff nextBackOff = backOff.calculateNextBackOff();

final Duration expected = min.multipliedBy(2);
assertThat(nextBackOff.getRestartDelay()).isGreaterThanOrEqualTo(expected);
}

@Test
public void nextBackOffIsCappedToMax() {
final ExponentialBackOffConfig exponentialBackOffConfig = Mockito.mock(ExponentialBackOffConfig.class);
final Duration min = Duration.ofSeconds(1234);
final Duration max = min.plus(Duration.ofSeconds(1));
when(exponentialBackOffConfig.getMin()).thenReturn(min);
when(exponentialBackOffConfig.getMax()).thenReturn(max);
when(exponentialBackOffConfig.getRandomFactor()).thenReturn(1.0);
final ExponentialBackOff backOff = ExponentialBackOff.initial(exponentialBackOffConfig);

final ExponentialBackOff nextBackOff = backOff.calculateNextBackOff();

assertThat(nextBackOff.getRestartDelay()).isEqualTo(max);
}

@Test
public void nextBackOffIsAlsoMinIfWasStableForTwiceOfMax() throws InterruptedException {
final ExponentialBackOffConfig exponentialBackOffConfig = Mockito.mock(ExponentialBackOffConfig.class);
final Duration min = Duration.ofSeconds(1);
final Duration max = Duration.ofSeconds(3);
when(exponentialBackOffConfig.getMin()).thenReturn(min);
when(exponentialBackOffConfig.getMax()).thenReturn(max);
when(exponentialBackOffConfig.getRandomFactor()).thenReturn(1.0);
final ExponentialBackOff backOff = ExponentialBackOff.initial(exponentialBackOffConfig);

TimeUnit.SECONDS.sleep(max.multipliedBy(2).toSeconds());

final ExponentialBackOff nextBackOff = backOff.calculateNextBackOff();

assertThat(nextBackOff.getRestartDelay()).isEqualTo(min);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,13 @@
package org.eclipse.ditto.connectivity.service.messaging.kafka;

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

import org.eclipse.ditto.base.service.config.supervision.ExponentialBackOff;
import org.eclipse.ditto.base.service.config.supervision.ExponentialBackOffConfig;

import akka.Done;
Expand All @@ -30,22 +29,22 @@
*/
final class RestartableKafkaConsumerStream implements KafkaConsumerStream {

private final Backoff backoff;
private final ExponentialBackOff backOff;
private final KafkaConsumerStream kafkaConsumerStream;
private final Supplier<KafkaConsumerStream> consumerStreamStarter;

RestartableKafkaConsumerStream(final Supplier<KafkaConsumerStream> consumerStreamStarter,
final ExponentialBackOffConfig backOffConfig) {

this.backoff = new Backoff(backOffConfig);
this.backOff = ExponentialBackOff.initial(backOffConfig);
this.consumerStreamStarter = consumerStreamStarter;
this.kafkaConsumerStream = consumerStreamStarter.get();
}

private RestartableKafkaConsumerStream(final Supplier<KafkaConsumerStream> consumerStreamStarter,
final Backoff backoff) {
final ExponentialBackOff backOff) {

this.backoff = backoff;
this.backOff = backOff;
this.consumerStreamStarter = consumerStreamStarter;
this.kafkaConsumerStream = consumerStreamStarter.get();
}
Expand All @@ -70,85 +69,13 @@ CompletionStage<RestartableKafkaConsumerStream> restart() {
//Ignore errors from last stream to ensure a new stream is started
.exceptionally(error -> Done.getInstance())
.thenCompose(done -> {
final CompletableFuture<Backoff> delayFuture = new CompletableFuture<>();
final Duration restartDelay = backoff.getRestartDelay();
final Backoff nextBackoff = this.backoff.calculateNextBackoff();
delayFuture.completeOnTimeout(nextBackoff, restartDelay.toMillis(), TimeUnit.MILLISECONDS);
final CompletableFuture<ExponentialBackOff> delayFuture = new CompletableFuture<>();
final ExponentialBackOff calculatedBackOff = this.backOff.calculateNextBackOff();
final Duration restartDelay = calculatedBackOff.getRestartDelay();
delayFuture.completeOnTimeout(calculatedBackOff, restartDelay.toMillis(), TimeUnit.MILLISECONDS);
return delayFuture;
})
.thenApply(nextBackoff -> new RestartableKafkaConsumerStream(consumerStreamStarter, nextBackoff));
}

private static final class Backoff {

private final Instant lastRestart = Instant.now();
private final ExponentialBackOffConfig exponentialBackOffConfig;
private final Duration restartDelay;
private final Duration resetToMinThreshold;

private Backoff(final ExponentialBackOffConfig exponentialBackOffConfig) {
resetToMinThreshold = exponentialBackOffConfig.getMax().multipliedBy(2);
this.exponentialBackOffConfig = exponentialBackOffConfig;
this.restartDelay = exponentialBackOffConfig.getMin();
}

private Backoff(final ExponentialBackOffConfig exponentialBackOffConfig, final Duration restartDelay) {
resetToMinThreshold = exponentialBackOffConfig.getMax().multipliedBy(2);
this.exponentialBackOffConfig = exponentialBackOffConfig;
this.restartDelay = restartDelay;
}

public Backoff calculateNextBackoff() {
return new Backoff(exponentialBackOffConfig, calculateRestartDelay());
}

public Duration getRestartDelay() {
return restartDelay;
}

private Duration calculateRestartDelay() {
final Duration minBackOff = exponentialBackOffConfig.getMin();
final Duration maxBackOff = exponentialBackOffConfig.getMax();
final Instant now = Instant.now();
final Duration sinceLastError = Duration.between(lastRestart, now);
if (resetToMinThreshold.compareTo(sinceLastError) <= 0) {
// no restart if time since last error exceed backoff threshold; reset to minBackOff.
return minBackOff;
} else {
// increase delay.
final double randomFactor = exponentialBackOffConfig.getRandomFactor();
return calculateNextBackOffDuration(minBackOff, restartDelay, maxBackOff, randomFactor);
}
}

private static Duration calculateNextBackOffDuration(final Duration minBackOff, final Duration restartDelay,
final Duration maxBackOff, final double randomFactor) {
final Duration nextBackoff = restartDelay.plus(randomize(restartDelay, randomFactor));
return boundDuration(minBackOff, nextBackoff, maxBackOff);
}

/**
* Return a random duration between the base duration and {@code (1 + randomFactor)} times the base duration.
*
* @param base the base duration.
* @param randomFactor the random factor.
* @return the random duration.
*/
private static Duration randomize(final Duration base, final double randomFactor) {
final double multiplier = 1.0 + ThreadLocalRandom.current().nextDouble() * randomFactor;
return Duration.ofMillis((long) (base.toMillis() * multiplier));
}

private static Duration boundDuration(final Duration min, final Duration duration, final Duration max) {
if (duration.minus(min).isNegative()) {
return min;
} else if (max.minus(duration).isNegative()) {
return max;
} else {
return duration;
}
}

}

}
Loading

0 comments on commit 27fedfb

Please sign in to comment.