Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@

import com.automq.stream.s3.metadata.StreamState;
import kafka.server.metadata.KRaftMetadataCache;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.s3.StreamFencedException;
import org.apache.kafka.image.S3StreamMetadataImage;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.metadata.PartitionRegistration;

public class DefaultOpenStreamChecker implements OpenStreamChecker {
private final KRaftMetadataCache metadataCache;
Expand All @@ -24,13 +27,30 @@ public DefaultOpenStreamChecker(KRaftMetadataCache metadataCache) {
}

@Override
public boolean check(long streamId, long epoch) throws StreamFencedException {
public boolean check(Uuid topicId, int partition, long streamId, long epoch) throws StreamFencedException {
// When ABA reassign happens:
// 1. Assign P0 to broker0 with epoch=0, broker0 opens the partition
// 2. Assign P0 to broker1 with epoch=1, broker1 waits for the partition to be closed
// 3. Quick reassign P0 to broker0 with epoch=2, broker0 merge step2/3 image and keep stream opened with epoch=0
// 4. So broker1 should check partition leader epoch to fail the waiting
TopicImage topicImage = metadataCache.currentImage().topics().getTopic(topicId);
if (topicImage == null) {
throw new StreamFencedException(String.format("topicId=%s cannot be found, it may be deleted or not created yet", topicId));
}
PartitionRegistration partitionImage = topicImage.partitions().get(partition);
if (partitionImage == null) {
throw new StreamFencedException(String.format("partition=%d cannot be found, it may be deleted or not created yet", partition));
}
int currentEpoch = partitionImage.leaderEpoch;
if (currentEpoch > epoch) {
throw new StreamFencedException(String.format("partition=%d with epoch=%d is fenced by new leader epoch=%d", partition, epoch, currentEpoch));
}
S3StreamMetadataImage stream = metadataCache.currentImage().streamsMetadata().getStreamMetadata(streamId);
if (stream == null) {
throw new StreamFencedException(String.format("streamId=%d cannot be found, it may be deleted or not created yet", streamId));
}
if (stream.getEpoch() > epoch)
throw new StreamFencedException(String.format("streamId=%d with epoch=%d is fenced by new epoch=%d", streamId, epoch, stream.getEpoch()));
throw new StreamFencedException(String.format("streamId=%d with epoch=%d is fenced by new leader epoch=%d", streamId, epoch, stream.getEpoch()));
return StreamState.CLOSED.equals(stream.state());
}
}
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ object ElasticLog extends Logging {
stream
} else {
val metaStreamId = Unpooled.wrappedBuffer(value.get()).readLong()
awaitStreamReadyForOpen(openStreamChecker, metaStreamId, leaderEpoch, logIdent = logIdent)
awaitStreamReadyForOpen(openStreamChecker, topicId.get, topicPartition.partition(), metaStreamId, leaderEpoch, logIdent = logIdent)
// open partition meta stream
val stream = client.streamClient().openStream(metaStreamId, OpenStreamOptions.builder().epoch(leaderEpoch).build())
.thenApply(stream => new MetaStream(stream, META_SCHEDULE_EXECUTOR, logIdent))
Expand Down Expand Up @@ -881,10 +881,10 @@ object ElasticLog extends Logging {
resultCf
}

private def awaitStreamReadyForOpen(checker: OpenStreamChecker, streamId: Long, epoch: Long, logIdent: String): Unit = {
private def awaitStreamReadyForOpen(checker: OpenStreamChecker, topicId: Uuid, partition: Int, streamId: Long, epoch: Long, logIdent: String): Unit = {
var round = 0
while(true) {
if (checker.check(streamId, epoch)) {
if (checker.check(topicId, partition, streamId, epoch)) {
return
}
round += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,18 @@

package kafka.log.streamaspect;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.s3.StreamFencedException;

/**
* Check whether a stream is ready for open.
*/
public interface OpenStreamChecker {
OpenStreamChecker NOOP = (streamId, epoch) -> true;
OpenStreamChecker NOOP = (topicId, partition, streamId, epoch) -> true;

/**
* Check whether a stream is ready for open.
*/
boolean check(long streamId, long epoch) throws StreamFencedException;
boolean check(Uuid topicId, int partition, long streamId, long epoch) throws StreamFencedException;

}