diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index bf2d339da8..87055cbbbe 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -129,7 +129,7 @@
-
+
diff --git a/clients/src/main/resources/common/message/OpenStreamResponse.json b/clients/src/main/resources/common/message/OpenStreamResponse.json
index d38cae6de4..03cf51cc85 100644
--- a/clients/src/main/resources/common/message/OpenStreamResponse.json
+++ b/clients/src/main/resources/common/message/OpenStreamResponse.json
@@ -37,6 +37,12 @@
"type": "int64",
"versions": "0+",
"about": "The start offset of the opened stream"
+ },
+ {
+ "name": "NextOffset",
+ "type": "int64",
+ "versions": "0+",
+ "about": "The next offset of the opened stream"
}
]
}
\ No newline at end of file
diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java
index d1710bc414..439e080e7c 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java
@@ -31,6 +31,7 @@
import java.util.stream.Collectors;
import org.apache.kafka.common.message.PrepareS3ObjectRequestData;
import org.apache.kafka.common.message.PrepareS3ObjectResponseData;
+import org.apache.kafka.common.metadata.AssignedS3ObjectIdRecord;
import org.apache.kafka.common.metadata.RemoveS3ObjectRecord;
import org.apache.kafka.common.metadata.S3ObjectRecord;
import org.apache.kafka.common.utils.LogContext;
@@ -44,6 +45,7 @@
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineLong;
import org.slf4j.Logger;
/**
@@ -69,7 +71,7 @@ public class S3ObjectControlManager {
/**
* The objectId of the next object to be prepared. (start from 0)
*/
- private Long nextAssignedObjectId = 0L;
+ private TimelineLong nextAssignedObjectId;
private final Queue preparedObjects;
@@ -93,6 +95,7 @@ public S3ObjectControlManager(
this.log = logContext.logger(S3ObjectControlManager.class);
this.clusterId = clusterId;
this.config = config;
+ this.nextAssignedObjectId = new TimelineLong(snapshotRegistry);
this.objectsMetadata = new TimelineHashMap<>(snapshotRegistry, 0);
this.preparedObjects = new LinkedBlockingDeque<>();
this.markDestroyedObjects = new LinkedBlockingDeque<>();
@@ -119,17 +122,23 @@ public void registerListener(S3ObjectLifeCycleListener listener) {
}
public Long nextAssignedObjectId() {
- return nextAssignedObjectId;
+ return nextAssignedObjectId.get();
}
public ControllerResult prepareObject(PrepareS3ObjectRequestData request) {
- // TODO: support batch prepare objects
+ // TODO: pre assigned a batch of objectIds in controller
List records = new ArrayList<>();
PrepareS3ObjectResponseData response = new PrepareS3ObjectResponseData();
int count = request.preparedCount();
List prepareObjectIds = new ArrayList<>(count);
+
+ // update assigned stream id
+ long newAssignedObjectId = nextAssignedObjectId.get() + count - 1;
+ records.add(new ApiMessageAndVersion(new AssignedS3ObjectIdRecord()
+ .setAssignedS3ObjectId(newAssignedObjectId), (short) 0));
+
for (int i = 0; i < count; i++) {
- Long objectId = nextAssignedObjectId + i;
+ Long objectId = nextAssignedObjectId.get() + i;
prepareObjectIds.add(objectId);
long preparedTs = System.currentTimeMillis();
long expiredTs = preparedTs + request.timeToLiveInMs();
@@ -141,7 +150,11 @@ public ControllerResult prepareObject(PrepareS3Obje
records.add(new ApiMessageAndVersion(record, (short) 0));
}
response.setS3ObjectIds(prepareObjectIds);
- return ControllerResult.of(records, response);
+ return ControllerResult.atomicOf(records, response);
+ }
+
+ public void replay(AssignedS3ObjectIdRecord record) {
+ nextAssignedObjectId.set(record.assignedS3ObjectId() + 1);
}
public void replay(S3ObjectRecord record) {
@@ -157,7 +170,6 @@ public void replay(S3ObjectRecord record) {
} else if (object.getS3ObjectState() == S3ObjectState.MARK_DESTROYED) {
markDestroyedObjects.add(object.getObjectId());
}
- nextAssignedObjectId = Math.max(nextAssignedObjectId, record.objectId() + 1);
}
public void replay(RemoveS3ObjectRecord record) {
diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java
index 4a48dff9ef..1a9ce023da 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java
@@ -37,6 +37,7 @@
import org.apache.kafka.common.message.DeleteStreamResponseData;
import org.apache.kafka.common.message.OpenStreamRequestData;
import org.apache.kafka.common.message.OpenStreamResponseData;
+import org.apache.kafka.common.metadata.AssignedStreamIdRecord;
import org.apache.kafka.common.metadata.RangeRecord;
import org.apache.kafka.common.metadata.RemoveRangeRecord;
import org.apache.kafka.common.metadata.RemoveS3StreamRecord;
@@ -51,6 +52,8 @@
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineHashSet;
+import org.apache.kafka.timeline.TimelineInteger;
+import org.apache.kafka.timeline.TimelineLong;
import org.slf4j.Logger;
/**
@@ -62,32 +65,35 @@ public class StreamControlManager {
// TODO: timeline check
public static class S3StreamMetadata {
// current epoch, when created but not open, use 0 represent
- private long currentEpoch;
+ private TimelineLong currentEpoch;
// rangeIndex, when created but not open, there is no range, use -1 represent
- private int currentRangeIndex = -1;
- private long startOffset;
+ private TimelineInteger currentRangeIndex;
+ private TimelineLong startOffset;
private TimelineHashMap ranges;
private TimelineHashSet streamObjects;
public S3StreamMetadata(long currentEpoch, int currentRangeIndex, long startOffset,
SnapshotRegistry registry) {
- this.currentEpoch = currentEpoch;
- this.currentRangeIndex = currentRangeIndex;
- this.startOffset = startOffset;
+ this.currentEpoch = new TimelineLong(registry);
+ this.currentEpoch.set(currentEpoch);
+ this.currentRangeIndex = new TimelineInteger(registry);
+ this.currentRangeIndex.set(currentRangeIndex);
+ this.startOffset = new TimelineLong(registry);
+ this.startOffset.set(startOffset);
this.ranges = new TimelineHashMap<>(registry, 0);
this.streamObjects = new TimelineHashSet<>(registry, 0);
}
public long currentEpoch() {
- return currentEpoch;
+ return currentEpoch.get();
}
public int currentRangeIndex() {
- return currentRangeIndex;
+ return currentRangeIndex.get();
}
public long startOffset() {
- return startOffset;
+ return startOffset.get();
}
public Map ranges() {
@@ -145,7 +151,7 @@ public String toString() {
/**
* The next stream id to be assigned.
*/
- private Long nextAssignedStreamId = 0L;
+ private final TimelineLong nextAssignedStreamId;
private final TimelineHashMap streamsMetadata;
@@ -158,16 +164,19 @@ public StreamControlManager(
this.snapshotRegistry = snapshotRegistry;
this.log = logContext.logger(StreamControlManager.class);
this.s3ObjectControlManager = s3ObjectControlManager;
+ this.nextAssignedStreamId = new TimelineLong(snapshotRegistry);
this.streamsMetadata = new TimelineHashMap<>(snapshotRegistry, 0);
this.brokersMetadata = new TimelineHashMap<>(snapshotRegistry, 0);
}
- // TODO: refactor to return next offset of stream in response
// TODO: lazy update range's end offset
- // TODO: controller allocate the stream id
public ControllerResult createStream(CreateStreamRequestData data) {
+ // TODO: pre assigned a batch of stream id in controller
CreateStreamResponseData resp = new CreateStreamResponseData();
- long streamId = nextAssignedStreamId;
+ long streamId = nextAssignedStreamId.get();
+ // update assigned id
+ ApiMessageAndVersion record0 = new ApiMessageAndVersion(new AssignedStreamIdRecord()
+ .setAssignedStreamId(streamId), (short) 0);
// create stream
ApiMessageAndVersion record = new ApiMessageAndVersion(new S3StreamRecord()
.setStreamId(streamId)
@@ -175,7 +184,7 @@ public ControllerResult createStream(CreateStreamReque
.setStartOffset(0L)
.setRangeIndex(-1), (short) 0);
resp.setStreamId(streamId);
- return ControllerResult.of(Arrays.asList(record), resp);
+ return ControllerResult.atomicOf(Arrays.asList(record0, record), resp);
}
public ControllerResult openStream(OpenStreamRequestData data) {
@@ -190,37 +199,40 @@ public ControllerResult openStream(OpenStreamRequestData
}
// verify epoch match
S3StreamMetadata streamMetadata = this.streamsMetadata.get(streamId);
- if (streamMetadata.currentEpoch > epoch) {
+ if (streamMetadata.currentEpoch.get() > epoch) {
resp.setErrorCode(Errors.STREAM_FENCED.code());
return ControllerResult.of(Collections.emptyList(), resp);
}
- if (streamMetadata.currentEpoch == epoch) {
+ if (streamMetadata.currentEpoch.get() == epoch) {
// epoch equals, verify broker
- RangeMetadata rangeMetadata = streamMetadata.ranges.get(streamMetadata.currentRangeIndex);
- if (rangeMetadata == null || rangeMetadata.brokerId() != brokerId) {
- resp.setErrorCode(Errors.STREAM_FENCED.code());
+ RangeMetadata rangeMetadata = streamMetadata.ranges.get(streamMetadata.currentRangeIndex.get());
+ if (rangeMetadata != null) {
+ if (rangeMetadata.brokerId() != brokerId) {
+ resp.setErrorCode(Errors.STREAM_FENCED.code());
+ return ControllerResult.of(Collections.emptyList(), resp);
+ }
+ // epoch equals, broker equals, regard it as redundant open operation, just return success
+ resp.setStartOffset(streamMetadata.startOffset.get());
+ resp.setNextOffset(rangeMetadata.endOffset());
return ControllerResult.of(Collections.emptyList(), resp);
}
- // epoch equals, broker equals, regard it as redundant open operation, just return success
- resp.setStartOffset(streamMetadata.startOffset);
- return ControllerResult.of(Collections.emptyList(), resp);
}
// now the request in valid, update the stream's epoch and create a new range for this broker
List records = new ArrayList<>();
long newEpoch = epoch;
- int newRangeIndex = streamMetadata.currentRangeIndex + 1;
+ int newRangeIndex = streamMetadata.currentRangeIndex.get() + 1;
// stream update record
records.add(new ApiMessageAndVersion(new S3StreamRecord()
.setStreamId(streamId)
.setEpoch(newEpoch)
.setRangeIndex(newRangeIndex)
- .setStartOffset(streamMetadata.startOffset), (short) 0));
+ .setStartOffset(streamMetadata.startOffset.get()), (short) 0));
// get new range's start offset
// default regard this range is the first range in stream, use 0 as start offset
long startOffset = 0;
if (newRangeIndex > 0) {
// means that the new range is not the first range in stream, get the last range's end offset
- RangeMetadata lastRangeMetadata = streamMetadata.ranges.get(streamMetadata.currentRangeIndex);
+ RangeMetadata lastRangeMetadata = streamMetadata.ranges.get(streamMetadata.currentRangeIndex.get());
startOffset = lastRangeMetadata.endOffset();
}
// range create record
@@ -232,7 +244,8 @@ public ControllerResult openStream(OpenStreamRequestData
.setEpoch(newEpoch)
.setRangeIndex(newRangeIndex), (short) 0));
resp.setStartOffset(startOffset);
- return ControllerResult.of(records, resp);
+ resp.setNextOffset(startOffset);
+ return ControllerResult.atomicOf(records, resp);
}
public ControllerResult closeStream(CloseStreamRequestData data) {
@@ -255,22 +268,24 @@ public ControllerResult commitStreamObject(Commi
throw new UnsupportedOperationException();
}
+ public void replay(AssignedStreamIdRecord record) {
+ this.nextAssignedStreamId.set(record.assignedStreamId() + 1);
+ }
public void replay(S3StreamRecord record) {
long streamId = record.streamId();
// already exist, update the stream's self metadata
if (this.streamsMetadata.containsKey(streamId)) {
S3StreamMetadata streamMetadata = this.streamsMetadata.get(streamId);
- streamMetadata.startOffset = record.startOffset();
- streamMetadata.currentEpoch = record.epoch();
- streamMetadata.currentRangeIndex = record.rangeIndex();
+ streamMetadata.startOffset.set(record.startOffset());
+ streamMetadata.currentEpoch.set(record.epoch());
+ streamMetadata.currentRangeIndex.set(record.rangeIndex());
return;
}
// not exist, create a new stream
S3StreamMetadata streamMetadata = new S3StreamMetadata(record.epoch(), record.rangeIndex(),
record.startOffset(), this.snapshotRegistry);
this.streamsMetadata.put(streamId, streamMetadata);
- this.nextAssignedStreamId = Math.max(this.nextAssignedStreamId, streamId + 1);
}
public void replay(RemoveS3StreamRecord record) {
@@ -310,7 +325,7 @@ public Map brokersMetadata() {
}
public Long nextAssignedStreamId() {
- return nextAssignedStreamId;
+ return nextAssignedStreamId.get();
}
@Override
diff --git a/metadata/src/test/java/org/apache/kafka/controller/S3ObjectControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/S3ObjectControlManagerTest.java
index 718e814b45..5affff47ac 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/S3ObjectControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/S3ObjectControlManagerTest.java
@@ -22,6 +22,7 @@
import org.apache.kafka.common.message.PrepareS3ObjectRequestData;
import org.apache.kafka.common.message.PrepareS3ObjectResponseData;
+import org.apache.kafka.common.metadata.AssignedS3ObjectIdRecord;
import org.apache.kafka.common.metadata.S3ObjectRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
@@ -40,6 +41,7 @@
@Timeout(40)
@Tag("S3Unit")
public class S3ObjectControlManagerTest {
+
private static final int BROKER0 = 0;
private static final int BROKER1 = 1;
private static final String CLUSTER = "kafka-on-S3_cluster";
@@ -67,11 +69,16 @@ public void testBasicPrepareObject() {
.setPreparedCount(3)
.setTimeToLiveInMs(1000));
assertEquals(Errors.NONE.code(), result0.response().errorCode());
- assertEquals(3, result0.records().size());
+ assertEquals(4, result0.records().size());
+ ApiMessage message = result0.records().get(0).message();
+ assertInstanceOf(AssignedS3ObjectIdRecord.class, message);
+ AssignedS3ObjectIdRecord assignedRecord = (AssignedS3ObjectIdRecord) message;
+ assertEquals(2, assignedRecord.assignedS3ObjectId());
for (int i = 0; i < 3; i++) {
- verifyPrepareObjectRecord(result0.records().get(i), i, 1000);
+ verifyPrepareObjectRecord(result0.records().get(i + 1), i, 1000);
}
- result0.records().stream().map(ApiMessageAndVersion::message).forEach(record -> manager.replay((S3ObjectRecord) record));
+ manager.replay(assignedRecord);
+ result0.records().stream().skip(1).map(ApiMessageAndVersion::message).forEach(record -> manager.replay((S3ObjectRecord) record));
// verify the 3 objects are prepared
assertEquals(3, manager.objectsMetadata().size());
diff --git a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java
index 0bfebec065..bdbd59607d 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java
@@ -27,6 +27,7 @@
import org.apache.kafka.common.message.CreateStreamResponseData;
import org.apache.kafka.common.message.OpenStreamRequestData;
import org.apache.kafka.common.message.OpenStreamResponseData;
+import org.apache.kafka.common.metadata.AssignedStreamIdRecord;
import org.apache.kafka.common.metadata.RangeRecord;
import org.apache.kafka.common.metadata.S3StreamRecord;
import org.apache.kafka.common.protocol.Errors;
@@ -75,16 +76,21 @@ public void testBasicCreateStream() {
CreateStreamResponseData response0 = result0.response();
assertEquals(Errors.NONE.code(), response0.errorCode());
assertEquals(STREAM0, response0.streamId());
- assertEquals(1, records0.size());
+ assertEquals(2, records0.size());
ApiMessageAndVersion record0 = records0.get(0);
- assertInstanceOf(S3StreamRecord.class, record0.message());
- S3StreamRecord streamRecord0 = (S3StreamRecord) record0.message();
+ assertInstanceOf(AssignedStreamIdRecord.class, record0.message());
+ AssignedStreamIdRecord assignedRecord = (AssignedStreamIdRecord) record0.message();
+ assertEquals(STREAM0, assignedRecord.assignedStreamId());
+ ApiMessageAndVersion record1 = records0.get(1);
+ assertInstanceOf(S3StreamRecord.class, record1.message());
+ S3StreamRecord streamRecord0 = (S3StreamRecord) record1.message();
assertEquals(STREAM0, streamRecord0.streamId());
assertEquals(0, streamRecord0.epoch());
assertEquals(-1, streamRecord0.rangeIndex());
assertEquals(0L, streamRecord0.startOffset());
- // replay records_0
+ // replay
+ manager.replay(assignedRecord);
manager.replay(streamRecord0);
// verify the stream_0 is created
Map streamsMetadata =
@@ -100,8 +106,12 @@ public void testBasicCreateStream() {
CreateStreamResponseData response1 = result1.response();
assertEquals(Errors.NONE.code(), response1.errorCode());
assertEquals(STREAM1, response1.streamId());
- assertEquals(1, records1.size());
- ApiMessageAndVersion record1 = records1.get(0);
+ assertEquals(2, records1.size());
+ record0 = records1.get(0);
+ assertInstanceOf(AssignedStreamIdRecord.class, record0.message());
+ assignedRecord = (AssignedStreamIdRecord) record0.message();
+ assertEquals(STREAM1, assignedRecord.assignedStreamId());
+ record1 = records1.get(1);
assertInstanceOf(S3StreamRecord.class, record1.message());
S3StreamRecord streamRecord1 = (S3StreamRecord) record1.message();
assertEquals(STREAM1, streamRecord1.streamId());
@@ -110,6 +120,7 @@ public void testBasicCreateStream() {
assertEquals(0L, streamRecord1.startOffset());
// replay records_1
+ manager.replay(assignedRecord);
manager.replay(streamRecord1);
// verify the stream_2 is created
streamsMetadata =
@@ -124,24 +135,33 @@ public void testBasicOpenStream() {
// 1. create stream_0 and stream_1
CreateStreamRequestData request0 = new CreateStreamRequestData();
ControllerResult result0 = manager.createStream(request0);
- result0.records().stream().map(x -> (S3StreamRecord) x.message()).forEach(manager::replay);
+ manager.replay((AssignedStreamIdRecord) result0.records().get(0).message());
+ result0.records().stream().skip(1).map(x -> (S3StreamRecord) x.message()).forEach(manager::replay);
CreateStreamRequestData request1 = new CreateStreamRequestData();
ControllerResult result1 = manager.createStream(request1);
- result1.records().stream().map(x -> (S3StreamRecord) x.message()).forEach(manager::replay);
+ manager.replay((AssignedStreamIdRecord) result1.records().get(0).message());
+ result1.records().stream().skip(1).map(x -> (S3StreamRecord) x.message()).forEach(manager::replay);
// verify the streams are created
Map streamsMetadata = manager.streamsMetadata();
assertEquals(2, streamsMetadata.size());
verifyInitializedStreamMetadata(streamsMetadata.get(STREAM0));
verifyInitializedStreamMetadata(streamsMetadata.get(STREAM1));
+ assertEquals(2, manager.nextAssignedStreamId());
- // 2. broker_0 open stream_0 and stream_1 with epoch1
+ // 2. broker_0 open stream_0 and stream_1 with epoch0
ControllerResult result2 = manager.openStream(
- new OpenStreamRequestData().setStreamId(STREAM0).setStreamEpoch(EPOCH1).setBrokerId(BROKER0));
+ new OpenStreamRequestData().setStreamId(STREAM0).setStreamEpoch(EPOCH0).setBrokerId(BROKER0));
ControllerResult result3 = manager.openStream(
- new OpenStreamRequestData().setStreamId(STREAM1).setStreamEpoch(EPOCH1).setBrokerId(BROKER0));
- verifyFirstTimeOpenStreamResult(result2, EPOCH1, BROKER0);
- verifyFirstTimeOpenStreamResult(result3, EPOCH1, BROKER0);
+ new OpenStreamRequestData().setStreamId(STREAM1).setStreamEpoch(EPOCH0).setBrokerId(BROKER0));
+ assertEquals(Errors.NONE.code(), result2.response().errorCode());
+ assertEquals(Errors.NONE.code(), result3.response().errorCode());
+ assertEquals(0L, result2.response().startOffset());
+ assertEquals(0L, result3.response().startOffset());
+ assertEquals(0L, result2.response().nextOffset());
+ assertEquals(0L, result3.response().nextOffset());
+ verifyFirstTimeOpenStreamResult(result2, EPOCH0, BROKER0);
+ verifyFirstTimeOpenStreamResult(result3, EPOCH0, BROKER0);
S3StreamRecord streamRecord = (S3StreamRecord) result2.records().get(0).message();
manager.replay(streamRecord);
RangeRecord rangeRecord = (RangeRecord) result2.records().get(1).message();
@@ -153,8 +173,8 @@ public void testBasicOpenStream() {
// verify the stream_0 and stream_1 metadata are updated, and the range_0 is created
S3StreamMetadata streamMetadata0 = manager.streamsMetadata().get(STREAM0);
- verifyFirstRange(manager.streamsMetadata().get(STREAM0), EPOCH1, BROKER0);
- verifyFirstRange(manager.streamsMetadata().get(STREAM1), EPOCH1, BROKER0);
+ verifyFirstRange(manager.streamsMetadata().get(STREAM0), EPOCH0, BROKER0);
+ verifyFirstRange(manager.streamsMetadata().get(STREAM1), EPOCH0, BROKER0);
// TODO: support write range record, then roll the range and verify
// 3. broker_1 try to open stream_0 with epoch0
@@ -163,46 +183,50 @@ public void testBasicOpenStream() {
assertEquals(Errors.STREAM_FENCED.code(), result4.response().errorCode());
assertEquals(0, result4.records().size());
- // 4. broker_1 try to open stream_0 with epoch2
+ // 4. broker_1 try to open stream_0 with epoch1
ControllerResult result5 = manager.openStream(
- new OpenStreamRequestData().setStreamId(STREAM0).setStreamEpoch(EPOCH2).setBrokerId(BROKER1));
+ new OpenStreamRequestData().setStreamId(STREAM0).setStreamEpoch(EPOCH1).setBrokerId(BROKER1));
assertEquals(Errors.NONE.code(), result5.response().errorCode());
+ assertEquals(0L, result5.response().startOffset());
+ assertEquals(0L, result5.response().nextOffset());
assertEquals(2, result5.records().size());
streamRecord = (S3StreamRecord) result5.records().get(0).message();
manager.replay(streamRecord);
- assertEquals(EPOCH2, streamRecord.epoch());
+ assertEquals(EPOCH1, streamRecord.epoch());
assertEquals(1, streamRecord.rangeIndex());
assertEquals(0L, streamRecord.startOffset());
rangeRecord = (RangeRecord) result5.records().get(1).message();
manager.replay(rangeRecord);
assertEquals(BROKER1, rangeRecord.brokerId());
- assertEquals(EPOCH2, rangeRecord.epoch());
+ assertEquals(EPOCH1, rangeRecord.epoch());
assertEquals(1, rangeRecord.rangeIndex());
assertEquals(0L, rangeRecord.startOffset());
assertEquals(0L, rangeRecord.endOffset());
- // verify that stream_0's epoch update to epoch2, and range index update to 1
+ // verify that stream_0's epoch update to epoch1, and range index update to 1
streamMetadata0 = manager.streamsMetadata().get(STREAM0);
- assertEquals(EPOCH2, streamMetadata0.currentEpoch());
+ assertEquals(EPOCH1, streamMetadata0.currentEpoch());
assertEquals(1, streamMetadata0.currentRangeIndex());
assertEquals(0L, streamMetadata0.startOffset());
assertEquals(2, streamMetadata0.ranges().size());
RangeMetadata rangeMetadata0 = streamMetadata0.ranges().get(1);
assertEquals(BROKER1, rangeMetadata0.brokerId());
- assertEquals(EPOCH2, rangeMetadata0.epoch());
+ assertEquals(EPOCH1, rangeMetadata0.epoch());
assertEquals(1, rangeMetadata0.rangeIndex());
assertEquals(0L, rangeMetadata0.startOffset());
assertEquals(0L, rangeMetadata0.endOffset());
- // 5. broker_0 try to open stream_1 with epoch1
+ // 5. broker_0 try to open stream_1 with epoch0
ControllerResult result6 = manager.openStream(
- new OpenStreamRequestData().setStreamId(STREAM1).setStreamEpoch(EPOCH1).setBrokerId(BROKER0));
+ new OpenStreamRequestData().setStreamId(STREAM1).setStreamEpoch(EPOCH0).setBrokerId(BROKER0));
assertEquals(Errors.NONE.code(), result6.response().errorCode());
+ assertEquals(0L, result6.response().startOffset());
+ assertEquals(0L, result6.response().nextOffset());
assertEquals(0, result6.records().size());
- // 6. broker_1 try to open stream_1 with epoch1
+ // 6. broker_1 try to open stream_1 with epoch0
ControllerResult result7 = manager.openStream(
- new OpenStreamRequestData().setStreamId(STREAM1).setStreamEpoch(EPOCH1).setBrokerId(BROKER1));
+ new OpenStreamRequestData().setStreamId(STREAM1).setStreamEpoch(EPOCH0).setBrokerId(BROKER1));
assertEquals(Errors.STREAM_FENCED.code(), result7.response().errorCode());
assertEquals(0, result7.records().size());
}