diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index f446a5a8a7..050c13f3c9 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -415,12 +415,16 @@ class Partition(val topicPartition: TopicPartition, try { val log = logManager.getOrCreateLog(topicPartition, isNew, isFutureReplica, topicId, newLeaderEpoch) maybeLog = Some(log) - updateHighWatermark(log) // AutoMQ for Kafka inject start log match { case elasticUnifiedLog: ElasticUnifiedLog => elasticUnifiedLog.confirmOffsetChangeListener = Some(() => handleLeaderConfirmOffsetMove()) + // just update LEO to HW since we only have one replica + val initialHighWatermark = log.logEndOffset + log.updateHighWatermark(log.logEndOffset) + info(s"Log loaded for partition $topicPartition with initial high watermark $initialHighWatermark") case _ => + updateHighWatermark(log) } // AutoMQ for Kafka inject end log diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticLogManager.scala b/core/src/main/scala/kafka/log/streamaspect/ElasticLogManager.scala index 9b071467d0..c41f37d17e 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticLogManager.scala +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticLogManager.scala @@ -125,7 +125,7 @@ class ElasticLogManager(val client: Client) extends Logging { object ElasticLogManager { var INSTANCE: Option[ElasticLogManager] = None var NAMESPACE = "" - private var turnedOn = false + private var isEnabled = false def init(config: KafkaConfig, clusterId: String, broker: BrokerServer = null): Boolean = { if (!config.elasticStreamEnabled) { @@ -153,11 +153,11 @@ object ElasticLogManager { true } - def mark(shouldEnable: Boolean): Unit = { - turnedOn = shouldEnable + def enable(shouldEnable: Boolean): Unit = { + isEnabled = shouldEnable } - def enabled(): Boolean = turnedOn + def enabled(): Boolean = isEnabled def removeLog(topicPartition: TopicPartition): Unit = { INSTANCE.get.removeLog(topicPartition) diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 47273335c6..4dd249d8a2 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -204,7 +204,7 @@ class BrokerServer( // AutoMQ for Kafka inject start // ElasticLogManager should be marked before LogManager is created. - ElasticLogManager.mark(config.elasticStreamEnabled) + ElasticLogManager.enable(config.elasticStreamEnabled) // AutoMQ for Kafka inject end // Create log manager, but don't start it because we need to delay any potential unclean shutdown log recovery