From 3836bb6f5d5015d8d1c86b7d911ccbf1c2332c05 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Mon, 28 Aug 2023 15:24:11 +0800 Subject: [PATCH] feat(s3): generate and replay assigned record in controller 1. generate and replay assigned record in controller Signed-off-by: TheR1sing3un --- checkstyle/checkstyle.xml | 2 +- .../common/message/OpenStreamResponse.json | 6 ++ .../stream/S3ObjectControlManager.java | 24 ++++-- .../stream/StreamControlManager.java | 77 +++++++++++-------- .../S3ObjectControlManagerTest.java | 13 +++- .../controller/StreamControlManagerTest.java | 76 +++++++++++------- 6 files changed, 131 insertions(+), 67 deletions(-) diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml index bf2d339da8..87055cbbbe 100644 --- a/checkstyle/checkstyle.xml +++ b/checkstyle/checkstyle.xml @@ -129,7 +129,7 @@ - + diff --git a/clients/src/main/resources/common/message/OpenStreamResponse.json b/clients/src/main/resources/common/message/OpenStreamResponse.json index d38cae6de4..03cf51cc85 100644 --- a/clients/src/main/resources/common/message/OpenStreamResponse.json +++ b/clients/src/main/resources/common/message/OpenStreamResponse.json @@ -37,6 +37,12 @@ "type": "int64", "versions": "0+", "about": "The start offset of the opened stream" + }, + { + "name": "NextOffset", + "type": "int64", + "versions": "0+", + "about": "The next offset of the opened stream" } ] } \ No newline at end of file diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java index d1710bc414..439e080e7c 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java @@ -31,6 +31,7 @@ import java.util.stream.Collectors; import org.apache.kafka.common.message.PrepareS3ObjectRequestData; import org.apache.kafka.common.message.PrepareS3ObjectResponseData; +import org.apache.kafka.common.metadata.AssignedS3ObjectIdRecord; import org.apache.kafka.common.metadata.RemoveS3ObjectRecord; import org.apache.kafka.common.metadata.S3ObjectRecord; import org.apache.kafka.common.utils.LogContext; @@ -44,6 +45,7 @@ import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.TimelineHashMap; +import org.apache.kafka.timeline.TimelineLong; import org.slf4j.Logger; /** @@ -69,7 +71,7 @@ public class S3ObjectControlManager { /** * The objectId of the next object to be prepared. (start from 0) */ - private Long nextAssignedObjectId = 0L; + private TimelineLong nextAssignedObjectId; private final Queue preparedObjects; @@ -93,6 +95,7 @@ public S3ObjectControlManager( this.log = logContext.logger(S3ObjectControlManager.class); this.clusterId = clusterId; this.config = config; + this.nextAssignedObjectId = new TimelineLong(snapshotRegistry); this.objectsMetadata = new TimelineHashMap<>(snapshotRegistry, 0); this.preparedObjects = new LinkedBlockingDeque<>(); this.markDestroyedObjects = new LinkedBlockingDeque<>(); @@ -119,17 +122,23 @@ public void registerListener(S3ObjectLifeCycleListener listener) { } public Long nextAssignedObjectId() { - return nextAssignedObjectId; + return nextAssignedObjectId.get(); } public ControllerResult prepareObject(PrepareS3ObjectRequestData request) { - // TODO: support batch prepare objects + // TODO: pre assigned a batch of objectIds in controller List records = new ArrayList<>(); PrepareS3ObjectResponseData response = new PrepareS3ObjectResponseData(); int count = request.preparedCount(); List prepareObjectIds = new ArrayList<>(count); + + // update assigned stream id + long newAssignedObjectId = nextAssignedObjectId.get() + count - 1; + records.add(new ApiMessageAndVersion(new AssignedS3ObjectIdRecord() + .setAssignedS3ObjectId(newAssignedObjectId), (short) 0)); + for (int i = 0; i < count; i++) { - Long objectId = nextAssignedObjectId + i; + Long objectId = nextAssignedObjectId.get() + i; prepareObjectIds.add(objectId); long preparedTs = System.currentTimeMillis(); long expiredTs = preparedTs + request.timeToLiveInMs(); @@ -141,7 +150,11 @@ public ControllerResult prepareObject(PrepareS3Obje records.add(new ApiMessageAndVersion(record, (short) 0)); } response.setS3ObjectIds(prepareObjectIds); - return ControllerResult.of(records, response); + return ControllerResult.atomicOf(records, response); + } + + public void replay(AssignedS3ObjectIdRecord record) { + nextAssignedObjectId.set(record.assignedS3ObjectId() + 1); } public void replay(S3ObjectRecord record) { @@ -157,7 +170,6 @@ public void replay(S3ObjectRecord record) { } else if (object.getS3ObjectState() == S3ObjectState.MARK_DESTROYED) { markDestroyedObjects.add(object.getObjectId()); } - nextAssignedObjectId = Math.max(nextAssignedObjectId, record.objectId() + 1); } public void replay(RemoveS3ObjectRecord record) { diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java index 4a48dff9ef..1a9ce023da 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java @@ -37,6 +37,7 @@ import org.apache.kafka.common.message.DeleteStreamResponseData; import org.apache.kafka.common.message.OpenStreamRequestData; import org.apache.kafka.common.message.OpenStreamResponseData; +import org.apache.kafka.common.metadata.AssignedStreamIdRecord; import org.apache.kafka.common.metadata.RangeRecord; import org.apache.kafka.common.metadata.RemoveRangeRecord; import org.apache.kafka.common.metadata.RemoveS3StreamRecord; @@ -51,6 +52,8 @@ import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.TimelineHashMap; import org.apache.kafka.timeline.TimelineHashSet; +import org.apache.kafka.timeline.TimelineInteger; +import org.apache.kafka.timeline.TimelineLong; import org.slf4j.Logger; /** @@ -62,32 +65,35 @@ public class StreamControlManager { // TODO: timeline check public static class S3StreamMetadata { // current epoch, when created but not open, use 0 represent - private long currentEpoch; + private TimelineLong currentEpoch; // rangeIndex, when created but not open, there is no range, use -1 represent - private int currentRangeIndex = -1; - private long startOffset; + private TimelineInteger currentRangeIndex; + private TimelineLong startOffset; private TimelineHashMap ranges; private TimelineHashSet streamObjects; public S3StreamMetadata(long currentEpoch, int currentRangeIndex, long startOffset, SnapshotRegistry registry) { - this.currentEpoch = currentEpoch; - this.currentRangeIndex = currentRangeIndex; - this.startOffset = startOffset; + this.currentEpoch = new TimelineLong(registry); + this.currentEpoch.set(currentEpoch); + this.currentRangeIndex = new TimelineInteger(registry); + this.currentRangeIndex.set(currentRangeIndex); + this.startOffset = new TimelineLong(registry); + this.startOffset.set(startOffset); this.ranges = new TimelineHashMap<>(registry, 0); this.streamObjects = new TimelineHashSet<>(registry, 0); } public long currentEpoch() { - return currentEpoch; + return currentEpoch.get(); } public int currentRangeIndex() { - return currentRangeIndex; + return currentRangeIndex.get(); } public long startOffset() { - return startOffset; + return startOffset.get(); } public Map ranges() { @@ -145,7 +151,7 @@ public String toString() { /** * The next stream id to be assigned. */ - private Long nextAssignedStreamId = 0L; + private final TimelineLong nextAssignedStreamId; private final TimelineHashMap streamsMetadata; @@ -158,16 +164,19 @@ public StreamControlManager( this.snapshotRegistry = snapshotRegistry; this.log = logContext.logger(StreamControlManager.class); this.s3ObjectControlManager = s3ObjectControlManager; + this.nextAssignedStreamId = new TimelineLong(snapshotRegistry); this.streamsMetadata = new TimelineHashMap<>(snapshotRegistry, 0); this.brokersMetadata = new TimelineHashMap<>(snapshotRegistry, 0); } - // TODO: refactor to return next offset of stream in response // TODO: lazy update range's end offset - // TODO: controller allocate the stream id public ControllerResult createStream(CreateStreamRequestData data) { + // TODO: pre assigned a batch of stream id in controller CreateStreamResponseData resp = new CreateStreamResponseData(); - long streamId = nextAssignedStreamId; + long streamId = nextAssignedStreamId.get(); + // update assigned id + ApiMessageAndVersion record0 = new ApiMessageAndVersion(new AssignedStreamIdRecord() + .setAssignedStreamId(streamId), (short) 0); // create stream ApiMessageAndVersion record = new ApiMessageAndVersion(new S3StreamRecord() .setStreamId(streamId) @@ -175,7 +184,7 @@ public ControllerResult createStream(CreateStreamReque .setStartOffset(0L) .setRangeIndex(-1), (short) 0); resp.setStreamId(streamId); - return ControllerResult.of(Arrays.asList(record), resp); + return ControllerResult.atomicOf(Arrays.asList(record0, record), resp); } public ControllerResult openStream(OpenStreamRequestData data) { @@ -190,37 +199,40 @@ public ControllerResult openStream(OpenStreamRequestData } // verify epoch match S3StreamMetadata streamMetadata = this.streamsMetadata.get(streamId); - if (streamMetadata.currentEpoch > epoch) { + if (streamMetadata.currentEpoch.get() > epoch) { resp.setErrorCode(Errors.STREAM_FENCED.code()); return ControllerResult.of(Collections.emptyList(), resp); } - if (streamMetadata.currentEpoch == epoch) { + if (streamMetadata.currentEpoch.get() == epoch) { // epoch equals, verify broker - RangeMetadata rangeMetadata = streamMetadata.ranges.get(streamMetadata.currentRangeIndex); - if (rangeMetadata == null || rangeMetadata.brokerId() != brokerId) { - resp.setErrorCode(Errors.STREAM_FENCED.code()); + RangeMetadata rangeMetadata = streamMetadata.ranges.get(streamMetadata.currentRangeIndex.get()); + if (rangeMetadata != null) { + if (rangeMetadata.brokerId() != brokerId) { + resp.setErrorCode(Errors.STREAM_FENCED.code()); + return ControllerResult.of(Collections.emptyList(), resp); + } + // epoch equals, broker equals, regard it as redundant open operation, just return success + resp.setStartOffset(streamMetadata.startOffset.get()); + resp.setNextOffset(rangeMetadata.endOffset()); return ControllerResult.of(Collections.emptyList(), resp); } - // epoch equals, broker equals, regard it as redundant open operation, just return success - resp.setStartOffset(streamMetadata.startOffset); - return ControllerResult.of(Collections.emptyList(), resp); } // now the request in valid, update the stream's epoch and create a new range for this broker List records = new ArrayList<>(); long newEpoch = epoch; - int newRangeIndex = streamMetadata.currentRangeIndex + 1; + int newRangeIndex = streamMetadata.currentRangeIndex.get() + 1; // stream update record records.add(new ApiMessageAndVersion(new S3StreamRecord() .setStreamId(streamId) .setEpoch(newEpoch) .setRangeIndex(newRangeIndex) - .setStartOffset(streamMetadata.startOffset), (short) 0)); + .setStartOffset(streamMetadata.startOffset.get()), (short) 0)); // get new range's start offset // default regard this range is the first range in stream, use 0 as start offset long startOffset = 0; if (newRangeIndex > 0) { // means that the new range is not the first range in stream, get the last range's end offset - RangeMetadata lastRangeMetadata = streamMetadata.ranges.get(streamMetadata.currentRangeIndex); + RangeMetadata lastRangeMetadata = streamMetadata.ranges.get(streamMetadata.currentRangeIndex.get()); startOffset = lastRangeMetadata.endOffset(); } // range create record @@ -232,7 +244,8 @@ public ControllerResult openStream(OpenStreamRequestData .setEpoch(newEpoch) .setRangeIndex(newRangeIndex), (short) 0)); resp.setStartOffset(startOffset); - return ControllerResult.of(records, resp); + resp.setNextOffset(startOffset); + return ControllerResult.atomicOf(records, resp); } public ControllerResult closeStream(CloseStreamRequestData data) { @@ -255,22 +268,24 @@ public ControllerResult commitStreamObject(Commi throw new UnsupportedOperationException(); } + public void replay(AssignedStreamIdRecord record) { + this.nextAssignedStreamId.set(record.assignedStreamId() + 1); + } public void replay(S3StreamRecord record) { long streamId = record.streamId(); // already exist, update the stream's self metadata if (this.streamsMetadata.containsKey(streamId)) { S3StreamMetadata streamMetadata = this.streamsMetadata.get(streamId); - streamMetadata.startOffset = record.startOffset(); - streamMetadata.currentEpoch = record.epoch(); - streamMetadata.currentRangeIndex = record.rangeIndex(); + streamMetadata.startOffset.set(record.startOffset()); + streamMetadata.currentEpoch.set(record.epoch()); + streamMetadata.currentRangeIndex.set(record.rangeIndex()); return; } // not exist, create a new stream S3StreamMetadata streamMetadata = new S3StreamMetadata(record.epoch(), record.rangeIndex(), record.startOffset(), this.snapshotRegistry); this.streamsMetadata.put(streamId, streamMetadata); - this.nextAssignedStreamId = Math.max(this.nextAssignedStreamId, streamId + 1); } public void replay(RemoveS3StreamRecord record) { @@ -310,7 +325,7 @@ public Map brokersMetadata() { } public Long nextAssignedStreamId() { - return nextAssignedStreamId; + return nextAssignedStreamId.get(); } @Override diff --git a/metadata/src/test/java/org/apache/kafka/controller/S3ObjectControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/S3ObjectControlManagerTest.java index 718e814b45..5affff47ac 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/S3ObjectControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/S3ObjectControlManagerTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.message.PrepareS3ObjectRequestData; import org.apache.kafka.common.message.PrepareS3ObjectResponseData; +import org.apache.kafka.common.metadata.AssignedS3ObjectIdRecord; import org.apache.kafka.common.metadata.S3ObjectRecord; import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.protocol.Errors; @@ -40,6 +41,7 @@ @Timeout(40) @Tag("S3Unit") public class S3ObjectControlManagerTest { + private static final int BROKER0 = 0; private static final int BROKER1 = 1; private static final String CLUSTER = "kafka-on-S3_cluster"; @@ -67,11 +69,16 @@ public void testBasicPrepareObject() { .setPreparedCount(3) .setTimeToLiveInMs(1000)); assertEquals(Errors.NONE.code(), result0.response().errorCode()); - assertEquals(3, result0.records().size()); + assertEquals(4, result0.records().size()); + ApiMessage message = result0.records().get(0).message(); + assertInstanceOf(AssignedS3ObjectIdRecord.class, message); + AssignedS3ObjectIdRecord assignedRecord = (AssignedS3ObjectIdRecord) message; + assertEquals(2, assignedRecord.assignedS3ObjectId()); for (int i = 0; i < 3; i++) { - verifyPrepareObjectRecord(result0.records().get(i), i, 1000); + verifyPrepareObjectRecord(result0.records().get(i + 1), i, 1000); } - result0.records().stream().map(ApiMessageAndVersion::message).forEach(record -> manager.replay((S3ObjectRecord) record)); + manager.replay(assignedRecord); + result0.records().stream().skip(1).map(ApiMessageAndVersion::message).forEach(record -> manager.replay((S3ObjectRecord) record)); // verify the 3 objects are prepared assertEquals(3, manager.objectsMetadata().size()); diff --git a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java index 0bfebec065..bdbd59607d 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.common.message.CreateStreamResponseData; import org.apache.kafka.common.message.OpenStreamRequestData; import org.apache.kafka.common.message.OpenStreamResponseData; +import org.apache.kafka.common.metadata.AssignedStreamIdRecord; import org.apache.kafka.common.metadata.RangeRecord; import org.apache.kafka.common.metadata.S3StreamRecord; import org.apache.kafka.common.protocol.Errors; @@ -75,16 +76,21 @@ public void testBasicCreateStream() { CreateStreamResponseData response0 = result0.response(); assertEquals(Errors.NONE.code(), response0.errorCode()); assertEquals(STREAM0, response0.streamId()); - assertEquals(1, records0.size()); + assertEquals(2, records0.size()); ApiMessageAndVersion record0 = records0.get(0); - assertInstanceOf(S3StreamRecord.class, record0.message()); - S3StreamRecord streamRecord0 = (S3StreamRecord) record0.message(); + assertInstanceOf(AssignedStreamIdRecord.class, record0.message()); + AssignedStreamIdRecord assignedRecord = (AssignedStreamIdRecord) record0.message(); + assertEquals(STREAM0, assignedRecord.assignedStreamId()); + ApiMessageAndVersion record1 = records0.get(1); + assertInstanceOf(S3StreamRecord.class, record1.message()); + S3StreamRecord streamRecord0 = (S3StreamRecord) record1.message(); assertEquals(STREAM0, streamRecord0.streamId()); assertEquals(0, streamRecord0.epoch()); assertEquals(-1, streamRecord0.rangeIndex()); assertEquals(0L, streamRecord0.startOffset()); - // replay records_0 + // replay + manager.replay(assignedRecord); manager.replay(streamRecord0); // verify the stream_0 is created Map streamsMetadata = @@ -100,8 +106,12 @@ public void testBasicCreateStream() { CreateStreamResponseData response1 = result1.response(); assertEquals(Errors.NONE.code(), response1.errorCode()); assertEquals(STREAM1, response1.streamId()); - assertEquals(1, records1.size()); - ApiMessageAndVersion record1 = records1.get(0); + assertEquals(2, records1.size()); + record0 = records1.get(0); + assertInstanceOf(AssignedStreamIdRecord.class, record0.message()); + assignedRecord = (AssignedStreamIdRecord) record0.message(); + assertEquals(STREAM1, assignedRecord.assignedStreamId()); + record1 = records1.get(1); assertInstanceOf(S3StreamRecord.class, record1.message()); S3StreamRecord streamRecord1 = (S3StreamRecord) record1.message(); assertEquals(STREAM1, streamRecord1.streamId()); @@ -110,6 +120,7 @@ public void testBasicCreateStream() { assertEquals(0L, streamRecord1.startOffset()); // replay records_1 + manager.replay(assignedRecord); manager.replay(streamRecord1); // verify the stream_2 is created streamsMetadata = @@ -124,24 +135,33 @@ public void testBasicOpenStream() { // 1. create stream_0 and stream_1 CreateStreamRequestData request0 = new CreateStreamRequestData(); ControllerResult result0 = manager.createStream(request0); - result0.records().stream().map(x -> (S3StreamRecord) x.message()).forEach(manager::replay); + manager.replay((AssignedStreamIdRecord) result0.records().get(0).message()); + result0.records().stream().skip(1).map(x -> (S3StreamRecord) x.message()).forEach(manager::replay); CreateStreamRequestData request1 = new CreateStreamRequestData(); ControllerResult result1 = manager.createStream(request1); - result1.records().stream().map(x -> (S3StreamRecord) x.message()).forEach(manager::replay); + manager.replay((AssignedStreamIdRecord) result1.records().get(0).message()); + result1.records().stream().skip(1).map(x -> (S3StreamRecord) x.message()).forEach(manager::replay); // verify the streams are created Map streamsMetadata = manager.streamsMetadata(); assertEquals(2, streamsMetadata.size()); verifyInitializedStreamMetadata(streamsMetadata.get(STREAM0)); verifyInitializedStreamMetadata(streamsMetadata.get(STREAM1)); + assertEquals(2, manager.nextAssignedStreamId()); - // 2. broker_0 open stream_0 and stream_1 with epoch1 + // 2. broker_0 open stream_0 and stream_1 with epoch0 ControllerResult result2 = manager.openStream( - new OpenStreamRequestData().setStreamId(STREAM0).setStreamEpoch(EPOCH1).setBrokerId(BROKER0)); + new OpenStreamRequestData().setStreamId(STREAM0).setStreamEpoch(EPOCH0).setBrokerId(BROKER0)); ControllerResult result3 = manager.openStream( - new OpenStreamRequestData().setStreamId(STREAM1).setStreamEpoch(EPOCH1).setBrokerId(BROKER0)); - verifyFirstTimeOpenStreamResult(result2, EPOCH1, BROKER0); - verifyFirstTimeOpenStreamResult(result3, EPOCH1, BROKER0); + new OpenStreamRequestData().setStreamId(STREAM1).setStreamEpoch(EPOCH0).setBrokerId(BROKER0)); + assertEquals(Errors.NONE.code(), result2.response().errorCode()); + assertEquals(Errors.NONE.code(), result3.response().errorCode()); + assertEquals(0L, result2.response().startOffset()); + assertEquals(0L, result3.response().startOffset()); + assertEquals(0L, result2.response().nextOffset()); + assertEquals(0L, result3.response().nextOffset()); + verifyFirstTimeOpenStreamResult(result2, EPOCH0, BROKER0); + verifyFirstTimeOpenStreamResult(result3, EPOCH0, BROKER0); S3StreamRecord streamRecord = (S3StreamRecord) result2.records().get(0).message(); manager.replay(streamRecord); RangeRecord rangeRecord = (RangeRecord) result2.records().get(1).message(); @@ -153,8 +173,8 @@ public void testBasicOpenStream() { // verify the stream_0 and stream_1 metadata are updated, and the range_0 is created S3StreamMetadata streamMetadata0 = manager.streamsMetadata().get(STREAM0); - verifyFirstRange(manager.streamsMetadata().get(STREAM0), EPOCH1, BROKER0); - verifyFirstRange(manager.streamsMetadata().get(STREAM1), EPOCH1, BROKER0); + verifyFirstRange(manager.streamsMetadata().get(STREAM0), EPOCH0, BROKER0); + verifyFirstRange(manager.streamsMetadata().get(STREAM1), EPOCH0, BROKER0); // TODO: support write range record, then roll the range and verify // 3. broker_1 try to open stream_0 with epoch0 @@ -163,46 +183,50 @@ public void testBasicOpenStream() { assertEquals(Errors.STREAM_FENCED.code(), result4.response().errorCode()); assertEquals(0, result4.records().size()); - // 4. broker_1 try to open stream_0 with epoch2 + // 4. broker_1 try to open stream_0 with epoch1 ControllerResult result5 = manager.openStream( - new OpenStreamRequestData().setStreamId(STREAM0).setStreamEpoch(EPOCH2).setBrokerId(BROKER1)); + new OpenStreamRequestData().setStreamId(STREAM0).setStreamEpoch(EPOCH1).setBrokerId(BROKER1)); assertEquals(Errors.NONE.code(), result5.response().errorCode()); + assertEquals(0L, result5.response().startOffset()); + assertEquals(0L, result5.response().nextOffset()); assertEquals(2, result5.records().size()); streamRecord = (S3StreamRecord) result5.records().get(0).message(); manager.replay(streamRecord); - assertEquals(EPOCH2, streamRecord.epoch()); + assertEquals(EPOCH1, streamRecord.epoch()); assertEquals(1, streamRecord.rangeIndex()); assertEquals(0L, streamRecord.startOffset()); rangeRecord = (RangeRecord) result5.records().get(1).message(); manager.replay(rangeRecord); assertEquals(BROKER1, rangeRecord.brokerId()); - assertEquals(EPOCH2, rangeRecord.epoch()); + assertEquals(EPOCH1, rangeRecord.epoch()); assertEquals(1, rangeRecord.rangeIndex()); assertEquals(0L, rangeRecord.startOffset()); assertEquals(0L, rangeRecord.endOffset()); - // verify that stream_0's epoch update to epoch2, and range index update to 1 + // verify that stream_0's epoch update to epoch1, and range index update to 1 streamMetadata0 = manager.streamsMetadata().get(STREAM0); - assertEquals(EPOCH2, streamMetadata0.currentEpoch()); + assertEquals(EPOCH1, streamMetadata0.currentEpoch()); assertEquals(1, streamMetadata0.currentRangeIndex()); assertEquals(0L, streamMetadata0.startOffset()); assertEquals(2, streamMetadata0.ranges().size()); RangeMetadata rangeMetadata0 = streamMetadata0.ranges().get(1); assertEquals(BROKER1, rangeMetadata0.brokerId()); - assertEquals(EPOCH2, rangeMetadata0.epoch()); + assertEquals(EPOCH1, rangeMetadata0.epoch()); assertEquals(1, rangeMetadata0.rangeIndex()); assertEquals(0L, rangeMetadata0.startOffset()); assertEquals(0L, rangeMetadata0.endOffset()); - // 5. broker_0 try to open stream_1 with epoch1 + // 5. broker_0 try to open stream_1 with epoch0 ControllerResult result6 = manager.openStream( - new OpenStreamRequestData().setStreamId(STREAM1).setStreamEpoch(EPOCH1).setBrokerId(BROKER0)); + new OpenStreamRequestData().setStreamId(STREAM1).setStreamEpoch(EPOCH0).setBrokerId(BROKER0)); assertEquals(Errors.NONE.code(), result6.response().errorCode()); + assertEquals(0L, result6.response().startOffset()); + assertEquals(0L, result6.response().nextOffset()); assertEquals(0, result6.records().size()); - // 6. broker_1 try to open stream_1 with epoch1 + // 6. broker_1 try to open stream_1 with epoch0 ControllerResult result7 = manager.openStream( - new OpenStreamRequestData().setStreamId(STREAM1).setStreamEpoch(EPOCH1).setBrokerId(BROKER1)); + new OpenStreamRequestData().setStreamId(STREAM1).setStreamEpoch(EPOCH0).setBrokerId(BROKER1)); assertEquals(Errors.STREAM_FENCED.code(), result7.response().errorCode()); assertEquals(0, result7.records().size()); }