From c3d149cf3bc2063e5d14b69c78bba777300ae73a Mon Sep 17 00:00:00 2001 From: Robin Han Date: Wed, 24 Jan 2024 10:23:58 +0800 Subject: [PATCH] fix(log): make txn read async to avoid deadlock Signed-off-by: Robin Han --- .../scala/kafka/log/streamaspect/ElasticLog.scala | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala b/core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala index bda469b0e7..b02f83c4bb 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala @@ -97,6 +97,7 @@ class ElasticLog(val metaStream: MetaStream, private val appendAckQueue = new LinkedBlockingQueue[Long]() private val appendAckThread = APPEND_CALLBACK_EXECUTOR(math.abs(logIdent.hashCode % APPEND_CALLBACK_EXECUTOR.length)) + private val readAsyncThread = READ_ASYNC_EXECUTOR(math.abs(logIdent.hashCode % READ_ASYNC_EXECUTOR.length)) // persist log meta when lazy stream real create streamManager.setListener((_, event) => { @@ -346,12 +347,12 @@ class ElasticLog(val metaStream: MetaStream, // Do the read on the segment with a base offset less than the target offset // but if that segment doesn't contain any messages with an offset greater than that // continue to read from successive segments until we get some messages or we reach the end of the log - readFromSegment(segmentOpt).thenApply(fetchDataInfo => { + readFromSegment(segmentOpt).thenCompose(fetchDataInfo => { if (fetchDataInfo == null) { // okay we are beyond the end of the last segment with no data fetched although the start offset is in range, // this can happen when all messages with offset larger than start offsets have been deleted. // In this case, we will return the empty set with log end offset metadata - FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY) + CompletableFuture.completedFuture(FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)) } else { if (includeAbortedTxns) { val upperBoundOpt = fetchDataInfo.records match { @@ -362,9 +363,11 @@ class ElasticLog(val metaStream: MetaStream, case _ => None } - addAbortedTransactions(startOffset, finalSegmentOpt.get, fetchDataInfo, upperBoundOpt) + CompletableFuture.supplyAsync(() => { + addAbortedTransactions(startOffset, finalSegmentOpt.get, fetchDataInfo, upperBoundOpt) + }, readAsyncThread) } else { - fetchDataInfo + CompletableFuture.completedFuture(fetchDataInfo) } } }) @@ -439,10 +442,14 @@ object ElasticLog extends Logging { private val APPEND_CALLBACK_TIME_HIST = KafkaMetricsGroup.newHistogram("AppendCallbackTimeNanos") private val APPEND_ACK_TIME_HIST = KafkaMetricsGroup.newHistogram("AppendAckTimeNanos") private val APPEND_CALLBACK_EXECUTOR: Array[ExecutorService] = new Array[ExecutorService](8) + private val READ_ASYNC_EXECUTOR: Array[ExecutorService] = new Array[ExecutorService](8) for (i <- APPEND_CALLBACK_EXECUTOR.indices) { APPEND_CALLBACK_EXECUTOR(i) = Executors.newSingleThreadExecutor(ThreadUtils.createThreadFactory("log-append-callback-executor-" + i, true)) } + for (i <- READ_ASYNC_EXECUTOR.indices) { + READ_ASYNC_EXECUTOR(i) = Executors.newSingleThreadExecutor(ThreadUtils.createThreadFactory("log-read-async-executor-" + i, true)) + } private val META_SCHEDULE_EXECUTOR = Executors.newScheduledThreadPool(1, ThreadUtils.createThreadFactory("log-meta-schedule-executor", true))