diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java index 8750eb7bb0..5ccabff41f 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java @@ -954,7 +954,8 @@ public ControllerResult getOpeningStreams(GetOpen .setStreamId(e.getKey()) .setEpoch(streamMetadata.currentEpoch()) .setStartOffset(streamMetadata.startOffset()) - .setEndOffset(rangeMetadata.endOffset()); + // Fix https://github.com/AutoMQ/automq/issues/1222#issuecomment-2132812938 + .setEndOffset(Math.max(rangeMetadata.endOffset(), streamMetadata.startOffset())); }).collect(Collectors.toList()); resp.setStreamMetadataList(streamStatusList); return ControllerResult.atomicOf(records, resp); @@ -1020,16 +1021,17 @@ private Errors streamAdvanceCheck(List ranges, int nodeId) { } for (StreamOffsetRange range : ranges) { // verify stream exist - if (!this.streamsMetadata.containsKey(range.streamId())) { + S3StreamMetadata streamMetadata = this.streamsMetadata.get(range.streamId()); + if (streamMetadata == null) { log.warn("[streamAdvanceCheck]: streamId={} not exist", range.streamId()); return Errors.STREAM_NOT_EXIST; } // check if this stream open - if (this.streamsMetadata.get(range.streamId()).currentState() != StreamState.OPENED) { + if (streamMetadata.currentState() != StreamState.OPENED) { log.warn("[streamAdvanceCheck]: streamId={} not opened", range.streamId()); return Errors.STREAM_NOT_OPENED; } - RangeMetadata rangeMetadata = this.streamsMetadata.get(range.streamId()).currentRangeMetadata(); + RangeMetadata rangeMetadata = streamMetadata.currentRangeMetadata(); if (rangeMetadata == null) { // should not happen log.error("[streamAdvanceCheck]: streamId={}'s current range={} not exist when stream has been ", @@ -1041,10 +1043,11 @@ private Errors streamAdvanceCheck(List ranges, int nodeId) { range.streamId(), rangeMetadata.nodeId(), nodeId); return Errors.STREAM_INNER_ERROR; } - if (rangeMetadata.endOffset() != range.startOffset()) { - log.warn("[streamAdvanceCheck]: streamId={}'s current range={}'s end offset {} is not equal to request start offset {}", + if (!(rangeMetadata.endOffset() == range.startOffset() || range.startOffset() <= streamMetadata.startOffset())) { + // Fix https://github.com/AutoMQ/automq/issues/1222#issuecomment-2132812938 + log.warn("[streamAdvanceCheck]: streamId={}'s current range={}'s end offset {} is not equal to request start offset {}, stream's startOffset={}", range.streamId(), this.streamsMetadata.get(range.streamId()).currentRangeIndex(), - rangeMetadata.endOffset(), range.startOffset()); + rangeMetadata.endOffset(), range.startOffset(), streamMetadata.startOffset()); return Errors.OFFSET_NOT_MATCHED; } } diff --git a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java index a2ebe5def4..a9a4a534ff 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java @@ -601,6 +601,41 @@ public void testCommitStreamSetObject_theSameStreamObject() { assertTrue(result4.records().isEmpty()); } + @Test + public void testTrimBeyondCommit() { + Mockito.when(objectControlManager.commitObject(anyLong(), anyLong(), anyLong())).then(args -> { + long objectId = args.getArgument(0); + return ControllerResult.of( + List.of( + new ApiMessageAndVersion( + new S3ObjectRecord().setObjectId(objectId).setObjectState(S3ObjectState.COMMITTED.toByte()), + (short) 0 + ) + ), + true); + }); + registerAlwaysSuccessEpoch(BROKER0); + CreateStreamRequest request0 = new CreateStreamRequest(); + ControllerResult createStreamRst = manager.createStream(BROKER0, BROKER_EPOCH0, request0); + replay(manager, createStreamRst.records()); + ControllerResult openStreamRst = manager.openStream(BROKER0, 0, + new OpenStreamRequest().setStreamId(STREAM0).setStreamEpoch(EPOCH0)); + replay(manager, openStreamRst.records()); + + ControllerResult trimRst = manager.trimStream(BROKER0, EPOCH0, new TrimStreamRequest().setStreamId(STREAM0).setStreamEpoch(EPOCH0).setNewStartOffset(100L)); + replay(manager, trimRst.records()); + assertEquals(100L, manager.streamsMetadata().get(STREAM0).startOffset()); + + ControllerResult commitRst = manager.commitStreamSetObject(new CommitStreamSetObjectRequestData().setNodeId(BROKER0).setNodeEpoch(EPOCH0).setObjectStreamRanges( + List.of( + new ObjectStreamRange().setStreamId(STREAM0).setStreamEpoch(EPOCH0).setStartOffset(0).setEndOffset(10L) + ) + )); + replay(manager, commitRst.records()); + assertEquals(10L, manager.streamsMetadata().get(STREAM0).ranges().get(0).endOffset()); + assertEquals((short) 0, commitRst.response().errorCode()); + } + private long createStream() { CreateStreamRequest request0 = new CreateStreamRequest(); ControllerResult result0 = manager.createStream(BROKER0, BROKER_EPOCH0, request0);