From 908e0eac344c4a725d3e4b3f81bfd7570e54daec Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Thu, 24 Aug 2023 10:05:45 +0800 Subject: [PATCH] feat(s3): allocate stream id in controller 1. allocate stream id in controller Signed-off-by: TheR1sing3un --- .../common/message/CreateStreamRequest.json | 7 ---- .../common/message/CreateStreamResponse.json | 7 ++++ .../java/kafka/log/s3/ObjectWriterTest.java | 2 + .../test/java/kafka/log/s3/S3StreamTest.java | 2 + .../src/test/java/kafka/log/s3/S3WalTest.java | 2 + .../stream/StreamControlManager.java | 18 ++++++--- .../S3ObjectControlManagerTest.java | 2 + .../controller/StreamControlManagerTest.java | 37 +++++++++++++++---- 8 files changed, 56 insertions(+), 21 deletions(-) diff --git a/clients/src/main/resources/common/message/CreateStreamRequest.json b/clients/src/main/resources/common/message/CreateStreamRequest.json index 15bfec351b..09deb54b57 100644 --- a/clients/src/main/resources/common/message/CreateStreamRequest.json +++ b/clients/src/main/resources/common/message/CreateStreamRequest.json @@ -24,12 +24,5 @@ "validVersions": "0", "flexibleVersions": "0+", "fields": [ - { - "name": "StreamId", - "type": "int64", - "versions": "0+", - "entityType": "streamId", - "about": "The id of the requesting stream" - } ] } \ No newline at end of file diff --git a/clients/src/main/resources/common/message/CreateStreamResponse.json b/clients/src/main/resources/common/message/CreateStreamResponse.json index 32865611f3..9c1743c01e 100644 --- a/clients/src/main/resources/common/message/CreateStreamResponse.json +++ b/clients/src/main/resources/common/message/CreateStreamResponse.json @@ -31,6 +31,13 @@ "type": "int32", "versions": "0+", "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." + }, + { + "name": "StreamId", + "type": "int64", + "versions": "0+", + "entityType": "streamId", + "about": "The id of the created stream" } ] } \ No newline at end of file diff --git a/core/src/test/java/kafka/log/s3/ObjectWriterTest.java b/core/src/test/java/kafka/log/s3/ObjectWriterTest.java index e9887343fd..892191f18e 100644 --- a/core/src/test/java/kafka/log/s3/ObjectWriterTest.java +++ b/core/src/test/java/kafka/log/s3/ObjectWriterTest.java @@ -24,6 +24,7 @@ import kafka.log.s3.operator.MemoryS3Operator; import kafka.log.s3.operator.S3Operator; import org.apache.kafka.metadata.stream.S3ObjectType; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import java.util.Iterator; @@ -33,6 +34,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +@Tag("S3Unit") public class ObjectWriterTest { @Test diff --git a/core/src/test/java/kafka/log/s3/S3StreamTest.java b/core/src/test/java/kafka/log/s3/S3StreamTest.java index 807b6457f7..ad76be3f2e 100644 --- a/core/src/test/java/kafka/log/s3/S3StreamTest.java +++ b/core/src/test/java/kafka/log/s3/S3StreamTest.java @@ -26,6 +26,7 @@ import kafka.log.s3.model.StreamMetadata; import kafka.log.s3.streams.StreamManager; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import java.util.List; @@ -37,6 +38,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +@Tag("S3Unit") public class S3StreamTest { Wal wal; S3BlockCache blockCache; diff --git a/core/src/test/java/kafka/log/s3/S3WalTest.java b/core/src/test/java/kafka/log/s3/S3WalTest.java index 6795c59a80..648566237e 100644 --- a/core/src/test/java/kafka/log/s3/S3WalTest.java +++ b/core/src/test/java/kafka/log/s3/S3WalTest.java @@ -24,6 +24,7 @@ import kafka.log.s3.objects.ObjectStreamRange; import kafka.log.s3.operator.MemoryS3Operator; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; @@ -39,6 +40,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +@Tag("S3Unit") public class S3WalTest { ObjectManager objectManager; S3Wal s3Wal; diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java index bdfc66c42c..f65d6a02dd 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java @@ -140,6 +140,11 @@ public String toString() { private final S3ObjectControlManager s3ObjectControlManager; + /** + * The next stream id to be assigned. + */ + private Long nextAssignedStreamId = 0L; + private final TimelineHashMap streamsMetadata; private final TimelineHashMap brokersMetadata; @@ -159,19 +164,15 @@ public StreamControlManager( // TODO: lazy update range's end offset // TODO: controller allocate the stream id public ControllerResult createStream(CreateStreamRequestData data) { - long streamId = data.streamId(); CreateStreamResponseData resp = new CreateStreamResponseData(); - if (this.streamsMetadata.containsKey(streamId)) { - // already exist - resp.setErrorCode(Errors.STREAM_EXIST.code()); - return ControllerResult.of(Collections.emptyList(), resp); - } + long streamId = nextAssignedStreamId; // create stream ApiMessageAndVersion record = new ApiMessageAndVersion(new S3StreamRecord() .setStreamId(streamId) .setEpoch(0) .setStartOffset(0L) .setRangeIndex(-1), (short) 0); + resp.setStreamId(streamId); return ControllerResult.of(Arrays.asList(record), resp); } @@ -267,6 +268,7 @@ public void replay(S3StreamRecord record) { S3StreamMetadata streamMetadata = new S3StreamMetadata(record.epoch(), record.rangeIndex(), record.startOffset(), this.snapshotRegistry); this.streamsMetadata.put(streamId, streamMetadata); + this.nextAssignedStreamId = Math.max(this.nextAssignedStreamId, streamId + 1); } public void replay(RemoveS3StreamRecord record) { @@ -305,6 +307,10 @@ public Map brokersMetadata() { return brokersMetadata; } + public Long nextAssignedStreamId() { + return nextAssignedStreamId; + } + @Override public String toString() { return "StreamControlManager{" + diff --git a/metadata/src/test/java/org/apache/kafka/controller/S3ObjectControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/S3ObjectControlManagerTest.java index 4a32f62492..718e814b45 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/S3ObjectControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/S3ObjectControlManagerTest.java @@ -32,11 +32,13 @@ import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.timeline.SnapshotRegistry; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.mockito.Mockito; @Timeout(40) +@Tag("S3Unit") public class S3ObjectControlManagerTest { private static final int BROKER0 = 0; private static final int BROKER1 = 1; diff --git a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java index d0c5e314a5..0bfebec065 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java @@ -37,10 +37,12 @@ import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.timeline.SnapshotRegistry; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @Timeout(value = 40) +@Tag("S3Unit") public class StreamControlManagerTest { private final static long STREAM0 = 0; @@ -67,10 +69,12 @@ public void setUp() { @Test public void testBasicCreateStream() { // 1. create stream_0 success - CreateStreamRequestData request0 = new CreateStreamRequestData().setStreamId(STREAM0); + CreateStreamRequestData request0 = new CreateStreamRequestData(); ControllerResult result0 = manager.createStream(request0); List records0 = result0.records(); CreateStreamResponseData response0 = result0.response(); + assertEquals(Errors.NONE.code(), response0.errorCode()); + assertEquals(STREAM0, response0.streamId()); assertEquals(1, records0.size()); ApiMessageAndVersion record0 = records0.get(0); assertInstanceOf(S3StreamRecord.class, record0.message()); @@ -79,7 +83,6 @@ public void testBasicCreateStream() { assertEquals(0, streamRecord0.epoch()); assertEquals(-1, streamRecord0.rangeIndex()); assertEquals(0L, streamRecord0.startOffset()); - assertEquals(0, response0.errorCode()); // replay records_0 manager.replay(streamRecord0); @@ -88,23 +91,41 @@ public void testBasicCreateStream() { manager.streamsMetadata(); assertEquals(1, streamsMetadata.size()); verifyInitializedStreamMetadata(streamsMetadata.get(STREAM0)); + assertEquals(1, manager.nextAssignedStreamId()); - // 2. create stream_0 with exception - CreateStreamRequestData request1 = new CreateStreamRequestData().setStreamId(STREAM0); + // 2. create stream_1 + CreateStreamRequestData request1 = new CreateStreamRequestData(); ControllerResult result1 = manager.createStream(request1); List records1 = result1.records(); CreateStreamResponseData response1 = result1.response(); - assertEquals(0, records1.size()); - assertEquals(Errors.STREAM_EXIST.code(), response1.errorCode()); + assertEquals(Errors.NONE.code(), response1.errorCode()); + assertEquals(STREAM1, response1.streamId()); + assertEquals(1, records1.size()); + ApiMessageAndVersion record1 = records1.get(0); + assertInstanceOf(S3StreamRecord.class, record1.message()); + S3StreamRecord streamRecord1 = (S3StreamRecord) record1.message(); + assertEquals(STREAM1, streamRecord1.streamId()); + assertEquals(0, streamRecord1.epoch()); + assertEquals(-1, streamRecord1.rangeIndex()); + assertEquals(0L, streamRecord1.startOffset()); + + // replay records_1 + manager.replay(streamRecord1); + // verify the stream_2 is created + streamsMetadata = + manager.streamsMetadata(); + assertEquals(2, streamsMetadata.size()); + verifyInitializedStreamMetadata(streamsMetadata.get(STREAM1)); + assertEquals(2, manager.nextAssignedStreamId()); } @Test public void testBasicOpenStream() { // 1. create stream_0 and stream_1 - CreateStreamRequestData request0 = new CreateStreamRequestData().setStreamId(STREAM0); + CreateStreamRequestData request0 = new CreateStreamRequestData(); ControllerResult result0 = manager.createStream(request0); result0.records().stream().map(x -> (S3StreamRecord) x.message()).forEach(manager::replay); - CreateStreamRequestData request1 = new CreateStreamRequestData().setStreamId(STREAM1); + CreateStreamRequestData request1 = new CreateStreamRequestData(); ControllerResult result1 = manager.createStream(request1); result1.records().stream().map(x -> (S3StreamRecord) x.message()).forEach(manager::replay);