diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java index 5359fea45d..e78d0f633f 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java @@ -621,11 +621,13 @@ public boolean isActive(int brokerId) { return !registration.inControlledShutdown() && !registration.fenced(); } + // AutoMQ for kafka inject start public List getActiveBrokers() { return brokerRegistrations.values().stream() .filter(broker -> isActive(broker.id())) .collect(Collectors.toList()); } + // AutoMQ for kafka inject end BrokerHeartbeatManager heartbeatManager() { if (heartbeatManager == null) { diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 1a0711fcbf..d89bc6a657 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -1930,7 +1930,8 @@ private QuorumController( } this.s3ObjectControlManager = new S3ObjectControlManager( this, snapshotRegistry, logContext, clusterId, streamConfig, s3Operator); - this.streamControlManager = new StreamControlManager(snapshotRegistry, logContext, this.s3ObjectControlManager); + this.streamControlManager = new StreamControlManager(this, snapshotRegistry, logContext, + this.s3ObjectControlManager, clusterControl); this.kvControlManager = new KVControlManager(snapshotRegistry, logContext); // AutoMQ for Kafka inject end updateWriteOffset(-1); @@ -2308,8 +2309,7 @@ public void close() throws InterruptedException { // AutoMQ for Kafka inject start @Override public CompletableFuture checkS3ObjectsLifecycle(ControllerRequestContext context) { - return appendWriteEvent("checkS3ObjectsLifecycle", context.deadlineNs(), - () -> s3ObjectControlManager.checkS3ObjectsLifecycle()); + return appendWriteEvent("checkS3ObjectsLifecycle", context.deadlineNs(), s3ObjectControlManager::checkS3ObjectsLifecycle); } diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java index 1b35166232..29751c16c0 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java @@ -52,7 +52,10 @@ import org.apache.kafka.common.metadata.S3StreamSetObjectRecord; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.ThreadUtils; +import org.apache.kafka.controller.ClusterControlManager; import org.apache.kafka.controller.ControllerResult; +import org.apache.kafka.controller.QuorumController; import org.apache.kafka.metadata.stream.RangeMetadata; import org.apache.kafka.metadata.stream.S3StreamObject; import org.apache.kafka.metadata.stream.S3StreamSetObject; @@ -65,9 +68,16 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.OptionalLong; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -79,12 +89,9 @@ @SuppressWarnings("all") public class StreamControlManager { private static final Logger LOGGER = LoggerFactory.getLogger(StreamControlManager.class); - private final SnapshotRegistry snapshotRegistry; private final Logger log; - private final S3ObjectControlManager s3ObjectControlManager; - /** * The next stream id to be assigned. */ @@ -94,16 +101,36 @@ public class StreamControlManager { private final TimelineHashMap nodesMetadata; + private final QuorumController quorumController; + + private final SnapshotRegistry snapshotRegistry; + + private final S3ObjectControlManager s3ObjectControlManager; + + private final ClusterControlManager clusterControlManager; + + private final ScheduledExecutorService cleanupScheduler; + public StreamControlManager( + QuorumController quorumController, SnapshotRegistry snapshotRegistry, LogContext logContext, - S3ObjectControlManager s3ObjectControlManager) { + S3ObjectControlManager s3ObjectControlManager, + ClusterControlManager clusterControlManager) { this.snapshotRegistry = snapshotRegistry; this.log = logContext.logger(StreamControlManager.class); - this.s3ObjectControlManager = s3ObjectControlManager; this.nextAssignedStreamId = new TimelineLong(snapshotRegistry); this.streamsMetadata = new TimelineHashMap<>(snapshotRegistry, 0); this.nodesMetadata = new TimelineHashMap<>(snapshotRegistry, 0); + + this.cleanupScheduler = Executors.newSingleThreadScheduledExecutor( + ThreadUtils.createThreadFactory("stream-cleanup-scheduler", true)); + + this.quorumController = quorumController; + this.s3ObjectControlManager = s3ObjectControlManager; + this.clusterControlManager = clusterControlManager; + + this.cleanupScheduler.scheduleWithFixedDelay(this::triggerCleanupScaleInNodes, 30, 30, TimeUnit.MINUTES); } public ControllerResult createStream(int nodeId, long nodeEpoch, CreateStreamRequest request) { @@ -898,6 +925,55 @@ private Errors nodeEpochCheck(int nodeId, long nodeEpoch, boolean checkFailover) return Errors.NONE; } + public void triggerCleanupScaleInNodes() { + if (!quorumController.isActive()) { + return; + } + quorumController.appendWriteEvent("cleanupScaleInNodes", OptionalLong.empty(), () -> { + return cleanupScaleInNodes(); + }); + } + + public ControllerResult cleanupScaleInNodes() { + List records = new LinkedList<>(); + List cleanupObjects = new LinkedList<>(); + nodesMetadata.forEach((nodeId, nodeMetadata) -> { + if (!clusterControlManager.isActive(nodeId)) { + Collection objects = nodeMetadata.streamSetObjects().values(); + boolean alive = false; + for (S3StreamSetObject object: objects) { + if (alive) { + // if the last object is not expired, the likelihood of the subsequent object expiring is also quite low. + break; + } + long objectId = object.objectId(); + AtomicBoolean expired = new AtomicBoolean(true); + List streamOffsetRanges = object.offsetRangeList(); + for (StreamOffsetRange streamOffsetRange : streamOffsetRanges) { + S3StreamMetadata stream = streamsMetadata.get(streamOffsetRange.streamId()); + if (stream != null && stream.startOffset() < streamOffsetRange.endOffset()) { + expired.set(false); + alive = true; + } + } + if (expired.get()) { + cleanupObjects.add(object); + records.add(new ApiMessageAndVersion(new RemoveStreamSetObjectRecord() + .setNodeId(nodeId) + .setObjectId(objectId), (short) 0)); + records.addAll(this.s3ObjectControlManager.markDestroyObjects(List.of(objectId)).records()); + } + } + } + }); + if (!cleanupObjects.isEmpty()) { + LOGGER.info("clean up scaled-in nodes objects: {}", cleanupObjects); + } else { + LOGGER.debug("clean up scaled-in nodes objects: []"); + } + return ControllerResult.of(records, null); + } + public void replay(AssignedStreamIdRecord record) { this.nextAssignedStreamId.set(record.assignedStreamId() + 1); } diff --git a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java index e8fc86c910..11c412678c 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java @@ -46,6 +46,7 @@ import org.apache.kafka.common.metadata.RemoveS3StreamObjectRecord; import org.apache.kafka.common.metadata.RemoveS3StreamRecord; import org.apache.kafka.common.metadata.RemoveStreamSetObjectRecord; +import org.apache.kafka.common.metadata.S3ObjectRecord; import org.apache.kafka.common.metadata.S3StreamObjectRecord; import org.apache.kafka.common.metadata.S3StreamRecord; import org.apache.kafka.common.metadata.S3StreamSetObjectRecord; @@ -56,6 +57,7 @@ import org.apache.kafka.controller.stream.S3StreamMetadata; import org.apache.kafka.controller.stream.StreamControlManager; import org.apache.kafka.metadata.stream.RangeMetadata; +import org.apache.kafka.metadata.stream.S3ObjectState; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.timeline.SnapshotRegistry; import org.junit.jupiter.api.BeforeEach; @@ -76,6 +78,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; @Timeout(value = 40) @Tag("S3Unit") @@ -95,15 +100,20 @@ public class StreamControlManagerTest { private final static long BROKER_EPOCH0 = 0; + private QuorumController quorumController; private StreamControlManager manager; private S3ObjectControlManager objectControlManager; + private ClusterControlManager clusterControlManager; @BeforeEach public void setUp() { LogContext context = new LogContext(); SnapshotRegistry registry = new SnapshotRegistry(context); - objectControlManager = Mockito.mock(S3ObjectControlManager.class); - manager = new StreamControlManager(registry, context, objectControlManager); + quorumController = mock(QuorumController.class); + objectControlManager = mock(S3ObjectControlManager.class); + clusterControlManager = mock(ClusterControlManager.class); + + manager = new StreamControlManager(quorumController, registry, context, objectControlManager, clusterControlManager); } @Test @@ -113,7 +123,7 @@ public void testBasicCreateStream() { // 1. create stream_0 success CreateStreamRequest request0 = new CreateStreamRequest() - .setNodeId(BROKER0); + .setNodeId(BROKER0); ControllerResult result0 = manager.createStream(BROKER0, BROKER_EPOCH0, request0); List records0 = result0.records(); CreateStreamResponse response0 = result0.response(); @@ -137,7 +147,7 @@ public void testBasicCreateStream() { manager.replay(streamRecord0); // verify the stream_0 is created Map streamsMetadata = - manager.streamsMetadata(); + manager.streamsMetadata(); assertEquals(1, streamsMetadata.size()); verifyInitializedStreamMetadata(streamsMetadata.get(STREAM0)); assertEquals(1, manager.nextAssignedStreamId()); @@ -167,7 +177,7 @@ public void testBasicCreateStream() { manager.replay(streamRecord1); // verify the stream_2 is created streamsMetadata = - manager.streamsMetadata(); + manager.streamsMetadata(); assertEquals(2, streamsMetadata.size()); verifyInitializedStreamMetadata(streamsMetadata.get(STREAM1)); assertEquals(2, manager.nextAssignedStreamId()); @@ -195,9 +205,9 @@ public void testBasicOpenCloseStream() { // 2. node_0 open stream_0 and stream_1 with epoch0 ControllerResult result2 = manager.openStream(BROKER0, EPOCH0, - new OpenStreamRequest().setStreamId(STREAM0).setStreamEpoch(EPOCH0)); + new OpenStreamRequest().setStreamId(STREAM0).setStreamEpoch(EPOCH0)); ControllerResult result3 = manager.openStream(BROKER0, EPOCH0, - new OpenStreamRequest().setStreamId(STREAM1).setStreamEpoch(EPOCH0)); + new OpenStreamRequest().setStreamId(STREAM1).setStreamEpoch(EPOCH0)); assertEquals(Errors.NONE.code(), result2.response().errorCode()); assertEquals(Errors.NONE.code(), result3.response().errorCode()); assertEquals(0L, result2.response().startOffset()); @@ -223,13 +233,13 @@ public void testBasicOpenCloseStream() { // TODO: support write range record, then roll the range and verify // 3. node_1 try to open stream_0 with epoch0 ControllerResult result4 = manager.openStream(BROKER1, 0, - new OpenStreamRequest().setStreamId(STREAM0).setStreamEpoch(EPOCH0)); + new OpenStreamRequest().setStreamId(STREAM0).setStreamEpoch(EPOCH0)); assertEquals(Errors.STREAM_FENCED.code(), result4.response().errorCode()); assertEquals(0, result4.records().size()); // 4. node_0 try to open stream_1 with epoch0 ControllerResult result6 = manager.openStream(BROKER0, 0, - new OpenStreamRequest().setStreamId(STREAM1).setStreamEpoch(EPOCH0)); + new OpenStreamRequest().setStreamId(STREAM1).setStreamEpoch(EPOCH0)); assertEquals(Errors.NONE.code(), result6.response().errorCode()); assertEquals(0L, result6.response().startOffset()); assertEquals(0L, result6.response().nextOffset()); @@ -237,44 +247,44 @@ public void testBasicOpenCloseStream() { // 5. node_0 try to open stream_1 with epoch1 ControllerResult result7 = manager.openStream(BROKER0, 0, - new OpenStreamRequest().setStreamId(STREAM1).setStreamEpoch(EPOCH1)); + new OpenStreamRequest().setStreamId(STREAM1).setStreamEpoch(EPOCH1)); assertEquals(Errors.STREAM_NOT_CLOSED.code(), result7.response().errorCode()); // 6. node_1 try to open stream_1 with epoch0 ControllerResult result8 = manager.openStream(BROKER1, 0, - new OpenStreamRequest().setStreamId(STREAM1).setStreamEpoch(EPOCH0)); + new OpenStreamRequest().setStreamId(STREAM1).setStreamEpoch(EPOCH0)); assertEquals(Errors.STREAM_FENCED.code(), result8.response().errorCode()); assertEquals(0, result8.records().size()); // 7. node_1 try to open stream_1 with epoch1 ControllerResult result9 = manager.openStream(BROKER1, 0, - new OpenStreamRequest().setStreamId(STREAM1).setStreamEpoch(EPOCH1)); + new OpenStreamRequest().setStreamId(STREAM1).setStreamEpoch(EPOCH1)); assertEquals(Errors.STREAM_NOT_CLOSED.code(), result9.response().errorCode()); // 8. node_1 try to close stream_1 with epoch0 ControllerResult result10 = manager.closeStream(BROKER1, BROKER_EPOCH0, - new CloseStreamRequest().setStreamId(STREAM1).setStreamEpoch(EPOCH0)); + new CloseStreamRequest().setStreamId(STREAM1).setStreamEpoch(EPOCH0)); assertEquals(Errors.STREAM_FENCED.code(), result10.response().errorCode()); // 9. node_0 try to close stream_1 with epoch1 ControllerResult result11 = manager.closeStream(BROKER0, BROKER_EPOCH0, - new CloseStreamRequest().setStreamId(STREAM1).setStreamEpoch(EPOCH1)); + new CloseStreamRequest().setStreamId(STREAM1).setStreamEpoch(EPOCH1)); assertEquals(Errors.STREAM_INNER_ERROR.code(), result11.response().errorCode()); // 10. node_0 try to close stream_1 with epoch0 ControllerResult result12 = manager.closeStream(BROKER0, BROKER_EPOCH0, - new CloseStreamRequest().setStreamId(STREAM1).setStreamEpoch(EPOCH0)); + new CloseStreamRequest().setStreamId(STREAM1).setStreamEpoch(EPOCH0)); assertEquals(Errors.NONE.code(), result12.response().errorCode()); replay(manager, result12.records()); // 11. node_0 try to close stream_1 with epoch0 again ControllerResult result13 = manager.closeStream(BROKER0, BROKER_EPOCH0, - new CloseStreamRequest().setStreamId(STREAM1).setStreamEpoch(EPOCH0)); + new CloseStreamRequest().setStreamId(STREAM1).setStreamEpoch(EPOCH0)); assertEquals(Errors.NONE.code(), result13.response().errorCode()); // 12. node_1 try to open stream_1 with epoch1 ControllerResult result14 = manager.openStream(BROKER1, 0, - new OpenStreamRequest().setStreamId(STREAM1).setStreamEpoch(EPOCH1)); + new OpenStreamRequest().setStreamId(STREAM1).setStreamEpoch(EPOCH1)); assertEquals(Errors.NONE.code(), result14.response().errorCode()); replay(manager, result14.records()); @@ -303,20 +313,20 @@ public void testCommitWalBasic() { ControllerResult result0 = manager.createStream(BROKER0, BROKER_EPOCH0, request0); replay(manager, result0.records()); ControllerResult result2 = manager.openStream(BROKER0, 0, - new OpenStreamRequest().setStreamId(STREAM0).setStreamEpoch(EPOCH0)); + new OpenStreamRequest().setStreamId(STREAM0).setStreamEpoch(EPOCH0)); verifyFirstTimeOpenStreamResult(result2, EPOCH0, BROKER0); replay(manager, result2.records()); // 2. commit valid stream set object List streamRanges0 = List.of(new ObjectStreamRange() - .setStreamId(STREAM0) - .setStreamEpoch(EPOCH0) - .setStartOffset(0L) - .setEndOffset(100L)); + .setStreamId(STREAM0) + .setStreamEpoch(EPOCH0) + .setStartOffset(0L) + .setEndOffset(100L)); CommitStreamSetObjectRequestData commitRequest0 = new CommitStreamSetObjectRequestData() - .setObjectId(0L) - .setNodeId(BROKER0) - .setObjectSize(999) - .setObjectStreamRanges(streamRanges0); + .setObjectId(0L) + .setNodeId(BROKER0) + .setObjectSize(999) + .setObjectStreamRanges(streamRanges0); ControllerResult result3 = manager.commitStreamSetObject(commitRequest0); assertEquals(Errors.NONE.code(), result3.response().errorCode()); replay(manager, result3.records()); @@ -329,39 +339,39 @@ public void testCommitWalBasic() { assertEquals(1, manager.nodesMetadata().get(BROKER0).streamSetObjects().size()); // 3. commit a stream set object that doesn't exist List streamRanges1 = List.of(new ObjectStreamRange() - .setStreamId(STREAM0) - .setStreamEpoch(EPOCH0) - .setStartOffset(100) - .setEndOffset(200)); + .setStreamId(STREAM0) + .setStreamEpoch(EPOCH0) + .setStartOffset(100) + .setEndOffset(200)); CommitStreamSetObjectRequestData commitRequest1 = new CommitStreamSetObjectRequestData() - .setObjectId(1L) - .setNodeId(BROKER0) - .setObjectSize(999) - .setObjectStreamRanges(streamRanges1); + .setObjectId(1L) + .setNodeId(BROKER0) + .setObjectSize(999) + .setObjectStreamRanges(streamRanges1); ControllerResult result4 = manager.commitStreamSetObject(commitRequest1); assertEquals(Errors.OBJECT_NOT_EXIST.code(), result4.response().errorCode()); // 4. node_0 close stream_0 with epoch_0 and node_1 open stream_0 with epoch_1 ControllerResult result7 = manager.closeStream(BROKER0, BROKER_EPOCH0, - new CloseStreamRequest().setStreamId(STREAM0).setStreamEpoch(EPOCH0)); + new CloseStreamRequest().setStreamId(STREAM0).setStreamEpoch(EPOCH0)); assertEquals(Errors.NONE.code(), result7.response().errorCode()); replay(manager, result7.records()); ControllerResult result8 = manager.openStream(BROKER1, 0, - new OpenStreamRequest().setStreamId(STREAM0).setStreamEpoch(EPOCH1)); + new OpenStreamRequest().setStreamId(STREAM0).setStreamEpoch(EPOCH1)); assertEquals(Errors.NONE.code(), result8.response().errorCode()); assertEquals(0L, result8.response().startOffset()); assertEquals(100L, result8.response().nextOffset()); replay(manager, result8.records()); // 5. node_1 successfully commit stream set object which contains stream_0's data List streamRanges6 = List.of(new ObjectStreamRange() - .setStreamId(STREAM0) - .setStreamEpoch(EPOCH1) - .setStartOffset(100) - .setEndOffset(300)); + .setStreamId(STREAM0) + .setStreamEpoch(EPOCH1) + .setStartOffset(100) + .setEndOffset(300)); CommitStreamSetObjectRequestData commitRequest6 = new CommitStreamSetObjectRequestData() - .setNodeId(BROKER1) - .setObjectId(6L) - .setObjectSize(999) - .setObjectStreamRanges(streamRanges6); + .setNodeId(BROKER1) + .setObjectId(6L) + .setObjectSize(999) + .setObjectStreamRanges(streamRanges6); ControllerResult result10 = manager.commitStreamSetObject(commitRequest6); assertEquals(Errors.NONE.code(), result10.response().errorCode()); replay(manager, result10.records()); @@ -377,7 +387,7 @@ public void testCommitWalBasic() { // 6. get stream's offset GetOpeningStreamsRequestData request = new GetOpeningStreamsRequestData() - .setNodeId(BROKER1).setNodeEpoch(0L); + .setNodeId(BROKER1).setNodeEpoch(0L); GetOpeningStreamsResponseData response = manager.getOpeningStreams(request).response(); assertEquals(1, response.streamMetadataList().size()); assertEquals(STREAM0, response.streamMetadataList().get(0).streamId()); @@ -385,7 +395,7 @@ public void testCommitWalBasic() { assertEquals(300L, response.streamMetadataList().get(0).endOffset()); request = new GetOpeningStreamsRequestData() - .setNodeId(BROKER0).setNodeEpoch(0L); + .setNodeId(BROKER0).setNodeEpoch(0L); assertEquals(0, manager.getOpeningStreams(request).response().streamMetadataList().size()); } @@ -398,14 +408,14 @@ private long createStream() { private void openStream(int nodeId, long epoch, long streamId) { ControllerResult result1 = manager.openStream(nodeId, 0, - new OpenStreamRequest().setStreamId(streamId).setStreamEpoch(epoch)); + new OpenStreamRequest().setStreamId(streamId).setStreamEpoch(epoch)); replay(manager, result1.records()); } private void closeStream(int nodeId, long epoch, long streamId) { ControllerResult result = manager.closeStream(nodeId, BROKER_EPOCH0, new CloseStreamRequest() - .setStreamId(streamId) - .setStreamEpoch(epoch)); + .setStreamId(streamId) + .setStreamEpoch(epoch)); replay(manager, result.records()); } @@ -417,7 +427,7 @@ private void createAndOpenStream(int nodeId, long epoch) { @Test public void testCommitWalCompacted() { Mockito.when(objectControlManager.commitObject(anyLong(), anyLong(), anyLong())) - .thenReturn(ControllerResult.of(Collections.emptyList(), Errors.NONE)); + .thenReturn(ControllerResult.of(Collections.emptyList(), Errors.NONE)); Mockito.when(objectControlManager.markDestroyObjects(anyList())).thenReturn(ControllerResult.of(Collections.emptyList(), true)); registerAlwaysSuccessEpoch(BROKER0); @@ -427,22 +437,22 @@ public void testCommitWalCompacted() { // 2. commit first level stream set object of stream_0 and stream_1 List streamRanges0 = List.of( - new ObjectStreamRange() - .setStreamId(STREAM0) - .setStreamEpoch(EPOCH0) - .setStartOffset(0L) - .setEndOffset(100L), - new ObjectStreamRange() - .setStreamId(STREAM1) - .setStreamEpoch(EPOCH0) - .setStartOffset(0L) - .setEndOffset(200L)); + new ObjectStreamRange() + .setStreamId(STREAM0) + .setStreamEpoch(EPOCH0) + .setStartOffset(0L) + .setEndOffset(100L), + new ObjectStreamRange() + .setStreamId(STREAM1) + .setStreamEpoch(EPOCH0) + .setStartOffset(0L) + .setEndOffset(200L)); CommitStreamSetObjectRequestData commitRequest0 = new CommitStreamSetObjectRequestData() - .setObjectId(0L) - .setOrderId(0L) - .setNodeId(BROKER0) - .setObjectSize(999) - .setObjectStreamRanges(streamRanges0); + .setObjectId(0L) + .setOrderId(0L) + .setNodeId(BROKER0) + .setObjectSize(999) + .setObjectStreamRanges(streamRanges0); ControllerResult result4 = manager.commitStreamSetObject(commitRequest0); assertEquals(Errors.NONE.code(), result4.response().errorCode()); replay(manager, result4.records()); @@ -461,22 +471,22 @@ public void testCommitWalCompacted() { // 4. keep committing first level object of stream_0 and stream_1 List streamRanges1 = List.of( - new ObjectStreamRange() - .setStreamId(STREAM0) - .setStreamEpoch(EPOCH0) - .setStartOffset(100L) - .setEndOffset(200L), - new ObjectStreamRange() - .setStreamId(STREAM1) - .setStreamEpoch(EPOCH0) - .setStartOffset(200L) - .setEndOffset(300L)); + new ObjectStreamRange() + .setStreamId(STREAM0) + .setStreamEpoch(EPOCH0) + .setStartOffset(100L) + .setEndOffset(200L), + new ObjectStreamRange() + .setStreamId(STREAM1) + .setStreamEpoch(EPOCH0) + .setStartOffset(200L) + .setEndOffset(300L)); CommitStreamSetObjectRequestData commitRequest1 = new CommitStreamSetObjectRequestData() - .setObjectId(1L) - .setOrderId(1L) - .setNodeId(BROKER0) - .setObjectSize(999) - .setObjectStreamRanges(streamRanges1); + .setObjectId(1L) + .setOrderId(1L) + .setNodeId(BROKER0) + .setObjectSize(999) + .setObjectStreamRanges(streamRanges1); ControllerResult result5 = manager.commitStreamSetObject(commitRequest1); assertEquals(Errors.NONE.code(), result5.response().errorCode()); replay(manager, result5.records()); @@ -495,23 +505,23 @@ public void testCommitWalCompacted() { // 6. commit an invalid stream set object which contains the destroyed or not exist stream set object Mockito.when(objectControlManager.markDestroyObjects(anyList())).thenReturn(ControllerResult.of(Collections.emptyList(), false)); List streamRanges2 = List.of( - new ObjectStreamRange() - .setStreamId(STREAM0) - .setStreamEpoch(EPOCH0) - .setStartOffset(0L) - .setEndOffset(200L), - new ObjectStreamRange() - .setStreamId(STREAM1) - .setStreamEpoch(EPOCH0) - .setStartOffset(0L) - .setEndOffset(300L)); + new ObjectStreamRange() + .setStreamId(STREAM0) + .setStreamEpoch(EPOCH0) + .setStartOffset(0L) + .setEndOffset(200L), + new ObjectStreamRange() + .setStreamId(STREAM1) + .setStreamEpoch(EPOCH0) + .setStartOffset(0L) + .setEndOffset(300L)); CommitStreamSetObjectRequestData commitRequest2 = new CommitStreamSetObjectRequestData() - .setObjectId(2L) - .setOrderId(0L) - .setNodeId(BROKER0) - .setObjectSize(999) - .setObjectStreamRanges(streamRanges2) - .setCompactedObjectIds(List.of(0L, 1L, 10L)); + .setObjectId(2L) + .setOrderId(0L) + .setNodeId(BROKER0) + .setObjectSize(999) + .setObjectStreamRanges(streamRanges2) + .setCompactedObjectIds(List.of(0L, 1L, 10L)); ControllerResult result6 = manager.commitStreamSetObject(commitRequest2); assertEquals(Errors.COMPACTED_OBJECTS_NOT_FOUND.code(), result6.response().errorCode()); assertEquals(0, result6.records().size()); @@ -519,12 +529,12 @@ public void testCommitWalCompacted() { // 7. commit a second level stream set object which compact wal_0 and wal_1 commitRequest2 = new CommitStreamSetObjectRequestData() - .setObjectId(2L) - .setOrderId(0L) - .setNodeId(BROKER0) - .setObjectSize(999) - .setObjectStreamRanges(streamRanges2) - .setCompactedObjectIds(List.of(0L, 1L)); + .setObjectId(2L) + .setOrderId(0L) + .setNodeId(BROKER0) + .setObjectSize(999) + .setObjectStreamRanges(streamRanges2) + .setCompactedObjectIds(List.of(0L, 1L)); result6 = manager.commitStreamSetObject(commitRequest2); assertEquals(Errors.NONE.code(), result6.response().errorCode()); replay(manager, result6.records()); @@ -550,7 +560,7 @@ public void testCommitWalCompacted() { @Test public void testCommitWalWithStreamObject() { Mockito.when(objectControlManager.commitObject(anyLong(), anyLong(), anyLong())) - .thenReturn(ControllerResult.of(Collections.emptyList(), Errors.NONE)); + .thenReturn(ControllerResult.of(Collections.emptyList(), Errors.NONE)); Mockito.when(objectControlManager.markDestroyObjects(anyList())).thenReturn(ControllerResult.of(Collections.emptyList(), true)); registerAlwaysSuccessEpoch(BROKER0); @@ -560,25 +570,25 @@ public void testCommitWalWithStreamObject() { // 2. commit a wal with stream_0 and a stream object with stream_1 that is split out from wal List streamRanges0 = List.of( - new ObjectStreamRange() - .setStreamId(STREAM0) - .setStreamEpoch(EPOCH0) - .setStartOffset(0L) - .setEndOffset(100L)); + new ObjectStreamRange() + .setStreamId(STREAM0) + .setStreamEpoch(EPOCH0) + .setStartOffset(0L) + .setEndOffset(100L)); CommitStreamSetObjectRequestData commitRequest0 = new CommitStreamSetObjectRequestData() - .setObjectId(0L) - .setOrderId(0L) - .setNodeId(BROKER0) - .setObjectSize(999) - .setObjectStreamRanges(streamRanges0) - .setStreamObjects(List.of( - new StreamObject() - .setStreamId(STREAM1) - .setObjectId(1L) - .setObjectSize(999) - .setStartOffset(0L) - .setEndOffset(200L) - )); + .setObjectId(0L) + .setOrderId(0L) + .setNodeId(BROKER0) + .setObjectSize(999) + .setObjectStreamRanges(streamRanges0) + .setStreamObjects(List.of( + new StreamObject() + .setStreamId(STREAM1) + .setObjectId(1L) + .setObjectSize(999) + .setStartOffset(0L) + .setEndOffset(200L) + )); ControllerResult result4 = manager.commitStreamSetObject(commitRequest0); assertEquals(Errors.NONE.code(), result4.response().errorCode()); replay(manager, result4.records()); @@ -599,25 +609,25 @@ public void testCommitWalWithStreamObject() { // 5. commit stream set object with not continuous stream List streamRanges1 = List.of( - new ObjectStreamRange() - .setStreamId(STREAM0) - .setStreamEpoch(EPOCH0) - .setStartOffset(99L) - .setEndOffset(200L)); + new ObjectStreamRange() + .setStreamId(STREAM0) + .setStreamEpoch(EPOCH0) + .setStartOffset(99L) + .setEndOffset(200L)); CommitStreamSetObjectRequestData commitRequest1 = new CommitStreamSetObjectRequestData() - .setObjectId(1L) - .setOrderId(1L) - .setNodeId(BROKER0) - .setObjectSize(999) - .setObjectStreamRanges(streamRanges1) - .setStreamObjects(List.of( - new StreamObject() - .setStreamId(STREAM1) - .setObjectId(2L) - .setObjectSize(999) - .setStartOffset(200L) - .setEndOffset(400L) - )); + .setObjectId(1L) + .setOrderId(1L) + .setNodeId(BROKER0) + .setObjectSize(999) + .setObjectStreamRanges(streamRanges1) + .setStreamObjects(List.of( + new StreamObject() + .setStreamId(STREAM1) + .setObjectId(2L) + .setObjectSize(999) + .setStartOffset(200L) + .setEndOffset(400L) + )); ControllerResult result5 = manager.commitStreamSetObject(commitRequest1); assertEquals(Errors.OFFSET_NOT_MATCHED.code(), result5.response().errorCode()); } @@ -625,7 +635,7 @@ public void testCommitWalWithStreamObject() { @Test public void testCommitStreamObject() { Mockito.when(objectControlManager.commitObject(anyLong(), anyLong(), anyLong())) - .thenReturn(ControllerResult.of(Collections.emptyList(), Errors.NONE)); + .thenReturn(ControllerResult.of(Collections.emptyList(), Errors.NONE)); Mockito.when(objectControlManager.markDestroyObjects(anyList())).thenReturn(ControllerResult.of(Collections.emptyList(), true)); registerAlwaysSuccessEpoch(BROKER0); @@ -635,25 +645,25 @@ public void testCommitStreamObject() { // 2. commit a wal with stream_0 and a stream object with stream_1 that is split out from wal List streamRanges0 = List.of( - new ObjectStreamRange() - .setStreamId(STREAM0) - .setStreamEpoch(EPOCH0) - .setStartOffset(0L) - .setEndOffset(100L)); + new ObjectStreamRange() + .setStreamId(STREAM0) + .setStreamEpoch(EPOCH0) + .setStartOffset(0L) + .setEndOffset(100L)); CommitStreamSetObjectRequestData commitRequest0 = new CommitStreamSetObjectRequestData() - .setObjectId(0L) - .setOrderId(0L) - .setNodeId(BROKER0) - .setObjectSize(999) - .setObjectStreamRanges(streamRanges0) - .setStreamObjects(List.of( - new StreamObject() - .setStreamId(STREAM1) - .setObjectId(1L) - .setObjectSize(999) - .setStartOffset(0L) - .setEndOffset(200L) - )); + .setObjectId(0L) + .setOrderId(0L) + .setNodeId(BROKER0) + .setObjectSize(999) + .setObjectStreamRanges(streamRanges0) + .setStreamObjects(List.of( + new StreamObject() + .setStreamId(STREAM1) + .setObjectId(1L) + .setObjectSize(999) + .setStartOffset(0L) + .setEndOffset(200L) + )); ControllerResult result0 = manager.commitStreamSetObject(commitRequest0); assertEquals(Errors.NONE.code(), result0.response().errorCode()); replay(manager, result0.records()); @@ -661,25 +671,25 @@ public void testCommitStreamObject() { // 3. commit a wal with stream_0 and a stream object with stream_1 that is split out from wal List streamRanges1 = List.of( - new ObjectStreamRange() - .setStreamId(STREAM0) - .setStreamEpoch(EPOCH0) - .setStartOffset(100L) - .setEndOffset(200L)); + new ObjectStreamRange() + .setStreamId(STREAM0) + .setStreamEpoch(EPOCH0) + .setStartOffset(100L) + .setEndOffset(200L)); CommitStreamSetObjectRequestData commitRequest1 = new CommitStreamSetObjectRequestData() - .setObjectId(2L) - .setOrderId(1L) - .setNodeId(BROKER0) - .setObjectSize(999) - .setObjectStreamRanges(streamRanges1) - .setStreamObjects(List.of( - new StreamObject() - .setStreamId(STREAM1) - .setObjectId(3L) - .setObjectSize(999) - .setStartOffset(200L) - .setEndOffset(400L) - )); + .setObjectId(2L) + .setOrderId(1L) + .setNodeId(BROKER0) + .setObjectSize(999) + .setObjectStreamRanges(streamRanges1) + .setStreamObjects(List.of( + new StreamObject() + .setStreamId(STREAM1) + .setObjectId(3L) + .setObjectSize(999) + .setStartOffset(200L) + .setEndOffset(400L) + )); ControllerResult result1 = manager.commitStreamSetObject(commitRequest1); assertEquals(Errors.NONE.code(), result1.response().errorCode()); replay(manager, result1.records()); @@ -687,12 +697,12 @@ public void testCommitStreamObject() { // 4. compact these two stream objects CommitStreamObjectRequestData streamObjectRequest = new CommitStreamObjectRequestData() - .setObjectId(4L) - .setStreamId(STREAM1) - .setStartOffset(0L) - .setEndOffset(400L) - .setObjectSize(999) - .setSourceObjectIds(List.of(1L, 3L)); + .setObjectId(4L) + .setStreamId(STREAM1) + .setStartOffset(0L) + .setEndOffset(400L) + .setObjectSize(999) + .setSourceObjectIds(List.of(1L, 3L)); ControllerResult result2 = manager.commitStreamObject(streamObjectRequest); assertEquals(Errors.NONE.code(), result2.response().errorCode()); replay(manager, result2.records()); @@ -711,12 +721,12 @@ public void testCommitStreamObject() { // 6. compact a stream object from invalid source object Mockito.when(objectControlManager.markDestroyObjects(anyList())).thenReturn(ControllerResult.of(Collections.emptyList(), false)); streamObjectRequest = new CommitStreamObjectRequestData() - .setObjectId(5L) - .setStreamId(STREAM1) - .setStartOffset(400L) - .setEndOffset(1000L) - .setObjectSize(999) - .setSourceObjectIds(List.of(10L)); + .setObjectId(5L) + .setStreamId(STREAM1) + .setStartOffset(400L) + .setEndOffset(1000L) + .setObjectSize(999) + .setSourceObjectIds(List.of(10L)); result2 = manager.commitStreamObject(streamObjectRequest); assertEquals(Errors.COMPACTED_OBJECTS_NOT_FOUND.code(), result2.response().errorCode()); replay(manager, result2.records()); @@ -731,7 +741,7 @@ public void testCommitStreamObject() { private void mockData0() { Mockito.when(objectControlManager.commitObject(anyLong(), anyLong(), anyLong())) - .thenReturn(ControllerResult.of(Collections.emptyList(), Errors.NONE)); + .thenReturn(ControllerResult.of(Collections.emptyList(), Errors.NONE)); Mockito.when(objectControlManager.markDestroyObjects(anyList())).thenReturn(ControllerResult.of(Collections.emptyList(), true)); registerAlwaysSuccessEpoch(BROKER0); registerAlwaysSuccessEpoch(BROKER1); @@ -741,46 +751,46 @@ private void mockData0() { createAndOpenStream(BROKER0, EPOCH0); // 2. commit stream set object with stream0-[0, 10) CommitStreamSetObjectRequestData requestData = new CommitStreamSetObjectRequestData() - .setNodeId(BROKER0) - .setObjectSize(999) - .setOrderId(0) - .setObjectId(0) - .setObjectStreamRanges(List.of(new ObjectStreamRange() - .setStreamId(STREAM0) - .setStreamEpoch(EPOCH0) - .setStartOffset(0) - .setEndOffset(10))); + .setNodeId(BROKER0) + .setObjectSize(999) + .setOrderId(0) + .setObjectId(0) + .setObjectStreamRanges(List.of(new ObjectStreamRange() + .setStreamId(STREAM0) + .setStreamEpoch(EPOCH0) + .setStartOffset(0) + .setEndOffset(10))); ControllerResult result = manager.commitStreamSetObject(requestData); replay(manager, result.records()); // 3. commit stream set object with stream0-[10, 20), and stream1-[0, 10) requestData = new CommitStreamSetObjectRequestData() - .setNodeId(BROKER0) - .setObjectSize(999) - .setOrderId(1) - .setObjectId(1) - .setObjectStreamRanges(List.of(new ObjectStreamRange() - .setStreamId(STREAM0) - .setStreamEpoch(EPOCH0) - .setStartOffset(10) - .setEndOffset(20), new ObjectStreamRange() - .setStreamId(STREAM1) - .setStreamEpoch(EPOCH0) - .setStartOffset(0) - .setEndOffset(10))); + .setNodeId(BROKER0) + .setObjectSize(999) + .setOrderId(1) + .setObjectId(1) + .setObjectStreamRanges(List.of(new ObjectStreamRange() + .setStreamId(STREAM0) + .setStreamEpoch(EPOCH0) + .setStartOffset(10) + .setEndOffset(20), new ObjectStreamRange() + .setStreamId(STREAM1) + .setStreamEpoch(EPOCH0) + .setStartOffset(0) + .setEndOffset(10))); result = manager.commitStreamSetObject(requestData); replay(manager, result.records()); // 4. commit with a stream object with stream0-[20, 40) requestData = new CommitStreamSetObjectRequestData() - .setNodeId(BROKER0) - .setObjectSize(999) - .setOrderId(S3StreamConstant.INVALID_ORDER_ID) - .setObjectId(ObjectUtils.NOOP_OBJECT_ID) - .setStreamObjects(List.of(new StreamObject() - .setStreamId(STREAM0) + .setNodeId(BROKER0) .setObjectSize(999) - .setObjectId(2) - .setStartOffset(20) - .setEndOffset(40))); + .setOrderId(S3StreamConstant.INVALID_ORDER_ID) + .setObjectId(ObjectUtils.NOOP_OBJECT_ID) + .setStreamObjects(List.of(new StreamObject() + .setStreamId(STREAM0) + .setObjectSize(999) + .setObjectId(2) + .setStartOffset(20) + .setEndOffset(40))); result = manager.commitStreamSetObject(requestData); replay(manager, result.records()); // 5. node0 close stream0 and node1 open stream0 @@ -788,15 +798,15 @@ private void mockData0() { openStream(BROKER1, EPOCH1, STREAM0); // 6. commit stream set object with stream0-[40, 70) requestData = new CommitStreamSetObjectRequestData() - .setNodeId(BROKER1) - .setObjectSize(999) - .setObjectId(3) - .setOrderId(3) - .setObjectStreamRanges(List.of(new ObjectStreamRange() - .setStreamId(STREAM0) - .setStreamEpoch(EPOCH1) - .setStartOffset(40) - .setEndOffset(70))); + .setNodeId(BROKER1) + .setObjectSize(999) + .setObjectId(3) + .setOrderId(3) + .setObjectStreamRanges(List.of(new ObjectStreamRange() + .setStreamId(STREAM0) + .setStreamEpoch(EPOCH1) + .setStartOffset(40) + .setEndOffset(70))); result = manager.commitStreamSetObject(requestData); replay(manager, result.records()); } @@ -807,9 +817,9 @@ public void testTrim() { // 1. trim stream0 to [60, ..) TrimStreamRequest trimRequest = new TrimStreamRequest() - .setStreamId(STREAM0) - .setStreamEpoch(EPOCH1) - .setNewStartOffset(60); + .setStreamId(STREAM0) + .setStreamEpoch(EPOCH1) + .setNewStartOffset(60); ControllerResult result1 = manager.trimStream(BROKER1, BROKER_EPOCH0, trimRequest); assertEquals(Errors.NONE.code(), result1.response().errorCode()); replay(manager, result1.records()); @@ -826,9 +836,9 @@ public void testTrim() { // 3. trim stream0 to [100, ..) trimRequest = new TrimStreamRequest() - .setStreamId(STREAM0) - .setStreamEpoch(EPOCH1) - .setNewStartOffset(100); + .setStreamId(STREAM0) + .setStreamEpoch(EPOCH1) + .setNewStartOffset(100); result1 = manager.trimStream(BROKER1, BROKER_EPOCH0, trimRequest); assertEquals(Errors.NONE.code(), result1.response().errorCode()); replay(manager, result1.records()); @@ -845,15 +855,15 @@ public void testTrim() { // 5. commit stream set object with stream0-[70, 100) CommitStreamSetObjectRequestData requestData = new CommitStreamSetObjectRequestData() - .setNodeId(BROKER1) - .setObjectSize(999) - .setObjectId(4) - .setOrderId(4) - .setObjectStreamRanges(List.of(new ObjectStreamRange() - .setStreamId(STREAM0) - .setStreamEpoch(EPOCH0) - .setStartOffset(70) - .setEndOffset(100))); + .setNodeId(BROKER1) + .setObjectSize(999) + .setObjectId(4) + .setOrderId(4) + .setObjectStreamRanges(List.of(new ObjectStreamRange() + .setStreamId(STREAM0) + .setStreamEpoch(EPOCH0) + .setStartOffset(70) + .setEndOffset(100))); ControllerResult result = manager.commitStreamSetObject(requestData); replay(manager, result.records()); @@ -873,24 +883,24 @@ public void testDelete() { // 1. delete with invalid stream owner DeleteStreamRequest req = new DeleteStreamRequest() - .setStreamId(STREAM0) - .setStreamEpoch(EPOCH1); + .setStreamId(STREAM0) + .setStreamEpoch(EPOCH1); ControllerResult result = manager.deleteStream(BROKER0, BROKER_EPOCH0, req); assertEquals(Errors.STREAM_FENCED.code(), result.response().errorCode()); assertTrue(result.records().isEmpty()); // 2. delete with invalid stream epoch req = new DeleteStreamRequest() - .setStreamId(STREAM0) - .setStreamEpoch(EPOCH0); + .setStreamId(STREAM0) + .setStreamEpoch(EPOCH0); result = manager.deleteStream(BROKER1, BROKER_EPOCH0, req); assertEquals(Errors.STREAM_FENCED.code(), result.response().errorCode()); replay(manager, result.records()); // 3. delete with valid request req = new DeleteStreamRequest() - .setStreamId(STREAM0) - .setStreamEpoch(EPOCH1); + .setStreamId(STREAM0) + .setStreamEpoch(EPOCH1); result = manager.deleteStream(BROKER1, BROKER_EPOCH0, req); assertEquals(Errors.NONE.code(), result.response().errorCode()); replay(manager, result.records()); @@ -902,8 +912,8 @@ public void testDelete() { // 5. delete again req = new DeleteStreamRequest() - .setStreamId(STREAM0) - .setStreamEpoch(EPOCH1); + .setStreamId(STREAM0) + .setStreamEpoch(EPOCH1); result = manager.deleteStream(BROKER1, BROKER_EPOCH0, req); assertEquals(Errors.STREAM_NOT_EXIST.code(), result.response().errorCode()); } @@ -912,14 +922,14 @@ public void testDelete() { public void testGetOpeningStreams() { // 1. create stream without register CreateStreamRequest request0 = new CreateStreamRequest() - .setNodeId(BROKER0); + .setNodeId(BROKER0); ControllerResult result0 = manager.createStream(BROKER0, BROKER_EPOCH0, request0); assertEquals(Errors.NODE_EPOCH_NOT_EXIST, Errors.forCode(result0.response().errorCode())); // 2. register GetOpeningStreamsRequestData request1 = new GetOpeningStreamsRequestData() - .setNodeId(BROKER0) - .setNodeEpoch(1); + .setNodeId(BROKER0) + .setNodeEpoch(1); ControllerResult result1 = manager.getOpeningStreams(request1); assertEquals(Errors.NONE, Errors.forCode(result1.response().errorCode())); assertEquals(0, result1.response().streamMetadataList().size()); @@ -927,15 +937,15 @@ public void testGetOpeningStreams() { // 3. register with lower epoch again request1 = new GetOpeningStreamsRequestData() - .setNodeId(BROKER0) - .setNodeEpoch(0); + .setNodeId(BROKER0) + .setNodeEpoch(0); result1 = manager.getOpeningStreams(request1); assertEquals(Errors.NODE_EPOCH_EXPIRED, Errors.forCode(result1.response().errorCode())); // 4. register with higher epoch request1 = new GetOpeningStreamsRequestData() - .setNodeId(BROKER0) - .setNodeEpoch(2); + .setNodeId(BROKER0) + .setNodeEpoch(2); result1 = manager.getOpeningStreams(request1); assertEquals(Errors.NONE, Errors.forCode(result1.response().errorCode())); assertEquals(0, result1.response().streamMetadataList().size()); @@ -946,30 +956,79 @@ public void testGetOpeningStreams() { // 6. create stream with lower epoch CreateStreamRequest request2 = new CreateStreamRequest() - .setNodeId(BROKER0); + .setNodeId(BROKER0); ControllerResult result2 = manager.createStream(BROKER0, BROKER_EPOCH0, request2); assertEquals(Errors.NODE_EPOCH_EXPIRED, Errors.forCode(result2.response().errorCode())); // 7. create stream with matched epoch CreateStreamRequest request3 = new CreateStreamRequest() - .setNodeId(BROKER0); + .setNodeId(BROKER0); ControllerResult result3 = manager.createStream(BROKER0, 2, request3); assertEquals(Errors.NONE, Errors.forCode(result3.response().errorCode())); replay(manager, result3.records()); } + @Test + public void testCleanupScaleInNodes() { + when(objectControlManager.commitObject(anyLong(), anyLong(), anyLong())).thenReturn(ControllerResult.of(Collections.emptyList(), null)); + + registerAlwaysSuccessEpoch(BROKER0); + registerAlwaysSuccessEpoch(BROKER1); + createAndOpenStream(BROKER0, 0); + createAndOpenStream(BROKER0, 0); + ControllerResult rst = manager.commitStreamSetObject(new CommitStreamSetObjectRequestData().setNodeId(BROKER0).setObjectId(1L) + .setStreamObjects(Collections.emptyList()) + .setObjectStreamRanges(List.of( + new ObjectStreamRange().setStreamId(STREAM0).setStartOffset(0).setEndOffset(100), + new ObjectStreamRange().setStreamId(STREAM1).setStartOffset(0).setEndOffset(100) + ))); + replay(manager, rst.records()); + + closeStream(BROKER0, 0, STREAM0); + openStream(BROKER1, 1, STREAM0); + rst = manager.commitStreamSetObject(new CommitStreamSetObjectRequestData().setNodeId(BROKER1).setObjectId(2L) + .setStreamObjects(Collections.emptyList()) + .setObjectStreamRanges(List.of( + new ObjectStreamRange().setStreamId(STREAM0).setStartOffset(100).setEndOffset(200) + ))); + replay(manager, rst.records()); + rst = manager.commitStreamSetObject(new CommitStreamSetObjectRequestData().setNodeId(BROKER1).setObjectId(3L) + .setStreamObjects(Collections.emptyList()) + .setObjectStreamRanges(List.of( + new ObjectStreamRange().setStreamId(STREAM0).setStartOffset(200).setEndOffset(300) + ))); + replay(manager, rst.records()); + + rst = manager.trimStream(BROKER1, 0, new TrimStreamRequest().setStreamId(STREAM0).setNewStartOffset(200L).setStreamEpoch(1)); + replay(manager, rst.records()); + + when(clusterControlManager.isActive(eq(1))).thenReturn(false); + when(clusterControlManager.isActive(eq(2))).thenReturn(true); + + ApiMessageAndVersion record = new ApiMessageAndVersion(new S3ObjectRecord().setObjectId(2L).setObjectState(S3ObjectState.MARK_DESTROYED.toByte()), (short) 0); + when(objectControlManager.markDestroyObjects(eq(List.of(2L)))).thenReturn(ControllerResult.of(List.of(record), null)); + + rst = manager.cleanupScaleInNodes(); + List records = rst.records(); + assertEquals(2, records.size()); + RemoveStreamSetObjectRecord r0 = (RemoveStreamSetObjectRecord) records.get(0).message(); + assertEquals(2, r0.objectId()); + S3ObjectRecord r1 = (S3ObjectRecord) records.get(1).message(); + assertEquals(2, r1.objectId()); + } + private void registerAlwaysSuccessEpoch(int nodeId) { GetOpeningStreamsRequestData req = new GetOpeningStreamsRequestData() - .setNodeId(nodeId) - .setNodeEpoch(-1); + .setNodeId(nodeId) + .setNodeEpoch(-1); ControllerResult result = manager.getOpeningStreams(req); replay(manager, result.records()); } private void replay(StreamControlManager manager, List records) { List messages = records.stream().map(x -> x.message()) - .collect(Collectors.toList()); + .collect(Collectors.toList()); for (ApiMessage message : messages) { MetadataRecordType type = MetadataRecordType.fromId(message.apiKey()); switch (type) { @@ -1020,7 +1079,7 @@ private void verifyInitializedStreamMetadata(S3StreamMetadata metadata) { } private void verifyFirstTimeOpenStreamResult(ControllerResult result, - long expectedEpoch, int expectedNodeId) { + long expectedEpoch, int expectedNodeId) { assertEquals(0, result.response().errorCode()); assertEquals(0, result.response().startOffset()); assertEquals(2, result.records().size());