From 4ba6204dad43db0ad459ea7b3e22d5b0638c2c09 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Tue, 5 Sep 2023 11:39:29 +0800 Subject: [PATCH] feat(s3): replace object-id with order-id for WAL 1. replace object-id with order-id for WAL Signed-off-by: TheR1sing3un --- .../kafka/log/s3/StreamMetadataManager.java | 2 +- .../log/s3/memory/MemoryMetadataManager.java | 2 +- .../stream/StreamControlManager.java | 1 + .../kafka/image/BrokerS3WALMetadataDelta.java | 6 +- .../kafka/image/BrokerS3WALMetadataImage.java | 26 ++-- .../kafka/image/S3StreamMetadataImage.java | 2 +- .../kafka/image/S3StreamsMetadataImage.java | 2 +- .../stream/S3StreamConstant.java | 4 +- .../kafka/metadata/stream/S3WALObject.java | 8 +- .../metadata/stream/SortedWALObjects.java | 54 ++++++++ .../metadata/stream/SortedWALObjectsList.java | 126 ++++++++++++++++++ .../controller/StreamControlManagerTest.java | 2 +- .../image/BrokerS3WALMetadataImageTest.java | 15 ++- .../image/S3StreamMetadataImageTest.java | 2 +- .../image/S3StreamsMetadataImageTest.java | 21 +-- .../stream/SortedWALObjectsListTest.java | 70 ++++++++++ 16 files changed, 300 insertions(+), 43 deletions(-) rename metadata/src/main/java/org/apache/kafka/{controller => metadata}/stream/S3StreamConstant.java (93%) create mode 100644 metadata/src/main/java/org/apache/kafka/metadata/stream/SortedWALObjects.java create mode 100644 metadata/src/main/java/org/apache/kafka/metadata/stream/SortedWALObjectsList.java create mode 100644 metadata/src/test/java/org/apache/kafka/metadata/stream/SortedWALObjectsListTest.java diff --git a/core/src/main/scala/kafka/log/s3/StreamMetadataManager.java b/core/src/main/scala/kafka/log/s3/StreamMetadataManager.java index b5e8983235..3c63d647e7 100644 --- a/core/src/main/scala/kafka/log/s3/StreamMetadataManager.java +++ b/core/src/main/scala/kafka/log/s3/StreamMetadataManager.java @@ -25,7 +25,7 @@ import kafka.server.BrokerServer; import kafka.server.KafkaConfig; import kafka.server.MetadataCache; -import org.apache.kafka.controller.stream.S3StreamConstant; +import org.apache.kafka.metadata.stream.S3StreamConstant; import org.apache.kafka.image.BrokerS3WALMetadataImage; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; diff --git a/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java b/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java index 041a69c06d..fb6f0b3a43 100644 --- a/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java +++ b/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java @@ -41,7 +41,7 @@ import kafka.log.s3.objects.ObjectStreamRange; import kafka.log.s3.objects.OpenStreamMetadata; import org.apache.kafka.common.errors.s3.StreamNotClosedException; -import org.apache.kafka.controller.stream.S3StreamConstant; +import org.apache.kafka.metadata.stream.S3StreamConstant; import org.apache.kafka.metadata.stream.S3ObjectMetadata; import kafka.log.s3.streams.StreamManager; import org.apache.kafka.metadata.stream.ObjectUtils; diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java index 0175bf5e51..3ee287fdab 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java @@ -57,6 +57,7 @@ import org.apache.kafka.controller.ControllerResult; import org.apache.kafka.metadata.stream.RangeMetadata; import org.apache.kafka.metadata.stream.S3ObjectStreamIndex; +import org.apache.kafka.metadata.stream.S3StreamConstant; import org.apache.kafka.metadata.stream.S3StreamObject; import org.apache.kafka.metadata.stream.S3WALObject; import org.apache.kafka.metadata.stream.StreamState; diff --git a/metadata/src/main/java/org/apache/kafka/image/BrokerS3WALMetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/BrokerS3WALMetadataDelta.java index 2853a68986..281dce4e54 100644 --- a/metadata/src/main/java/org/apache/kafka/image/BrokerS3WALMetadataDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/BrokerS3WALMetadataDelta.java @@ -17,16 +17,16 @@ package org.apache.kafka.image; -import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import org.apache.kafka.common.metadata.BrokerWALMetadataRecord; import org.apache.kafka.common.metadata.RemoveWALObjectRecord; import org.apache.kafka.common.metadata.WALObjectRecord; import org.apache.kafka.metadata.stream.S3WALObject; +import org.apache.kafka.metadata.stream.SortedWALObjects; +import org.apache.kafka.metadata.stream.SortedWALObjectsList; public class BrokerS3WALMetadataDelta { @@ -58,7 +58,7 @@ public void replay(RemoveWALObjectRecord record) { } public BrokerS3WALMetadataImage apply() { - List newS3WALObjects = new ArrayList<>(image.getWalObjects()); + SortedWALObjects newS3WALObjects = new SortedWALObjectsList(image.getWalObjects()); // add all changed WAL objects newS3WALObjects.addAll(addedS3WALObjects.values()); // remove all removed WAL objects diff --git a/metadata/src/main/java/org/apache/kafka/image/BrokerS3WALMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/BrokerS3WALMetadataImage.java index 6308c754c8..6e0a9eba14 100644 --- a/metadata/src/main/java/org/apache/kafka/image/BrokerS3WALMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/BrokerS3WALMetadataImage.java @@ -18,24 +18,26 @@ package org.apache.kafka.image; -import java.util.List; +import java.util.Iterator; import java.util.Objects; -import java.util.stream.Collectors; import org.apache.kafka.common.metadata.BrokerWALMetadataRecord; +import org.apache.kafka.metadata.stream.S3StreamConstant; import org.apache.kafka.metadata.stream.S3WALObject; import org.apache.kafka.image.writer.ImageWriter; import org.apache.kafka.image.writer.ImageWriterOptions; +import org.apache.kafka.metadata.stream.SortedWALObjects; +import org.apache.kafka.metadata.stream.SortedWALObjectsList; import org.apache.kafka.server.common.ApiMessageAndVersion; public class BrokerS3WALMetadataImage { - public static final BrokerS3WALMetadataImage EMPTY = new BrokerS3WALMetadataImage(-1, List.of()); + public static final BrokerS3WALMetadataImage EMPTY = new BrokerS3WALMetadataImage(S3StreamConstant.INVALID_BROKER_ID, new SortedWALObjectsList()); private final int brokerId; - private final List s3WalObjects; + private final SortedWALObjects s3WalObjects; - public BrokerS3WALMetadataImage(int brokerId, List s3WalObjects) { + public BrokerS3WALMetadataImage(int brokerId, SortedWALObjects sourceWALObjects) { this.brokerId = brokerId; - this.s3WalObjects = s3WalObjects; + this.s3WalObjects = new SortedWALObjectsList(sourceWALObjects); } @Override @@ -58,10 +60,14 @@ public int hashCode() { public void write(ImageWriter writer, ImageWriterOptions options) { writer.write(new ApiMessageAndVersion(new BrokerWALMetadataRecord() .setBrokerId(brokerId), (short) 0)); - s3WalObjects.forEach(walObject -> writer.write(walObject.toRecord())); + Iterator iterator = s3WalObjects.iterator(); + while (iterator.hasNext()) { + S3WALObject s3WALObject = iterator.next(); + writer.write(s3WALObject.toRecord()); + } } - public List getWalObjects() { + public SortedWALObjects getWalObjects() { return s3WalObjects; } @@ -73,9 +79,7 @@ public int getBrokerId() { public String toString() { return "BrokerS3WALMetadataImage{" + "brokerId=" + brokerId + - ", s3WalObjects=" + s3WalObjects.stream() - .map(wal -> wal.toString()) - .collect(Collectors.joining(", ")) + + ", s3WalObjects=" + s3WalObjects + '}'; } } diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java index 67b7310bb3..64a71263dd 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java @@ -21,7 +21,7 @@ import java.util.Objects; import java.util.stream.Collectors; import org.apache.kafka.common.metadata.S3StreamRecord; -import org.apache.kafka.controller.stream.S3StreamConstant; +import org.apache.kafka.metadata.stream.S3StreamConstant; import org.apache.kafka.metadata.stream.RangeMetadata; import org.apache.kafka.metadata.stream.S3StreamObject; import org.apache.kafka.image.writer.ImageWriter; diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java index 430002fb92..a509d73cf0 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java @@ -142,7 +142,7 @@ public InRangeObjects getObjects(int limit) { if (wal == null) { return InRangeObjects.INVALID; } - List walObjects = wal.getWalObjects().stream() + List walObjects = wal.getWalObjects().list().stream() .filter(obj -> obj.streamsIndex().containsKey(streamId) && obj.streamsIndex().get(streamId).size() != 0) .flatMap(obj -> { List indexes = obj.streamsIndex().get(streamId); diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/S3StreamConstant.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamConstant.java similarity index 93% rename from metadata/src/main/java/org/apache/kafka/controller/stream/S3StreamConstant.java rename to metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamConstant.java index 10da5563ff..1c9e9103c1 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/S3StreamConstant.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamConstant.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.kafka.controller.stream; +package org.apache.kafka.metadata.stream; public class S3StreamConstant { @@ -33,6 +33,8 @@ public class S3StreamConstant { public static final long INVALID_OFFSET = -1L; + public static final int INVALID_BROKER_ID = -1; + public static final long MAX_OBJECT_ID = Long.MAX_VALUE; } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java index c68a35a69c..845100ca30 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java @@ -41,11 +41,6 @@ public class S3WALObject implements Comparable { private final S3ObjectType objectType = S3ObjectType.UNKNOWN; - public S3WALObject(long objectId, int brokerId, final Map> streamsIndex) { - // default orderId is equal to objectId - this(objectId, brokerId, streamsIndex, objectId); - } - public S3WALObject(long objectId, int brokerId, final Map> streamsIndex, long orderId) { this.orderId = orderId; this.objectId = objectId; @@ -71,6 +66,7 @@ public ApiMessageAndVersion toRecord() { return new ApiMessageAndVersion(new WALObjectRecord() .setObjectId(objectId) .setBrokerId(brokerId) + .setOrderId(orderId) .setStreamsIndex( streamsIndex.values().stream().flatMap(List::stream) .map(S3ObjectStreamIndex::toRecordStreamIndex) @@ -82,7 +78,7 @@ public static S3WALObject of(WALObjectRecord record) { .map(index -> new S3ObjectStreamIndex(index.streamId(), index.startOffset(), index.endOffset())) .collect(Collectors.groupingBy(S3ObjectStreamIndex::getStreamId)); S3WALObject s3WalObject = new S3WALObject(record.objectId(), record.brokerId(), - collect); + collect, record.orderId()); return s3WalObject; } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/SortedWALObjects.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/SortedWALObjects.java new file mode 100644 index 0000000000..bc61009d78 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/SortedWALObjects.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.stream; + +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.function.Predicate; + +public interface SortedWALObjects { + + int size(); + + boolean isEmpty(); + + Iterator iterator(); + + List list(); + + boolean contains(Object o); + + boolean add(S3WALObject s3WALObject); + + default boolean addAll(Collection walObjects) { + walObjects.forEach(this::add); + return true; + } + + boolean remove(Object o); + + default boolean removeIf(Predicate filter) { + return this.list().removeIf(filter); + } + + S3WALObject get(int index); + + void clear(); + +} diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/SortedWALObjectsList.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/SortedWALObjectsList.java new file mode 100644 index 0000000000..b45e2e0ad9 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/SortedWALObjectsList.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.stream; + +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +public class SortedWALObjectsList implements SortedWALObjects { + + private final List list; + + public SortedWALObjectsList(SortedWALObjects source) { + this.list = new LinkedList<>(source.list()); + } + + public SortedWALObjectsList() { + this.list = new LinkedList<>(); + } + + /** + * Construct a SortedWALObjectsList from a list of S3WALObjects. + * @param list the list of S3WALObjects, must guarantee that the list is sorted + */ + public SortedWALObjectsList(List list) { + this.list = list; + } + + @Override + public int size() { + return this.list.size(); + } + + @Override + public boolean isEmpty() { + return this.list.isEmpty(); + } + + @Override + public Iterator iterator() { + return this.list.iterator(); + } + + @Override + public List list() { + return list; + } + + @Override + public boolean contains(Object o) { + return this.list.contains(o); + } + + @Override + public boolean add(S3WALObject s3WALObject) { + // TODO: optimize by binary search + for (int index = 0; index < this.list.size(); index++) { + S3WALObject current = this.list.get(index); + if (s3WALObject.compareTo(current) <= 0) { + this.list.add(index, s3WALObject); + return true; + } + } + this.list.add(s3WALObject); + return true; + } + + @Override + public boolean remove(Object o) { + // TODO: optimize by binary search + return this.list.remove(o); + } + + + + @Override + public S3WALObject get(int index) { + return this.list.get(index); + } + + @Override + public void clear() { + this.list.clear(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SortedWALObjectsList that = (SortedWALObjectsList) o; + return Objects.equals(list, that.list); + } + + @Override + public int hashCode() { + return Objects.hash(list); + } + + @Override + public String toString() { + return "SortedWALObjectsList{" + + "list=" + list.stream().map(S3WALObject::toString).collect(Collectors.joining(",")) + + '}'; + } +} diff --git a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java index bf86c11ec0..7bf57e3f3d 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java @@ -54,7 +54,7 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.controller.stream.S3ObjectControlManager; -import org.apache.kafka.controller.stream.S3StreamConstant; +import org.apache.kafka.metadata.stream.S3StreamConstant; import org.apache.kafka.controller.stream.StreamControlManager; import org.apache.kafka.controller.stream.StreamControlManager.S3StreamMetadata; import org.apache.kafka.metadata.stream.RangeMetadata; diff --git a/metadata/src/test/java/org/apache/kafka/image/BrokerS3WALMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/BrokerS3WALMetadataImageTest.java index e8b4b0fb92..ce643ef872 100644 --- a/metadata/src/test/java/org/apache/kafka/image/BrokerS3WALMetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/BrokerS3WALMetadataImageTest.java @@ -30,6 +30,7 @@ import org.apache.kafka.metadata.RecordTestUtils; import org.apache.kafka.metadata.stream.S3ObjectStreamIndex; import org.apache.kafka.metadata.stream.S3WALObject; +import org.apache.kafka.metadata.stream.SortedWALObjectsList; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -47,7 +48,7 @@ public class BrokerS3WALMetadataImageTest { @Test public void testS3WALObjects() { - BrokerS3WALMetadataImage image0 = new BrokerS3WALMetadataImage(BROKER0, List.of()); + BrokerS3WALMetadataImage image0 = new BrokerS3WALMetadataImage(BROKER0, new SortedWALObjectsList()); List delta0Records = new ArrayList<>(); BrokerS3WALMetadataDelta delta0 = new BrokerS3WALMetadataDelta(image0); // 1. create WALObject0 and WALObject1 @@ -56,6 +57,7 @@ public void testS3WALObjects() { delta0Records.add(new ApiMessageAndVersion(new WALObjectRecord() .setObjectId(0L) .setBrokerId(BROKER0) + .setOrderId(0L) .setStreamsIndex(List.of( new WALObjectRecord.StreamIndex() .setStreamId(STREAM0) @@ -68,6 +70,7 @@ public void testS3WALObjects() { delta0Records.add(new ApiMessageAndVersion(new WALObjectRecord() .setObjectId(1L) .setBrokerId(BROKER0) + .setOrderId(1L) .setStreamsIndex(List.of( new WALObjectRecord.StreamIndex() .setStreamId(STREAM0) @@ -75,12 +78,12 @@ public void testS3WALObjects() { .setEndOffset(200L))), (short) 0)); RecordTestUtils.replayAll(delta0, delta0Records); // verify delta and check image's write - BrokerS3WALMetadataImage image1 = new BrokerS3WALMetadataImage(BROKER0, List.of( + BrokerS3WALMetadataImage image1 = new BrokerS3WALMetadataImage(BROKER0, new SortedWALObjectsList(List.of( new S3WALObject(0L, BROKER0, Map.of( STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 0L, 100L)), - STREAM1, List.of(new S3ObjectStreamIndex(STREAM1, 0L, 200L)))), + STREAM1, List.of(new S3ObjectStreamIndex(STREAM1, 0L, 200L))), 0L), new S3WALObject(1L, BROKER0, Map.of( - STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 101L, 200L)))))); + STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 101L, 200L))), 1L)))); assertEquals(image1, delta0.apply()); testToImageAndBack(image1); @@ -91,9 +94,9 @@ public void testS3WALObjects() { .setObjectId(0L), (short) 0)); RecordTestUtils.replayAll(delta1, delta1Records); // verify delta and check image's write - BrokerS3WALMetadataImage image2 = new BrokerS3WALMetadataImage(BROKER0, List.of( + BrokerS3WALMetadataImage image2 = new BrokerS3WALMetadataImage(BROKER0, new SortedWALObjectsList(List.of( new S3WALObject(1L, BROKER0, Map.of( - STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 101L, 200L)))))); + STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 101L, 200L))), 1L)))); assertEquals(image2, delta1.apply()); testToImageAndBack(image2); } diff --git a/metadata/src/test/java/org/apache/kafka/image/S3StreamMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/S3StreamMetadataImageTest.java index f0ba6e929a..ec2daa29a5 100644 --- a/metadata/src/test/java/org/apache/kafka/image/S3StreamMetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/S3StreamMetadataImageTest.java @@ -28,7 +28,7 @@ import org.apache.kafka.common.metadata.RemoveS3StreamObjectRecord; import org.apache.kafka.common.metadata.S3StreamObjectRecord; import org.apache.kafka.common.metadata.S3StreamRecord; -import org.apache.kafka.controller.stream.S3StreamConstant; +import org.apache.kafka.metadata.stream.S3StreamConstant; import org.apache.kafka.image.writer.ImageWriterOptions; import org.apache.kafka.image.writer.RecordListWriter; import org.apache.kafka.metadata.RecordTestUtils; diff --git a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java index 3896c2396d..2cfae9d27d 100644 --- a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java @@ -33,6 +33,7 @@ import org.apache.kafka.metadata.stream.S3ObjectStreamIndex; import org.apache.kafka.metadata.stream.S3StreamObject; import org.apache.kafka.metadata.stream.S3WALObject; +import org.apache.kafka.metadata.stream.SortedWALObjectsList; import org.apache.kafka.metadata.stream.StreamState; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.junit.jupiter.api.Tag; @@ -99,18 +100,18 @@ private void testToImageAndBack(S3StreamsMetadataImage image) { @Test public void testGetObjects() { List broker0WalObjects = List.of( - new S3WALObject(0, BROKER0, Map.of(STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 100L, 120L)))), - new S3WALObject(1, BROKER0, Map.of(STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 120L, 140L)))), - new S3WALObject(2, BROKER0, Map.of(STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 180L, 200L)))), + new S3WALObject(0, BROKER0, Map.of(STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 100L, 120L))), 0L), + new S3WALObject(1, BROKER0, Map.of(STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 120L, 140L))), 1L), + new S3WALObject(2, BROKER0, Map.of(STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 180L, 200L))), 2L), new S3WALObject(3, BROKER0, Map.of(STREAM0, List.of( - new S3ObjectStreamIndex(STREAM0, 400L, 420L), new S3ObjectStreamIndex(STREAM0, 500L, 520L)))), - new S3WALObject(4, BROKER0, Map.of(STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 520L, 600L))))); + new S3ObjectStreamIndex(STREAM0, 400L, 420L), new S3ObjectStreamIndex(STREAM0, 500L, 520L))), 3L), + new S3WALObject(4, BROKER0, Map.of(STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 520L, 600L))), 4L)); List broker1WalObjects = List.of( - new S3WALObject(5, BROKER1, Map.of(STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 140L, 160L)))), - new S3WALObject(6, BROKER1, Map.of(STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 160L, 180L)))), - new S3WALObject(7, BROKER1, Map.of(STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 420L, 500L))))); - BrokerS3WALMetadataImage broker0WALMetadataImage = new BrokerS3WALMetadataImage(BROKER0, broker0WalObjects); - BrokerS3WALMetadataImage broker1WALMetadataImage = new BrokerS3WALMetadataImage(BROKER1, broker1WalObjects); + new S3WALObject(5, BROKER1, Map.of(STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 140L, 160L))), 0L), + new S3WALObject(6, BROKER1, Map.of(STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 160L, 180L))), 1L), + new S3WALObject(7, BROKER1, Map.of(STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 420L, 500L))), 2L)); + BrokerS3WALMetadataImage broker0WALMetadataImage = new BrokerS3WALMetadataImage(BROKER0, new SortedWALObjectsList(broker0WalObjects)); + BrokerS3WALMetadataImage broker1WALMetadataImage = new BrokerS3WALMetadataImage(BROKER1, new SortedWALObjectsList(broker1WalObjects)); Map ranges = Map.of( 0, new RangeMetadata(STREAM0, 0L, 0, 10L, 140L, BROKER0), 1, new RangeMetadata(STREAM0, 1L, 1, 140L, 180L, BROKER1), diff --git a/metadata/src/test/java/org/apache/kafka/metadata/stream/SortedWALObjectsListTest.java b/metadata/src/test/java/org/apache/kafka/metadata/stream/SortedWALObjectsListTest.java new file mode 100644 index 0000000000..fa478ac02f --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/metadata/stream/SortedWALObjectsListTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.List; +import java.util.stream.Collectors; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +@Tag("S3Unit") +public class SortedWALObjectsListTest { + + + @Test + public void testSorted() { + SortedWALObjects objects = new SortedWALObjectsList(); + objects.add(new S3WALObject(0, -1, null, 2)); + objects.add(new S3WALObject(1, -1, null, 1)); + objects.add(new S3WALObject(2, -1, null, 3)); + objects.add(new S3WALObject(3, -1, null, 0)); + objects.add(new S3WALObject(4, -1, null, 4)); + + assertEquals(5, objects.size()); + List expectedOrderIds = List.of(0L, 1L, 2L, 3L, 4L); + assertEquals(expectedOrderIds, objects.list() + .stream() + .map(S3WALObject::orderId) + .collect(Collectors.toList())); + + List expectedObjectIds = List.of(3L, 1L, 0L, 2L, 4L); + assertEquals(expectedObjectIds, objects.list() + .stream() + .map(S3WALObject::objectId) + .collect(Collectors.toList())); + + objects.removeIf(obj -> obj.objectId() == 2 || obj.objectId() == 3); + + assertEquals(3, objects.size()); + expectedOrderIds = List.of(1L, 2L, 4L); + assertEquals(expectedOrderIds, objects.list() + .stream() + .map(S3WALObject::orderId) + .collect(Collectors.toList())); + + expectedObjectIds = List.of(1L, 0L, 4L); + assertEquals(expectedObjectIds, objects.list() + .stream() + .map(S3WALObject::objectId) + .collect(Collectors.toList())); + } + + +}