Skip to content

Commit

Permalink
Restart KafkaConsumerStream with exponential backoff
Browse files Browse the repository at this point in the history
Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Sep 3, 2021
1 parent 29110d3 commit d6a7fb8
Show file tree
Hide file tree
Showing 8 changed files with 169 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.service.config.supervision.DefaultExponentialBackOffConfig;
import org.eclipse.ditto.base.service.config.supervision.ExponentialBackOffConfig;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

Expand All @@ -27,12 +30,16 @@ final class DefaultKafkaConsumerConfig implements KafkaConsumerConfig {

private static final String CONFIG_PATH = "consumer";
private static final String ALPAKKA_PATH = "alpakka";
private static final String RESTART_PATH = "restart";

private final ConnectionThrottlingConfig throttlingConfig;
private final ExponentialBackOffConfig consumerRestartBackOffConfig;
private final Config alpakkaConfig;

private DefaultKafkaConsumerConfig(final Config kafkaConsumerScopedConfig) {
throttlingConfig = ConnectionThrottlingConfig.of(kafkaConsumerScopedConfig);
consumerRestartBackOffConfig =
DefaultExponentialBackOffConfig.of(getConfigOrEmpty(kafkaConsumerScopedConfig, RESTART_PATH));
alpakkaConfig = getConfigOrEmpty(kafkaConsumerScopedConfig, ALPAKKA_PATH);
}

Expand All @@ -56,6 +63,11 @@ public ConnectionThrottlingConfig getThrottlingConfig() {
return throttlingConfig;
}

@Override
public ExponentialBackOffConfig getConsumerRestartBackOffConfig() {
return consumerRestartBackOffConfig;
}

@Override
public Config getAlpakkaConfig() {
return alpakkaConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.service.config.ThrottlingConfig;
import org.eclipse.ditto.base.service.config.supervision.ExponentialBackOffConfig;

import com.typesafe.config.Config;

Expand All @@ -31,6 +31,14 @@ public interface KafkaConsumerConfig {
*/
ConnectionThrottlingConfig getThrottlingConfig();

/**
* Returns the config to configure the backOff for restarting the qos 1 consumer stream after a failed
* acknowledgement.
*
* @return the config.
*/
ExponentialBackOffConfig getConsumerRestartBackOffConfig();

/**
* Returns the Config for consumers needed by the Kafka client.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.service.config.supervision.ExponentialBackOffConfig;
import org.eclipse.ditto.connectivity.api.BaseClientState;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionId;
Expand All @@ -34,6 +35,7 @@
import org.eclipse.ditto.connectivity.service.config.ConnectionConfig;
import org.eclipse.ditto.connectivity.service.config.ConnectionThrottlingConfig;
import org.eclipse.ditto.connectivity.service.config.KafkaConfig;
import org.eclipse.ditto.connectivity.service.config.KafkaConsumerConfig;
import org.eclipse.ditto.connectivity.service.messaging.BaseClientActor;
import org.eclipse.ditto.connectivity.service.messaging.BaseClientData;
import org.eclipse.ditto.connectivity.service.messaging.internal.ClientConnected;
Expand Down Expand Up @@ -226,13 +228,15 @@ private Stream<ConsumerData> consumerDataFromSource(final Source source) {
}

private void startKafkaConsumer(final ConsumerData consumerData, final boolean dryRun) {
final ConnectionThrottlingConfig throttlingConfig = kafkaConfig.getConsumerConfig().getThrottlingConfig();
final KafkaConsumerConfig consumerConfig = kafkaConfig.getConsumerConfig();
final ConnectionThrottlingConfig throttlingConfig = consumerConfig.getThrottlingConfig();
final ExponentialBackOffConfig consumerRestartBackOffConfig = consumerConfig.getConsumerRestartBackOffConfig();
final KafkaConsumerStreamFactory streamFactory =
new KafkaConsumerStreamFactory(throttlingConfig, propertiesFactory, consumerData, dryRun);
final Props consumerActorProps =
KafkaConsumerActor.props(connection(), streamFactory,
consumerData.getAddress(), consumerData.getSource(), getInboundMappingSink(),
connectivityStatusResolver);
connectivityStatusResolver, consumerRestartBackOffConfig);
final ActorRef consumerActor =
startChildActorConflictFree(consumerData.getActorNamePrefix(), consumerActorProps);
kafkaConsumerActors.add(consumerActor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import javax.annotation.Nullable;

import org.eclipse.ditto.base.service.config.supervision.ExponentialBackOffConfig;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory;
import org.eclipse.ditto.connectivity.model.ConnectivityStatus;
Expand All @@ -46,8 +46,8 @@
*/
final class KafkaConsumerActor extends BaseConsumerActor {

private static final Duration MAX_SHUTDOWN_TIMEOUT = Duration.ofSeconds(10);
static final String ACTOR_NAME_PREFIX = "kafkaConsumer-";
private static final Duration MAX_SHUTDOWN_TIMEOUT = Duration.ofSeconds(10);
private static final int DEFAULT_CONSUMPTION_QOS = 0;

private final ThreadSafeDittoLoggingAdapter log;
Expand All @@ -59,7 +59,8 @@ private KafkaConsumerActor(final Connection connection,
final String sourceAddress,
final Source source,
final Sink<Object, NotUsed> inboundMappingSink,
final ConnectivityStatusResolver connectivityStatusResolver) {
final ConnectivityStatusResolver connectivityStatusResolver,
final ExponentialBackOffConfig exponentialBackOffConfig) {
super(connection, sourceAddress, inboundMappingSink, source, connectivityStatusResolver);

log = DittoLoggerFactory.getThreadSafeDittoLoggingAdapter(this);
Expand All @@ -73,7 +74,7 @@ private KafkaConsumerActor(final Connection connection,
getMessageMappingSink(), getDittoRuntimeExceptionSink());
kafkaConsumerStream.whenComplete(this::handleStreamCompletion);
return kafkaConsumerStream;
});
}, exponentialBackOffConfig);
} else {
kafkaStream = new RestartableKafkaConsumerStream(
() -> {
Expand All @@ -82,7 +83,7 @@ private KafkaConsumerActor(final Connection connection,
getMessageMappingSink(), getDittoRuntimeExceptionSink());
kafkaConsumerStream.whenComplete(this::handleStreamCompletion);
return kafkaConsumerStream;
});
}, exponentialBackOffConfig);
}
}

Expand All @@ -91,9 +92,10 @@ static Props props(final Connection connection,
final String sourceAddress,
final Source source,
final Sink<Object, NotUsed> inboundMappingSink,
final ConnectivityStatusResolver connectivityStatusResolver) {
final ConnectivityStatusResolver connectivityStatusResolver,
final ExponentialBackOffConfig exponentialBackOffConfig) {
return Props.create(KafkaConsumerActor.class, connection, streamFactory, sourceAddress, source,
inboundMappingSink, connectivityStatusResolver);
inboundMappingSink, connectivityStatusResolver, exponentialBackOffConfig);
}

@Override
Expand All @@ -109,6 +111,7 @@ public Receive createReceive() {
.match(RetrieveAddressStatus.class, ram -> getSender().tell(getCurrentSourceStatus(), getSelf()))
.match(GracefulStop.class, stop -> this.shutdown())
.match(MessageRejectedException.class, this::restartStream)
.match(RestartableKafkaConsumerStream.class, this::setStream)
.matchAny(unhandled -> {
log.info("Unhandled message: {}", unhandled);
unhandled(unhandled);
Expand Down Expand Up @@ -160,7 +163,7 @@ private void handleStreamCompletion(@Nullable final Done done, @Nullable final T
InstanceIdentifierSupplier.getInstance().get(),
status,
sourceAddress,
"Consumer closed", now);
"Restarting because of rejected message.", now);
} else {
log.debug("Consumer failed with error! <{}: {}>", throwable.getClass().getSimpleName(),
throwable.getMessage());
Expand All @@ -176,8 +179,14 @@ private void handleStreamCompletion(@Nullable final Done done, @Nullable final T
handleAddressStatus(statusUpdate);
}

private void restartStream(final MessageRejectedException ex) throws ExecutionException, InterruptedException {
kafkaStream = kafkaStream.restart().toCompletableFuture().get();
private void restartStream(final MessageRejectedException ex) {
kafkaStream.restart().toCompletableFuture()
.thenAccept(newKafkaStream -> self().tell(newKafkaStream, self()));
}

private void setStream(final RestartableKafkaConsumerStream newKafkaStream) {
kafkaStream = newKafkaStream;
resetResourceStatus();
}

private void escalate(final Throwable throwable, final String description) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@
@Immutable
interface KafkaConsumerStream {

/**
* Allows registering a handler for stream completion.
*
* @param handleCompletion the handler
* @return the chained completion stage. Completes when the Stream completes and the handler finished.
*/
CompletionStage<Done> whenComplete(BiConsumer<? super Done, ? super Throwable> handleCompletion);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,40 @@
*/
package org.eclipse.ditto.connectivity.service.messaging.kafka;

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

import org.eclipse.ditto.base.service.config.supervision.ExponentialBackOffConfig;

import akka.Done;

/**
* Responsible to wrap a {@link KafkaConsumerStream} and restart it on demand.
*/
final class RestartableKafkaConsumerStream implements KafkaConsumerStream {

private final Backoff backoff;
private final KafkaConsumerStream kafkaConsumerStream;
private final Supplier<KafkaConsumerStream> consumerStreamStarter;

RestartableKafkaConsumerStream(final Supplier<KafkaConsumerStream> consumerStreamStarter) {
RestartableKafkaConsumerStream(final Supplier<KafkaConsumerStream> consumerStreamStarter,
final ExponentialBackOffConfig backOffConfig) {

this.backoff = new Backoff(backOffConfig);
this.consumerStreamStarter = consumerStreamStarter;
this.kafkaConsumerStream = consumerStreamStarter.get();
}

private RestartableKafkaConsumerStream(final Supplier<KafkaConsumerStream> consumerStreamStarter,
final Backoff backoff) {

this.backoff = backoff;
this.consumerStreamStarter = consumerStreamStarter;
this.kafkaConsumerStream = consumerStreamStarter.get();
}
Expand All @@ -50,7 +69,86 @@ CompletionStage<RestartableKafkaConsumerStream> restart() {
return kafkaConsumerStream.stop()
//Ignore errors from last stream to ensure a new stream is started
.exceptionally(error -> Done.getInstance())
.thenApply(done -> new RestartableKafkaConsumerStream(consumerStreamStarter));
.thenCompose(done -> {
final CompletableFuture<Backoff> delayFuture = new CompletableFuture<>();
final Duration restartDelay = backoff.getRestartDelay();
final Backoff nextBackoff = this.backoff.calculateNextBackoff();
delayFuture.completeOnTimeout(nextBackoff, restartDelay.toMillis(), TimeUnit.MILLISECONDS);
return delayFuture;
})
.thenApply(nextBackoff -> new RestartableKafkaConsumerStream(consumerStreamStarter, nextBackoff));
}

private static final class Backoff {

private final Instant lastRestart = Instant.now();
private final ExponentialBackOffConfig exponentialBackOffConfig;
private final Duration restartDelay;
private final Duration resetToMinThreshold;

private Backoff(final ExponentialBackOffConfig exponentialBackOffConfig) {
resetToMinThreshold = exponentialBackOffConfig.getMax().multipliedBy(2);
this.exponentialBackOffConfig = exponentialBackOffConfig;
this.restartDelay = exponentialBackOffConfig.getMin();
}

private Backoff(final ExponentialBackOffConfig exponentialBackOffConfig, final Duration restartDelay) {
resetToMinThreshold = exponentialBackOffConfig.getMax().multipliedBy(2);
this.exponentialBackOffConfig = exponentialBackOffConfig;
this.restartDelay = restartDelay;
}

public Backoff calculateNextBackoff() {
return new Backoff(exponentialBackOffConfig, calculateRestartDelay());
}

public Duration getRestartDelay() {
return restartDelay;
}

private Duration calculateRestartDelay() {
final Duration minBackOff = exponentialBackOffConfig.getMin();
final Duration maxBackOff = exponentialBackOffConfig.getMax();
final Instant now = Instant.now();
final Duration sinceLastError = Duration.between(lastRestart, now);
if (resetToMinThreshold.compareTo(sinceLastError) <= 0) {
// no restart if time since last error exceed backoff threshold; reset to minBackOff.
return minBackOff;
} else {
// increase delay.
final double randomFactor = exponentialBackOffConfig.getRandomFactor();
return calculateNextBackOffDuration(minBackOff, restartDelay, maxBackOff, randomFactor);
}
}

private static Duration calculateNextBackOffDuration(final Duration minBackOff, final Duration restartDelay,
final Duration maxBackOff, final double randomFactor) {
final Duration nextBackoff = restartDelay.plus(randomize(restartDelay, randomFactor));
return boundDuration(minBackOff, nextBackoff, maxBackOff);
}

/**
* Return a random duration between the base duration and {@code (1 + randomFactor)} times the base duration.
*
* @param base the base duration.
* @param randomFactor the random factor.
* @return the random duration.
*/
private static Duration randomize(final Duration base, final double randomFactor) {
final double multiplier = 1.0 + ThreadLocalRandom.current().nextDouble() * randomFactor;
return Duration.ofMillis((long) (base.toMillis() * multiplier));
}

private static Duration boundDuration(final Duration min, final Duration duration, final Duration max) {
if (duration.minus(min).isNegative()) {
return min;
} else if (max.minus(duration).isNegative()) {
return max;
} else {
return duration;
}
}

}

}
11 changes: 11 additions & 0 deletions connectivity/service/src/main/resources/connectivity.conf
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,17 @@ ditto {
max-in-flight-factor = ${?KAFKA_CONSUMER_THROTTLING_MAX_IN_FLIGHT_FACTOR}
}

restart {
exponential-backoff {
min = 100ms
min = ${?KAFKA_CONSUMER_RESTART_BACK_OFF_MIN}
max = 60s
min = ${?KAFKA_CONSUMER_RESTART_BACK_OFF_MAX}
random-factor = 1.0
random-factor = ${?KAFKA_CONSUMER_RESTART_BACK_OFF_RANDOM_FACTOR}
}
}

alpakka = ${akka.kafka.consumer} # resolve defaults from reference.conf
alpakka = {
use-dispatcher = "kafka-consumer-dispatcher"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.eclipse.ditto.base.model.common.ResponseType;
import org.eclipse.ditto.base.service.config.supervision.DefaultExponentialBackOffConfig;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory;
import org.eclipse.ditto.connectivity.model.HeaderMapping;
Expand All @@ -38,6 +39,8 @@
import org.eclipse.ditto.connectivity.service.messaging.TestConstants;
import org.junit.Before;

import com.typesafe.config.ConfigFactory;

import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.Props;
Expand Down Expand Up @@ -125,13 +128,14 @@ protected Props getConsumerActorProps(final Sink<Object, NotUsed> inboundMapping
final ConsumerData consumerData = new ConsumerData(connectionSource, address, "xy");
final KafkaConsumerStreamFactory consumerStreamFactory =
new KafkaConsumerStreamFactory(sourceSupplier, null, consumerData, false);

final DefaultExponentialBackOffConfig backOffConfig = DefaultExponentialBackOffConfig.of(ConfigFactory.empty());
return KafkaConsumerActor.props(CONNECTION,
consumerStreamFactory,
address,
connectionSource,
inboundMappingSink,
mock(ConnectivityStatusResolver.class));
mock(ConnectivityStatusResolver.class),
backOffConfig);
}

@Override
Expand Down

0 comments on commit d6a7fb8

Please sign in to comment.