Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,12 @@ class Partition(val topicPartition: TopicPartition,

// AutoMQ for Kafka inject start
private var closed: Boolean = false
/**
* Same with the `_confirmOffset` in `ElasticLog`
* Updated by [[ElasticUnifiedLog.confirmOffsetChangeListener]]
* Used to return fast when fetching messages with `fetchOffset` equals to `confirmOffset` in [[readRecordsAsync]]
*/
private var confirmOffset: Option[Long] = None
// AutoMQ for Kafka inject end

def hasLateTransaction(currentTimeMs: Long): Boolean = leaderLogIfLocal.exists(_.hasLateTransaction(currentTimeMs))
Expand Down Expand Up @@ -1063,7 +1069,9 @@ class Partition(val topicPartition: TopicPartition,
// move high watermark based on log confirm offset to prevent ack inflight record.
leaderLog match {
case elasticLog: ElasticUnifiedLog =>
newHighWatermark = elasticLog.confirmOffset()
val confirmOffset = elasticLog.confirmOffset()
newHighWatermark = confirmOffset
this.confirmOffset = Some(confirmOffset.messageOffset)
case _ =>
}
// AutoMQ for Kafka inject end
Expand Down Expand Up @@ -1575,6 +1583,17 @@ class Partition(val topicPartition: TopicPartition,
logEndOffset = initialLogEndOffset,
lastStableOffset = initialLastStableOffset))
}

if (confirmOffset.contains(fetchOffset)) {
// The fetch offset equals to the confirmed offset, return empty response directly
return CompletableFuture.completedFuture(LogReadInfo(
fetchedData = FetchDataInfo.empty(fetchOffset),
divergingEpoch = None,
highWatermark = initialHighWatermark,
logStartOffset = initialLogStartOffset,
logEndOffset = initialLogEndOffset,
lastStableOffset = initialLastStableOffset))
}
}

localLog.readAsync(
Expand Down