diff --git a/core/src/main/scala/kafka/log/streamaspect/DefaultOpenStreamChecker.java b/core/src/main/scala/kafka/log/streamaspect/DefaultOpenStreamChecker.java index 61d6f1f020..aa8cf64837 100644 --- a/core/src/main/scala/kafka/log/streamaspect/DefaultOpenStreamChecker.java +++ b/core/src/main/scala/kafka/log/streamaspect/DefaultOpenStreamChecker.java @@ -13,8 +13,11 @@ import com.automq.stream.s3.metadata.StreamState; import kafka.server.metadata.KRaftMetadataCache; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.s3.StreamFencedException; import org.apache.kafka.image.S3StreamMetadataImage; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.metadata.PartitionRegistration; public class DefaultOpenStreamChecker implements OpenStreamChecker { private final KRaftMetadataCache metadataCache; @@ -24,13 +27,30 @@ public DefaultOpenStreamChecker(KRaftMetadataCache metadataCache) { } @Override - public boolean check(long streamId, long epoch) throws StreamFencedException { + public boolean check(Uuid topicId, int partition, long streamId, long epoch) throws StreamFencedException { + // When ABA reassign happens: + // 1. Assign P0 to broker0 with epoch=0, broker0 opens the partition + // 2. Assign P0 to broker1 with epoch=1, broker1 waits for the partition to be closed + // 3. Quick reassign P0 to broker0 with epoch=2, broker0 merge step2/3 image and keep stream opened with epoch=0 + // 4. So broker1 should check partition leader epoch to fail the waiting + TopicImage topicImage = metadataCache.currentImage().topics().getTopic(topicId); + if (topicImage == null) { + throw new StreamFencedException(String.format("topicId=%s cannot be found, it may be deleted or not created yet", topicId)); + } + PartitionRegistration partitionImage = topicImage.partitions().get(partition); + if (partitionImage == null) { + throw new StreamFencedException(String.format("partition=%d cannot be found, it may be deleted or not created yet", partition)); + } + int currentEpoch = partitionImage.leaderEpoch; + if (currentEpoch > epoch) { + throw new StreamFencedException(String.format("partition=%d with epoch=%d is fenced by new leader epoch=%d", partition, epoch, currentEpoch)); + } S3StreamMetadataImage stream = metadataCache.currentImage().streamsMetadata().getStreamMetadata(streamId); if (stream == null) { throw new StreamFencedException(String.format("streamId=%d cannot be found, it may be deleted or not created yet", streamId)); } if (stream.getEpoch() > epoch) - throw new StreamFencedException(String.format("streamId=%d with epoch=%d is fenced by new epoch=%d", streamId, epoch, stream.getEpoch())); + throw new StreamFencedException(String.format("streamId=%d with epoch=%d is fenced by new leader epoch=%d", streamId, epoch, stream.getEpoch())); return StreamState.CLOSED.equals(stream.state()); } } diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala b/core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala index ea3efac4f7..5226f62d6e 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala @@ -644,7 +644,7 @@ object ElasticLog extends Logging { stream } else { val metaStreamId = Unpooled.wrappedBuffer(value.get()).readLong() - awaitStreamReadyForOpen(openStreamChecker, metaStreamId, leaderEpoch, logIdent = logIdent) + awaitStreamReadyForOpen(openStreamChecker, topicId.get, topicPartition.partition(), metaStreamId, leaderEpoch, logIdent = logIdent) // open partition meta stream val stream = client.streamClient().openStream(metaStreamId, OpenStreamOptions.builder().epoch(leaderEpoch).build()) .thenApply(stream => new MetaStream(stream, META_SCHEDULE_EXECUTOR, logIdent)) @@ -881,10 +881,10 @@ object ElasticLog extends Logging { resultCf } - private def awaitStreamReadyForOpen(checker: OpenStreamChecker, streamId: Long, epoch: Long, logIdent: String): Unit = { + private def awaitStreamReadyForOpen(checker: OpenStreamChecker, topicId: Uuid, partition: Int, streamId: Long, epoch: Long, logIdent: String): Unit = { var round = 0 while(true) { - if (checker.check(streamId, epoch)) { + if (checker.check(topicId, partition, streamId, epoch)) { return } round += 1 diff --git a/core/src/main/scala/kafka/log/streamaspect/OpenStreamChecker.java b/core/src/main/scala/kafka/log/streamaspect/OpenStreamChecker.java index e5bb4a8794..7c54af326b 100644 --- a/core/src/main/scala/kafka/log/streamaspect/OpenStreamChecker.java +++ b/core/src/main/scala/kafka/log/streamaspect/OpenStreamChecker.java @@ -11,17 +11,18 @@ package kafka.log.streamaspect; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.s3.StreamFencedException; /** * Check whether a stream is ready for open. */ public interface OpenStreamChecker { - OpenStreamChecker NOOP = (streamId, epoch) -> true; + OpenStreamChecker NOOP = (topicId, partition, streamId, epoch) -> true; /** * Check whether a stream is ready for open. */ - boolean check(long streamId, long epoch) throws StreamFencedException; + boolean check(Uuid topicId, int partition, long streamId, long epoch) throws StreamFencedException; }