Skip to content

Commit

Permalink
KAFKA-14902: KafkaStatusBackingStore retries on a dedicated backgroun…
Browse files Browse the repository at this point in the history
…d thread to avoid stack overflows (#13557)

KafkaStatusBackingStore uses an infinite retry logic on producer send, which can lead to a stack overflow.
To avoid the problem, a background thread was added, and the callback submits the retry onto the background thread.
  • Loading branch information
urbandan committed Apr 18, 2023
1 parent e27926f commit 454b721
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 2 deletions.
Expand Up @@ -17,7 +17,9 @@

package org.apache.kafka.common.utils;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
Expand Down Expand Up @@ -52,4 +54,22 @@ public Thread newThread(Runnable r) {
}
};
}

/**
* Shuts down an executor service with a timeout. After the timeout/on interrupt, the service is forcefully closed.
* @param executorService The service to shut down.
* @param timeout The timeout of the shutdown.
* @param timeUnit The time unit of the shutdown timeout.
*/
public static void shutdownExecutorServiceQuietly(ExecutorService executorService,
long timeout, TimeUnit timeUnit) {
executorService.shutdown();
try {
if (!executorService.awaitTermination(timeout, timeUnit)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
}
}
}
Expand Up @@ -29,6 +29,7 @@
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.Schema;
Expand Down Expand Up @@ -63,6 +64,9 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

/**
Expand Down Expand Up @@ -138,6 +142,7 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
private KafkaBasedLog<String, byte[]> kafkaLog;
private int generation;
private SharedTopicAdmin ownTopicAdmin;
private ExecutorService sendRetryExecutor;

@Deprecated
public KafkaStatusBackingStore(Time time, Converter converter) {
Expand All @@ -159,6 +164,8 @@ public KafkaStatusBackingStore(Time time, Converter converter, Supplier<TopicAdm
this(time, converter);
this.kafkaLog = kafkaLog;
this.statusTopic = statusTopic;
sendRetryExecutor = Executors.newSingleThreadExecutor(
ThreadUtils.createThreadFactory("status-store-retry-" + statusTopic, true));
}

@Override
Expand All @@ -167,6 +174,9 @@ public void configure(final WorkerConfig config) {
if (this.statusTopic == null || this.statusTopic.trim().length() == 0)
throw new ConfigException("Must specify topic for connector status.");

sendRetryExecutor = Executors.newSingleThreadExecutor(
ThreadUtils.createThreadFactory("status-store-retry-" + statusTopic, true));

String clusterId = config.kafkaClusterId();
Map<String, Object> originals = config.originals();
Map<String, Object> producerProps = new HashMap<>(originals);
Expand Down Expand Up @@ -246,6 +256,7 @@ public void stop() {
try {
kafkaLog.stop();
} finally {
ThreadUtils.shutdownExecutorServiceQuietly(sendRetryExecutor, 10, TimeUnit.SECONDS);
if (ownTopicAdmin != null) {
ownTopicAdmin.close();
}
Expand Down Expand Up @@ -307,7 +318,7 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) return;
// TODO: retry more gracefully and not forever
if (exception instanceof RetriableException) {
kafkaLog.send(key, value, this);
sendRetryExecutor.submit((Runnable) () -> kafkaLog.send(key, value, this));
} else {
log.error("Failed to write status update", exception);
}
Expand Down Expand Up @@ -340,7 +351,8 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
|| (safeWrite && !entry.canWriteSafely(status, sequence)))
return;
}
kafkaLog.send(key, value, this);

sendRetryExecutor.submit((Runnable) () -> kafkaLog.send(key, value, this));
} else {
log.error("Failed to write status update", exception);
}
Expand Down

0 comments on commit 454b721

Please sign in to comment.