From 09f828ae47143adbb5a9d7dc9f3aef777ec1618b Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Mon, 28 Aug 2023 19:06:31 +0800 Subject: [PATCH 1/4] feat(s3): support commit wal object 1. support commit wal object Signed-off-by: TheR1sing3un --- .../errors/s3/ObjectNotExistException.java | 28 ++++++ .../apache/kafka/common/protocol/Errors.java | 5 +- .../stream/S3ObjectControlManager.java | 22 +++++ .../stream/StreamControlManager.java | 93 ++++++++++++++++++- 4 files changed, 143 insertions(+), 5 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/common/errors/s3/ObjectNotExistException.java diff --git a/clients/src/main/java/org/apache/kafka/common/errors/s3/ObjectNotExistException.java b/clients/src/main/java/org/apache/kafka/common/errors/s3/ObjectNotExistException.java new file mode 100644 index 0000000000..7190276887 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/s3/ObjectNotExistException.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.errors.s3; + +import org.apache.kafka.common.errors.ApiException; + +public class ObjectNotExistException extends ApiException { + + public ObjectNotExistException(String message) { + super(message); + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 78dd4c5d42..098d7bcfa1 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -127,6 +127,7 @@ import org.apache.kafka.common.errors.UnsupportedForMessageFormatException; import org.apache.kafka.common.errors.UnsupportedSaslMechanismException; import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.errors.s3.ObjectNotExistException; import org.apache.kafka.common.errors.s3.StreamExistException; import org.apache.kafka.common.errors.s3.StreamFencedException; import org.apache.kafka.common.errors.s3.StreamNotExistException; @@ -379,8 +380,8 @@ public enum Errors { STREAM_EXIST(501, "The stream already exists.", StreamExistException::new), STREAM_NOT_EXIST(502, "The stream does not exist.", StreamNotExistException::new), - STREAM_FENCED(503, "The stream is fenced.", StreamFencedException::new); - + STREAM_FENCED(503, "The stream is fenced.", StreamFencedException::new), + OBJECT_NOT_EXIST(504, "The object does not exist.", ObjectNotExistException::new); // Kafka on S3 inject end diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java index 439e080e7c..f81428cd37 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.OptionalLong; @@ -153,6 +154,23 @@ public ControllerResult prepareObject(PrepareS3Obje return ControllerResult.atomicOf(records, response); } + public ControllerResult commitObject(long objectId, long objectSize) { + S3Object object = this.objectsMetadata.get(objectId); + if (object == null) { + log.error("object {} not exist when commit wal object", objectId); + return ControllerResult.of(Collections.emptyList(), false); + } + S3ObjectRecord record = new S3ObjectRecord() + .setObjectId(objectId) + .setObjectSize(objectSize) + .setObjectState(S3ObjectState.COMMITTED.toByte()) + .setPreparedTimeInMs(object.getPreparedTimeInMs()) + .setExpiredTimeInMs(object.getExpiredTimeInMs()) + .setCommittedTimeInMs(System.currentTimeMillis()); + return ControllerResult.of(Collections.singletonList( + new ApiMessageAndVersion(record, (short) 0)), true); + } + public void replay(AssignedS3ObjectIdRecord record) { nextAssignedObjectId.set(record.assignedS3ObjectId() + 1); } @@ -255,6 +273,10 @@ public Map objectsMetadata() { return objectsMetadata; } + public S3Object getObject(Long objectId) { + return this.objectsMetadata.get(objectId); + } + /** * S3Object's lifecycle listener. */ diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java index 1a9ce023da..2045bba1c8 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java @@ -23,6 +23,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.kafka.common.message.CloseStreamRequestData; import org.apache.kafka.common.message.CloseStreamResponseData; import org.apache.kafka.common.message.CommitCompactObjectRequestData; @@ -30,6 +32,7 @@ import org.apache.kafka.common.message.CommitStreamObjectRequestData; import org.apache.kafka.common.message.CommitStreamObjectResponseData; import org.apache.kafka.common.message.CommitWALObjectRequestData; +import org.apache.kafka.common.message.CommitWALObjectRequestData.ObjectStreamRange; import org.apache.kafka.common.message.CommitWALObjectResponseData; import org.apache.kafka.common.message.CreateStreamRequestData; import org.apache.kafka.common.message.CreateStreamResponseData; @@ -38,14 +41,19 @@ import org.apache.kafka.common.message.OpenStreamRequestData; import org.apache.kafka.common.message.OpenStreamResponseData; import org.apache.kafka.common.metadata.AssignedStreamIdRecord; +import org.apache.kafka.common.metadata.BrokerWALMetadataRecord; import org.apache.kafka.common.metadata.RangeRecord; import org.apache.kafka.common.metadata.RemoveRangeRecord; import org.apache.kafka.common.metadata.RemoveS3StreamRecord; import org.apache.kafka.common.metadata.S3StreamRecord; +import org.apache.kafka.common.metadata.WALObjectRecord; +import org.apache.kafka.common.metadata.WALObjectRecord.StreamIndex; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.controller.ControllerResult; import org.apache.kafka.metadata.stream.RangeMetadata; +import org.apache.kafka.metadata.stream.S3Object; +import org.apache.kafka.metadata.stream.S3ObjectStreamIndex; import org.apache.kafka.metadata.stream.S3StreamObject; import org.apache.kafka.metadata.stream.S3WALObject; import org.apache.kafka.server.common.ApiMessageAndVersion; @@ -61,9 +69,8 @@ */ public class StreamControlManager { - // TODO: assigned record - // TODO: timeline check public static class S3StreamMetadata { + // current epoch, when created but not open, use 0 represent private TimelineLong currentEpoch; // rangeIndex, when created but not open, there is no range, use -1 represent @@ -117,6 +124,7 @@ public String toString() { } public static class BrokerS3WALMetadata { + private int brokerId; private TimelineHashSet walObjects; @@ -257,7 +265,67 @@ public ControllerResult deleteStream(DeleteStreamReque } public ControllerResult commitWALObject(CommitWALObjectRequestData data) { - throw new UnsupportedOperationException(); + CommitWALObjectResponseData resp = new CommitWALObjectResponseData(); + List records = new ArrayList<>(); + List failedStreamIds = new ArrayList<>(); + resp.setFailedStreamIds(failedStreamIds); + long objectId = data.objectId(); + int brokerId = data.brokerId(); + long objectSize = data.objectSize(); + List streamRanges = data.objectStreamRanges(); + // verify stream epoch + streamRanges.stream().filter(range -> !verifyWalStreamRanges(range)) + .mapToLong(ObjectStreamRange::streamId).forEach(failedStreamIds::add); + if (!failedStreamIds.isEmpty()) { + StringBuilder failedIds = new StringBuilder(); + Stream.of(failedStreamIds).forEach(id -> failedIds.append(id).append(",")); + log.error("stream epoch not match when commit wal object, failed stream ids {}", failedIds); + resp.setErrorCode(Errors.STREAM_FENCED.code()); + return ControllerResult.of(Collections.emptyList(), resp); + } + // commit object + ControllerResult commitResult = this.s3ObjectControlManager.commitObject(objectId, objectSize); + if (!commitResult.response()) { + log.error("object {} not exist when commit wal object", objectId); + resp.setErrorCode(Errors.OBJECT_NOT_EXIST.code()); + return ControllerResult.of(Collections.emptyList(), resp); + } + records.addAll(commitResult.records()); + List indexes = new ArrayList<>(streamRanges.size()); + streamRanges.stream().forEach(range -> { + // build WAL object + long streamId = range.streamId(); + long startOffset = range.startOffset(); + long endOffset = range.endOffset(); + indexes.add(new S3ObjectStreamIndex(objectId, startOffset, endOffset)); + // TODO: support lazy flush range's end offset + // update range's offset + S3StreamMetadata streamMetadata = this.streamsMetadata.get(streamId); + RangeRecord record = new RangeRecord() + .setStreamId(streamId) + .setBrokerId(brokerId) + .setEpoch(streamMetadata.currentEpoch()) + .setRangeIndex(streamMetadata.currentRangeIndex()) + .setStartOffset(startOffset) + .setEndOffset(endOffset); + records.add(new ApiMessageAndVersion(record, (short) 0)); + }); + // update broker's wal object + BrokerS3WALMetadata brokerMetadata = this.brokersMetadata.get(brokerId); + if (brokerMetadata == null) { + // first time commit wal object, create broker's metadata + records.add(new ApiMessageAndVersion(new BrokerWALMetadataRecord() + .setBrokerId(brokerId), (short) 0)); + } + // create broker's wal object + records.add(new ApiMessageAndVersion(new WALObjectRecord() + .setObjectId(objectId) + .setBrokerId(brokerId) + .setStreamsIndex( + indexes.stream() + .map(S3ObjectStreamIndex::toRecordStreamIndex) + .collect(Collectors.toList())), (short) 0)); + return ControllerResult.atomicOf(records, resp); } public ControllerResult commitCompactObject(CommitCompactObjectRequestData data) { @@ -328,6 +396,25 @@ public Long nextAssignedStreamId() { return nextAssignedStreamId.get(); } + private boolean verifyWalStreamRanges(ObjectStreamRange range) { + long streamId = range.streamId(); + long epoch = range.streamEpoch(); + // verify + S3StreamMetadata streamMetadata = this.streamsMetadata.get(streamId); + if (streamMetadata == null) { + return false; + } + // compare epoch + if (streamMetadata.currentEpoch() > epoch) { + return false; + } + if (streamMetadata.currentEpoch() < epoch) { + return false; + } + return true; + } + + @Override public String toString() { return "StreamControlManager{" + From e7d4656709d3bb815b23c4acfe2be1261703a530 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Tue, 29 Aug 2023 11:05:25 +0800 Subject: [PATCH 2/4] test(s3): add commit wal object test 1. add commit wal object test Signed-off-by: TheR1sing3un --- .../message/CommitCompactObjectRequest.json | 2 +- .../message/CommitWALObjectRequest.json | 2 +- .../kafka/controller/QuorumController.java | 4 +- .../stream/S3ObjectControlManager.java | 12 +- .../stream/StreamControlManager.java | 56 ++++- .../kafka/metadata/stream/S3Object.java | 2 +- .../S3ObjectControlManagerTest.java | 94 +++++++- .../controller/StreamControlManagerTest.java | 215 +++++++++++++++++- 8 files changed, 360 insertions(+), 27 deletions(-) diff --git a/clients/src/main/resources/common/message/CommitCompactObjectRequest.json b/clients/src/main/resources/common/message/CommitCompactObjectRequest.json index caf49ca9ae..8528694449 100644 --- a/clients/src/main/resources/common/message/CommitCompactObjectRequest.json +++ b/clients/src/main/resources/common/message/CommitCompactObjectRequest.json @@ -51,7 +51,7 @@ "fields": [ { "name": "StreamId", - "type": "int32", + "type": "int64", "versions": "0+", "about": "The ID of the stream" }, diff --git a/clients/src/main/resources/common/message/CommitWALObjectRequest.json b/clients/src/main/resources/common/message/CommitWALObjectRequest.json index 77ab3b83de..972e84ace3 100644 --- a/clients/src/main/resources/common/message/CommitWALObjectRequest.json +++ b/clients/src/main/resources/common/message/CommitWALObjectRequest.json @@ -51,7 +51,7 @@ "fields": [ { "name": "StreamId", - "type": "int32", + "type": "int64", "versions": "0+", "about": "The ID of the stream" }, diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 930373e5c1..4f3b5e7239 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -89,6 +89,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.controller.stream.DefaultS3Operator; import org.apache.kafka.controller.stream.S3ObjectControlManager; import org.apache.kafka.controller.stream.StreamControlManager; import org.apache.kafka.metadata.BrokerHeartbeatReply; @@ -1801,7 +1802,8 @@ private QuorumController( // Kafka on S3 inject start this.s3Config = s3Config; - this.s3ObjectControlManager = new S3ObjectControlManager(this, snapshotRegistry, logContext, clusterId, s3Config); + this.s3ObjectControlManager = new S3ObjectControlManager( + this, snapshotRegistry, logContext, clusterId, s3Config, new DefaultS3Operator()); this.streamControlManager = new StreamControlManager(snapshotRegistry, logContext, this.s3ObjectControlManager); // Kafka on S3 inject end updateWriteOffset(-1); diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java index f81428cd37..3ea1b86e3c 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java @@ -90,7 +90,8 @@ public S3ObjectControlManager( SnapshotRegistry snapshotRegistry, LogContext logContext, String clusterId, - S3Config config) { + S3Config config, + S3Operator operator) { this.quorumController = quorumController; this.snapshotRegistry = snapshotRegistry; this.log = logContext.logger(S3ObjectControlManager.class); @@ -100,7 +101,7 @@ public S3ObjectControlManager( this.objectsMetadata = new TimelineHashMap<>(snapshotRegistry, 0); this.preparedObjects = new LinkedBlockingDeque<>(); this.markDestroyedObjects = new LinkedBlockingDeque<>(); - this.operator = new DefaultS3Operator(); + this.operator = operator; this.lifecycleListeners = new ArrayList<>(); this.lifecycleCheckTimer = Executors.newSingleThreadScheduledExecutor(); this.lifecycleCheckTimer.scheduleWithFixedDelay(() -> { @@ -167,7 +168,7 @@ public ControllerResult commitObject(long objectId, long objectSize) { .setPreparedTimeInMs(object.getPreparedTimeInMs()) .setExpiredTimeInMs(object.getExpiredTimeInMs()) .setCommittedTimeInMs(System.currentTimeMillis()); - return ControllerResult.of(Collections.singletonList( + return ControllerResult.of(List.of( new ApiMessageAndVersion(record, (short) 0)), true); } @@ -209,7 +210,7 @@ public ControllerResult checkS3ObjectsLifecycle() { forEach(obj -> { S3ObjectRecord record = new S3ObjectRecord() .setObjectId(obj.getObjectId()) - .setObjectState((byte) S3ObjectState.DESTROYED.ordinal()) + .setObjectState((byte) S3ObjectState.MARK_DESTROYED.ordinal()) .setObjectSize(obj.getObjectSize()) .setPreparedTimeInMs(obj.getPreparedTimeInMs()) .setExpiredTimeInMs(obj.getExpiredTimeInMs()) @@ -229,6 +230,9 @@ public ControllerResult checkS3ObjectsLifecycle() { .map(id -> new ObjectPair(id, objectsMetadata.get(id).getObjectKey())) .toArray(ObjectPair[]::new); String[] destroyedObjectKeys = Arrays.stream(destroyedObjects).map(ObjectPair::objectKey).toArray(String[]::new); + if (destroyedObjectKeys == null || destroyedObjectKeys.length == 0) { + return ControllerResult.of(records, null); + } Set destroyedObjectIds = Arrays.stream(destroyedObjects).map(ObjectPair::objectId).collect(Collectors.toSet()); // TODO: deal with failed objects in batch object deletion request this.operator.delele(destroyedObjectKeys).whenCompleteAsync((success, e) -> { diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java index 2045bba1c8..18993094bc 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java @@ -24,7 +24,6 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.kafka.common.message.CloseStreamRequestData; import org.apache.kafka.common.message.CloseStreamResponseData; import org.apache.kafka.common.message.CommitCompactObjectRequestData; @@ -52,7 +51,6 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.controller.ControllerResult; import org.apache.kafka.metadata.stream.RangeMetadata; -import org.apache.kafka.metadata.stream.S3Object; import org.apache.kafka.metadata.stream.S3ObjectStreamIndex; import org.apache.kafka.metadata.stream.S3StreamObject; import org.apache.kafka.metadata.stream.S3WALObject; @@ -137,7 +135,7 @@ public int getBrokerId() { return brokerId; } - public TimelineHashSet getWalObjects() { + public TimelineHashSet walObjects() { return walObjects; } @@ -251,7 +249,7 @@ public ControllerResult openStream(OpenStreamRequestData .setEndOffset(startOffset) .setEpoch(newEpoch) .setRangeIndex(newRangeIndex), (short) 0)); - resp.setStartOffset(startOffset); + resp.setStartOffset(streamMetadata.startOffset()); resp.setNextOffset(startOffset); return ControllerResult.atomicOf(records, resp); } @@ -274,12 +272,11 @@ public ControllerResult commitWALObject(CommitWALOb long objectSize = data.objectSize(); List streamRanges = data.objectStreamRanges(); // verify stream epoch - streamRanges.stream().filter(range -> !verifyWalStreamRanges(range)) + streamRanges.stream().filter(range -> !verifyWalStreamRanges(range, brokerId)) .mapToLong(ObjectStreamRange::streamId).forEach(failedStreamIds::add); if (!failedStreamIds.isEmpty()) { - StringBuilder failedIds = new StringBuilder(); - Stream.of(failedStreamIds).forEach(id -> failedIds.append(id).append(",")); - log.error("stream epoch not match when commit wal object, failed stream ids {}", failedIds); + log.error("stream is invalid when commit wal object, failed stream ids [{}]", + String.join(",", failedStreamIds.stream().map(String::valueOf).collect(Collectors.toList()))); resp.setErrorCode(Errors.STREAM_FENCED.code()); return ControllerResult.of(Collections.emptyList(), resp); } @@ -301,12 +298,13 @@ public ControllerResult commitWALObject(CommitWALOb // TODO: support lazy flush range's end offset // update range's offset S3StreamMetadata streamMetadata = this.streamsMetadata.get(streamId); + RangeMetadata oldRange = streamMetadata.ranges.get(streamMetadata.currentRangeIndex()); RangeRecord record = new RangeRecord() .setStreamId(streamId) .setBrokerId(brokerId) - .setEpoch(streamMetadata.currentEpoch()) - .setRangeIndex(streamMetadata.currentRangeIndex()) - .setStartOffset(startOffset) + .setEpoch(oldRange.epoch()) + .setRangeIndex(oldRange.rangeIndex()) + .setStartOffset(oldRange.startOffset()) .setEndOffset(endOffset); records.add(new ApiMessageAndVersion(record, (short) 0)); }); @@ -383,6 +381,28 @@ public void replay(RemoveRangeRecord record) { streamMetadata.ranges.remove(record.rangeIndex()); } + public void replay(BrokerWALMetadataRecord record) { + int brokerId = record.brokerId(); + this.brokersMetadata.computeIfAbsent(brokerId, id -> new BrokerS3WALMetadata(id, this.snapshotRegistry)); + } + + public void replay(WALObjectRecord record) { + long objectId = record.objectId(); + int brokerId = record.brokerId(); + List streamIndexes = record.streamsIndex(); + BrokerS3WALMetadata brokerMetadata = this.brokersMetadata.get(brokerId); + if (brokerMetadata == null) { + // should not happen + log.error("broker {} not exist when replay wal object record {}", brokerId, record); + return; + } + Map> indexMap = streamIndexes + .stream() + .map(S3ObjectStreamIndex::of) + .collect(Collectors.groupingBy(S3ObjectStreamIndex::getStreamId)); + brokerMetadata.walObjects.add(new S3WALObject(objectId, brokerId, indexMap)); + } + public Map streamsMetadata() { return streamsMetadata; @@ -396,7 +416,7 @@ public Long nextAssignedStreamId() { return nextAssignedStreamId.get(); } - private boolean verifyWalStreamRanges(ObjectStreamRange range) { + private boolean verifyWalStreamRanges(ObjectStreamRange range, long brokerId) { long streamId = range.streamId(); long epoch = range.streamEpoch(); // verify @@ -411,6 +431,18 @@ private boolean verifyWalStreamRanges(ObjectStreamRange range) { if (streamMetadata.currentEpoch() < epoch) { return false; } + RangeMetadata rangeMetadata = streamMetadata.ranges.get(streamMetadata.currentRangeIndex.get()); + if (rangeMetadata == null) { + return false; + } + // compare broker + if (rangeMetadata.brokerId() != brokerId) { + return false; + } + // compare offset + if (rangeMetadata.endOffset() != range.startOffset()) { + return false; + } return true; } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java index 9445b97de3..060975121e 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java @@ -150,6 +150,6 @@ public S3ObjectState getS3ObjectState() { } public boolean isExpired() { - return System.currentTimeMillis() > expiredTimeInMs; + return this.s3ObjectState == S3ObjectState.PREPARED && System.currentTimeMillis() > expiredTimeInMs; } } diff --git a/metadata/src/test/java/org/apache/kafka/controller/S3ObjectControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/S3ObjectControlManagerTest.java index 5affff47ac..369769781a 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/S3ObjectControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/S3ObjectControlManagerTest.java @@ -18,17 +18,27 @@ package org.apache.kafka.controller; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anySet; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.kafka.common.message.PrepareS3ObjectRequestData; import org.apache.kafka.common.message.PrepareS3ObjectResponseData; import org.apache.kafka.common.metadata.AssignedS3ObjectIdRecord; +import org.apache.kafka.common.metadata.RemoveS3ObjectRecord; import org.apache.kafka.common.metadata.S3ObjectRecord; import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.controller.stream.S3ObjectControlManager; +import org.apache.kafka.controller.stream.S3Operator; import org.apache.kafka.metadata.stream.S3Config; +import org.apache.kafka.metadata.stream.S3Object; import org.apache.kafka.metadata.stream.S3ObjectState; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.timeline.SnapshotRegistry; @@ -50,24 +60,25 @@ public class S3ObjectControlManagerTest { private static final S3Config S3_CONFIG = new S3Config(S3_REGION, S3_BUCKET); private S3ObjectControlManager manager; - private QuorumController controller; + private S3Operator operator; @BeforeEach public void setUp() { controller = Mockito.mock(QuorumController.class); + operator = Mockito.mock(S3Operator.class); LogContext logContext = new LogContext(); SnapshotRegistry registry = new SnapshotRegistry(logContext); - manager = new S3ObjectControlManager(controller, registry, logContext, CLUSTER, S3_CONFIG); + manager = new S3ObjectControlManager(controller, registry, logContext, CLUSTER, S3_CONFIG, operator); } @Test - public void testBasicPrepareObject() { + public void testPrepareObject() { // 1. prepare 3 objects ControllerResult result0 = manager.prepareObject(new PrepareS3ObjectRequestData() .setBrokerId(BROKER0) .setPreparedCount(3) - .setTimeToLiveInMs(1000)); + .setTimeToLiveInMs(60 * 1000)); assertEquals(Errors.NONE.code(), result0.response().errorCode()); assertEquals(4, result0.records().size()); ApiMessage message = result0.records().get(0).message(); @@ -75,7 +86,7 @@ public void testBasicPrepareObject() { AssignedS3ObjectIdRecord assignedRecord = (AssignedS3ObjectIdRecord) message; assertEquals(2, assignedRecord.assignedS3ObjectId()); for (int i = 0; i < 3; i++) { - verifyPrepareObjectRecord(result0.records().get(i + 1), i, 1000); + verifyPrepareObjectRecord(result0.records().get(i + 1), i, 60 * 1000); } manager.replay(assignedRecord); result0.records().stream().skip(1).map(ApiMessageAndVersion::message).forEach(record -> manager.replay((S3ObjectRecord) record)); @@ -85,7 +96,7 @@ public void testBasicPrepareObject() { manager.objectsMetadata().forEach((id, s3Object) -> { assertEquals(S3ObjectState.PREPARED, s3Object.getS3ObjectState()); assertEquals(id, s3Object.getObjectId()); - assertEquals(1000, s3Object.getExpiredTimeInMs() - s3Object.getPreparedTimeInMs()); + assertEquals(60 * 1000, s3Object.getExpiredTimeInMs() - s3Object.getPreparedTimeInMs()); }); assertEquals(3, manager.nextAssignedObjectId()); } @@ -99,4 +110,75 @@ private void verifyPrepareObjectRecord(ApiMessageAndVersion result, long expecte assertEquals((byte) S3ObjectState.PREPARED.ordinal(), record.objectState()); } + @Test + public void testCommitObject() { + // 1. prepare 1 object + prepareOneObject(60 * 1000); + + // 2. commit an object which not exist in controller + ControllerResult result1 = manager.commitObject(1, 1024); + assertFalse(result1.response()); + assertEquals(0, result1.records().size()); + + // 3. commit an valid object + ControllerResult result2 = manager.commitObject(0, 1024); + assertTrue(result2.response()); + assertEquals(1, result2.records().size()); + S3ObjectRecord record = (S3ObjectRecord) result2.records().get(0).message(); + manager.replay(record); + + // 4. verify the object is committed + assertEquals(1, manager.objectsMetadata().size()); + S3Object object = manager.objectsMetadata().get(0L); + assertEquals(S3ObjectState.COMMITTED, object.getS3ObjectState()); + } + + @Test + public void testExpiredCheck() throws InterruptedException { + AtomicBoolean hit = new AtomicBoolean(false); + Mockito.when(operator.delele(any(String[].class))) + .then(ink -> { + List keys = List.of((String[]) ink.getArgument(0)); + if (keys.size() > 0) { + hit.set(true); + } + return CompletableFuture.completedFuture(true); + }); + Mockito.when(controller.checkS3ObjectsLifecycle(any(ControllerRequestContext.class))) + .then(inv -> { + ControllerResult result = manager.checkS3ObjectsLifecycle(); + result.records().stream().map(record -> (S3ObjectRecord) record.message()).forEach(manager::replay); + return CompletableFuture.completedFuture(null); + }); + Mockito.when(controller.notifyS3ObjectDeleted(any(ControllerRequestContext.class), anySet())) + .then(inv -> { + ControllerResult result = manager.notifyS3ObjectDeleted(inv.getArgument(1)); + result.records().stream().map(record -> (RemoveS3ObjectRecord) record.message()).forEach(manager::replay); + return CompletableFuture.completedFuture(null); + }); + // 1. prepare 1 object + prepareOneObject(3 * 1000); + assertEquals(1, manager.objectsMetadata().size()); + // 3. wait for expired + Thread.sleep(11 * 1000); + assertEquals(0, manager.objectsMetadata().size()); + assertTrue(hit.get()); + } + + private void prepareOneObject(long ttl) { + ControllerResult result0 = manager.prepareObject(new PrepareS3ObjectRequestData() + .setBrokerId(BROKER0) + .setPreparedCount(1) + .setTimeToLiveInMs(ttl)); + assertEquals(Errors.NONE.code(), result0.response().errorCode()); + assertEquals(2, result0.records().size()); + ApiMessage message = result0.records().get(0).message(); + assertInstanceOf(AssignedS3ObjectIdRecord.class, message); + AssignedS3ObjectIdRecord assignedRecord = (AssignedS3ObjectIdRecord) message; + assertEquals(0, assignedRecord.assignedS3ObjectId()); + verifyPrepareObjectRecord(result0.records().get(1), 0, ttl); + manager.replay(assignedRecord); + manager.replay((S3ObjectRecord) result0.records().get(1).message()); + } + } diff --git a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java index bdbd59607d..d632da56df 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java @@ -20,18 +20,33 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyLong; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import org.apache.kafka.common.message.CommitWALObjectRequestData; +import org.apache.kafka.common.message.CommitWALObjectRequestData.ObjectStreamRange; +import org.apache.kafka.common.message.CommitWALObjectResponseData; import org.apache.kafka.common.message.CreateStreamRequestData; import org.apache.kafka.common.message.CreateStreamResponseData; import org.apache.kafka.common.message.OpenStreamRequestData; import org.apache.kafka.common.message.OpenStreamResponseData; import org.apache.kafka.common.metadata.AssignedStreamIdRecord; +import org.apache.kafka.common.metadata.BrokerWALMetadataRecord; +import org.apache.kafka.common.metadata.MetadataRecordType; import org.apache.kafka.common.metadata.RangeRecord; +import org.apache.kafka.common.metadata.RemoveRangeRecord; +import org.apache.kafka.common.metadata.RemoveS3StreamRecord; import org.apache.kafka.common.metadata.S3StreamRecord; +import org.apache.kafka.common.metadata.WALObjectRecord; +import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.controller.stream.S3ObjectControlManager; import org.apache.kafka.controller.stream.StreamControlManager; import org.apache.kafka.controller.stream.StreamControlManager.S3StreamMetadata; import org.apache.kafka.metadata.stream.RangeMetadata; @@ -41,6 +56,7 @@ import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.mockito.Mockito; @Timeout(value = 40) @Tag("S3Unit") @@ -59,12 +75,14 @@ public class StreamControlManagerTest { private final static long EPOCH2 = 2; private StreamControlManager manager; + private S3ObjectControlManager objectControlManager; @BeforeEach public void setUp() { LogContext context = new LogContext(); SnapshotRegistry registry = new SnapshotRegistry(context); - manager = new StreamControlManager(registry, context, null); + objectControlManager = Mockito.mock(S3ObjectControlManager.class); + manager = new StreamControlManager(registry, context, objectControlManager); } @Test @@ -231,6 +249,201 @@ public void testBasicOpenStream() { assertEquals(0, result7.records().size()); } + @Test + public void testCommitWal() { + Mockito.when(objectControlManager.commitObject(anyLong(), anyLong())).then(ink -> { + long objectId = ink.getArgument(0); + if (objectId == 1) { + return ControllerResult.of(Collections.emptyList(), false); + } + return ControllerResult.of(Collections.emptyList(), true); + }); + // 1. create and open stream_0 + CreateStreamRequestData request0 = new CreateStreamRequestData(); + ControllerResult result0 = manager.createStream(request0); + replay(manager, result0.records()); + + ControllerResult result2 = manager.openStream( + new OpenStreamRequestData().setStreamId(STREAM0).setStreamEpoch(EPOCH0).setBrokerId(BROKER0)); + verifyFirstTimeOpenStreamResult(result2, EPOCH0, BROKER0); + replay(manager, result2.records()); + + // 2. commit valid wal object + List streamRanges0 = new ArrayList<>(); + streamRanges0.add(new ObjectStreamRange() + .setStreamId(STREAM0) + .setStreamEpoch(EPOCH0) + .setStartOffset(0L) + .setEndOffset(100L)); + CommitWALObjectRequestData commitRequest0 = new CommitWALObjectRequestData() + .setObjectId(0L) + .setBrokerId(BROKER0) + .setObjectSize(999) + .setObjectStreamRanges(streamRanges0); + ControllerResult result3 = manager.commitWALObject(commitRequest0); + assertEquals(Errors.NONE.code(), result3.response().errorCode()); + replay(manager, result3.records()); + // verify range's end offset advanced and wal object is added + S3StreamMetadata streamMetadata0 = manager.streamsMetadata().get(STREAM0); + assertEquals(1, streamMetadata0.ranges().size()); + RangeMetadata rangeMetadata0 = streamMetadata0.ranges().get(0); + assertEquals(0L, rangeMetadata0.startOffset()); + assertEquals(100L, rangeMetadata0.endOffset()); + assertEquals(1, manager.brokersMetadata().get(BROKER0).walObjects().size()); + + // 3. commit a wal object that doesn't exist + List streamRanges1 = List.of(new ObjectStreamRange() + .setStreamId(STREAM0) + .setStreamEpoch(EPOCH0) + .setStartOffset(100) + .setEndOffset(200)); + CommitWALObjectRequestData commitRequest1 = new CommitWALObjectRequestData() + .setObjectId(1L) + .setBrokerId(BROKER0) + .setObjectSize(999) + .setObjectStreamRanges(streamRanges1); + ControllerResult result4 = manager.commitWALObject(commitRequest1); + assertEquals(Errors.OBJECT_NOT_EXIST.code(), result4.response().errorCode()); + + // 4. commit a wal object that doesn't match the next offset + List streamRanges2 = List.of(new ObjectStreamRange() + .setStreamId(STREAM0) + .setStreamEpoch(EPOCH0) + .setStartOffset(99) + .setEndOffset(200)); + CommitWALObjectRequestData commitRequest2 = new CommitWALObjectRequestData() + .setObjectId(2L) + .setBrokerId(BROKER0) + .setObjectSize(999) + .setObjectStreamRanges(streamRanges2); + ControllerResult result5 = manager.commitWALObject(commitRequest2); + assertEquals(Errors.STREAM_FENCED.code(), result5.response().errorCode()); + assertEquals(1, result5.response().failedStreamIds().size()); + assertEquals(STREAM0, result5.response().failedStreamIds().get(0).longValue()); + + // 5. commit a wal object that contains a stream which doesn't exist + List streamRanges3 = List.of(new ObjectStreamRange() + .setStreamId(STREAM0) + .setStreamEpoch(EPOCH0) + .setStartOffset(100) + .setEndOffset(200), + new ObjectStreamRange() + .setStreamId(STREAM1) + .setStreamEpoch(EPOCH0) + .setStartOffset(0) + .setEndOffset(100)); + CommitWALObjectRequestData commitRequest3 = new CommitWALObjectRequestData() + .setObjectId(3L) + .setBrokerId(BROKER0) + .setObjectSize(999) + .setObjectStreamRanges(streamRanges3); + ControllerResult result6 = manager.commitWALObject(commitRequest3); + assertEquals(Errors.STREAM_FENCED.code(), result6.response().errorCode()); + + // 6. commit a valid wal object + List streamRanges4 = List.of(new ObjectStreamRange() + .setStreamId(STREAM0) + .setStreamEpoch(EPOCH0) + .setStartOffset(100) + .setEndOffset(200)); + CommitWALObjectRequestData commitRequest4 = new CommitWALObjectRequestData() + .setObjectId(4L) + .setBrokerId(BROKER0) + .setObjectSize(999) + .setObjectStreamRanges(streamRanges4); + ControllerResult result7 = manager.commitWALObject(commitRequest4); + assertEquals(Errors.NONE.code(), result7.response().errorCode()); + replay(manager, result7.records()); + // verify range's end offset advanced and wal object is added + streamMetadata0 = manager.streamsMetadata().get(STREAM0); + assertEquals(1, streamMetadata0.ranges().size()); + rangeMetadata0 = streamMetadata0.ranges().get(0); + assertEquals(0L, rangeMetadata0.startOffset()); + assertEquals(200L, rangeMetadata0.endOffset()); + assertEquals(2, manager.brokersMetadata().get(BROKER0).walObjects().size()); + + // 7. broker_1 open stream_0 with epoch_1 + ControllerResult result8 = manager.openStream( + new OpenStreamRequestData().setStreamId(STREAM0).setStreamEpoch(EPOCH1).setBrokerId(BROKER1)); + assertEquals(Errors.NONE.code(), result8.response().errorCode()); + assertEquals(0L, result8.response().startOffset()); + assertEquals(200L, result8.response().nextOffset()); + replay(manager, result8.records()); + + // 8. broker_0 try to keep committing wal object which contains stream_0's data + List streamRanges5 = List.of(new ObjectStreamRange() + .setStreamId(STREAM0) + .setStreamEpoch(EPOCH0) + .setStartOffset(200) + .setEndOffset(300)); + CommitWALObjectRequestData commitRequest5 = new CommitWALObjectRequestData() + .setObjectId(5L) + .setBrokerId(BROKER0) + .setObjectSize(999) + .setObjectStreamRanges(streamRanges5); + ControllerResult result9 = manager.commitWALObject(commitRequest5); + assertEquals(Errors.STREAM_FENCED.code(), result9.response().errorCode()); + + // 9. broker_1 successfully commit wal object which contains stream_0's data + List streamRanges6 = List.of(new ObjectStreamRange() + .setStreamId(STREAM0) + .setStreamEpoch(EPOCH1) + .setStartOffset(200) + .setEndOffset(300)); + CommitWALObjectRequestData commitRequest6 = new CommitWALObjectRequestData() + .setBrokerId(BROKER1) + .setObjectId(6L) + .setObjectSize(999) + .setObjectStreamRanges(streamRanges6); + ControllerResult result10 = manager.commitWALObject(commitRequest6); + assertEquals(Errors.NONE.code(), result10.response().errorCode()); + replay(manager, result10.records()); + // verify range's end offset advanced and wal object is added + streamMetadata0 = manager.streamsMetadata().get(STREAM0); + assertEquals(2, streamMetadata0.ranges().size()); + rangeMetadata0 = streamMetadata0.ranges().get(0); + assertEquals(0L, rangeMetadata0.startOffset()); + assertEquals(200L, rangeMetadata0.endOffset()); + RangeMetadata rangeMetadata1 = streamMetadata0.ranges().get(1); + assertEquals(200L, rangeMetadata1.startOffset()); + assertEquals(300L, rangeMetadata1.endOffset()); + assertTrue(manager.brokersMetadata().containsKey(BROKER1)); + assertEquals(1, manager.brokersMetadata().get(BROKER1).walObjects().size()); + } + + private void replay(StreamControlManager manager, List records) { + List messages = records.stream().map(x -> x.message()) + .collect(Collectors.toList()); + for (ApiMessage message : messages) { + MetadataRecordType type = MetadataRecordType.fromId(message.apiKey()); + switch (type) { + case ASSIGNED_STREAM_ID_RECORD: + manager.replay((AssignedStreamIdRecord) message); + break; + case S3_STREAM_RECORD: + manager.replay((S3StreamRecord) message); + break; + case REMOVE_S3_STREAM_RECORD: + manager.replay((RemoveS3StreamRecord) message); + break; + case RANGE_RECORD: + manager.replay((RangeRecord) message); + break; + case REMOVE_RANGE_RECORD: + manager.replay((RemoveRangeRecord) message); + break; + case BROKER_WALMETADATA_RECORD: + manager.replay((BrokerWALMetadataRecord) message); + break; + case WALOBJECT_RECORD: + manager.replay((WALObjectRecord) message); + break; + default: + throw new IllegalStateException("Unknown metadata record type " + type); + } + } + } + private void verifyInitializedStreamMetadata(S3StreamMetadata metadata) { assertNotNull(metadata); assertEquals(0, metadata.currentEpoch()); From 5a411ec1deed0af6dff0eb96ddba6b590370a95e Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Tue, 29 Aug 2023 14:18:18 +0800 Subject: [PATCH 3/4] style(s3): fix code style 1. fix code style Signed-off-by: TheR1sing3un --- .../kafka/controller/StreamControlManagerTest.java | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java index d632da56df..76474f0cd7 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java @@ -20,7 +20,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.anyLong; import java.util.ArrayList; @@ -262,12 +261,10 @@ public void testCommitWal() { CreateStreamRequestData request0 = new CreateStreamRequestData(); ControllerResult result0 = manager.createStream(request0); replay(manager, result0.records()); - ControllerResult result2 = manager.openStream( new OpenStreamRequestData().setStreamId(STREAM0).setStreamEpoch(EPOCH0).setBrokerId(BROKER0)); verifyFirstTimeOpenStreamResult(result2, EPOCH0, BROKER0); replay(manager, result2.records()); - // 2. commit valid wal object List streamRanges0 = new ArrayList<>(); streamRanges0.add(new ObjectStreamRange() @@ -290,7 +287,6 @@ public void testCommitWal() { assertEquals(0L, rangeMetadata0.startOffset()); assertEquals(100L, rangeMetadata0.endOffset()); assertEquals(1, manager.brokersMetadata().get(BROKER0).walObjects().size()); - // 3. commit a wal object that doesn't exist List streamRanges1 = List.of(new ObjectStreamRange() .setStreamId(STREAM0) @@ -304,7 +300,6 @@ public void testCommitWal() { .setObjectStreamRanges(streamRanges1); ControllerResult result4 = manager.commitWALObject(commitRequest1); assertEquals(Errors.OBJECT_NOT_EXIST.code(), result4.response().errorCode()); - // 4. commit a wal object that doesn't match the next offset List streamRanges2 = List.of(new ObjectStreamRange() .setStreamId(STREAM0) @@ -320,7 +315,6 @@ public void testCommitWal() { assertEquals(Errors.STREAM_FENCED.code(), result5.response().errorCode()); assertEquals(1, result5.response().failedStreamIds().size()); assertEquals(STREAM0, result5.response().failedStreamIds().get(0).longValue()); - // 5. commit a wal object that contains a stream which doesn't exist List streamRanges3 = List.of(new ObjectStreamRange() .setStreamId(STREAM0) @@ -339,7 +333,6 @@ public void testCommitWal() { .setObjectStreamRanges(streamRanges3); ControllerResult result6 = manager.commitWALObject(commitRequest3); assertEquals(Errors.STREAM_FENCED.code(), result6.response().errorCode()); - // 6. commit a valid wal object List streamRanges4 = List.of(new ObjectStreamRange() .setStreamId(STREAM0) @@ -361,7 +354,6 @@ public void testCommitWal() { assertEquals(0L, rangeMetadata0.startOffset()); assertEquals(200L, rangeMetadata0.endOffset()); assertEquals(2, manager.brokersMetadata().get(BROKER0).walObjects().size()); - // 7. broker_1 open stream_0 with epoch_1 ControllerResult result8 = manager.openStream( new OpenStreamRequestData().setStreamId(STREAM0).setStreamEpoch(EPOCH1).setBrokerId(BROKER1)); @@ -369,7 +361,6 @@ public void testCommitWal() { assertEquals(0L, result8.response().startOffset()); assertEquals(200L, result8.response().nextOffset()); replay(manager, result8.records()); - // 8. broker_0 try to keep committing wal object which contains stream_0's data List streamRanges5 = List.of(new ObjectStreamRange() .setStreamId(STREAM0) @@ -383,7 +374,6 @@ public void testCommitWal() { .setObjectStreamRanges(streamRanges5); ControllerResult result9 = manager.commitWALObject(commitRequest5); assertEquals(Errors.STREAM_FENCED.code(), result9.response().errorCode()); - // 9. broker_1 successfully commit wal object which contains stream_0's data List streamRanges6 = List.of(new ObjectStreamRange() .setStreamId(STREAM0) @@ -407,7 +397,6 @@ public void testCommitWal() { RangeMetadata rangeMetadata1 = streamMetadata0.ranges().get(1); assertEquals(200L, rangeMetadata1.startOffset()); assertEquals(300L, rangeMetadata1.endOffset()); - assertTrue(manager.brokersMetadata().containsKey(BROKER1)); assertEquals(1, manager.brokersMetadata().get(BROKER1).walObjects().size()); } From c606e582dac1702c6a601cad9aec363043bb7f77 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Tue, 29 Aug 2023 14:39:57 +0800 Subject: [PATCH 4/4] fix(s3): still commit valid stream in wal object 1. still commit valid stream in wal object Signed-off-by: TheR1sing3un --- .../stream/StreamControlManager.java | 4 +- .../controller/StreamControlManagerTest.java | 45 +++++++------------ 2 files changed, 17 insertions(+), 32 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java index 18993094bc..18820a554b 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java @@ -277,8 +277,6 @@ public ControllerResult commitWALObject(CommitWALOb if (!failedStreamIds.isEmpty()) { log.error("stream is invalid when commit wal object, failed stream ids [{}]", String.join(",", failedStreamIds.stream().map(String::valueOf).collect(Collectors.toList()))); - resp.setErrorCode(Errors.STREAM_FENCED.code()); - return ControllerResult.of(Collections.emptyList(), resp); } // commit object ControllerResult commitResult = this.s3ObjectControlManager.commitObject(objectId, objectSize); @@ -289,7 +287,7 @@ public ControllerResult commitWALObject(CommitWALOb } records.addAll(commitResult.records()); List indexes = new ArrayList<>(streamRanges.size()); - streamRanges.stream().forEach(range -> { + streamRanges.stream().filter(range -> !failedStreamIds.contains(range.streamId())).forEach(range -> { // build WAL object long streamId = range.streamId(); long startOffset = range.startOffset(); diff --git a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java index 76474f0cd7..9bcd44cada 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java @@ -22,7 +22,6 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.mockito.ArgumentMatchers.anyLong; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -266,8 +265,7 @@ public void testCommitWal() { verifyFirstTimeOpenStreamResult(result2, EPOCH0, BROKER0); replay(manager, result2.records()); // 2. commit valid wal object - List streamRanges0 = new ArrayList<>(); - streamRanges0.add(new ObjectStreamRange() + List streamRanges0 = List.of(new ObjectStreamRange() .setStreamId(STREAM0) .setStreamEpoch(EPOCH0) .setStartOffset(0L) @@ -312,7 +310,7 @@ public void testCommitWal() { .setObjectSize(999) .setObjectStreamRanges(streamRanges2); ControllerResult result5 = manager.commitWALObject(commitRequest2); - assertEquals(Errors.STREAM_FENCED.code(), result5.response().errorCode()); + assertEquals(Errors.NONE.code(), result5.response().errorCode()); assertEquals(1, result5.response().failedStreamIds().size()); assertEquals(STREAM0, result5.response().failedStreamIds().get(0).longValue()); // 5. commit a wal object that contains a stream which doesn't exist @@ -332,36 +330,24 @@ public void testCommitWal() { .setObjectSize(999) .setObjectStreamRanges(streamRanges3); ControllerResult result6 = manager.commitWALObject(commitRequest3); - assertEquals(Errors.STREAM_FENCED.code(), result6.response().errorCode()); - // 6. commit a valid wal object - List streamRanges4 = List.of(new ObjectStreamRange() - .setStreamId(STREAM0) - .setStreamEpoch(EPOCH0) - .setStartOffset(100) - .setEndOffset(200)); - CommitWALObjectRequestData commitRequest4 = new CommitWALObjectRequestData() - .setObjectId(4L) - .setBrokerId(BROKER0) - .setObjectSize(999) - .setObjectStreamRanges(streamRanges4); - ControllerResult result7 = manager.commitWALObject(commitRequest4); - assertEquals(Errors.NONE.code(), result7.response().errorCode()); - replay(manager, result7.records()); + assertEquals(Errors.NONE.code(), result6.response().errorCode()); + assertEquals(1, result6.response().failedStreamIds().size()); + assertEquals(STREAM1, result6.response().failedStreamIds().get(0).longValue()); + replay(manager, result6.records()); // verify range's end offset advanced and wal object is added streamMetadata0 = manager.streamsMetadata().get(STREAM0); assertEquals(1, streamMetadata0.ranges().size()); - rangeMetadata0 = streamMetadata0.ranges().get(0); - assertEquals(0L, rangeMetadata0.startOffset()); - assertEquals(200L, rangeMetadata0.endOffset()); + assertEquals(0L, streamMetadata0.ranges().get(0).startOffset()); + assertEquals(200L, streamMetadata0.ranges().get(0).endOffset()); assertEquals(2, manager.brokersMetadata().get(BROKER0).walObjects().size()); - // 7. broker_1 open stream_0 with epoch_1 + // 6. broker_1 open stream_0 with epoch_1 ControllerResult result8 = manager.openStream( new OpenStreamRequestData().setStreamId(STREAM0).setStreamEpoch(EPOCH1).setBrokerId(BROKER1)); assertEquals(Errors.NONE.code(), result8.response().errorCode()); assertEquals(0L, result8.response().startOffset()); assertEquals(200L, result8.response().nextOffset()); replay(manager, result8.records()); - // 8. broker_0 try to keep committing wal object which contains stream_0's data + // 7. broker_0 try to keep committing wal object which contains stream_0's data List streamRanges5 = List.of(new ObjectStreamRange() .setStreamId(STREAM0) .setStreamEpoch(EPOCH0) @@ -373,8 +359,10 @@ public void testCommitWal() { .setObjectSize(999) .setObjectStreamRanges(streamRanges5); ControllerResult result9 = manager.commitWALObject(commitRequest5); - assertEquals(Errors.STREAM_FENCED.code(), result9.response().errorCode()); - // 9. broker_1 successfully commit wal object which contains stream_0's data + assertEquals(Errors.NONE.code(), result9.response().errorCode()); + assertEquals(1, result9.response().failedStreamIds().size()); + assertEquals(STREAM0, result9.response().failedStreamIds().get(0).longValue()); + // 8. broker_1 successfully commit wal object which contains stream_0's data List streamRanges6 = List.of(new ObjectStreamRange() .setStreamId(STREAM0) .setStreamEpoch(EPOCH1) @@ -391,9 +379,8 @@ public void testCommitWal() { // verify range's end offset advanced and wal object is added streamMetadata0 = manager.streamsMetadata().get(STREAM0); assertEquals(2, streamMetadata0.ranges().size()); - rangeMetadata0 = streamMetadata0.ranges().get(0); - assertEquals(0L, rangeMetadata0.startOffset()); - assertEquals(200L, rangeMetadata0.endOffset()); + assertEquals(0L, streamMetadata0.ranges().get(0).startOffset()); + assertEquals(200L, streamMetadata0.ranges().get(0).endOffset()); RangeMetadata rangeMetadata1 = streamMetadata0.ranges().get(1); assertEquals(200L, rangeMetadata1.startOffset()); assertEquals(300L, rangeMetadata1.endOffset());