From a4cd72916c520230b297032008946836f8113c5e Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Wed, 23 Aug 2023 11:36:00 +0800 Subject: [PATCH 1/5] feat(s3): support S3StreamControlManager handles stream's create/open operation 1. support S3StreamControlManager handles stream's create/open operation Signed-off-by: TheR1sing3un --- .../errors/s3/StreamExistException.java | 26 ++++ .../errors/s3/StreamFencedException.java | 29 +++++ .../errors/s3/StreamNotExistException.java | 28 ++++ .../apache/kafka/common/protocol/Errors.java | 14 +- .../kafka/controller/QuorumController.java | 24 ++-- .../stream/S3ObjectControlManager.java | 8 ++ .../stream/StreamControlManager.java | 122 +++++++++++++++++- .../common/metadata/S3StreamRecord.json | 6 + 8 files changed, 246 insertions(+), 11 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/common/errors/s3/StreamExistException.java create mode 100644 clients/src/main/java/org/apache/kafka/common/errors/s3/StreamFencedException.java create mode 100644 clients/src/main/java/org/apache/kafka/common/errors/s3/StreamNotExistException.java diff --git a/clients/src/main/java/org/apache/kafka/common/errors/s3/StreamExistException.java b/clients/src/main/java/org/apache/kafka/common/errors/s3/StreamExistException.java new file mode 100644 index 0000000000..9764445864 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/s3/StreamExistException.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.errors.s3; + +import org.apache.kafka.common.errors.ApiException; + +public class StreamExistException extends ApiException { + public StreamExistException(String message) { + super(message); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/errors/s3/StreamFencedException.java b/clients/src/main/java/org/apache/kafka/common/errors/s3/StreamFencedException.java new file mode 100644 index 0000000000..b787a395d6 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/s3/StreamFencedException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.kafka.common.errors.s3; + +import org.apache.kafka.common.errors.ApiException; + +public class StreamFencedException extends ApiException { + + public StreamFencedException(String message) { + super(message); + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/errors/s3/StreamNotExistException.java b/clients/src/main/java/org/apache/kafka/common/errors/s3/StreamNotExistException.java new file mode 100644 index 0000000000..cec778ace4 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/s3/StreamNotExistException.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.errors.s3; + +import org.apache.kafka.common.errors.ApiException; + +public class StreamNotExistException extends ApiException { + + public StreamNotExistException(String message) { + super(message); + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index c220bbcde4..8d74d0d806 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -127,6 +127,9 @@ import org.apache.kafka.common.errors.UnsupportedForMessageFormatException; import org.apache.kafka.common.errors.UnsupportedSaslMechanismException; import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.errors.s3.StreamExistException; +import org.apache.kafka.common.errors.s3.StreamFencedException; +import org.apache.kafka.common.errors.s3.StreamNotExistException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -370,7 +373,16 @@ public enum Errors { TRANSACTIONAL_ID_NOT_FOUND(105, "The transactionalId could not be found", TransactionalIdNotFoundException::new), FETCH_SESSION_TOPIC_ID_ERROR(106, "The fetch session encountered inconsistent topic ID usage", FetchSessionTopicIdException::new), INELIGIBLE_REPLICA(107, "The new ISR contains at least one ineligible replica.", IneligibleReplicaException::new), - NEW_LEADER_ELECTED(108, "The AlterPartition request successfully updated the partition state but the leader has changed.", NewLeaderElectedException::new); + NEW_LEADER_ELECTED(108, "The AlterPartition request successfully updated the partition state but the leader has changed.", NewLeaderElectedException::new), + + // Kafka on S3 inject start + + STREAM_EXIST(109, "The stream already exists.", StreamExistException::new), + STREAM_NOT_EXIST(110, "The stream does not exist.", StreamNotExistException::new), + STREAM_FENCED(111, "The stream is fenced.", StreamFencedException::new); + + + // Kafka on S3 inject end private static final Logger log = LoggerFactory.getLogger(Errors.class); diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index fba468dbb7..930373e5c1 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -2186,44 +2186,52 @@ public CompletableFuture notifyS3ObjectDeleted(ControllerRequestContext co @Override public CompletableFuture createStream(ControllerRequestContext context, CreateStreamRequestData request) { - return null; + return appendWriteEvent("creatStream", context.deadlineNs(), + () -> streamControlManager.createStream(request)); } @Override public CompletableFuture openStream(ControllerRequestContext context, OpenStreamRequestData request) { - return null; + return appendWriteEvent("openStream", context.deadlineNs(), + () -> streamControlManager.openStream(request)); } @Override public CompletableFuture closeStream(ControllerRequestContext context, CloseStreamRequestData response) { - return null; + return appendWriteEvent("closeStream", context.deadlineNs(), + () -> streamControlManager.closeStream(response)); } @Override public CompletableFuture deleteStream(ControllerRequestContext context, DeleteStreamRequestData request) { - return null; + return appendWriteEvent("deleteStream", context.deadlineNs(), + () -> streamControlManager.deleteStream(request)); } @Override public CompletableFuture prepareObject(ControllerRequestContext context, PrepareS3ObjectRequestData request) { - return null; + return appendWriteEvent("prepareObject", context.deadlineNs(), + () -> s3ObjectControlManager.prepareObject(request)); } @Override public CompletableFuture commitWALObject(ControllerRequestContext context, CommitWALObjectRequestData request) { - return null; + return appendWriteEvent("commitWALObject", context.deadlineNs(), + () -> streamControlManager.commitWALObject(request)); } @Override public CompletableFuture commitCompactObject(ControllerRequestContext context, CommitCompactObjectRequestData request) { - return null; + return appendWriteEvent("commitCompactObject", context.deadlineNs(), + () -> streamControlManager.commitCompactObject(request)); } @Override public CompletableFuture commitStreamObject(ControllerRequestContext context, CommitStreamObjectRequestData request) { - return null; + return appendWriteEvent("commitStreamObject", context.deadlineNs(), + () -> streamControlManager.commitStreamObject(request)); } // Kafka on S3 inject end diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java index 06b78a4cf8..d00ace45d9 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java @@ -28,6 +28,10 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import org.apache.kafka.common.message.CreateStreamRequestData; +import org.apache.kafka.common.message.CreateStreamResponseData; +import org.apache.kafka.common.message.PrepareS3ObjectRequestData; +import org.apache.kafka.common.message.PrepareS3ObjectResponseData; import org.apache.kafka.common.metadata.RemoveS3ObjectRecord; import org.apache.kafka.common.metadata.S3ObjectRecord; import org.apache.kafka.common.utils.LogContext; @@ -119,6 +123,10 @@ public Long nextAssignedObjectId() { return nextAssignedObjectId; } + public ControllerResult prepareObject(PrepareS3ObjectRequestData data) { + + } + public void replay(S3ObjectRecord record) { GenerateContextV0 ctx = new GenerateContextV0(clusterId, record.objectId()); String objectKey = S3ObjectKeyGeneratorManager.getByVersion(0).generate(ctx); diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java index 7f0483717f..c04affea4a 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java @@ -17,10 +17,32 @@ package org.apache.kafka.controller.stream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.kafka.common.message.CloseStreamRequestData; +import org.apache.kafka.common.message.CloseStreamResponseData; +import org.apache.kafka.common.message.CommitCompactObjectRequestData; +import org.apache.kafka.common.message.CommitCompactObjectResponseData; +import org.apache.kafka.common.message.CommitStreamObjectRequestData; +import org.apache.kafka.common.message.CommitStreamObjectResponseData; +import org.apache.kafka.common.message.CommitWALObjectRequestData; +import org.apache.kafka.common.message.CommitWALObjectResponseData; +import org.apache.kafka.common.message.CreateStreamRequestData; +import org.apache.kafka.common.message.CreateStreamResponseData; +import org.apache.kafka.common.message.DeleteStreamRequestData; +import org.apache.kafka.common.message.DeleteStreamResponseData; +import org.apache.kafka.common.message.OpenStreamRequestData; +import org.apache.kafka.common.message.OpenStreamResponseData; +import org.apache.kafka.common.metadata.RangeRecord; +import org.apache.kafka.common.metadata.S3StreamRecord; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.controller.ControllerResult; import org.apache.kafka.metadata.stream.RangeMetadata; import org.apache.kafka.metadata.stream.S3StreamObject; import org.apache.kafka.metadata.stream.S3WALObject; +import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.TimelineHashMap; import org.apache.kafka.timeline.TimelineHashSet; @@ -32,14 +54,19 @@ public class StreamControlManager { static class S3StreamMetadata { + private Long streamId; - private Long epoch; + // current epoch, when created but not open, use 0 represent + private Long currentEpoch; + // rangeIndex, when created but not open, there is no range, use -1 represent + private Integer currentRangeIndex = -1; private Long startOffset; - private TimelineHashSet ranges; + private TimelineHashMap ranges; private TimelineHashSet streamObjects; } static class BrokerS3WALMetadata { + private Integer brokerId; private TimelineHashSet walObjects; } @@ -65,4 +92,95 @@ public StreamControlManager( this.brokersMetadata = new TimelineHashMap<>(snapshotRegistry, 0); } + public ControllerResult createStream(CreateStreamRequestData data) { + long streamId = data.streamId(); + CreateStreamResponseData resp = new CreateStreamResponseData(); + if (this.streamsMetadata.containsKey(streamId)) { + // already exist + resp.setErrorCode(Errors.STREAM_EXIST.code()); + return ControllerResult.of(null, resp); + } + // create stream + ApiMessageAndVersion record = new ApiMessageAndVersion(new S3StreamRecord() + .setStreamId(streamId) + .setEpoch(0) + .setStartOffset(0L), (short) 0); + return ControllerResult.of(Arrays.asList(record), resp); + } + + public ControllerResult openStream(OpenStreamRequestData data) { + OpenStreamResponseData resp = new OpenStreamResponseData(); + long streamId = data.streamId(); + int brokerId = data.brokerId(); + long epoch = data.streamEpoch(); + // verify stream exist + if (!this.streamsMetadata.containsKey(streamId)) { + resp.setErrorCode(Errors.STREAM_NOT_EXIST.code()); + return ControllerResult.of(null, resp); + } + // verify epoch match + S3StreamMetadata streamMetadata = this.streamsMetadata.get(streamId); + if (streamMetadata.currentEpoch > epoch) { + resp.setErrorCode(Errors.STREAM_FENCED.code()); + return ControllerResult.of(null, resp); + } + if (streamMetadata.currentEpoch == epoch) { + // epoch equals, verify broker + RangeMetadata rangeMetadata = streamMetadata.ranges.get(streamMetadata.currentRangeIndex); + if (rangeMetadata == null || rangeMetadata.getBrokerId() != brokerId) { + resp.setErrorCode(Errors.STREAM_FENCED.code()); + return ControllerResult.of(null, resp); + } + } + // now the request in valid, update the stream's epoch and create a new range for this broker + List records = new ArrayList<>(); + long newEpoch = streamMetadata.currentEpoch + 1; + int newRangeIndex = streamMetadata.currentRangeIndex + 1; + // stream update record + records.add(new ApiMessageAndVersion(new S3StreamRecord() + .setStreamId(streamId) + .setEpoch(newEpoch) + .setRangeIndex(newRangeIndex) + .setStartOffset(streamMetadata.startOffset), (short) 0)); + // get new range's start offset + // default regard this range is the first range in stream, use 0 as start offset + long startOffset = 0; + if (newRangeIndex > 0) { + // means that the new range is not the first range in stream, get the last range's end offset + RangeMetadata lastRangeMetadata = streamMetadata.ranges.get(streamMetadata.currentRangeIndex); + startOffset = lastRangeMetadata.getEndOffset().get() + 1; + } + // range create record + records.add(new ApiMessageAndVersion(new RangeRecord() + .setStreamId(streamId) + .setBrokerId(brokerId) + .setStartOffset(startOffset) + .setEndOffset(startOffset) + .setEpoch(newEpoch) + .setRangeIndex(newRangeIndex), (short) 0)); + resp.setStartOffset(startOffset); + return ControllerResult.of(records, resp); + } + + public ControllerResult closeStream(CloseStreamRequestData data) { + throw new UnsupportedOperationException(); + } + + public ControllerResult deleteStream(DeleteStreamRequestData data) { + throw new UnsupportedOperationException(); + } + + public ControllerResult commitWALObject(CommitWALObjectRequestData data) { + + } + + public ControllerResult commitCompactObject(CommitCompactObjectRequestData data) { + + } + + public ControllerResult commitStreamObject(CommitStreamObjectRequestData data) { + + } + + } diff --git a/metadata/src/main/resources/common/metadata/S3StreamRecord.json b/metadata/src/main/resources/common/metadata/S3StreamRecord.json index b4d4a08f31..20c5a3a903 100644 --- a/metadata/src/main/resources/common/metadata/S3StreamRecord.json +++ b/metadata/src/main/resources/common/metadata/S3StreamRecord.json @@ -32,6 +32,12 @@ "versions": "0+", "about": "The epoch" }, + { + "name": "rangeIndex", + "type": "int32", + "versions": "0+", + "about": "The range index" + }, { "name": "StartOffset", "type": "int64", From 79fc9b3638c97f1edb6476e1e204022955265c8f Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Wed, 23 Aug 2023 16:38:25 +0800 Subject: [PATCH 2/5] test(s3): add S3StreamControlManager basic create/open stream test 1. add S3StreamControlManager basic create/open stream test 2. fix to pass checkstyle and spot bugs checking Signed-off-by: TheR1sing3un --- build.gradle | 5 +- .../errors/es/SlowFetchHintException.java | 16 +- .../common/requests/OpenStreamRequest.java | 1 - .../common/requests/OpenStreamResponse.java | 1 - .../kafka/log/es/AlwaysSuccessClient.java | 5 +- .../log/es/DefaultElasticStreamSlice.java | 2 +- .../kafka/log/es/ElasticLogReopenTester.scala | 18 ++ .../org/apache/kafka/message/EntityType.java | 5 +- .../apache/kafka/controller/Controller.java | 5 - .../stream/S3ObjectControlManager.java | 4 +- .../stream/S3ObjectKeyGeneratorManager.java | 12 +- .../stream/StreamControlManager.java | 167 +++++++++++-- .../kafka/image/BrokerS3WALMetadataDelta.java | 10 +- .../org/apache/kafka/image/MetadataDelta.java | 28 ++- .../apache/kafka/image/S3ObjectsDelta.java | 2 +- .../kafka/image/S3StreamMetadataDelta.java | 6 +- .../kafka/metadata/stream/RangeMetadata.java | 43 ++-- .../kafka/metadata/stream/S3Object.java | 5 +- .../kafka/metadata/stream/S3StreamObject.java | 18 ++ .../kafka/metadata/stream/S3WALObject.java | 31 +++ .../metadata/stream/SimplifiedS3Object.java | 17 ++ .../controller/StreamControlManagerTest.java | 234 ++++++++++++++++++ .../image/S3StreamMetadataImageTest.java | 9 +- 23 files changed, 545 insertions(+), 99 deletions(-) create mode 100644 metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java diff --git a/build.gradle b/build.gradle index 13b16adf35..e07c7812a1 100644 --- a/build.gradle +++ b/build.gradle @@ -224,7 +224,10 @@ if (file('.git').exists()) { 'streams/streams-scala/logs/*', 'licenses/*', '**/generated/**', - 'clients/src/test/resources/serializedData/*' + 'clients/src/test/resources/serializedData/*', + '.github/**', + 'tests/docker/es/docker-compose.yaml', + 'tests/esk_test_suite.yml' ]) } } else { diff --git a/clients/src/main/java/org/apache/kafka/common/errors/es/SlowFetchHintException.java b/clients/src/main/java/org/apache/kafka/common/errors/es/SlowFetchHintException.java index 6d6deb8c9f..fb51a5a900 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/es/SlowFetchHintException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/es/SlowFetchHintException.java @@ -23,12 +23,20 @@ */ public class SlowFetchHintException extends RetriableException { private static final long serialVersionUID = 1L; - public SlowFetchHintException() { super();} + public SlowFetchHintException() { + super(); + } - public SlowFetchHintException(String message) { super(message); } + public SlowFetchHintException(String message) { + super(message); + } - public SlowFetchHintException(Throwable cause) { super(cause); } + public SlowFetchHintException(Throwable cause) { + super(cause); + } - public SlowFetchHintException(String message, Throwable cause) { super(message, cause); } + public SlowFetchHintException(String message, Throwable cause) { + super(message, cause); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OpenStreamRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OpenStreamRequest.java index 6dd9cfc03c..63f48d74f8 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OpenStreamRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OpenStreamRequest.java @@ -20,7 +20,6 @@ import org.apache.kafka.common.message.OpenStreamRequestData; import org.apache.kafka.common.message.OpenStreamResponseData; import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.ApiMessage; public class OpenStreamRequest extends AbstractRequest { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OpenStreamResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OpenStreamResponse.java index 4807e45c32..9d1b070f98 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OpenStreamResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OpenStreamResponse.java @@ -20,7 +20,6 @@ import java.util.Map; import org.apache.kafka.common.message.OpenStreamResponseData; import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.protocol.Errors; public class OpenStreamResponse extends AbstractResponse { diff --git a/core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java b/core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java index c4083e14c7..1c229ca12b 100644 --- a/core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java +++ b/core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java @@ -40,6 +40,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +@SuppressWarnings("uncheck") public class AlwaysSuccessClient implements Client { private static final Logger LOGGER = LoggerFactory.getLogger(AlwaysSuccessClient.class); @@ -128,7 +129,7 @@ static class StreamImpl implements Stream { private final Stream stream; private volatile boolean closed = false; private final Map slowFetchingOffsetMap = new ConcurrentHashMap<>(); - private final long SLOW_FETCH_TIMEOUT_MILLIS = 10; + private static final long SLOW_FETCH_TIMEOUT_MILLIS = 10; public StreamImpl(Stream stream) { this.stream = stream; @@ -179,7 +180,7 @@ public CompletableFuture fetch(long startOffset, long endOffset, in if (ex != null) { if (closed) { cf.completeExceptionally(new IllegalStateException("stream already closed")); - } else if (ex instanceof TimeoutException){ + } else if (ex instanceof TimeoutException) { LOGGER.info("Fetch stream[{}] [{},{}) timeout for {} ms, retry with slow fetching", streamId(), startOffset, endOffset, SLOW_FETCH_TIMEOUT_MILLIS); cf.completeExceptionally(new SlowFetchHintException("fetch data too slowly, retry with slow fetching")); slowFetchingOffsetMap.put(slowFetchKey, true); diff --git a/core/src/main/scala/kafka/log/es/DefaultElasticStreamSlice.java b/core/src/main/scala/kafka/log/es/DefaultElasticStreamSlice.java index 6dc8725f29..935f0c1624 100644 --- a/core/src/main/scala/kafka/log/es/DefaultElasticStreamSlice.java +++ b/core/src/main/scala/kafka/log/es/DefaultElasticStreamSlice.java @@ -80,7 +80,7 @@ public FetchResult fetch(long startOffset, long endOffset, int maxBytesHint) thr return stream.fetch(startOffsetInStream + fixedStartOffset, startOffsetInStream + endOffset, maxBytesHint).thenApply(FetchResultWrapper::new).get(); } catch (ExecutionException e) { if (e.getCause() instanceof SlowFetchHintException) { - throw (SlowFetchHintException)(e.getCause()); + throw (SlowFetchHintException) (e.getCause()); } else { throw new RuntimeException(e.getCause()); } diff --git a/core/src/test/scala/kafka/log/es/ElasticLogReopenTester.scala b/core/src/test/scala/kafka/log/es/ElasticLogReopenTester.scala index 2855b81ad9..33a7e3d2a7 100644 --- a/core/src/test/scala/kafka/log/es/ElasticLogReopenTester.scala +++ b/core/src/test/scala/kafka/log/es/ElasticLogReopenTester.scala @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package kafka.log.es import joptsimple.OptionParser diff --git a/generator/src/main/java/org/apache/kafka/message/EntityType.java b/generator/src/main/java/org/apache/kafka/message/EntityType.java index 9fd35aab44..a2eae53200 100644 --- a/generator/src/main/java/org/apache/kafka/message/EntityType.java +++ b/generator/src/main/java/org/apache/kafka/message/EntityType.java @@ -18,7 +18,6 @@ package org.apache.kafka.message; import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.kafka.message.FieldType.Int64FieldType; public enum EntityType { @JsonProperty("unknown") @@ -41,10 +40,10 @@ public enum EntityType { // Kafka on S3 inject start @JsonProperty("streamId") - STREAM_ID(Int64FieldType.INSTANCE), + STREAM_ID(FieldType.Int64FieldType.INSTANCE), @JsonProperty("streamEpoch") - STREAM_EPOCH(Int64FieldType.INSTANCE); + STREAM_EPOCH(FieldType.Int64FieldType.INSTANCE); // Kafka on S3 inject end private final FieldType baseType; diff --git a/metadata/src/main/java/org/apache/kafka/controller/Controller.java b/metadata/src/main/java/org/apache/kafka/controller/Controller.java index 356574b786..9ba7a6a91a 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/Controller.java +++ b/metadata/src/main/java/org/apache/kafka/controller/Controller.java @@ -57,11 +57,6 @@ import org.apache.kafka.common.quota.ClientQuotaAlteration; import org.apache.kafka.common.quota.ClientQuotaEntity; import org.apache.kafka.common.requests.ApiError; -import org.apache.kafka.common.requests.CloseStreamRequest; -import org.apache.kafka.common.requests.CloseStreamResponse; -import org.apache.kafka.common.requests.CommitCompactObjectRequest; -import org.apache.kafka.common.requests.CommitStreamObjectRequest; -import org.apache.kafka.common.requests.PrepareS3ObjectRequest; import org.apache.kafka.metadata.BrokerHeartbeatReply; import org.apache.kafka.metadata.BrokerRegistrationReply; import org.apache.kafka.metadata.FinalizedControllerFeatures; diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java index d00ace45d9..6f228d55b8 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java @@ -28,8 +28,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import org.apache.kafka.common.message.CreateStreamRequestData; -import org.apache.kafka.common.message.CreateStreamResponseData; import org.apache.kafka.common.message.PrepareS3ObjectRequestData; import org.apache.kafka.common.message.PrepareS3ObjectResponseData; import org.apache.kafka.common.metadata.RemoveS3ObjectRecord; @@ -124,7 +122,7 @@ public Long nextAssignedObjectId() { } public ControllerResult prepareObject(PrepareS3ObjectRequestData data) { - + throw new UnsupportedOperationException(); } public void replay(S3ObjectRecord record) { diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectKeyGeneratorManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectKeyGeneratorManager.java index 220ec223c4..43251bb19a 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectKeyGeneratorManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectKeyGeneratorManager.java @@ -1,13 +1,13 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with + * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java index c04affea4a..c40497de0e 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java @@ -19,7 +19,10 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.kafka.common.message.CloseStreamRequestData; import org.apache.kafka.common.message.CloseStreamResponseData; import org.apache.kafka.common.message.CommitCompactObjectRequestData; @@ -35,6 +38,8 @@ import org.apache.kafka.common.message.OpenStreamRequestData; import org.apache.kafka.common.message.OpenStreamResponseData; import org.apache.kafka.common.metadata.RangeRecord; +import org.apache.kafka.common.metadata.RemoveRangeRecord; +import org.apache.kafka.common.metadata.RemoveS3StreamRecord; import org.apache.kafka.common.metadata.S3StreamRecord; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.LogContext; @@ -53,22 +58,80 @@ */ public class StreamControlManager { - static class S3StreamMetadata { - - private Long streamId; + public static class S3StreamMetadata { // current epoch, when created but not open, use 0 represent - private Long currentEpoch; + private long currentEpoch; // rangeIndex, when created but not open, there is no range, use -1 represent - private Integer currentRangeIndex = -1; - private Long startOffset; + private int currentRangeIndex = -1; + private long startOffset; private TimelineHashMap ranges; private TimelineHashSet streamObjects; - } - static class BrokerS3WALMetadata { + public S3StreamMetadata(long currentEpoch, int currentRangeIndex, long startOffset, + SnapshotRegistry registry) { + this.currentEpoch = currentEpoch; + this.currentRangeIndex = currentRangeIndex; + this.startOffset = startOffset; + this.ranges = new TimelineHashMap<>(registry, 0); + this.streamObjects = new TimelineHashSet<>(registry, 0); + } + + public long currentEpoch() { + return currentEpoch; + } + + public int currentRangeIndex() { + return currentRangeIndex; + } + + public long startOffset() { + return startOffset; + } + + public Map ranges() { + return ranges; + } + + public Set streamObjects() { + return streamObjects; + } + + @Override + public String toString() { + return "S3StreamMetadata{" + + "currentEpoch=" + currentEpoch + + ", currentRangeIndex=" + currentRangeIndex + + ", startOffset=" + startOffset + + ", ranges=" + ranges + + ", streamObjects=" + streamObjects + + '}'; + } + } - private Integer brokerId; + public static class BrokerS3WALMetadata { + private int brokerId; private TimelineHashSet walObjects; + + public BrokerS3WALMetadata(int brokerId, SnapshotRegistry registry) { + this.brokerId = brokerId; + this.walObjects = new TimelineHashSet<>(registry, 0); + } + + public int getBrokerId() { + return brokerId; + } + + public TimelineHashSet getWalObjects() { + return walObjects; + } + + @Override + public String toString() { + return "BrokerS3WALMetadata{" + + "brokerId=" + brokerId + + ", walObjects=" + walObjects + + '}'; + } } private final SnapshotRegistry snapshotRegistry; @@ -98,13 +161,14 @@ public ControllerResult createStream(CreateStreamReque if (this.streamsMetadata.containsKey(streamId)) { // already exist resp.setErrorCode(Errors.STREAM_EXIST.code()); - return ControllerResult.of(null, resp); + return ControllerResult.of(Collections.emptyList(), resp); } // create stream ApiMessageAndVersion record = new ApiMessageAndVersion(new S3StreamRecord() .setStreamId(streamId) .setEpoch(0) - .setStartOffset(0L), (short) 0); + .setStartOffset(0L) + .setRangeIndex(-1), (short) 0); return ControllerResult.of(Arrays.asList(record), resp); } @@ -116,21 +180,24 @@ public ControllerResult openStream(OpenStreamRequestData // verify stream exist if (!this.streamsMetadata.containsKey(streamId)) { resp.setErrorCode(Errors.STREAM_NOT_EXIST.code()); - return ControllerResult.of(null, resp); + return ControllerResult.of(Collections.emptyList(), resp); } // verify epoch match S3StreamMetadata streamMetadata = this.streamsMetadata.get(streamId); if (streamMetadata.currentEpoch > epoch) { resp.setErrorCode(Errors.STREAM_FENCED.code()); - return ControllerResult.of(null, resp); + return ControllerResult.of(Collections.emptyList(), resp); } if (streamMetadata.currentEpoch == epoch) { // epoch equals, verify broker RangeMetadata rangeMetadata = streamMetadata.ranges.get(streamMetadata.currentRangeIndex); - if (rangeMetadata == null || rangeMetadata.getBrokerId() != brokerId) { + if (rangeMetadata == null || rangeMetadata.brokerId() != brokerId) { resp.setErrorCode(Errors.STREAM_FENCED.code()); - return ControllerResult.of(null, resp); + return ControllerResult.of(Collections.emptyList(), resp); } + // epoch equals, broker equals, regard it as redundant open operation, just return success + resp.setStartOffset(streamMetadata.startOffset); + return ControllerResult.of(Collections.emptyList(), resp); } // now the request in valid, update the stream's epoch and create a new range for this broker List records = new ArrayList<>(); @@ -148,14 +215,15 @@ public ControllerResult openStream(OpenStreamRequestData if (newRangeIndex > 0) { // means that the new range is not the first range in stream, get the last range's end offset RangeMetadata lastRangeMetadata = streamMetadata.ranges.get(streamMetadata.currentRangeIndex); - startOffset = lastRangeMetadata.getEndOffset().get() + 1; + startOffset = lastRangeMetadata.endOffset() + 1; } // range create record records.add(new ApiMessageAndVersion(new RangeRecord() .setStreamId(streamId) .setBrokerId(brokerId) .setStartOffset(startOffset) - .setEndOffset(startOffset) + // default end offset set to (startOffset - 1) + .setEndOffset(startOffset - 1) .setEpoch(newEpoch) .setRangeIndex(newRangeIndex), (short) 0)); resp.setStartOffset(startOffset); @@ -171,16 +239,77 @@ public ControllerResult deleteStream(DeleteStreamReque } public ControllerResult commitWALObject(CommitWALObjectRequestData data) { - + throw new UnsupportedOperationException(); } public ControllerResult commitCompactObject(CommitCompactObjectRequestData data) { - + throw new UnsupportedOperationException(); } public ControllerResult commitStreamObject(CommitStreamObjectRequestData data) { + throw new UnsupportedOperationException(); + } + + + public void replay(S3StreamRecord record) { + long streamId = record.streamId(); + // already exist, update the stream's self metadata + if (this.streamsMetadata.containsKey(streamId)) { + S3StreamMetadata streamMetadata = this.streamsMetadata.get(streamId); + streamMetadata.startOffset = record.startOffset(); + streamMetadata.currentEpoch = record.epoch(); + streamMetadata.currentRangeIndex = record.rangeIndex(); + return; + } + // not exist, create a new stream + S3StreamMetadata streamMetadata = new S3StreamMetadata(record.epoch(), record.rangeIndex(), + record.startOffset(), this.snapshotRegistry); + this.streamsMetadata.put(streamId, streamMetadata); + } + + public void replay(RemoveS3StreamRecord record) { + long streamId = record.streamId(); + this.streamsMetadata.remove(streamId); + } + + public void replay(RangeRecord record) { + long streamId = record.streamId(); + S3StreamMetadata streamMetadata = this.streamsMetadata.get(streamId); + if (streamMetadata == null) { + // should not happen + log.error("stream {} not exist when replay range record {}", streamId, record); + return; + } + streamMetadata.ranges.put(record.rangeIndex(), RangeMetadata.of(record)); + } + + public void replay(RemoveRangeRecord record) { + long streamId = record.streamId(); + S3StreamMetadata streamMetadata = this.streamsMetadata.get(streamId); + if (streamMetadata == null) { + // should not happen + log.error("stream {} not exist when replay remove range record {}", streamId, record); + return; + } + streamMetadata.ranges.remove(record.rangeIndex()); + } + + public Map streamsMetadata() { + return streamsMetadata; } + public Map brokersMetadata() { + return brokersMetadata; + } + @Override + public String toString() { + return "StreamControlManager{" + + "snapshotRegistry=" + snapshotRegistry + + ", s3ObjectControlManager=" + s3ObjectControlManager + + ", streamsMetadata=" + streamsMetadata + + ", brokersMetadata=" + brokersMetadata + + '}'; + } } diff --git a/metadata/src/main/java/org/apache/kafka/image/BrokerS3WALMetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/BrokerS3WALMetadataDelta.java index 2ef674cc54..1c9c04f3f2 100644 --- a/metadata/src/main/java/org/apache/kafka/image/BrokerS3WALMetadataDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/BrokerS3WALMetadataDelta.java @@ -18,8 +18,10 @@ package org.apache.kafka.image; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.kafka.common.metadata.RemoveWALObjectRecord; import org.apache.kafka.common.metadata.WALObjectRecord; @@ -28,7 +30,7 @@ public class BrokerS3WALMetadataDelta { private final BrokerS3WALMetadataImage image; - private final Set addedS3WALObjects = new HashSet<>(); + private final Map addedS3WALObjects = new HashMap<>(); private final Set removedS3WALObjects = new HashSet<>(); @@ -37,7 +39,7 @@ public BrokerS3WALMetadataDelta(BrokerS3WALMetadataImage image) { } public void replay(WALObjectRecord record) { - addedS3WALObjects.add(S3WALObject.of(record)); + addedS3WALObjects.put(record.objectId(), S3WALObject.of(record)); // new add or update, so remove from removedObjects removedS3WALObjects.remove(record.objectId()); } @@ -51,9 +53,9 @@ public void replay(RemoveWALObjectRecord record) { public BrokerS3WALMetadataImage apply() { List newS3WALObjects = new ArrayList<>(image.getWalObjects()); // add all changed WAL objects - newS3WALObjects.addAll(addedS3WALObjects); + newS3WALObjects.addAll(addedS3WALObjects.values()); // remove all removed WAL objects - newS3WALObjects.removeAll(removedS3WALObjects); + newS3WALObjects.removeIf(s3WALObject -> removedS3WALObjects.contains(s3WALObject.objectId())); return new BrokerS3WALMetadataImage(image.getBrokerId(), newS3WALObjects); } diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java index 8ad7612cdc..b218bc228d 100644 --- a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java @@ -462,18 +462,8 @@ public MetadataImage apply(MetadataProvenance provenance) { } // Kafka on S3 inject start - S3StreamsMetadataImage newStreamMetadata; - if (s3StreamsMetadataDelta == null) { - newStreamMetadata = image.streamsMetadata(); - } else { - newStreamMetadata = s3StreamsMetadataDelta.apply(); - } - S3ObjectsImage newS3ObjectsMetadata; - if (s3ObjectsDelta == null) { - newS3ObjectsMetadata = image.objectsMetadata(); - } else { - newS3ObjectsMetadata = s3ObjectsDelta.apply(); - } + S3StreamsMetadataImage newStreamMetadata = getNewS3StreamsMetadataImage(); + S3ObjectsImage newS3ObjectsMetadata = getNewS3ObjectsMetadataImage(); // Kafka on S3 inject end return new MetadataImage( provenance, @@ -489,6 +479,20 @@ public MetadataImage apply(MetadataProvenance provenance) { ); } + // Kafka on S3 inject start + + private S3StreamsMetadataImage getNewS3StreamsMetadataImage() { + return s3StreamsMetadataDelta == null ? + image.streamsMetadata() : s3StreamsMetadataDelta.apply(); + } + + private S3ObjectsImage getNewS3ObjectsMetadataImage() { + return s3ObjectsDelta == null ? + image.objectsMetadata() : s3ObjectsDelta.apply(); + } + + // Kafka on S3 inject end + @Override public String toString() { return "MetadataDelta(" + diff --git a/metadata/src/main/java/org/apache/kafka/image/S3ObjectsDelta.java b/metadata/src/main/java/org/apache/kafka/image/S3ObjectsDelta.java index 32ca01c57f..7abcda297e 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3ObjectsDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3ObjectsDelta.java @@ -60,7 +60,7 @@ public void replay(S3ObjectRecord record) { public void replay(RemoveS3ObjectRecord record) { removedObjectIds.add(record.objectId()); // new remove, so remove from addedObjects - addedObjects.remove(record.objectId()); + addedObjects.removeIf(obj -> obj.objectId() == record.objectId()); } public S3ObjectsImage apply() { diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataDelta.java index b4338d03e1..6dad5f3b2b 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataDelta.java @@ -37,7 +37,7 @@ public class S3StreamMetadataDelta { private final Map changedRanges = new HashMap<>(); private final Set removedRanges = new HashSet<>(); - private final Set changedS3StreamObjects = new HashSet<>(); + private final Map changedS3StreamObjects = new HashMap<>(); private final Set removedS3StreamObjectIds = new HashSet<>(); public S3StreamMetadataDelta(S3StreamMetadataImage image) { @@ -58,7 +58,7 @@ public void replay(RemoveRangeRecord record) { } public void replay(S3StreamObjectRecord record) { - changedS3StreamObjects.add(S3StreamObject.of(record)); + changedS3StreamObjects.put(record.objectId(), S3StreamObject.of(record)); // new add or update, so remove from removedObjects removedS3StreamObjectIds.remove(record.objectId()); } @@ -77,7 +77,7 @@ public S3StreamMetadataImage apply() { removedRanges.forEach(newRanges::remove); List newS3StreamObjects = new ArrayList<>(image.getStreamObjects()); // add all changed stream-objects - newS3StreamObjects.addAll(changedS3StreamObjects); + newS3StreamObjects.addAll(changedS3StreamObjects.values()); // remove all removed stream-objects newS3StreamObjects.removeIf(removedS3StreamObjectIds::contains); return new S3StreamMetadataImage(image.getStreamId(), newEpoch, image.getStartOffset(), newRanges, newS3StreamObjects); diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/RangeMetadata.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/RangeMetadata.java index cc73897be7..5e4f00b587 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/RangeMetadata.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/RangeMetadata.java @@ -17,22 +17,18 @@ package org.apache.kafka.metadata.stream; -import java.util.Optional; import org.apache.kafka.common.metadata.RangeRecord; import org.apache.kafka.server.common.ApiMessageAndVersion; public class RangeMetadata implements Comparable { - private Long streamId; - private Long epoch; - private Integer rangeIndex; - private Long startOffset; - private Optional endOffset; - private Integer brokerId; + private long streamId; + private long epoch; + private int rangeIndex; + private long startOffset; + private long endOffset; + private int brokerId; - private RangeMetadata() { - } - - public RangeMetadata(Long streamId, Long epoch, Integer rangeIndex, Long startOffset, Optional endOffset, Integer brokerId) { + public RangeMetadata(long streamId, long epoch, int rangeIndex, long startOffset, long endOffset, int brokerId) { this.streamId = streamId; this.epoch = epoch; this.rangeIndex = rangeIndex; @@ -43,26 +39,26 @@ public RangeMetadata(Long streamId, Long epoch, Integer rangeIndex, Long startOf @Override public int compareTo(RangeMetadata o) { - return this.rangeIndex.compareTo(o.rangeIndex); + return this.rangeIndex - o.rangeIndex; } - public Long getEpoch() { + public long epoch() { return epoch; } - public Integer getRangeIndex() { + public int rangeIndex() { return rangeIndex; } - public Long getStartOffset() { + public long startOffset() { return startOffset; } - public Optional getEndOffset() { + public long endOffset() { return endOffset; } - public Integer getBrokerId() { + public int brokerId() { return brokerId; } @@ -73,17 +69,14 @@ public ApiMessageAndVersion toRecord() { .setBrokerId(brokerId) .setRangeIndex(rangeIndex) .setStartOffset(startOffset) - .setEndOffset(endOffset.get()), (short) 0); + .setEndOffset(endOffset), (short) 0); } public static RangeMetadata of(RangeRecord record) { - RangeMetadata rangeMetadata = new RangeMetadata(); - rangeMetadata.streamId = record.streamId(); - rangeMetadata.epoch = record.epoch(); - rangeMetadata.rangeIndex = record.rangeIndex(); - rangeMetadata.startOffset = record.startOffset(); - rangeMetadata.endOffset = Optional.ofNullable(record.endOffset()); - rangeMetadata.brokerId = record.brokerId(); + RangeMetadata rangeMetadata = new RangeMetadata( + record.streamId(), record.epoch(), record.rangeIndex(), + record.startOffset(), record.endOffset(), record.brokerId() + ); return rangeMetadata; } } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java index 96ea91ca9c..1829458473 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java @@ -120,11 +120,10 @@ public ApiMessageAndVersion toRecord() { .setAppliedTimeInMs(appliedTimeInMs.orElse(null)) .setExpiredTimeInMs(expiredTimeInMs.orElse(null)) .setCommittedTimeInMs(committedTimeInMs.orElse(null)) - .setDestroyedTimeInMs(destroyedTimeInMs.orElse(null)) - , (short) 0); + .setDestroyedTimeInMs(destroyedTimeInMs.orElse(null)), (short) 0); } - public class S3ObjectCommitContext { + static public class S3ObjectCommitContext { private final Long committedTimeInMs; private final Long objectSize; diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java index 323c68a698..543b5715ba 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java @@ -17,6 +17,7 @@ package org.apache.kafka.metadata.stream; +import java.util.Objects; import org.apache.kafka.common.metadata.S3StreamObjectRecord; import org.apache.kafka.server.common.ApiMessageAndVersion; @@ -57,4 +58,21 @@ public static S3StreamObject of(S3StreamObjectRecord record) { S3StreamObject s3StreamObject = new S3StreamObject(record.objectId(), index); return s3StreamObject; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + S3StreamObject that = (S3StreamObject) o; + return objectId == that.objectId; + } + + @Override + public int hashCode() { + return Objects.hash(objectId); + } } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java index 763b86205a..6611e2bc8a 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java @@ -18,6 +18,7 @@ package org.apache.kafka.metadata.stream; import java.util.Map; +import java.util.Objects; import java.util.stream.Collectors; import org.apache.kafka.common.metadata.WALObjectRecord; import org.apache.kafka.common.metadata.WALObjectRecord.StreamIndex; @@ -63,4 +64,34 @@ public Map getStreamsIndex() { return streamsIndex; } + public Long objectId() { + return objectId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + S3WALObject that = (S3WALObject) o; + return objectId == that.objectId; + } + + @Override + public int hashCode() { + return Objects.hash(objectId); + } + + @Override + public String toString() { + return "S3WALObject{" + + "objectId=" + objectId + + ", brokerId=" + brokerId + + ", streamsIndex=" + streamsIndex + + ", objectType=" + objectType + + '}'; + } } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/SimplifiedS3Object.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/SimplifiedS3Object.java index 9e79d07ba8..f367f8c7b2 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/SimplifiedS3Object.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/SimplifiedS3Object.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.kafka.metadata.stream; import org.apache.kafka.common.metadata.S3ObjectRecord; diff --git a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java new file mode 100644 index 0000000000..8f678919e9 --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.util.List; +import java.util.Map; +import org.apache.kafka.common.message.CreateStreamRequestData; +import org.apache.kafka.common.message.CreateStreamResponseData; +import org.apache.kafka.common.message.OpenStreamRequestData; +import org.apache.kafka.common.message.OpenStreamResponseData; +import org.apache.kafka.common.metadata.RangeRecord; +import org.apache.kafka.common.metadata.S3StreamRecord; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.controller.stream.StreamControlManager; +import org.apache.kafka.controller.stream.StreamControlManager.S3StreamMetadata; +import org.apache.kafka.metadata.stream.RangeMetadata; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +@Timeout(value = 40) +public class StreamControlManagerTest { + + private final static long STREAM0 = 0; + private final static long STREAM1 = 1; + private final static long STREAM2 = 2; + + private final static int BROKER0 = 0; + private final static int BROKER1 = 1; + private final static int BROKER2 = 2; + + private final static long EPOCH0 = 0; + private final static long EPOCH1 = 1; + private final static long EPOCH2 = 2; + + private StreamControlManager manager; + + @BeforeEach + public void setUp() { + LogContext context = new LogContext(); + SnapshotRegistry registry = new SnapshotRegistry(context); + manager = new StreamControlManager(registry, context, null); + } + + @Test + public void testBasicCreateStream() { + // 1. create stream_0 success + CreateStreamRequestData request0 = new CreateStreamRequestData().setStreamId(STREAM0); + ControllerResult result0 = manager.createStream(request0); + List records0 = result0.records(); + CreateStreamResponseData response0 = result0.response(); + assertEquals(1, records0.size()); + ApiMessageAndVersion record0 = records0.get(0); + assertInstanceOf(S3StreamRecord.class, record0.message()); + S3StreamRecord streamRecord0 = (S3StreamRecord) record0.message(); + assertEquals(STREAM0, streamRecord0.streamId()); + assertEquals(0, streamRecord0.epoch()); + assertEquals(-1, streamRecord0.rangeIndex()); + assertEquals(0L, streamRecord0.startOffset()); + assertEquals(0, response0.errorCode()); + + // replay records_0 + manager.replay(streamRecord0); + // verify the stream_0 is created + Map streamsMetadata = + manager.streamsMetadata(); + assertEquals(1, streamsMetadata.size()); + verifyInitializedStreamMetadata(streamsMetadata.get(STREAM0)); + + // 2. create stream_0 with exception + CreateStreamRequestData request1 = new CreateStreamRequestData().setStreamId(STREAM0); + ControllerResult result1 = manager.createStream(request1); + List records1 = result1.records(); + CreateStreamResponseData response1 = result1.response(); + assertEquals(0, records1.size()); + assertEquals(Errors.STREAM_EXIST.code(), response1.errorCode()); + } + + @Test + public void testBasicOpenStream() { + // 1. create stream_0 and stream_1 + CreateStreamRequestData request0 = new CreateStreamRequestData().setStreamId(STREAM0); + ControllerResult result0 = manager.createStream(request0); + result0.records().stream().map(x -> (S3StreamRecord) x.message()).forEach(manager::replay); + CreateStreamRequestData request1 = new CreateStreamRequestData().setStreamId(STREAM1); + ControllerResult result1 = manager.createStream(request1); + result1.records().stream().map(x -> (S3StreamRecord) x.message()).forEach(manager::replay); + + // verify the streams are created + Map streamsMetadata = manager.streamsMetadata(); + assertEquals(2, streamsMetadata.size()); + verifyInitializedStreamMetadata(streamsMetadata.get(STREAM0)); + verifyInitializedStreamMetadata(streamsMetadata.get(STREAM1)); + + // 2. broker_0 open stream_0 and stream_1 with epoch1 + ControllerResult result2 = manager.openStream( + new OpenStreamRequestData().setStreamId(STREAM0).setStreamEpoch(EPOCH1).setBrokerId(BROKER0)); + ControllerResult result3 = manager.openStream( + new OpenStreamRequestData().setStreamId(STREAM1).setStreamEpoch(EPOCH1).setBrokerId(BROKER0)); + verifyFirstTimeOpenStreamResult(result2, EPOCH1, BROKER0); + verifyFirstTimeOpenStreamResult(result3, EPOCH1, BROKER0); + S3StreamRecord streamRecord = (S3StreamRecord) result2.records().get(0).message(); + manager.replay(streamRecord); + RangeRecord rangeRecord = (RangeRecord) result2.records().get(1).message(); + manager.replay(rangeRecord); + streamRecord = (S3StreamRecord) result3.records().get(0).message(); + manager.replay(streamRecord); + rangeRecord = (RangeRecord) result3.records().get(1).message(); + manager.replay(rangeRecord); + + // verify the stream_0 and stream_1 metadata are updated, and the range_0 is created + S3StreamMetadata streamMetadata0 = manager.streamsMetadata().get(STREAM0); + verifyFirstRange(manager.streamsMetadata().get(STREAM0), EPOCH1, BROKER0); + verifyFirstRange(manager.streamsMetadata().get(STREAM1), EPOCH1, BROKER0); + + // TODO: support write range record, then roll the range and verify + // 3. broker_1 try to open stream_0 with epoch0 + ControllerResult result4 = manager.openStream( + new OpenStreamRequestData().setStreamId(STREAM0).setStreamEpoch(EPOCH0).setBrokerId(BROKER1)); + assertEquals(Errors.STREAM_FENCED.code(), result4.response().errorCode()); + assertEquals(0, result4.records().size()); + + // 4. broker_1 try to open stream_0 with epoch2 + ControllerResult result5 = manager.openStream( + new OpenStreamRequestData().setStreamId(STREAM0).setStreamEpoch(EPOCH2).setBrokerId(BROKER1)); + assertEquals(Errors.NONE.code(), result5.response().errorCode()); + assertEquals(2, result5.records().size()); + streamRecord = (S3StreamRecord) result5.records().get(0).message(); + manager.replay(streamRecord); + assertEquals(EPOCH2, streamRecord.epoch()); + assertEquals(1, streamRecord.rangeIndex()); + assertEquals(0L, streamRecord.startOffset()); + rangeRecord = (RangeRecord) result5.records().get(1).message(); + manager.replay(rangeRecord); + assertEquals(BROKER1, rangeRecord.brokerId()); + assertEquals(EPOCH2, rangeRecord.epoch()); + assertEquals(1, rangeRecord.rangeIndex()); + assertEquals(0L, rangeRecord.startOffset()); + assertEquals(-1L, rangeRecord.endOffset()); + + // verify that stream_0's epoch update to epoch2, and range index update to 1 + streamMetadata0 = manager.streamsMetadata().get(STREAM0); + assertEquals(EPOCH2, streamMetadata0.currentEpoch()); + assertEquals(1, streamMetadata0.currentRangeIndex()); + assertEquals(0L, streamMetadata0.startOffset()); + assertEquals(2, streamMetadata0.ranges().size()); + RangeMetadata rangeMetadata0 = streamMetadata0.ranges().get(1); + assertEquals(BROKER1, rangeMetadata0.brokerId()); + assertEquals(EPOCH2, rangeMetadata0.epoch()); + assertEquals(1, rangeMetadata0.rangeIndex()); + assertEquals(0L, rangeMetadata0.startOffset()); + assertEquals(-1L, rangeMetadata0.endOffset()); + + // 5. broker_0 try to open stream_1 with epoch1 + ControllerResult result6 = manager.openStream( + new OpenStreamRequestData().setStreamId(STREAM1).setStreamEpoch(EPOCH1).setBrokerId(BROKER0)); + assertEquals(Errors.NONE.code(), result6.response().errorCode()); + assertEquals(0, result6.records().size()); + + // 6. broker_1 try to open stream_1 with epoch1 + ControllerResult result7 = manager.openStream( + new OpenStreamRequestData().setStreamId(STREAM1).setStreamEpoch(EPOCH1).setBrokerId(BROKER1)); + assertEquals(Errors.STREAM_FENCED.code(), result7.response().errorCode()); + assertEquals(0, result7.records().size()); + } + + private void verifyInitializedStreamMetadata(S3StreamMetadata metadata) { + assertNotNull(metadata); + assertEquals(0, metadata.currentEpoch()); + assertEquals(-1, metadata.currentRangeIndex()); + assertEquals(0L, metadata.startOffset()); + } + + private void verifyFirstTimeOpenStreamResult(ControllerResult result, + long expectedEpoch, int expectedBrokerId) { + assertEquals(0, result.response().errorCode()); + assertEquals(0, result.response().startOffset()); + assertEquals(2, result.records().size()); + + // first record must be stream update record + ApiMessageAndVersion record0 = result.records().get(0); + assertInstanceOf(S3StreamRecord.class, record0.message()); + S3StreamRecord streamRecord0 = (S3StreamRecord) record0.message(); + assertEquals(expectedEpoch, streamRecord0.epoch()); + assertEquals(0, streamRecord0.rangeIndex()); + assertEquals(0L, streamRecord0.startOffset()); + + // second record must be range create record + ApiMessageAndVersion record1 = result.records().get(1); + assertInstanceOf(RangeRecord.class, record1.message()); + RangeRecord rangeRecord0 = (RangeRecord) record1.message(); + assertEquals(expectedBrokerId, rangeRecord0.brokerId()); + assertEquals(expectedEpoch, rangeRecord0.epoch()); + assertEquals(0, rangeRecord0.rangeIndex()); + assertEquals(0L, rangeRecord0.startOffset()); + assertEquals(-1L, rangeRecord0.endOffset()); + } + + private void verifyFirstRange(S3StreamMetadata streamMetadata, long expectedEpoch, int expectedBrokerId) { + assertNotNull(streamMetadata); + assertEquals(expectedEpoch, streamMetadata.currentEpoch()); + assertEquals(0, streamMetadata.currentRangeIndex()); + assertEquals(0L, streamMetadata.startOffset()); + assertEquals(1, streamMetadata.ranges().size()); + RangeMetadata rangeMetadata0 = streamMetadata.ranges().get(0); + assertEquals(expectedBrokerId, rangeMetadata0.brokerId()); + assertEquals(expectedEpoch, rangeMetadata0.epoch()); + assertEquals(0, rangeMetadata0.rangeIndex()); + assertEquals(0L, rangeMetadata0.startOffset()); + assertEquals(-1L, rangeMetadata0.endOffset()); + } +} diff --git a/metadata/src/test/java/org/apache/kafka/image/S3StreamMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/S3StreamMetadataImageTest.java index b8ed032853..6cf808a28a 100644 --- a/metadata/src/test/java/org/apache/kafka/image/S3StreamMetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/S3StreamMetadataImageTest.java @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Optional; import org.apache.kafka.common.metadata.RangeRecord; import org.apache.kafka.common.metadata.RemoveRangeRecord; import org.apache.kafka.common.metadata.RemoveS3StreamObjectRecord; @@ -80,7 +79,7 @@ public void testRanges() { RecordTestUtils.replayAll(delta1, delta1Records); // verify delta and check image's write S3StreamMetadataImage image2 = new S3StreamMetadataImage( - STREAM0, 1L, 0L, Map.of(0, new RangeMetadata(STREAM0, 1L, 0, 0L, Optional.empty(), BROKER0)), List.of()); + STREAM0, 1L, 0L, Map.of(0, new RangeMetadata(STREAM0, 1L, 0, 0L, -1L, BROKER0)), List.of()); assertEquals(image2, delta1.apply()); testToImageAndBack(image2); @@ -108,8 +107,8 @@ public void testRanges() { // verify delta and check image's write S3StreamMetadataImage image3 = new S3StreamMetadataImage( STREAM0, 2L, 0L, Map.of( - 0, new RangeMetadata(STREAM0, 1L, 0, 0L, Optional.of(100L), BROKER0), - 1, new RangeMetadata(STREAM0, 2L, 1, 101L, Optional.empty(), BROKER1)), List.of()); + 0, new RangeMetadata(STREAM0, 1L, 0, 0L, 100, BROKER0), + 1, new RangeMetadata(STREAM0, 2L, 1, 101L, 100, BROKER1)), List.of()); assertEquals(image3, delta2.apply()); testToImageAndBack(image3); @@ -127,7 +126,7 @@ public void testRanges() { // verify delta and check image's write S3StreamMetadataImage image4 = new S3StreamMetadataImage( STREAM0, 2L, 101L, Map.of( - 1, new RangeMetadata(STREAM0, 2L, 1, 101L, Optional.empty(), BROKER1)), List.of()); + 1, new RangeMetadata(STREAM0, 2L, 1, 101L, 100L, BROKER1)), List.of()); } @Test From 008af90ae87d7e5d4e78952982e6e19cc2a097d1 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Wed, 23 Aug 2023 18:27:44 +0800 Subject: [PATCH 3/5] refactor(s3): increase S3 related Errors code to avoid incompatibility future 1. increase S3 related Errors code to avoid incompatibility future Signed-off-by: TheR1sing3un --- .../main/java/org/apache/kafka/common/protocol/Errors.java | 6 +++--- .../kafka/controller/stream/StreamControlManager.java | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 8d74d0d806..78dd4c5d42 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -377,9 +377,9 @@ public enum Errors { // Kafka on S3 inject start - STREAM_EXIST(109, "The stream already exists.", StreamExistException::new), - STREAM_NOT_EXIST(110, "The stream does not exist.", StreamNotExistException::new), - STREAM_FENCED(111, "The stream is fenced.", StreamFencedException::new); + STREAM_EXIST(501, "The stream already exists.", StreamExistException::new), + STREAM_NOT_EXIST(502, "The stream does not exist.", StreamNotExistException::new), + STREAM_FENCED(503, "The stream is fenced.", StreamFencedException::new); // Kafka on S3 inject end diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java index c40497de0e..822ab10e80 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java @@ -201,7 +201,7 @@ public ControllerResult openStream(OpenStreamRequestData } // now the request in valid, update the stream's epoch and create a new range for this broker List records = new ArrayList<>(); - long newEpoch = streamMetadata.currentEpoch + 1; + long newEpoch = epoch + 1; int newRangeIndex = streamMetadata.currentRangeIndex + 1; // stream update record records.add(new ApiMessageAndVersion(new S3StreamRecord() From 7d128cd77dd28661f9699d904fd79ef355f7abc1 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Wed, 23 Aug 2023 18:38:09 +0800 Subject: [PATCH 4/5] refactor(s3): make range's endOffset exclusive 1. make range's endOffset exclusive Signed-off-by: TheR1sing3un --- .../kafka/controller/stream/StreamControlManager.java | 9 +++++---- .../org/apache/kafka/metadata/stream/RangeMetadata.java | 6 ++++++ .../kafka/controller/StreamControlManagerTest.java | 8 ++++---- 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java index 822ab10e80..f14ff1d72b 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java @@ -155,6 +155,8 @@ public StreamControlManager( this.brokersMetadata = new TimelineHashMap<>(snapshotRegistry, 0); } + // TODO: refactor to return next offset of stream in response + public ControllerResult createStream(CreateStreamRequestData data) { long streamId = data.streamId(); CreateStreamResponseData resp = new CreateStreamResponseData(); @@ -201,7 +203,7 @@ public ControllerResult openStream(OpenStreamRequestData } // now the request in valid, update the stream's epoch and create a new range for this broker List records = new ArrayList<>(); - long newEpoch = epoch + 1; + long newEpoch = epoch; int newRangeIndex = streamMetadata.currentRangeIndex + 1; // stream update record records.add(new ApiMessageAndVersion(new S3StreamRecord() @@ -215,15 +217,14 @@ public ControllerResult openStream(OpenStreamRequestData if (newRangeIndex > 0) { // means that the new range is not the first range in stream, get the last range's end offset RangeMetadata lastRangeMetadata = streamMetadata.ranges.get(streamMetadata.currentRangeIndex); - startOffset = lastRangeMetadata.endOffset() + 1; + startOffset = lastRangeMetadata.endOffset(); } // range create record records.add(new ApiMessageAndVersion(new RangeRecord() .setStreamId(streamId) .setBrokerId(brokerId) .setStartOffset(startOffset) - // default end offset set to (startOffset - 1) - .setEndOffset(startOffset - 1) + .setEndOffset(startOffset) .setEpoch(newEpoch) .setRangeIndex(newRangeIndex), (short) 0)); resp.setStartOffset(startOffset); diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/RangeMetadata.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/RangeMetadata.java index 5e4f00b587..067719fc33 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/RangeMetadata.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/RangeMetadata.java @@ -24,7 +24,13 @@ public class RangeMetadata implements Comparable { private long streamId; private long epoch; private int rangeIndex; + /** + * Inclusive + */ private long startOffset; + /** + * Exclusive + */ private long endOffset; private int brokerId; diff --git a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java index 8f678919e9..d0c5e314a5 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java @@ -158,7 +158,7 @@ public void testBasicOpenStream() { assertEquals(EPOCH2, rangeRecord.epoch()); assertEquals(1, rangeRecord.rangeIndex()); assertEquals(0L, rangeRecord.startOffset()); - assertEquals(-1L, rangeRecord.endOffset()); + assertEquals(0L, rangeRecord.endOffset()); // verify that stream_0's epoch update to epoch2, and range index update to 1 streamMetadata0 = manager.streamsMetadata().get(STREAM0); @@ -171,7 +171,7 @@ public void testBasicOpenStream() { assertEquals(EPOCH2, rangeMetadata0.epoch()); assertEquals(1, rangeMetadata0.rangeIndex()); assertEquals(0L, rangeMetadata0.startOffset()); - assertEquals(-1L, rangeMetadata0.endOffset()); + assertEquals(0L, rangeMetadata0.endOffset()); // 5. broker_0 try to open stream_1 with epoch1 ControllerResult result6 = manager.openStream( @@ -215,7 +215,7 @@ private void verifyFirstTimeOpenStreamResult(ControllerResult Date: Wed, 23 Aug 2023 18:42:35 +0800 Subject: [PATCH 5/5] feat(s3): add some TODO tasks in comments 1. add some TODO tasks in comments Signed-off-by: TheR1sing3un --- .../apache/kafka/controller/stream/StreamControlManager.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java index f14ff1d72b..bdfc66c42c 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java @@ -156,7 +156,8 @@ public StreamControlManager( } // TODO: refactor to return next offset of stream in response - + // TODO: lazy update range's end offset + // TODO: controller allocate the stream id public ControllerResult createStream(CreateStreamRequestData data) { long streamId = data.streamId(); CreateStreamResponseData resp = new CreateStreamResponseData();