Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class ElasticLog(val metaStream: MetaStream,

private val appendAckQueue = new LinkedBlockingQueue[Long]()
private val appendAckThread = APPEND_CALLBACK_EXECUTOR(math.abs(logIdent.hashCode % APPEND_CALLBACK_EXECUTOR.length))
private val readAsyncThread = READ_ASYNC_EXECUTOR(math.abs(logIdent.hashCode % READ_ASYNC_EXECUTOR.length))

// persist log meta when lazy stream real create
streamManager.setListener((_, event) => {
Expand Down Expand Up @@ -346,12 +347,12 @@ class ElasticLog(val metaStream: MetaStream,
// Do the read on the segment with a base offset less than the target offset
// but if that segment doesn't contain any messages with an offset greater than that
// continue to read from successive segments until we get some messages or we reach the end of the log
readFromSegment(segmentOpt).thenApply(fetchDataInfo => {
readFromSegment(segmentOpt).thenCompose(fetchDataInfo => {
if (fetchDataInfo == null) {
// okay we are beyond the end of the last segment with no data fetched although the start offset is in range,
// this can happen when all messages with offset larger than start offsets have been deleted.
// In this case, we will return the empty set with log end offset metadata
FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
CompletableFuture.completedFuture(FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY))
} else {
if (includeAbortedTxns) {
val upperBoundOpt = fetchDataInfo.records match {
Expand All @@ -362,9 +363,11 @@ class ElasticLog(val metaStream: MetaStream,
case _ =>
None
}
addAbortedTransactions(startOffset, finalSegmentOpt.get, fetchDataInfo, upperBoundOpt)
CompletableFuture.supplyAsync(() => {
addAbortedTransactions(startOffset, finalSegmentOpt.get, fetchDataInfo, upperBoundOpt)
}, readAsyncThread)
} else {
fetchDataInfo
CompletableFuture.completedFuture(fetchDataInfo)
}
}
})
Expand Down Expand Up @@ -439,10 +442,14 @@ object ElasticLog extends Logging {
private val APPEND_CALLBACK_TIME_HIST = KafkaMetricsGroup.newHistogram("AppendCallbackTimeNanos")
private val APPEND_ACK_TIME_HIST = KafkaMetricsGroup.newHistogram("AppendAckTimeNanos")
private val APPEND_CALLBACK_EXECUTOR: Array[ExecutorService] = new Array[ExecutorService](8)
private val READ_ASYNC_EXECUTOR: Array[ExecutorService] = new Array[ExecutorService](8)

for (i <- APPEND_CALLBACK_EXECUTOR.indices) {
APPEND_CALLBACK_EXECUTOR(i) = Executors.newSingleThreadExecutor(ThreadUtils.createThreadFactory("log-append-callback-executor-" + i, true))
}
for (i <- READ_ASYNC_EXECUTOR.indices) {
READ_ASYNC_EXECUTOR(i) = Executors.newSingleThreadExecutor(ThreadUtils.createThreadFactory("log-read-async-executor-" + i, true))
}

private val META_SCHEDULE_EXECUTOR = Executors.newScheduledThreadPool(1, ThreadUtils.createThreadFactory("log-meta-schedule-executor", true))

Expand Down