diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index e2842238fb..883321eee8 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -320,6 +320,12 @@ class Partition(val topicPartition: TopicPartition, // AutoMQ for Kafka inject start private var closed: Boolean = false + /** + * Same with the `_confirmOffset` in `ElasticLog` + * Updated by [[ElasticUnifiedLog.confirmOffsetChangeListener]] + * Used to return fast when fetching messages with `fetchOffset` equals to `confirmOffset` in [[readRecordsAsync]] + */ + private var confirmOffset: Option[Long] = None // AutoMQ for Kafka inject end def hasLateTransaction(currentTimeMs: Long): Boolean = leaderLogIfLocal.exists(_.hasLateTransaction(currentTimeMs)) @@ -1063,7 +1069,9 @@ class Partition(val topicPartition: TopicPartition, // move high watermark based on log confirm offset to prevent ack inflight record. leaderLog match { case elasticLog: ElasticUnifiedLog => - newHighWatermark = elasticLog.confirmOffset() + val confirmOffset = elasticLog.confirmOffset() + newHighWatermark = confirmOffset + this.confirmOffset = Some(confirmOffset.messageOffset) case _ => } // AutoMQ for Kafka inject end @@ -1575,6 +1583,17 @@ class Partition(val topicPartition: TopicPartition, logEndOffset = initialLogEndOffset, lastStableOffset = initialLastStableOffset)) } + + if (confirmOffset.contains(fetchOffset)) { + // The fetch offset equals to the confirmed offset, return empty response directly + return CompletableFuture.completedFuture(LogReadInfo( + fetchedData = FetchDataInfo.empty(fetchOffset), + divergingEpoch = None, + highWatermark = initialHighWatermark, + logStartOffset = initialLogStartOffset, + logEndOffset = initialLogEndOffset, + lastStableOffset = initialLastStableOffset)) + } } localLog.readAsync(