Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -954,7 +954,8 @@ public ControllerResult<GetOpeningStreamsResponseData> getOpeningStreams(GetOpen
.setStreamId(e.getKey())
.setEpoch(streamMetadata.currentEpoch())
.setStartOffset(streamMetadata.startOffset())
.setEndOffset(rangeMetadata.endOffset());
// Fix https://github.com/AutoMQ/automq/issues/1222#issuecomment-2132812938
.setEndOffset(Math.max(rangeMetadata.endOffset(), streamMetadata.startOffset()));
}).collect(Collectors.toList());
resp.setStreamMetadataList(streamStatusList);
return ControllerResult.atomicOf(records, resp);
Expand Down Expand Up @@ -1020,16 +1021,17 @@ private Errors streamAdvanceCheck(List<StreamOffsetRange> ranges, int nodeId) {
}
for (StreamOffsetRange range : ranges) {
// verify stream exist
if (!this.streamsMetadata.containsKey(range.streamId())) {
S3StreamMetadata streamMetadata = this.streamsMetadata.get(range.streamId());
if (streamMetadata == null) {
log.warn("[streamAdvanceCheck]: streamId={} not exist", range.streamId());
return Errors.STREAM_NOT_EXIST;
}
// check if this stream open
if (this.streamsMetadata.get(range.streamId()).currentState() != StreamState.OPENED) {
if (streamMetadata.currentState() != StreamState.OPENED) {
log.warn("[streamAdvanceCheck]: streamId={} not opened", range.streamId());
return Errors.STREAM_NOT_OPENED;
}
RangeMetadata rangeMetadata = this.streamsMetadata.get(range.streamId()).currentRangeMetadata();
RangeMetadata rangeMetadata = streamMetadata.currentRangeMetadata();
if (rangeMetadata == null) {
// should not happen
log.error("[streamAdvanceCheck]: streamId={}'s current range={} not exist when stream has been ",
Expand All @@ -1041,10 +1043,11 @@ private Errors streamAdvanceCheck(List<StreamOffsetRange> ranges, int nodeId) {
range.streamId(), rangeMetadata.nodeId(), nodeId);
return Errors.STREAM_INNER_ERROR;
}
if (rangeMetadata.endOffset() != range.startOffset()) {
log.warn("[streamAdvanceCheck]: streamId={}'s current range={}'s end offset {} is not equal to request start offset {}",
if (!(rangeMetadata.endOffset() == range.startOffset() || range.startOffset() <= streamMetadata.startOffset())) {
// Fix https://github.com/AutoMQ/automq/issues/1222#issuecomment-2132812938
log.warn("[streamAdvanceCheck]: streamId={}'s current range={}'s end offset {} is not equal to request start offset {}, stream's startOffset={}",
range.streamId(), this.streamsMetadata.get(range.streamId()).currentRangeIndex(),
rangeMetadata.endOffset(), range.startOffset());
rangeMetadata.endOffset(), range.startOffset(), streamMetadata.startOffset());
return Errors.OFFSET_NOT_MATCHED;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,41 @@ public void testCommitStreamSetObject_theSameStreamObject() {
assertTrue(result4.records().isEmpty());
}

@Test
public void testTrimBeyondCommit() {
Mockito.when(objectControlManager.commitObject(anyLong(), anyLong(), anyLong())).then(args -> {
long objectId = args.getArgument(0);
return ControllerResult.of(
List.of(
new ApiMessageAndVersion(
new S3ObjectRecord().setObjectId(objectId).setObjectState(S3ObjectState.COMMITTED.toByte()),
(short) 0
)
),
true);
});
registerAlwaysSuccessEpoch(BROKER0);
CreateStreamRequest request0 = new CreateStreamRequest();
ControllerResult<CreateStreamResponse> createStreamRst = manager.createStream(BROKER0, BROKER_EPOCH0, request0);
replay(manager, createStreamRst.records());
ControllerResult<OpenStreamResponse> openStreamRst = manager.openStream(BROKER0, 0,
new OpenStreamRequest().setStreamId(STREAM0).setStreamEpoch(EPOCH0));
replay(manager, openStreamRst.records());

ControllerResult<TrimStreamResponse> trimRst = manager.trimStream(BROKER0, EPOCH0, new TrimStreamRequest().setStreamId(STREAM0).setStreamEpoch(EPOCH0).setNewStartOffset(100L));
replay(manager, trimRst.records());
assertEquals(100L, manager.streamsMetadata().get(STREAM0).startOffset());

ControllerResult<CommitStreamSetObjectResponseData> commitRst = manager.commitStreamSetObject(new CommitStreamSetObjectRequestData().setNodeId(BROKER0).setNodeEpoch(EPOCH0).setObjectStreamRanges(
List.of(
new ObjectStreamRange().setStreamId(STREAM0).setStreamEpoch(EPOCH0).setStartOffset(0).setEndOffset(10L)
)
));
replay(manager, commitRst.records());
assertEquals(10L, manager.streamsMetadata().get(STREAM0).ranges().get(0).endOffset());
assertEquals((short) 0, commitRst.response().errorCode());
}

private long createStream() {
CreateStreamRequest request0 = new CreateStreamRequest();
ControllerResult<CreateStreamResponse> result0 = manager.createStream(BROKER0, BROKER_EPOCH0, request0);
Expand Down