diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java index 2b965a02f2..73b3636864 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java @@ -365,7 +365,7 @@ public ControllerResult commitWALObject(CommitWALOb // commit object ControllerResult commitResult = this.s3ObjectControlManager.commitObject(objectId, objectSize); if (!commitResult.response()) { - log.error("object {} not exist when commit wal object", objectId); + log.error("[CommitWALObject]: object {} not exist when commit wal object", objectId); resp.setErrorCode(Errors.OBJECT_NOT_EXIST.code()); return ControllerResult.of(Collections.emptyList(), resp); } @@ -374,7 +374,7 @@ public ControllerResult commitWALObject(CommitWALOb if (data.compactedObjectIds() != null && !data.compactedObjectIds().isEmpty()) { ControllerResult destroyResult = this.s3ObjectControlManager.markDestroyObjects(data.compactedObjectIds()); if (!destroyResult.response()) { - log.error("Mark destroy compacted objects {} failed", String.join(",", data.compactedObjectIds().toArray(new String[0]))); + log.error("[CommitWALObject]: Mark destroy compacted objects: {} failed", data.compactedObjectIds()); resp.setErrorCode(Errors.STREAM_INNER_ERROR.code()); return ControllerResult.of(Collections.emptyList(), resp); } @@ -411,12 +411,54 @@ public ControllerResult commitWALObject(CommitWALOb // generate compacted objects' remove record data.compactedObjectIds().forEach(id -> records.add(new ApiMessageAndVersion(new RemoveWALObjectRecord() .setObjectId(id), (short) 0))); - log.info("[CommitWALObject]: broker: {} commit wal object {} success", brokerId, objectId); + log.info("[CommitWALObject]: broker: {} commit wal object: {} success, compacted objects: {}", brokerId, objectId, data.compactedObjectIds()); return ControllerResult.atomicOf(records, resp); } public ControllerResult commitStreamObject(CommitStreamObjectRequestData data) { - throw new UnsupportedOperationException(); + long streamObjectId = data.objectId(); + long streamId = data.streamId(); + long startOffset = data.startOffset(); + long endOffset = data.endOffset(); + long objectSize = data.objectSize(); + List sourceObjectIds = data.sourceObjectIds(); + List records = new ArrayList<>(); + CommitStreamObjectResponseData resp = new CommitStreamObjectResponseData(); + + // commit object + ControllerResult commitResult = this.s3ObjectControlManager.commitObject(streamObjectId, objectSize); + if (!commitResult.response()) { + log.error("[CommitStreamObject]: object {} not exist when commit stream object", streamObjectId); + resp.setErrorCode(Errors.OBJECT_NOT_EXIST.code()); + return ControllerResult.of(Collections.emptyList(), resp); + } + + // mark destroy compacted object + if (sourceObjectIds != null && !sourceObjectIds.isEmpty()) { + ControllerResult destroyResult = this.s3ObjectControlManager.markDestroyObjects(sourceObjectIds); + if (!destroyResult.response()) { + log.error("[CommitStreamObject]: Mark destroy compacted objects: {} failed", sourceObjectIds); + resp.setErrorCode(Errors.STREAM_INNER_ERROR.code()); + return ControllerResult.of(Collections.emptyList(), resp); + } + } + + // generate stream object record + records.add(new ApiMessageAndVersion(new S3StreamObjectRecord() + .setObjectId(streamObjectId) + .setStreamId(streamId) + .setObjectSize(objectSize) + .setStartOffset(startOffset) + .setEndOffset(endOffset), (short) 0)); + + // generate compacted objects' remove record + if (sourceObjectIds != null && !sourceObjectIds.isEmpty()) { + sourceObjectIds.forEach(id -> records.add(new ApiMessageAndVersion(new RemoveS3StreamObjectRecord() + .setObjectId(id) + .setStreamId(streamId), (short) 0))); + } + log.info("[CommitStreamObject]: stream object: {} commit success, compacted objects: {}", streamObjectId, sourceObjectIds); + return ControllerResult.atomicOf(records, resp); } public GetStreamsOffsetResponseData getStreamsOffset(GetStreamsOffsetRequestData data) { @@ -541,6 +583,7 @@ public void replay(RemoveWALObjectRecord record) { } walMetadata.walObjects.remove(objectId); } + public void replay(S3StreamObjectRecord record) { long objectId = record.objectId(); long streamId = record.streamId(); diff --git a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java index 7bf57e3f3d..0e66c0f306 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java @@ -29,6 +29,8 @@ import java.util.stream.Collectors; import org.apache.kafka.common.message.CloseStreamRequestData; import org.apache.kafka.common.message.CloseStreamResponseData; +import org.apache.kafka.common.message.CommitStreamObjectRequestData; +import org.apache.kafka.common.message.CommitStreamObjectResponseData; import org.apache.kafka.common.message.CommitWALObjectRequestData; import org.apache.kafka.common.message.CommitWALObjectRequestData.ObjectStreamRange; import org.apache.kafka.common.message.CommitWALObjectRequestData.StreamObject; @@ -536,6 +538,96 @@ public void testCommitWalWithStreamObject() { assertEquals(1, manager.streamsMetadata().get(STREAM1).streamObjects().size()); } + @Test + public void testCommitStreamObject() { + Mockito.when(objectControlManager.commitObject(anyLong(), anyLong())).thenReturn(ControllerResult.of(Collections.emptyList(), true)); + Mockito.when(objectControlManager.markDestroyObjects(anyList())).thenReturn(ControllerResult.of(Collections.emptyList(), true)); + + // 1. create and open stream_0 and stream_1 + createAndOpenStream(BROKER0, EPOCH0); + createAndOpenStream(BROKER0, EPOCH0); + + // 2. commit a wal with stream_0 and a stream object with stream_1 that is split out from wal + List streamRanges0 = List.of( + new ObjectStreamRange() + .setStreamId(STREAM0) + .setStreamEpoch(EPOCH0) + .setStartOffset(0L) + .setEndOffset(100L)); + CommitWALObjectRequestData commitRequest0 = new CommitWALObjectRequestData() + .setObjectId(0L) + .setOrderId(0L) + .setBrokerId(BROKER0) + .setObjectSize(999) + .setObjectStreamRanges(streamRanges0) + .setStreamObjects(List.of( + new StreamObject() + .setStreamId(STREAM1) + .setObjectId(1L) + .setObjectSize(999) + .setStartOffset(0L) + .setEndOffset(200L) + )); + ControllerResult result0 = manager.commitWALObject(commitRequest0); + assertEquals(Errors.NONE.code(), result0.response().errorCode()); + replay(manager, result0.records()); + + // 3. commit a wal with stream_0 and a stream object with stream_1 that is split out from wal + List streamRanges1 = List.of( + new ObjectStreamRange() + .setStreamId(STREAM0) + .setStreamEpoch(EPOCH0) + .setStartOffset(100L) + .setEndOffset(200L)); + CommitWALObjectRequestData commitRequest1 = new CommitWALObjectRequestData() + .setObjectId(2L) + .setOrderId(1L) + .setBrokerId(BROKER0) + .setObjectSize(999) + .setObjectStreamRanges(streamRanges1) + .setStreamObjects(List.of( + new StreamObject() + .setStreamId(STREAM1) + .setObjectId(3L) + .setObjectSize(999) + .setStartOffset(200L) + .setEndOffset(400L) + )); + ControllerResult result1 = manager.commitWALObject(commitRequest1); + assertEquals(Errors.NONE.code(), result1.response().errorCode()); + replay(manager, result1.records()); + + // 4. compact these two stream objects + CommitStreamObjectRequestData streamObjectRequest = new CommitStreamObjectRequestData() + .setObjectId(4L) + .setStreamId(STREAM1) + .setStartOffset(0L) + .setEndOffset(400L) + .setObjectSize(999) + .setSourceObjectIds(List.of(1L, 3L)); + ControllerResult result2 = manager.commitStreamObject(streamObjectRequest); + assertEquals(Errors.NONE.code(), result2.response().errorCode()); + replay(manager, result2.records()); + + // 5. fetch stream offset range + GetStreamsOffsetRequestData request = new GetStreamsOffsetRequestData() + .setStreamIds(List.of(STREAM0, STREAM1)); + GetStreamsOffsetResponseData streamsOffset = manager.getStreamsOffset(request); + assertEquals(2, streamsOffset.streamsOffset().size()); + assertEquals(STREAM0, streamsOffset.streamsOffset().get(0).streamId()); + assertEquals(0L, streamsOffset.streamsOffset().get(0).startOffset()); + assertEquals(200L, streamsOffset.streamsOffset().get(0).endOffset()); + assertEquals(STREAM1, streamsOffset.streamsOffset().get(1).streamId()); + assertEquals(0L, streamsOffset.streamsOffset().get(1).startOffset()); + assertEquals(400L, streamsOffset.streamsOffset().get(1).endOffset()); + + // 6. verify stream objects + assertEquals(1, manager.streamsMetadata().get(STREAM1).streamObjects().size()); + assertEquals(4L, manager.streamsMetadata().get(STREAM1).streamObjects().get(4L).objectId()); + assertEquals(0L, manager.streamsMetadata().get(STREAM1).streamObjects().get(4L).streamIndex().getStartOffset()); + assertEquals(400L, manager.streamsMetadata().get(STREAM1).streamObjects().get(4L).streamIndex().getEndOffset()); + } + private void commitFirstLevelWalObject(long objectId, long orderId, long streamId, long startOffset, long endOffset, long epoch, int brokerId) { List streamRanges0 = List.of(new ObjectStreamRange() .setStreamId(streamId) @@ -586,7 +678,7 @@ private void replay(StreamControlManager manager, List rec case S3_STREAM_OBJECT_RECORD: manager.replay((S3StreamObjectRecord) message); break; - case REMOVE_S3_OBJECT_RECORD: + case REMOVE_S3_STREAM_OBJECT_RECORD: manager.replay((RemoveS3StreamObjectRecord) message); break; default: