From 6013114471ff0e9c17ceadd83bc3e0fba3c52376 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Wed, 6 Sep 2023 00:13:06 +0800 Subject: [PATCH 1/4] refactor(s3): remove inflight wal objects 1. remove inflight wal objects 2. delete redundant classes Signed-off-by: TheR1sing3un --- .../scala/kafka/log/s3/DefaultS3Client.java | 1 + .../kafka/log/s3/StreamMetadataManager.java | 340 ------------------ .../log/s3/memory/MemoryMetadataManager.java | 11 +- .../InRangeObjectsFetcher.java} | 36 +- .../s3/metadata/StreamMetadataManager.java | 165 +++++++++ .../kafka/log/s3/model/RangeMetadata.java | 66 ---- .../kafka/log/s3/model/StreamMetadata.java | 77 ---- .../s3/objects/ControllerObjectManager.java | 24 +- .../s3/streams/ControllerStreamManager.java | 6 +- .../kafka/log/s3/streams/StreamManager.java | 6 +- .../scala/kafka/server/MetadataCache.scala | 4 +- .../metadata/BrokerMetadataListener.scala | 2 +- .../server/metadata/KRaftMetadataCache.scala | 21 +- .../server/metadata/ZkMetadataCache.scala | 6 +- .../stream/StreamControlManager.java | 14 +- .../kafka/image/S3StreamMetadataImage.java | 9 + .../kafka/image/S3StreamsMetadataImage.java | 4 +- .../kafka/metadata/stream/RangeMetadata.java | 22 +- .../kafka/metadata/stream/S3StreamObject.java | 6 +- .../kafka/metadata/stream/S3WALObject.java | 20 +- ...treamIndex.java => StreamOffsetRange.java} | 31 +- .../image/BrokerS3WALMetadataImageTest.java | 10 +- .../image/S3StreamsMetadataImageTest.java | 18 +- 23 files changed, 312 insertions(+), 587 deletions(-) delete mode 100644 core/src/main/scala/kafka/log/s3/StreamMetadataManager.java rename core/src/main/scala/kafka/log/s3/{model/StreamOffset.java => metadata/InRangeObjectsFetcher.java} (56%) create mode 100644 core/src/main/scala/kafka/log/s3/metadata/StreamMetadataManager.java delete mode 100644 core/src/main/scala/kafka/log/s3/model/RangeMetadata.java delete mode 100644 core/src/main/scala/kafka/log/s3/model/StreamMetadata.java rename metadata/src/main/java/org/apache/kafka/metadata/stream/{S3ObjectStreamIndex.java => StreamOffsetRange.java} (58%) diff --git a/core/src/main/scala/kafka/log/s3/DefaultS3Client.java b/core/src/main/scala/kafka/log/s3/DefaultS3Client.java index d795bfd696..664f03a3b0 100644 --- a/core/src/main/scala/kafka/log/s3/DefaultS3Client.java +++ b/core/src/main/scala/kafka/log/s3/DefaultS3Client.java @@ -22,6 +22,7 @@ import com.automq.elasticstream.client.api.StreamClient; import kafka.log.s3.cache.DefaultS3BlockCache; import kafka.log.s3.cache.S3BlockCache; +import kafka.log.s3.metadata.StreamMetadataManager; import kafka.log.s3.network.ControllerRequestSender; import kafka.log.s3.objects.ControllerObjectManager; import kafka.log.s3.objects.ObjectManager; diff --git a/core/src/main/scala/kafka/log/s3/StreamMetadataManager.java b/core/src/main/scala/kafka/log/s3/StreamMetadataManager.java deleted file mode 100644 index 4b67aabc64..0000000000 --- a/core/src/main/scala/kafka/log/s3/StreamMetadataManager.java +++ /dev/null @@ -1,340 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.log.s3; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import kafka.server.BrokerServer; -import kafka.server.KafkaConfig; -import kafka.server.MetadataCache; -import org.apache.kafka.metadata.stream.S3StreamConstant; -import org.apache.kafka.image.BrokerS3WALMetadataImage; -import org.apache.kafka.image.MetadataDelta; -import org.apache.kafka.image.MetadataImage; -import org.apache.kafka.metadata.stream.InRangeObjects; -import org.apache.kafka.metadata.stream.S3Object; -import org.apache.kafka.metadata.stream.S3ObjectMetadata; -import org.apache.kafka.metadata.stream.S3ObjectStreamIndex; -import org.apache.kafka.metadata.stream.S3WALObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class StreamMetadataManager { - - // TODO: optimize by more suitable concurrent protection - // TODO: use order id instead of object id - private final static Logger LOGGER = LoggerFactory.getLogger(StreamMetadataManager.class); - private final KafkaConfig config; - private final BrokerServer broker; - private final InflightWalObjects inflightWalObjects; - private final MetadataCache metadataCache; - private final CatchUpMetadataListener catchUpMetadataListener; - - public StreamMetadataManager(BrokerServer broker, KafkaConfig config) { - this.config = config; - this.broker = broker; - this.inflightWalObjects = new InflightWalObjects(); - this.metadataCache = broker.metadataCache(); - this.catchUpMetadataListener = new CatchUpMetadataListener(); - this.broker.metadataListener().registerStreamMetadataListener(this.catchUpMetadataListener); - } - - public synchronized void append(InflightWalObject object) { - this.inflightWalObjects.append(object); - } - - public synchronized void catchupTo(long objectId) { - // delete all wal objects which are <= objectId - this.inflightWalObjects.trim(objectId); - } - - @SuppressWarnings("all") - public synchronized List getObjects(long streamId, long startOffset, long endOffset, int limit) { - List objects = new ArrayList<>(); - if (startOffset >= endOffset) { - LOGGER.warn("[GetObjects]: invalid offset range, stream: {}, startOffset: {}, endOffset: {}", streamId, startOffset, endOffset); - return objects; - } - OffsetRange walRange = this.inflightWalObjects.getWalRange(streamId); - if (walRange == null || endOffset <= walRange.startOffset()) { - // only search in cache - InRangeObjects cachedInRangeObjects = this.metadataCache.getObjects(streamId, startOffset, endOffset, limit); - if (cachedInRangeObjects == null) { - LOGGER.warn( - "[GetObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache failed with empty result", - streamId, startOffset, endOffset, limit); - return objects; - } - objects.addAll(cachedInRangeObjects.objects()); - objects.forEach(obj -> { - S3Object metadata = metadataCache.getObjectMetadata(obj.getObjectId()); - if (metadata == null) { - LOGGER.error("object: {} metadata not exist", obj.getObjectId()); - throw new RuntimeException("object: " + obj.getObjectId() + " metadata not exist"); - } - obj.setObjectSize(metadata.getObjectSize()); - }); - LOGGER.trace( - "[GetObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and return all from metadataCache: startOffset: {}, endOffset: {}, object count: {}", - streamId, startOffset, endOffset, limit, - cachedInRangeObjects.startOffset(), cachedInRangeObjects.endOffset(), objects.size()); - return objects; - } - if (startOffset >= walRange.startOffset()) { - // only search in inflight wal - InRangeObjects inflightInRangeObjects = this.inflightWalObjects.getObjects(streamId, startOffset, endOffset, limit); - if (inflightInRangeObjects == null) { - LOGGER.warn( - "[GetObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in inflightWalObjects failed with empty result", - streamId, startOffset, endOffset, limit); - return objects; - } - objects.addAll(inflightInRangeObjects.objects()); - LOGGER.trace( - "[GetObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and return all from inflight: startOffset: {}, endOffset: {}, object count: {}", - streamId, startOffset, endOffset, limit, - inflightInRangeObjects.startOffset(), inflightInRangeObjects.endOffset(), objects.size()); - return objects; - } - long cachedEndOffset = walRange.startOffset(); - InRangeObjects cachedInRangeObjects = this.metadataCache.getObjects(streamId, startOffset, cachedEndOffset, limit); - if (cachedInRangeObjects == null || cachedInRangeObjects == InRangeObjects.INVALID) { - LOGGER.warn("[GetObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache failed with empty result", - streamId, startOffset, endOffset, limit); - return objects; - } - objects.addAll(cachedInRangeObjects.objects()); - objects.forEach(obj -> { - S3Object metadata = metadataCache.getObjectMetadata(obj.getObjectId()); - if (metadata == null) { - LOGGER.error("object: {} metadata not exist", obj.getObjectId()); - throw new RuntimeException("object: " + obj.getObjectId() + " metadata not exist"); - } - obj.setObjectSize(metadata.getObjectSize()); - }); - if (objects.size() >= limit) { - LOGGER.trace( - "[GetObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and return all from metadataCache: startOffset: {}, endOffset: {}, object count: {}", - streamId, startOffset, endOffset, limit, - cachedInRangeObjects.startOffset(), cachedInRangeObjects.endOffset(), objects.size()); - return objects; - } - InRangeObjects inflightinRangeObjects = this.inflightWalObjects.getObjects(streamId, cachedEndOffset, endOffset, limit - objects.size()); - if (inflightinRangeObjects == null || inflightinRangeObjects == InRangeObjects.INVALID) { - LOGGER.warn( - "[GetObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in inflightWalObjects failed with empty result", - streamId, startOffset, endOffset, limit); - return objects; - } - objects.addAll(inflightinRangeObjects.objects()); - LOGGER.trace( - "[GetObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and return all from metadataCache and inflight: startOffset: {}, endOffset: {}, object count: {}", - streamId, startOffset, endOffset, limit, - cachedInRangeObjects.startOffset(), inflightinRangeObjects.endOffset(), objects.size()); - return objects; - } - - private static class OffsetRange { - - private long startOffset; - private long endOffset; - - public OffsetRange(long startOffset, long endOffset) { - this.startOffset = startOffset; - this.endOffset = endOffset; - } - - public long startOffset() { - return startOffset; - } - - public long endOffset() { - return endOffset; - } - - public void setEndOffset(long endOffset) { - this.endOffset = endOffset; - } - - public void setStartOffset(long startOffset) { - this.startOffset = startOffset; - } - } - - public static class InflightWalObject extends S3WALObject { - - private final long objectSize; - - public InflightWalObject(long objectId, int brokerId, Map> streamsIndex, long orderId, long objectSize) { - super(objectId, brokerId, streamsIndex, orderId); - this.objectSize = objectSize; - } - - public long startOffset(long streamId) { - List indexes = streamsIndex().get(streamId); - if (indexes == null || indexes.isEmpty()) { - return S3StreamConstant.INVALID_OFFSET; - } - return indexes.get(0).getStartOffset(); - } - - public long endOffset(long streamId) { - List indexes = streamsIndex().get(streamId); - if (indexes == null || indexes.isEmpty()) { - return S3StreamConstant.INVALID_OFFSET; - } - return indexes.get(indexes.size() - 1).getEndOffset(); - } - - public long objectSize() { - return objectSize; - } - - @Override - public boolean equals(Object o) { - return super.equals(o); - } - - @Override - public int hashCode() { - return super.hashCode(); - } - } - - static class InflightWalObjects { - - private final Logger log = LoggerFactory.getLogger(InflightWalObjects.class); - - private final List objects; - private final Map streamOffsets; - private volatile long firstObjectId = S3StreamConstant.MAX_OBJECT_ID; - - public InflightWalObjects() { - this.objects = new LinkedList<>(); - this.streamOffsets = new HashMap<>(); - } - - public void append(InflightWalObject object) { - objects.add(object); - if (objects.size() == 1) { - firstObjectId = object.objectId(); - } - log.trace("[AppendInflight]: append wal object: {}", object.objectId()); - object.streamsIndex().forEach((stream, indexes) -> { - // wal object only contains one index for each stream - streamOffsets.putIfAbsent(stream, new OffsetRange(indexes.get(0).getStartOffset(), indexes.get(indexes.size() - 1).getEndOffset())); - streamOffsets.get(stream).setEndOffset(indexes.get(indexes.size() - 1).getEndOffset()); - }); - } - - public void trim(long objectId) { - log.trace("[TrimInflight]: trim wal object <= {}", objectId); - // TODO: speed up by binary search - int clearEndIndex = objects.size(); - for (int i = 0; i < objects.size(); i++) { - S3WALObject wal = objects.get(i); - if (wal.objectId() > objectId) { - clearEndIndex = i; - firstObjectId = wal.objectId(); - break; - } - wal.streamsIndex().forEach((stream, indexes) -> { - streamOffsets.get(stream).setStartOffset(indexes.get(indexes.size() - 1).getEndOffset()); - }); - } - if (clearEndIndex == objects.size()) { - firstObjectId = S3StreamConstant.MAX_OBJECT_ID; - } - objects.subList(0, clearEndIndex).clear(); - } - - public OffsetRange getWalRange(long streamId) { - return streamOffsets.get(streamId); - } - - @SuppressWarnings("all") - public InRangeObjects getObjects(long streamId, long startOffset, long endOffset, int limit) { - OffsetRange walRange = getWalRange(streamId); - if (walRange == null) { - return InRangeObjects.INVALID; - } - if (startOffset < walRange.startOffset()) { - return InRangeObjects.INVALID; - } - if (endOffset > walRange.endOffset()) { - endOffset = walRange.endOffset(); - } - if (startOffset >= endOffset) { - return InRangeObjects.INVALID; - } - List inRangeObjects = new LinkedList<>(); - long nextStartOffset = startOffset; - for (InflightWalObject object : objects) { - if (limit <= 0) { - break; - } - if (nextStartOffset >= endOffset) { - break; - } - long objStartOffset = object.startOffset(streamId); - long objEndOffset = object.endOffset(streamId); - if (objStartOffset == S3StreamConstant.INVALID_OFFSET || objEndOffset == S3StreamConstant.INVALID_OFFSET) { - continue; - } - if (objStartOffset > startOffset) { - break; - } - if (objEndOffset <= startOffset) { - continue; - } - limit--; - inRangeObjects.add(new S3ObjectMetadata(object.objectId(), object.objectSize(), object.objectType())); - nextStartOffset = objEndOffset; - } - return new InRangeObjects(streamId, startOffset, nextStartOffset, inRangeObjects); - } - } - - public interface StreamMetadataListener { - - void onChange(MetadataDelta delta, MetadataImage image); - } - - class CatchUpMetadataListener implements StreamMetadataListener { - - @Override - public void onChange(MetadataDelta delta, MetadataImage newImage) { - BrokerS3WALMetadataImage walMetadataImage = newImage.streamsMetadata().brokerWALMetadata().get(config.brokerId()); - if (walMetadataImage == null) { - return; - } - S3WALObject wal = walMetadataImage.getWalObjects().get(walMetadataImage.getWalObjects().size() - 1); - if (wal == null) { - return; - } - if (wal.objectId() < inflightWalObjects.firstObjectId) { - return; - } - catchupTo(wal.objectId()); - } - } - -} diff --git a/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java b/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java index fb6f0b3a43..2bba94c283 100644 --- a/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java +++ b/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java @@ -32,7 +32,6 @@ import java.util.function.Supplier; import java.util.stream.Collectors; -import kafka.log.s3.model.StreamOffset; import kafka.log.s3.objects.CommitCompactObjectRequest; import kafka.log.s3.objects.CommitStreamObjectRequest; import kafka.log.s3.objects.CommitWALObjectRequest; @@ -49,7 +48,7 @@ import org.apache.kafka.common.errors.s3.StreamNotExistException; import org.apache.kafka.metadata.stream.S3Object; import org.apache.kafka.metadata.stream.S3ObjectState; -import org.apache.kafka.metadata.stream.S3ObjectStreamIndex; +import org.apache.kafka.metadata.stream.StreamOffsetRange; import org.apache.kafka.metadata.stream.S3StreamObject; import org.apache.kafka.metadata.stream.S3WALObject; import org.apache.kafka.metadata.stream.StreamState; @@ -168,12 +167,12 @@ public CompletableFuture commitWALObject(CommitWALObjec // build metadata MemoryBrokerWALMetadata walMetadata = this.brokerWALMetadata.computeIfAbsent(MOCK_BROKER_ID, k -> new MemoryBrokerWALMetadata(k)); - Map> index = new HashMap<>(); + Map> index = new HashMap<>(); streamRanges.stream().forEach(range -> { long streamId = range.getStreamId(); long startOffset = range.getStartOffset(); long endOffset = range.getEndOffset(); - index.put(streamId, List.of(new S3ObjectStreamIndex(streamId, startOffset, endOffset))); + index.put(streamId, List.of(new StreamOffsetRange(streamId, startOffset, endOffset))); // update range endOffset MemoryStreamMetadata streamMetadata = this.streamsMetadata.get(streamId); streamMetadata.endOffset = endOffset; @@ -357,10 +356,10 @@ public CompletableFuture deleteStream(long streamId, long epoch) { } @Override - public CompletableFuture> getStreamsOffset(List streamIds) { + public CompletableFuture> getStreamsOffset(List streamIds) { return this.submitEvent(() -> { return streamIds.stream().filter(this.streamsMetadata::containsKey).map(id -> { - return new StreamOffset(id, this.streamsMetadata.get(id).startOffset, this.streamsMetadata.get(id).endOffset); + return new StreamOffsetRange(id, this.streamsMetadata.get(id).startOffset, this.streamsMetadata.get(id).endOffset); }).collect(Collectors.toList()); }); } diff --git a/core/src/main/scala/kafka/log/s3/model/StreamOffset.java b/core/src/main/scala/kafka/log/s3/metadata/InRangeObjectsFetcher.java similarity index 56% rename from core/src/main/scala/kafka/log/s3/model/StreamOffset.java rename to core/src/main/scala/kafka/log/s3/metadata/InRangeObjectsFetcher.java index 0cfdf340df..5c883a2b22 100644 --- a/core/src/main/scala/kafka/log/s3/model/StreamOffset.java +++ b/core/src/main/scala/kafka/log/s3/metadata/InRangeObjectsFetcher.java @@ -15,28 +15,22 @@ * limitations under the License. */ -package kafka.log.s3.model; +package kafka.log.s3.metadata; -public class StreamOffset { - private final long streamId; - private final long startOffset; - private final long endOffset; +import java.util.concurrent.CompletableFuture; +import org.apache.kafka.metadata.stream.InRangeObjects; - public StreamOffset(long streamId, long startOffset, long endOffset) { - this.streamId = streamId; - this.startOffset = startOffset; - this.endOffset = endOffset; - } +public interface InRangeObjectsFetcher { - public long streamId() { - return streamId; - } + /** + * fetch stream interval related objects + * + * @param streamId stream id + * @param startOffset start offset, inclusive, if not exist, return INVALID + * @param endOffset end offset, exclusive, if not exist, wait for it + * @param limit max object count + * @return {@link InRangeObjects} + */ + CompletableFuture fetch(long streamId, long startOffset, long endOffset, int limit); - public long startOffset() { - return startOffset; - } - - public long endOffset() { - return endOffset; - } -} \ No newline at end of file +} diff --git a/core/src/main/scala/kafka/log/s3/metadata/StreamMetadataManager.java b/core/src/main/scala/kafka/log/s3/metadata/StreamMetadataManager.java new file mode 100644 index 0000000000..2f4254d8f6 --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/metadata/StreamMetadataManager.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.metadata; + +import io.netty.util.concurrent.DefaultThreadFactory; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import kafka.server.BrokerServer; +import kafka.server.KafkaConfig; +import kafka.server.MetadataCache; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.metadata.stream.InRangeObjects; +import org.apache.kafka.metadata.stream.StreamOffsetRange; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StreamMetadataManager implements InRangeObjectsFetcher { + + // TODO: optimize by more suitable concurrent protection + private final static Logger LOGGER = LoggerFactory.getLogger(StreamMetadataManager.class); + private final KafkaConfig config; + private final BrokerServer broker; + private final MetadataCache metadataCache; + private final CatchUpMetadataListener catchUpMetadataListener; + private final Map>> pendingGetObjectsTasks; + private final ExecutorService pendingExecutorService; + + private final ReplayedWalObjects replayedWalObjects; + + public StreamMetadataManager(BrokerServer broker, KafkaConfig config) { + this.config = config; + this.broker = broker; + this.metadataCache = broker.metadataCache(); + this.catchUpMetadataListener = new CatchUpMetadataListener(); + this.broker.metadataListener().registerStreamMetadataListener(this.catchUpMetadataListener); + this.replayedWalObjects = new ReplayedWalObjects(); + // TODO: optimize by more suitable data structure for pending tasks + this.pendingGetObjectsTasks = new HashMap<>(); + this.pendingExecutorService = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pending-get-objects-task-executor")); + } + + @Override + public CompletableFuture fetch(long streamId, long startOffset, long endOffset, int limit) { + return this.replayedWalObjects.fetch(streamId, startOffset, endOffset, limit); + } + + + static class GetObjectsTask { + + private final CompletableFuture cf; + private final long streamId; + private final long startOffset; + private final long endOffset; + private final int limit; + + public static GetObjectsTask of(long streamId, long startOffset, long endOffset, int limit) { + CompletableFuture cf = new CompletableFuture<>(); + GetObjectsTask task = new GetObjectsTask(cf, streamId, startOffset, endOffset, limit); + return task; + } + + private GetObjectsTask(CompletableFuture cf, long streamId, long startOffset, long endOffset, int limit) { + this.cf = cf; + this.streamId = streamId; + this.startOffset = startOffset; + this.endOffset = endOffset; + this.limit = limit; + } + + void complete(InRangeObjects objects) { + cf.complete(objects); + } + + void completeExceptionally(Throwable ex) { + cf.completeExceptionally(ex); + } + } + + class ReplayedWalObjects implements InRangeObjectsFetcher { + + @Override + public CompletableFuture fetch(long streamId, long startOffset, long endOffset, int limit) { + StreamOffsetRange offsetRange = StreamMetadataManager.this.metadataCache.getStreamOffsetRange(streamId); + if (offsetRange == null || offsetRange == StreamOffsetRange.INVALID) { + return CompletableFuture.completedFuture(InRangeObjects.INVALID); + } + long streamStartOffset = offsetRange.getStartOffset(); + long streamEndOffset = offsetRange.getEndOffset(); + if (startOffset < streamStartOffset) { + LOGGER.warn("[ReplayedWalObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and startOffset < streamStartOffset: {}", + streamId, startOffset, endOffset, limit, streamStartOffset); + return CompletableFuture.completedFuture(InRangeObjects.INVALID); + } + if (endOffset > streamEndOffset) { + // lag behind, need to wait for cache catch up + return pendingFetch(streamId, startOffset, endOffset, limit); + } + return fetch0(streamId, startOffset, endOffset, limit); + } + + private CompletableFuture pendingFetch(long streamId, long startOffset, long endOffset, int limit) { + GetObjectsTask task = GetObjectsTask.of(streamId, startOffset, endOffset, limit); + synchronized (StreamMetadataManager.this) { + Map> tasks = StreamMetadataManager.this.pendingGetObjectsTasks.computeIfAbsent(task.streamId, + k -> new TreeMap<>()); + List getObjectsTasks = tasks.computeIfAbsent(task.endOffset, k -> new ArrayList<>()); + getObjectsTasks.add(task); + } + LOGGER.info("[PendingFetch]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and pending fetch", streamId, startOffset, endOffset, limit); + return task.cf; + } + + private CompletableFuture fetch0(long streamId, long startOffset, long endOffset, int limit) { + // only search in cache + InRangeObjects cachedInRangeObjects = StreamMetadataManager.this.metadataCache.getObjects(streamId, startOffset, endOffset, limit); + if (cachedInRangeObjects == null || cachedInRangeObjects == InRangeObjects.INVALID) { + LOGGER.warn( + "[GetObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache failed with empty result", + streamId, startOffset, endOffset, limit); + return CompletableFuture.completedFuture(InRangeObjects.INVALID); + } + LOGGER.info( + "[GetObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache success with result: {}", + streamId, startOffset, endOffset, limit, cachedInRangeObjects); + return CompletableFuture.completedFuture(cachedInRangeObjects); + } + + + } + + public interface StreamMetadataListener { + + void onChange(MetadataDelta delta, MetadataImage image); + } + + class CatchUpMetadataListener implements StreamMetadataListener { + + @Override + public void onChange(MetadataDelta delta, MetadataImage newImage) { + } + } + +} diff --git a/core/src/main/scala/kafka/log/s3/model/RangeMetadata.java b/core/src/main/scala/kafka/log/s3/model/RangeMetadata.java deleted file mode 100644 index 85b1d1c4a3..0000000000 --- a/core/src/main/scala/kafka/log/s3/model/RangeMetadata.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.log.s3.model; - - -public class RangeMetadata { - private static final long NOOP_OFFSET = -1; - private int index; - private long startOffset; - private long endOffset; - private int brokerId; - - public RangeMetadata(int index, long startOffset, long endOffset, int serverId) { - this.index = index; - this.startOffset = startOffset; - this.endOffset = endOffset; - this.brokerId = serverId; - } - - public int getIndex() { - return index; - } - - public void setIndex(int index) { - this.index = index; - } - - public long getStartOffset() { - return startOffset; - } - - public void setStartOffset(long startOffset) { - this.startOffset = startOffset; - } - - public long getEndOffset() { - return endOffset; - } - - public void setEndOffset(long endOffset) { - this.endOffset = endOffset; - } - - public int getBrokerId() { - return brokerId; - } - - public void setBrokerId(int brokerId) { - this.brokerId = brokerId; - } -} diff --git a/core/src/main/scala/kafka/log/s3/model/StreamMetadata.java b/core/src/main/scala/kafka/log/s3/model/StreamMetadata.java deleted file mode 100644 index 1db1020ada..0000000000 --- a/core/src/main/scala/kafka/log/s3/model/StreamMetadata.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.log.s3.model; - -import java.util.List; - -public class StreamMetadata { - private long streamId; - private long epoch; - private int rangeIndex; - private long startOffset; - - private List ranges; - - public StreamMetadata(long streamId, long epoch, int rangeIndex, long startOffset, List ranges) { - this.streamId = streamId; - this.epoch = epoch; - this.rangeIndex = rangeIndex; - this.startOffset = startOffset; - this.ranges = ranges; - } - - public long getStreamId() { - return streamId; - } - - public void setStreamId(long streamId) { - this.streamId = streamId; - } - - public long getEpoch() { - return epoch; - } - - public void setEpoch(long epoch) { - this.epoch = epoch; - } - - public long getStartOffset() { - return startOffset; - } - - public void setStartOffset(long startOffset) { - this.startOffset = startOffset; - } - - public List getRanges() { - return ranges; - } - - public void setRanges(List ranges) { - this.ranges = ranges; - } - - public int getRangeIndex() { - return rangeIndex; - } - - public void setRangeIndex(int rangeIndex) { - this.rangeIndex = rangeIndex; - } -} diff --git a/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java b/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java index 006ff50c89..942d0ce96c 100644 --- a/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java +++ b/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java @@ -23,8 +23,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; -import kafka.log.s3.StreamMetadataManager; -import kafka.log.s3.StreamMetadataManager.InflightWalObject; +import kafka.log.s3.metadata.StreamMetadataManager; import kafka.log.s3.network.ControllerRequestSender; import kafka.server.KafkaConfig; import org.apache.kafka.common.message.CommitWALObjectRequestData; @@ -34,8 +33,9 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.s3.PrepareS3ObjectRequest; import org.apache.kafka.common.requests.s3.PrepareS3ObjectRequest.Builder; +import org.apache.kafka.metadata.stream.InRangeObjects; import org.apache.kafka.metadata.stream.S3ObjectMetadata; -import org.apache.kafka.metadata.stream.S3ObjectStreamIndex; +import org.apache.kafka.metadata.stream.StreamOffsetRange; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -97,16 +97,6 @@ public CompletableFuture commitWALObject(CommitWALObjec LOGGER.error("Error while committing WAL object: {}, code: {}", request, code); throw code.exception(); } - }).thenApply(resp -> { - long objectId = request.getObjectId(); - long orderId = request.getOrderId(); - int brokerId = config.brokerId(); - long objectSize = request.getObjectSize(); - Map> rangeList = request.getStreamRanges().stream() - .map(range -> new S3ObjectStreamIndex(range.getStreamId(), range.getStartOffset(), range.getEndOffset())) - .collect(Collectors.groupingBy(S3ObjectStreamIndex::getStreamId)); - this.metadataManager.append(new InflightWalObject(objectId, brokerId, rangeList, orderId, objectSize)); - return resp; }); } @@ -123,7 +113,13 @@ public CompletableFuture commitStreamObject(CommitStreamObjectRequest requ @Override public List getObjects(long streamId, long startOffset, long endOffset, int limit) { try { - return this.metadataManager.getObjects(streamId, startOffset, endOffset, limit); + return this.metadataManager.fetch(streamId, startOffset, endOffset, limit).thenApply(inRangeObjects -> { + if (inRangeObjects == null || inRangeObjects == InRangeObjects.INVALID) { + List objects = Collections.emptyList(); + return objects; + } + return inRangeObjects.objects(); + }).get(); } catch (Exception e) { LOGGER.error("Error while get objects, streamId: {}, startOffset: {}, endOffset: {}, limit: {}", streamId, startOffset, endOffset, limit, e); diff --git a/core/src/main/scala/kafka/log/s3/streams/ControllerStreamManager.java b/core/src/main/scala/kafka/log/s3/streams/ControllerStreamManager.java index d7838a824e..1afd60bce9 100644 --- a/core/src/main/scala/kafka/log/s3/streams/ControllerStreamManager.java +++ b/core/src/main/scala/kafka/log/s3/streams/ControllerStreamManager.java @@ -17,7 +17,6 @@ package kafka.log.s3.streams; -import kafka.log.s3.model.StreamOffset; import kafka.log.s3.network.ControllerRequestSender; import kafka.log.s3.objects.OpenStreamMetadata; import kafka.server.KafkaConfig; @@ -34,6 +33,7 @@ import org.apache.kafka.common.requests.s3.CreateStreamRequest; import org.apache.kafka.common.requests.s3.GetStreamsOffsetRequest; import org.apache.kafka.common.requests.s3.OpenStreamRequest; +import org.apache.kafka.metadata.stream.StreamOffsetRange; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -135,7 +135,7 @@ public CompletableFuture deleteStream(long streamId, long epoch) { } @Override - public CompletableFuture> getStreamsOffset(List streamIds) { + public CompletableFuture> getStreamsOffset(List streamIds) { GetStreamsOffsetRequest.Builder request = new GetStreamsOffsetRequest.Builder( new GetStreamsOffsetRequestData() .setStreamIds(streamIds)); @@ -143,7 +143,7 @@ public CompletableFuture> getStreamsOffset(List streamI switch (Errors.forCode(resp.errorCode())) { case NONE: return resp.streamsOffset().stream() - .map(streamOffset -> new StreamOffset(streamOffset.streamId(), streamOffset.startOffset(), streamOffset.endOffset())) + .map(streamOffset -> new StreamOffsetRange(streamOffset.streamId(), streamOffset.startOffset(), streamOffset.endOffset())) .collect(Collectors.toList()); default: LOGGER.error("Error while getting streams offset: {}, code: {}", request, Errors.forCode(resp.errorCode())); diff --git a/core/src/main/scala/kafka/log/s3/streams/StreamManager.java b/core/src/main/scala/kafka/log/s3/streams/StreamManager.java index 175aee16cf..51e3c39de8 100644 --- a/core/src/main/scala/kafka/log/s3/streams/StreamManager.java +++ b/core/src/main/scala/kafka/log/s3/streams/StreamManager.java @@ -19,8 +19,8 @@ import java.util.List; import java.util.concurrent.CompletableFuture; -import kafka.log.s3.model.StreamOffset; import kafka.log.s3.objects.OpenStreamMetadata; +import org.apache.kafka.metadata.stream.StreamOffsetRange; public interface StreamManager { @@ -74,8 +74,8 @@ public interface StreamManager { * When server is starting or recovering, wal in EBS need streams offset to determine the recover point. * * @param streamIds stream ids. - * @return {@link StreamOffset} + * @return {@link StreamOffsetRange} */ - CompletableFuture> getStreamsOffset(List streamIds); + CompletableFuture> getStreamsOffset(List streamIds); } diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index cef618a703..a10bcad865 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -22,7 +22,7 @@ import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache} import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.{Cluster, Node, TopicPartition, Uuid} -import org.apache.kafka.metadata.stream.{InRangeObjects, S3Object} +import org.apache.kafka.metadata.stream.{InRangeObjects, S3Object, StreamOffsetRange} import org.apache.kafka.server.common.MetadataVersion import java.util @@ -122,6 +122,8 @@ trait MetadataCache { def getObjects(streamId: Long, startOffset: Long, endOffset: Long, limit: Int): InRangeObjects def getObjectMetadata(objectId: Long): S3Object + + def getStreamOffsetRange(streamId: Long): StreamOffsetRange // Kafka on S3 inject end } diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala index 58809d6f01..d6123bc53e 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala @@ -16,7 +16,7 @@ */ package kafka.server.metadata -import kafka.log.s3.StreamMetadataManager.StreamMetadataListener +import kafka.log.s3.metadata.StreamMetadataManager.StreamMetadataListener import java.util import java.util.concurrent.atomic.AtomicBoolean diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala index 05a6a393ac..3a4cfc2daa 100644 --- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala +++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala @@ -35,7 +35,7 @@ import java.util.concurrent.ThreadLocalRandom import kafka.admin.BrokerMetadata import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.message.{DescribeClientQuotasRequestData, DescribeClientQuotasResponseData} -import org.apache.kafka.metadata.stream.{InRangeObjects, S3Object} +import org.apache.kafka.metadata.stream.{InRangeObjects, S3Object, StreamOffsetRange} import org.apache.kafka.metadata.{PartitionRegistration, Replicas} import org.apache.kafka.server.common.MetadataVersion @@ -397,12 +397,29 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w // Kafka on S3 inject start override def getObjects(streamId: Long, startOffset: Long, endOffset: Long, limit: Int): InRangeObjects = { val image = _currentImage - image.streamsMetadata().getObjects(streamId, startOffset, endOffset, limit) + val inRangeObjects = image.streamsMetadata().getObjects(streamId, startOffset, endOffset, limit) + inRangeObjects.objects().forEach(obj => { + val objMetadata = image.objectsMetadata().getObjectMetadata(obj.getObjectId) + Option(objMetadata) match { + case None => throw new RuntimeException(s"Object metadata not found for object ${obj.getObjectId}") + case Some(metadata) => obj.setObjectSize(metadata.getObjectSize) + } + }) + inRangeObjects } override def getObjectMetadata(objectId: Long): S3Object = { val image = _currentImage image.objectsMetadata().getObjectMetadata(objectId) } + + override def getStreamOffsetRange(streamId: Long): StreamOffsetRange = { + val image = _currentImage + Option(image.streamsMetadata().streamsMetadata().get(streamId)) match { + case None => StreamOffsetRange.INVALID + case Some(metadata) => + metadata.offsetRange() + } + } // Kafka on S3 inject end } diff --git a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala index efbc546f63..d7e6f0ac2e 100755 --- a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala +++ b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala @@ -40,7 +40,7 @@ import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{ApiVersionsResponse, MetadataResponse, UpdateMetadataRequest} import org.apache.kafka.common.security.auth.SecurityProtocol -import org.apache.kafka.metadata.stream.{InRangeObjects, S3Object} +import org.apache.kafka.metadata.stream.{InRangeObjects, S3Object, StreamOffsetRange} import org.apache.kafka.server.common.MetadataVersion import java.util.concurrent.{ThreadLocalRandom, TimeUnit} @@ -594,5 +594,9 @@ class ZkMetadataCache( override def getObjectMetadata(objectId: Long): S3Object = { throw new UnsupportedOperationException("getObjectMetadata is not supported in ZkMetadataCache") } + + override def getStreamOffsetRange(streamId: Long): StreamOffsetRange = { + throw new UnsupportedOperationException("getStreamOffsetRange is not supported in ZkMetadataCache") + } // Kafka on S3 inject end } diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java index 3ee287fdab..9aa2b00b66 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java @@ -56,7 +56,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.controller.ControllerResult; import org.apache.kafka.metadata.stream.RangeMetadata; -import org.apache.kafka.metadata.stream.S3ObjectStreamIndex; +import org.apache.kafka.metadata.stream.StreamOffsetRange; import org.apache.kafka.metadata.stream.S3StreamConstant; import org.apache.kafka.metadata.stream.S3StreamObject; import org.apache.kafka.metadata.stream.S3WALObject; @@ -383,8 +383,8 @@ public ControllerResult commitWALObject(CommitWALOb records.addAll(destroyResult.records()); } - List indexes = streamRanges.stream() - .map(range -> new S3ObjectStreamIndex(range.streamId(), range.startOffset(), range.endOffset())) + List indexes = streamRanges.stream() + .map(range -> new StreamOffsetRange(range.streamId(), range.startOffset(), range.endOffset())) .collect(Collectors.toList()); // update broker's wal object BrokerS3WALMetadata brokerMetadata = this.brokersMetadata.get(brokerId); @@ -400,7 +400,7 @@ public ControllerResult commitWALObject(CommitWALOb .setBrokerId(brokerId) .setStreamsIndex( indexes.stream() - .map(S3ObjectStreamIndex::toRecordStreamIndex) + .map(StreamOffsetRange::toRecordStreamIndex) .collect(Collectors.toList())), (short) 0)); // generate compacted objects' remove record data.compactedObjectIds().forEach(id -> records.add(new ApiMessageAndVersion(new RemoveWALObjectRecord() @@ -507,10 +507,10 @@ public void replay(WALObjectRecord record) { } // create wal object - Map> indexMap = streamIndexes + Map> indexMap = streamIndexes .stream() - .map(S3ObjectStreamIndex::of) - .collect(Collectors.groupingBy(S3ObjectStreamIndex::getStreamId)); + .map(StreamOffsetRange::of) + .collect(Collectors.groupingBy(StreamOffsetRange::getStreamId)); brokerMetadata.walObjects.put(objectId, new S3WALObject(objectId, brokerId, indexMap, orderId)); // update range diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java index 64a71263dd..8f69264a61 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java @@ -26,6 +26,7 @@ import org.apache.kafka.metadata.stream.S3StreamObject; import org.apache.kafka.image.writer.ImageWriter; import org.apache.kafka.image.writer.ImageWriterOptions; +import org.apache.kafka.metadata.stream.StreamOffsetRange; import org.apache.kafka.metadata.stream.StreamState; public class S3StreamMetadataImage { @@ -107,6 +108,14 @@ public StreamState state() { return state; } + public StreamOffsetRange offsetRange() { + long endOffset = startOffset; + if (ranges.containsKey(rangeIndex)) { + endOffset = ranges.get(rangeIndex).endOffset(); + } + return new StreamOffsetRange(streamId, startOffset, endOffset); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java index a509d73cf0..9dea157fac 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java @@ -30,7 +30,7 @@ import org.apache.kafka.metadata.stream.InRangeObjects; import org.apache.kafka.metadata.stream.RangeMetadata; import org.apache.kafka.metadata.stream.S3ObjectMetadata; -import org.apache.kafka.metadata.stream.S3ObjectStreamIndex; +import org.apache.kafka.metadata.stream.StreamOffsetRange; import org.apache.kafka.metadata.stream.S3ObjectType; import org.apache.kafka.metadata.stream.S3StreamObject; import org.apache.kafka.server.common.ApiMessageAndVersion; @@ -145,7 +145,7 @@ public InRangeObjects getObjects(int limit) { List walObjects = wal.getWalObjects().list().stream() .filter(obj -> obj.streamsIndex().containsKey(streamId) && obj.streamsIndex().get(streamId).size() != 0) .flatMap(obj -> { - List indexes = obj.streamsIndex().get(streamId); + List indexes = obj.streamsIndex().get(streamId); // TODO: pre filter useless objects return indexes.stream().filter(index -> { long objectStartOffset = index.getStartOffset(); diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/RangeMetadata.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/RangeMetadata.java index 6fc0e7125b..620f8c2709 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/RangeMetadata.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/RangeMetadata.java @@ -21,18 +21,36 @@ import org.apache.kafka.common.metadata.RangeRecord; import org.apache.kafka.server.common.ApiMessageAndVersion; +/** + * RangeMetadata is the metadata of a range of the stream. + *

+ * The range represents a continuous sequence of data [startOffset, endOffset) in the stream. + */ public class RangeMetadata implements Comparable { + + /** + * The id of the stream that the range belongs to. + */ private long streamId; + /** + * The epoch of the stream when the range is created. + */ private long epoch; + /** + * The index of the range in the stream. + */ private int rangeIndex; /** - * Inclusive + * Range start offer. (Inclusive) */ private long startOffset; /** - * Exclusive + * Range end offer. (Exclusive) */ private long endOffset; + /** + * The broker id of the broker that owns the range. + */ private int brokerId; public RangeMetadata(long streamId, long epoch, int rangeIndex, long startOffset, long endOffset, int brokerId) { diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java index 640ca43e24..87b0e71cdc 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java @@ -27,15 +27,15 @@ public class S3StreamObject { private final long objectSize; - private final S3ObjectStreamIndex streamIndex; + private final StreamOffsetRange streamIndex; public S3StreamObject(long objectId, long objectSize, long streamId, long startOffset, long endOffset) { this.objectId = objectId; this.objectSize = objectSize; - this.streamIndex = new S3ObjectStreamIndex(streamId, startOffset, endOffset); + this.streamIndex = new StreamOffsetRange(streamId, startOffset, endOffset); } - public S3ObjectStreamIndex streamIndex() { + public StreamOffsetRange streamIndex() { return streamIndex; } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java index 845100ca30..69af4385ab 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java @@ -37,11 +37,11 @@ public class S3WALObject implements Comparable { private final long objectId; private final int brokerId; - private final Map> streamsIndex; + private final Map> streamsIndex; private final S3ObjectType objectType = S3ObjectType.UNKNOWN; - public S3WALObject(long objectId, int brokerId, final Map> streamsIndex, long orderId) { + public S3WALObject(long objectId, int brokerId, final Map> streamsIndex, long orderId) { this.orderId = orderId; this.objectId = objectId; this.brokerId = brokerId; @@ -49,16 +49,16 @@ public S3WALObject(long objectId, int brokerId, final Map indexes = streamsIndex.get(streamId); + List indexes = streamsIndex.get(streamId); if (indexes == null || indexes.isEmpty()) { return false; } - S3ObjectStreamIndex firstIndex = indexes.get(0); - S3ObjectStreamIndex lastIndex = indexes.get(indexes.size() - 1); + StreamOffsetRange firstIndex = indexes.get(0); + StreamOffsetRange lastIndex = indexes.get(indexes.size() - 1); return startOffset >= firstIndex.getStartOffset() && startOffset <= lastIndex.getEndOffset(); } - public Map> streamsIndex() { + public Map> streamsIndex() { return streamsIndex; } @@ -69,14 +69,14 @@ public ApiMessageAndVersion toRecord() { .setOrderId(orderId) .setStreamsIndex( streamsIndex.values().stream().flatMap(List::stream) - .map(S3ObjectStreamIndex::toRecordStreamIndex) + .map(StreamOffsetRange::toRecordStreamIndex) .collect(Collectors.toList())), (short) 0); } public static S3WALObject of(WALObjectRecord record) { - Map> collect = record.streamsIndex().stream() - .map(index -> new S3ObjectStreamIndex(index.streamId(), index.startOffset(), index.endOffset())) - .collect(Collectors.groupingBy(S3ObjectStreamIndex::getStreamId)); + Map> collect = record.streamsIndex().stream() + .map(index -> new StreamOffsetRange(index.streamId(), index.startOffset(), index.endOffset())) + .collect(Collectors.groupingBy(StreamOffsetRange::getStreamId)); S3WALObject s3WalObject = new S3WALObject(record.objectId(), record.brokerId(), collect, record.orderId()); return s3WalObject; diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectStreamIndex.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/StreamOffsetRange.java similarity index 58% rename from metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectStreamIndex.java rename to metadata/src/main/java/org/apache/kafka/metadata/stream/StreamOffsetRange.java index 5c8fd88071..e405a9d732 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectStreamIndex.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/StreamOffsetRange.java @@ -20,38 +20,41 @@ import org.apache.kafka.common.metadata.WALObjectRecord.StreamIndex; /** - * ObjectStreamIndex is the index of a stream range in a WAL object or STREAM object. + * StreamOffsetRange represents [startOffset, endOffset) in the stream. */ -public class S3ObjectStreamIndex implements Comparable { +public class StreamOffsetRange implements Comparable { - private final Long streamId; + public static final StreamOffsetRange INVALID = new StreamOffsetRange(S3StreamConstant.INVALID_STREAM_ID, + S3StreamConstant.INVALID_OFFSET, S3StreamConstant.INVALID_OFFSET); - private final Long startOffset; + private final long streamId; - private final Long endOffset; + private final long startOffset; - public S3ObjectStreamIndex(Long streamId, Long startOffset, Long endOffset) { + private final long endOffset; + + public StreamOffsetRange(long streamId, long startOffset, long endOffset) { this.streamId = streamId; this.startOffset = startOffset; this.endOffset = endOffset; } - public Long getStreamId() { + public long getStreamId() { return streamId; } - public Long getStartOffset() { + public long getStartOffset() { return startOffset; } - public Long getEndOffset() { + public long getEndOffset() { return endOffset; } @Override - public int compareTo(S3ObjectStreamIndex o) { - int res = this.streamId.compareTo(o.streamId); - return res == 0 ? this.startOffset.compareTo(o.startOffset) : res; + public int compareTo(StreamOffsetRange o) { + int res = Long.compare(this.streamId, o.streamId); + return res == 0 ? Long.compare(this.startOffset, o.startOffset) : res; } public StreamIndex toRecordStreamIndex() { @@ -61,7 +64,7 @@ public StreamIndex toRecordStreamIndex() { .setEndOffset(endOffset); } - public static S3ObjectStreamIndex of(StreamIndex index) { - return new S3ObjectStreamIndex(index.streamId(), index.startOffset(), index.endOffset()); + public static StreamOffsetRange of(StreamIndex index) { + return new StreamOffsetRange(index.streamId(), index.startOffset(), index.endOffset()); } } diff --git a/metadata/src/test/java/org/apache/kafka/image/BrokerS3WALMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/BrokerS3WALMetadataImageTest.java index ce643ef872..c5a6be245e 100644 --- a/metadata/src/test/java/org/apache/kafka/image/BrokerS3WALMetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/BrokerS3WALMetadataImageTest.java @@ -28,7 +28,7 @@ import org.apache.kafka.image.writer.ImageWriterOptions; import org.apache.kafka.image.writer.RecordListWriter; import org.apache.kafka.metadata.RecordTestUtils; -import org.apache.kafka.metadata.stream.S3ObjectStreamIndex; +import org.apache.kafka.metadata.stream.StreamOffsetRange; import org.apache.kafka.metadata.stream.S3WALObject; import org.apache.kafka.metadata.stream.SortedWALObjectsList; import org.apache.kafka.server.common.ApiMessageAndVersion; @@ -80,10 +80,10 @@ public void testS3WALObjects() { // verify delta and check image's write BrokerS3WALMetadataImage image1 = new BrokerS3WALMetadataImage(BROKER0, new SortedWALObjectsList(List.of( new S3WALObject(0L, BROKER0, Map.of( - STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 0L, 100L)), - STREAM1, List.of(new S3ObjectStreamIndex(STREAM1, 0L, 200L))), 0L), + STREAM0, List.of(new StreamOffsetRange(STREAM0, 0L, 100L)), + STREAM1, List.of(new StreamOffsetRange(STREAM1, 0L, 200L))), 0L), new S3WALObject(1L, BROKER0, Map.of( - STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 101L, 200L))), 1L)))); + STREAM0, List.of(new StreamOffsetRange(STREAM0, 101L, 200L))), 1L)))); assertEquals(image1, delta0.apply()); testToImageAndBack(image1); @@ -96,7 +96,7 @@ public void testS3WALObjects() { // verify delta and check image's write BrokerS3WALMetadataImage image2 = new BrokerS3WALMetadataImage(BROKER0, new SortedWALObjectsList(List.of( new S3WALObject(1L, BROKER0, Map.of( - STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 101L, 200L))), 1L)))); + STREAM0, List.of(new StreamOffsetRange(STREAM0, 101L, 200L))), 1L)))); assertEquals(image2, delta1.apply()); testToImageAndBack(image2); } diff --git a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java index 2cfae9d27d..ecbb1fec26 100644 --- a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java @@ -30,7 +30,7 @@ import org.apache.kafka.metadata.stream.InRangeObjects; import org.apache.kafka.metadata.stream.RangeMetadata; import org.apache.kafka.metadata.stream.S3ObjectMetadata; -import org.apache.kafka.metadata.stream.S3ObjectStreamIndex; +import org.apache.kafka.metadata.stream.StreamOffsetRange; import org.apache.kafka.metadata.stream.S3StreamObject; import org.apache.kafka.metadata.stream.S3WALObject; import org.apache.kafka.metadata.stream.SortedWALObjectsList; @@ -100,16 +100,16 @@ private void testToImageAndBack(S3StreamsMetadataImage image) { @Test public void testGetObjects() { List broker0WalObjects = List.of( - new S3WALObject(0, BROKER0, Map.of(STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 100L, 120L))), 0L), - new S3WALObject(1, BROKER0, Map.of(STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 120L, 140L))), 1L), - new S3WALObject(2, BROKER0, Map.of(STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 180L, 200L))), 2L), + new S3WALObject(0, BROKER0, Map.of(STREAM0, List.of(new StreamOffsetRange(STREAM0, 100L, 120L))), 0L), + new S3WALObject(1, BROKER0, Map.of(STREAM0, List.of(new StreamOffsetRange(STREAM0, 120L, 140L))), 1L), + new S3WALObject(2, BROKER0, Map.of(STREAM0, List.of(new StreamOffsetRange(STREAM0, 180L, 200L))), 2L), new S3WALObject(3, BROKER0, Map.of(STREAM0, List.of( - new S3ObjectStreamIndex(STREAM0, 400L, 420L), new S3ObjectStreamIndex(STREAM0, 500L, 520L))), 3L), - new S3WALObject(4, BROKER0, Map.of(STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 520L, 600L))), 4L)); + new StreamOffsetRange(STREAM0, 400L, 420L), new StreamOffsetRange(STREAM0, 500L, 520L))), 3L), + new S3WALObject(4, BROKER0, Map.of(STREAM0, List.of(new StreamOffsetRange(STREAM0, 520L, 600L))), 4L)); List broker1WalObjects = List.of( - new S3WALObject(5, BROKER1, Map.of(STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 140L, 160L))), 0L), - new S3WALObject(6, BROKER1, Map.of(STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 160L, 180L))), 1L), - new S3WALObject(7, BROKER1, Map.of(STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 420L, 500L))), 2L)); + new S3WALObject(5, BROKER1, Map.of(STREAM0, List.of(new StreamOffsetRange(STREAM0, 140L, 160L))), 0L), + new S3WALObject(6, BROKER1, Map.of(STREAM0, List.of(new StreamOffsetRange(STREAM0, 160L, 180L))), 1L), + new S3WALObject(7, BROKER1, Map.of(STREAM0, List.of(new StreamOffsetRange(STREAM0, 420L, 500L))), 2L)); BrokerS3WALMetadataImage broker0WALMetadataImage = new BrokerS3WALMetadataImage(BROKER0, new SortedWALObjectsList(broker0WalObjects)); BrokerS3WALMetadataImage broker1WALMetadataImage = new BrokerS3WALMetadataImage(BROKER1, new SortedWALObjectsList(broker1WalObjects)); Map ranges = Map.of( From 5aebbd4f652122b90428864556e416187603577d Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Wed, 6 Sep 2023 14:18:39 +0800 Subject: [PATCH 2/4] feat(s3): support blocking `getObjects` in `StreamMetadataManager` 1. support blocking `getObjects` in `StreamMetadataManager` 2. refactor `getObjects` 3. change the unit of `s3.cache.size` from `MB` to `B` Signed-off-by: TheR1sing3un --- .../scala/kafka/log/s3/DefaultS3Client.java | 2 +- .../s3/metadata/StreamMetadataManager.java | 74 ++++++++-- .../s3/objects/ControllerObjectManager.java | 2 - .../main/scala/kafka/server/KafkaConfig.scala | 4 +- .../test/java/kafka/log/s3/S3StorageTest.java | 2 +- .../java/kafka/log/s3/S3StreamMemoryTest.java | 2 +- .../log/s3/StreamMetadataManagerTest.java | 137 ++++++++++++++++++ .../org/apache/kafka/image/MetadataDelta.java | 1 + .../kafka/image/S3StreamMetadataImage.java | 11 +- .../kafka/image/S3StreamsMetadataImage.java | 122 ++++++---------- 10 files changed, 253 insertions(+), 104 deletions(-) create mode 100644 core/src/test/java/kafka/log/s3/StreamMetadataManagerTest.java diff --git a/core/src/main/scala/kafka/log/s3/DefaultS3Client.java b/core/src/main/scala/kafka/log/s3/DefaultS3Client.java index 664f03a3b0..97bcc6c3fe 100644 --- a/core/src/main/scala/kafka/log/s3/DefaultS3Client.java +++ b/core/src/main/scala/kafka/log/s3/DefaultS3Client.java @@ -61,7 +61,7 @@ public DefaultS3Client(BrokerServer brokerServer, KafkaConfig config, S3Operator this.requestSender = new ControllerRequestSender(brokerServer); this.streamManager = new ControllerStreamManager(this.requestSender, config); this.objectManager = new ControllerObjectManager(this.requestSender, this.metadataManager, this.config); - this.blockCache = new DefaultS3BlockCache(config.s3CacheSize() * 1024L * 1024, objectManager, operator); + this.blockCache = new DefaultS3BlockCache(config.s3CacheSize(), objectManager, operator); this.storage = new S3Storage(config, new MemoryWriteAheadLog(), objectManager, blockCache, operator); this.streamClient = new S3StreamClient(this.streamManager, this.storage); this.kvClient = new ControllerKVClient(this.requestSender); diff --git a/core/src/main/scala/kafka/log/s3/metadata/StreamMetadataManager.java b/core/src/main/scala/kafka/log/s3/metadata/StreamMetadataManager.java index 2f4254d8f6..e141814bb2 100644 --- a/core/src/main/scala/kafka/log/s3/metadata/StreamMetadataManager.java +++ b/core/src/main/scala/kafka/log/s3/metadata/StreamMetadataManager.java @@ -19,13 +19,18 @@ import io.netty.util.concurrent.DefaultThreadFactory; import java.util.ArrayList; -import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.TreeMap; +import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.stream.Collectors; +import kafka.log.es.FutureUtil; import kafka.server.BrokerServer; import kafka.server.KafkaConfig; import kafka.server.MetadataCache; @@ -46,7 +51,6 @@ public class StreamMetadataManager implements InRangeObjectsFetcher { private final CatchUpMetadataListener catchUpMetadataListener; private final Map>> pendingGetObjectsTasks; private final ExecutorService pendingExecutorService; - private final ReplayedWalObjects replayedWalObjects; public StreamMetadataManager(BrokerServer broker, KafkaConfig config) { @@ -57,7 +61,7 @@ public StreamMetadataManager(BrokerServer broker, KafkaConfig config) { this.broker.metadataListener().registerStreamMetadataListener(this.catchUpMetadataListener); this.replayedWalObjects = new ReplayedWalObjects(); // TODO: optimize by more suitable data structure for pending tasks - this.pendingGetObjectsTasks = new HashMap<>(); + this.pendingGetObjectsTasks = new ConcurrentHashMap<>(); this.pendingExecutorService = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pending-get-objects-task-executor")); } @@ -66,6 +70,18 @@ public CompletableFuture fetch(long streamId, long startOffset, return this.replayedWalObjects.fetch(streamId, startOffset, endOffset, limit); } + public void retryPendingTasks(List tasks) { + LOGGER.warn("[RetryPendingTasks]: retry tasks count: {}", tasks.size()); + tasks.forEach(task -> { + long streamId = task.streamId; + long startOffset = task.startOffset; + long endOffset = task.endOffset; + int limit = task.limit; + CompletableFuture newCf = this.fetch(streamId, startOffset, endOffset, limit); + FutureUtil.propagate(newCf, task.cf); + }); + } + static class GetObjectsTask { @@ -122,13 +138,12 @@ public CompletableFuture fetch(long streamId, long startOffset, private CompletableFuture pendingFetch(long streamId, long startOffset, long endOffset, int limit) { GetObjectsTask task = GetObjectsTask.of(streamId, startOffset, endOffset, limit); - synchronized (StreamMetadataManager.this) { - Map> tasks = StreamMetadataManager.this.pendingGetObjectsTasks.computeIfAbsent(task.streamId, - k -> new TreeMap<>()); - List getObjectsTasks = tasks.computeIfAbsent(task.endOffset, k -> new ArrayList<>()); - getObjectsTasks.add(task); - } - LOGGER.info("[PendingFetch]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and pending fetch", streamId, startOffset, endOffset, limit); + Map> tasks = StreamMetadataManager.this.pendingGetObjectsTasks.computeIfAbsent(task.streamId, + k -> new ConcurrentSkipListMap<>()); + List getObjectsTasks = tasks.computeIfAbsent(task.endOffset, k -> new ArrayList<>()); + getObjectsTasks.add(task); + LOGGER.info("[PendingFetch]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and pending fetch", streamId, startOffset, endOffset, + limit); return task.cf; } @@ -159,6 +174,43 @@ class CatchUpMetadataListener implements StreamMetadataListener { @Override public void onChange(MetadataDelta delta, MetadataImage newImage) { + // TODO: pre filter unnecessary situations + Set pendingStreams = pendingGetObjectsTasks.keySet(); + List pendingStreamsOffsetRange = pendingStreams + .stream() + .map(metadataCache::getStreamOffsetRange) + .filter(offset -> offset != StreamOffsetRange.INVALID) + .collect(Collectors.toList()); + if (pendingStreamsOffsetRange.isEmpty()) { + return; + } + List retryTasks = new ArrayList<>(); + pendingStreamsOffsetRange.forEach(offsetRange -> { + long streamId = offsetRange.getStreamId(); + long endOffset = offsetRange.getEndOffset(); + Map> tasks = StreamMetadataManager.this.pendingGetObjectsTasks.get(streamId); + if (tasks == null) { + return; + } + Iterator>> iterator = + tasks.entrySet().iterator(); + while (iterator.hasNext()) { + Entry> entry = iterator.next(); + long pendingEndOffset = entry.getKey(); + if (pendingEndOffset > endOffset) { + break; + } + iterator.remove(); + List getObjectsTasks = entry.getValue(); + retryTasks.addAll(getObjectsTasks); + } + if (tasks.isEmpty()) { + StreamMetadataManager.this.pendingGetObjectsTasks.remove(streamId); + } + }); + pendingExecutorService.submit(() -> { + retryPendingTasks(retryTasks); + }); } } diff --git a/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java b/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java index 942d0ce96c..434eceef39 100644 --- a/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java +++ b/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java @@ -20,7 +20,6 @@ import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import kafka.log.s3.metadata.StreamMetadataManager; @@ -35,7 +34,6 @@ import org.apache.kafka.common.requests.s3.PrepareS3ObjectRequest.Builder; import org.apache.kafka.metadata.stream.InRangeObjects; import org.apache.kafka.metadata.stream.S3ObjectMetadata; -import org.apache.kafka.metadata.stream.StreamOffsetRange; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index fe9730190e..c76842a1a7 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -1500,7 +1500,7 @@ object KafkaConfig { .define(S3StreamSplitSizeProp, INT, 16777216, MEDIUM, S3StreamSplitSizeDoc) .define(S3ObjectBlockSizeProp, INT, 8388608, MEDIUM, S3ObjectBlockSizeDoc) .define(S3ObjectPartSizeProp, INT, 16777216, MEDIUM, S3ObjectPartSizeDoc) - .define(S3CacheSizeProp, INT, 1024, MEDIUM, S3CacheSizeDoc) + .define(S3CacheSizeProp, LONG, 1073741824, MEDIUM, S3CacheSizeDoc) // Kafka on S3 inject end } @@ -2043,7 +2043,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami val s3StreamSplitSize = getInt(KafkaConfig.S3StreamSplitSizeProp) val s3ObjectBlockSize = getInt(KafkaConfig.S3ObjectBlockSizeProp) val s3ObjectPartSize = getInt(KafkaConfig.S3ObjectPartSizeProp) - val s3CacheSize = getInt(KafkaConfig.S3CacheSizeProp) + val s3CacheSize = getLong(KafkaConfig.S3CacheSizeProp) // Kafka on S3 inject end def addReconfigurable(reconfigurable: Reconfigurable): Unit = { diff --git a/core/src/test/java/kafka/log/s3/S3StorageTest.java b/core/src/test/java/kafka/log/s3/S3StorageTest.java index 0ef201ed48..7854ca8122 100644 --- a/core/src/test/java/kafka/log/s3/S3StorageTest.java +++ b/core/src/test/java/kafka/log/s3/S3StorageTest.java @@ -64,7 +64,7 @@ public class S3StorageTest { public void setup() { objectManager = mock(ObjectManager.class); S3Operator s3Operator = new MemoryS3Operator(); - storage = new S3Storage(KafkaConfig.fromProps(TestUtils.defaultBrokerConfig()), new MemoryWriteAheadLog(), objectManager, new DefaultS3BlockCache(0, objectManager, s3Operator), s3Operator); + storage = new S3Storage(KafkaConfig.fromProps(TestUtils.defaultBrokerConfig()), new MemoryWriteAheadLog(), objectManager, new DefaultS3BlockCache(0L, objectManager, s3Operator), s3Operator); } @Test diff --git a/core/src/test/java/kafka/log/s3/S3StreamMemoryTest.java b/core/src/test/java/kafka/log/s3/S3StreamMemoryTest.java index 6441099416..416e28de52 100644 --- a/core/src/test/java/kafka/log/s3/S3StreamMemoryTest.java +++ b/core/src/test/java/kafka/log/s3/S3StreamMemoryTest.java @@ -113,7 +113,7 @@ public void setUp() { streamManager = manager; objectManager = manager; operator = new MemoryS3Operator(); - blockCache = new DefaultS3BlockCache(0, objectManager, operator); + blockCache = new DefaultS3BlockCache(0L, objectManager, operator); storage = new S3Storage(KafkaConfig.fromProps(TestUtils.defaultBrokerConfig()), new MemoryWriteAheadLog(), objectManager, blockCache, operator); streamClient = new S3StreamClient(streamManager, storage); } diff --git a/core/src/test/java/kafka/log/s3/StreamMetadataManagerTest.java b/core/src/test/java/kafka/log/s3/StreamMetadataManagerTest.java new file mode 100644 index 0000000000..859058c877 --- /dev/null +++ b/core/src/test/java/kafka/log/s3/StreamMetadataManagerTest.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import kafka.log.s3.metadata.StreamMetadataManager; +import kafka.log.s3.metadata.StreamMetadataManager.StreamMetadataListener; +import kafka.server.BrokerServer; +import kafka.server.metadata.BrokerMetadataListener; +import kafka.server.metadata.KRaftMetadataCache; +import org.apache.kafka.metadata.stream.InRangeObjects; +import org.apache.kafka.metadata.stream.S3ObjectMetadata; +import org.apache.kafka.metadata.stream.S3ObjectType; +import org.apache.kafka.metadata.stream.StreamOffsetRange; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.mockito.Mockito; + +@Timeout(40) +@Tag("S3Unit") +public class StreamMetadataManagerTest { + + private BrokerServer mockBroker; + private KRaftMetadataCache mockMetadataCache; + private BrokerMetadataListener mockBrokerMetadataListener; + private StreamMetadataListener streamMetadataListener; + private StreamMetadataManager manager; + + @BeforeEach + public void setUp() { + this.mockBroker = Mockito.mock(BrokerServer.class); + this.mockMetadataCache = Mockito.mock(KRaftMetadataCache.class); + this.mockBrokerMetadataListener = Mockito.mock(BrokerMetadataListener.class); + Mockito.when(this.mockBroker.metadataCache()).thenReturn(this.mockMetadataCache); + Mockito.when(this.mockBroker.metadataListener()).thenReturn(this.mockBrokerMetadataListener); + Mockito.doAnswer(invocation -> { + this.streamMetadataListener = invocation.getArgument(0); + return null; + }).when(this.mockBrokerMetadataListener).registerStreamMetadataListener(any()); + this.manager = new StreamMetadataManager(this.mockBroker, null); + } + + @Test + public void testFetch() throws Exception { + Mockito.when(this.mockMetadataCache.getStreamOffsetRange(1L)).thenReturn(new StreamOffsetRange(1L, 0L, 100L)); + S3ObjectMetadata object0 = new S3ObjectMetadata(1L, 128, S3ObjectType.WAL_LOOSE); + Mockito.when(this.mockMetadataCache.getObjects(1L, 10L, 100L, 5)) + .thenReturn(new InRangeObjects(1L, 10L, 100L, List.of(object0))); + + // 1. normal fetch + CompletableFuture result = this.manager.fetch(1L, 10L, 100L, 5); + Mockito.verify(this.mockMetadataCache).getStreamOffsetRange(1L); + Mockito.verify(this.mockMetadataCache).getObjects(1L, 10L, 100L, 5); + Mockito.verifyNoMoreInteractions(this.mockMetadataCache); + InRangeObjects inRangeObjects = result.get(); + assertEquals(1L, inRangeObjects.streamId()); + assertEquals(10L, inRangeObjects.startOffset()); + assertEquals(100L, inRangeObjects.endOffset()); + assertEquals(1, inRangeObjects.objects().size()); + assertEquals(object0, inRangeObjects.objects().get(0)); + + // 2. fetch with invalid streamId + result = this.manager.fetch(2L, 0L, 100L, 5); + inRangeObjects = result.get(); + assertEquals(InRangeObjects.INVALID, inRangeObjects); + + // 5. fetch with smaller startOffset + result = this.manager.fetch(1L, 5L, 100L, 5); + inRangeObjects = result.get(); + assertEquals(InRangeObjects.INVALID, inRangeObjects); + + // 6. fetch with larger endOffset + result = this.manager.fetch(1L, 10L, 200L, 5); + CompletableFuture finalResult = result; + assertThrows(TimeoutException.class, () -> { + finalResult.get(1, TimeUnit.SECONDS); + }); + + // 7. notify the manager that streams' end offset has been advanced + Mockito.when(this.mockMetadataCache.getStreamOffsetRange(1L)).thenReturn(new StreamOffsetRange(1L, 0L, 150L)); + Mockito.when(this.mockMetadataCache.getStreamOffsetRange(2L)).thenReturn(new StreamOffsetRange(2L, 0L, 150L)); + S3ObjectMetadata object1 = new S3ObjectMetadata(2L, 128, S3ObjectType.WAL_LOOSE); + Mockito.when(this.mockMetadataCache.getObjects(1L, 10L, 150L, 5)) + .thenReturn(new InRangeObjects(1L, 10L, 200L, List.of(object0, object1))); + + streamMetadataListener.onChange(null, null); + + assertThrows(TimeoutException.class, () -> { + finalResult.get(1, TimeUnit.SECONDS); + }); + + // 8. notify with correct end offset + Mockito.when(this.mockMetadataCache.getStreamOffsetRange(1L)).thenReturn(new StreamOffsetRange(1L, 0L, 200L)); + S3ObjectMetadata object2 = new S3ObjectMetadata(3L, 128, S3ObjectType.WAL_LOOSE); + Mockito.when(this.mockMetadataCache.getObjects(1L, 10L, 200L, 5)) + .thenReturn(new InRangeObjects(1L, 10L, 200L, List.of(object0, object1, object2))); + + streamMetadataListener.onChange(null, null); + + assertDoesNotThrow(() -> { + InRangeObjects rangeObjects = finalResult.get(1, TimeUnit.SECONDS); + assertEquals(1L, rangeObjects.streamId()); + assertEquals(10L, rangeObjects.startOffset()); + assertEquals(200L, rangeObjects.endOffset()); + assertEquals(3, rangeObjects.objects().size()); + assertEquals(object0, rangeObjects.objects().get(0)); + assertEquals(object1, rangeObjects.objects().get(1)); + assertEquals(object2, rangeObjects.objects().get(2)); + }); + + } +} diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java index db97072b87..e6dc1c0f22 100644 --- a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java @@ -584,6 +584,7 @@ public String toString() { ", aclsDelta=" + aclsDelta + ", streamMetadataDelta=" + s3StreamsMetadataDelta + ", objectsMetadataDelta=" + s3ObjectsDelta + + ", kvDelta=" + kvDelta + ')'; } } diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java index 8f69264a61..c547212e37 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java @@ -92,6 +92,11 @@ public long getStartOffset() { return startOffset; } + public long getEndOffset() { + RangeMetadata range = ranges.get(rangeIndex); + return range == null ? startOffset : range.endOffset(); + } + public long getStreamId() { return streamId; } @@ -109,11 +114,7 @@ public StreamState state() { } public StreamOffsetRange offsetRange() { - long endOffset = startOffset; - if (ranges.containsKey(rangeIndex)) { - endOffset = ranges.get(rangeIndex).endOffset(); - } - return new StreamOffsetRange(streamId, startOffset, endOffset); + return new StreamOffsetRange(streamId, startOffset, getEndOffset()); } @Override diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java index 9dea157fac..11843559b1 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java @@ -20,9 +20,11 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Queue; import java.util.stream.Collectors; import org.apache.kafka.common.metadata.AssignedStreamIdRecord; import org.apache.kafka.image.writer.ImageWriter; @@ -133,16 +135,9 @@ public RangeSearcher(long startOffset, long endOffset, long streamId, int broker this.brokerId = brokerId; } - @SuppressWarnings("all") - public InRangeObjects getObjects(int limit) { - if (limit <= 0) { - return InRangeObjects.INVALID; - } + private Queue rangeOfWalObjects() { BrokerS3WALMetadataImage wal = brokerWALMetadata.get(brokerId); - if (wal == null) { - return InRangeObjects.INVALID; - } - List walObjects = wal.getWalObjects().list().stream() + return wal.getWalObjects().list().stream() .filter(obj -> obj.streamsIndex().containsKey(streamId) && obj.streamsIndex().get(streamId).size() != 0) .flatMap(obj -> { List indexes = obj.streamsIndex().get(streamId); @@ -156,16 +151,15 @@ public InRangeObjects getObjects(int limit) { long endOffset = index.getEndOffset(); return new ObjectStreamRange(obj.objectId(), obj.objectType(), startOffset, endOffset); }); - }).collect(Collectors.toList()); + }).collect(Collectors.toCollection(LinkedList::new)); + } + + private Queue rangeOfStreamObjects() { S3StreamMetadataImage stream = streamsMetadata.get(streamId); - if (stream == null) { - return InRangeObjects.INVALID; - } - List streamObjects = new ArrayList<>(); Map streamObjectsMetadata = stream.getStreamObjects(); // TODO: refactor to make stream objects in order if (streamObjectsMetadata != null && !streamObjectsMetadata.isEmpty()) { - List streamObjs = streamObjectsMetadata.values().stream().filter(obj -> { + return streamObjectsMetadata.values().stream().filter(obj -> { long objectStartOffset = obj.streamIndex().getStartOffset(); long objectEndOffset = obj.streamIndex().getEndOffset(); return objectStartOffset < endOffset && objectEndOffset > startOffset; @@ -174,82 +168,48 @@ public InRangeObjects getObjects(int limit) { public int compare(S3StreamObject o1, S3StreamObject o2) { return o1.objectId() > o2.objectId() ? 1 : -1; } - }).collect(Collectors.toList()); - streamObjects.addAll( - streamObjs.stream().map( - obj -> { - long startOffset = obj.streamIndex().getStartOffset(); - long endOffset = obj.streamIndex().getEndOffset(); - return new ObjectStreamRange(obj.objectId(), obj.objectType(), startOffset, endOffset); - }).collect(Collectors.toList())); + }).map(obj -> { + long startOffset = obj.streamIndex().getStartOffset(); + long endOffset = obj.streamIndex().getEndOffset(); + return new ObjectStreamRange(obj.objectId(), obj.objectType(), startOffset, endOffset); + }).collect(Collectors.toCollection(LinkedList::new)); + } + return new LinkedList<>(); + } + + public InRangeObjects getObjects(int limit) { + if (limit <= 0) { + return InRangeObjects.INVALID; + } + if (!brokerWALMetadata.containsKey(brokerId) || !streamsMetadata.containsKey(streamId)) { + return InRangeObjects.INVALID; } + + Queue streamObjects = rangeOfStreamObjects(); + Queue walObjects = rangeOfWalObjects(); List inRangeObjects = new ArrayList<>(); - int walIndex = 0; - int streamIndex = 0; long nextStartOffset = startOffset; + while (limit > 0 && nextStartOffset < endOffset - && (walIndex < walObjects.size() || streamIndex < streamObjects.size())) { - - if (walIndex >= walObjects.size()) { - // only stream objects left - ObjectStreamRange streamRange = streamObjects.get(streamIndex++); - long objectStartOffset = streamRange.startOffset(); - long objectEndOffset = streamRange.endOffset(); - if (objectStartOffset > nextStartOffset) { - break; - } - if (objectEndOffset <= nextStartOffset) { - continue; - } - inRangeObjects.add(streamRange.toS3ObjectMetadata()); - limit--; - nextStartOffset = objectEndOffset; - continue; + && (!streamObjects.isEmpty() || !walObjects.isEmpty())) { + ObjectStreamRange streamRange = null; + if (walObjects.isEmpty() || (!streamObjects.isEmpty() && streamObjects.peek().startOffset() < walObjects.peek().startOffset())) { + streamRange = streamObjects.poll(); + } else { + streamRange = walObjects.poll(); } - - if (streamIndex >= streamObjects.size()) { - // only wal objects left - ObjectStreamRange walRange = walObjects.get(walIndex++); - long objectStartOffset = walRange.startOffset(); - long objectEndOffset = walRange.endOffset(); - if (objectStartOffset > nextStartOffset) { - break; - } - if (objectEndOffset <= nextStartOffset) { - continue; - } - inRangeObjects.add(walRange.toS3ObjectMetadata()); - limit--; - nextStartOffset = objectEndOffset; - continue; - } - - ObjectStreamRange walRange = walObjects.get(walIndex); - ObjectStreamRange streamRange = streamObjects.get(streamIndex); - long walObjectStartOffset = walRange.startOffset(); - long walObjectEndOffset = walRange.endOffset(); - long streamObjectStartOffset = streamRange.startOffset(); - long streamObjectEndOffset = streamRange.endOffset(); - if (walObjectStartOffset > nextStartOffset && streamObjectStartOffset > nextStartOffset) { - // both start offset are greater than nextStartOffset + long objectStartOffset = streamRange.startOffset(); + long objectEndOffset = streamRange.endOffset(); + if (objectStartOffset > nextStartOffset) { break; } - if (walObjectStartOffset <= nextStartOffset && walObjectEndOffset > nextStartOffset) { - // wal object contains nextStartOffset - inRangeObjects.add(walRange.toS3ObjectMetadata()); - limit--; - nextStartOffset = walObjectEndOffset; - walIndex++; + if (objectEndOffset <= nextStartOffset) { continue; } - if (streamObjectStartOffset <= nextStartOffset && streamObjectEndOffset > nextStartOffset) { - // stream object contains nextStartOffset - inRangeObjects.add(streamRange.toS3ObjectMetadata()); - limit--; - nextStartOffset = streamObjectEndOffset; - streamIndex++; - } + inRangeObjects.add(streamRange.toS3ObjectMetadata()); + limit--; + nextStartOffset = objectEndOffset; } return new InRangeObjects(streamId, startOffset, nextStartOffset, inRangeObjects); } From 9533b2a05b76a4bd6e7dadad75c053577624e3c0 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Wed, 6 Sep 2023 14:33:59 +0800 Subject: [PATCH 3/4] feat(s3): more suitable log level 1. more suitable log level Signed-off-by: TheR1sing3un --- .../log/s3/metadata/StreamMetadataManager.java | 14 +++----------- .../kafka/metadata/stream/InRangeObjects.java | 10 ++++++++++ 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/kafka/log/s3/metadata/StreamMetadataManager.java b/core/src/main/scala/kafka/log/s3/metadata/StreamMetadataManager.java index e141814bb2..5fb8f90d04 100644 --- a/core/src/main/scala/kafka/log/s3/metadata/StreamMetadataManager.java +++ b/core/src/main/scala/kafka/log/s3/metadata/StreamMetadataManager.java @@ -71,7 +71,7 @@ public CompletableFuture fetch(long streamId, long startOffset, } public void retryPendingTasks(List tasks) { - LOGGER.warn("[RetryPendingTasks]: retry tasks count: {}", tasks.size()); + LOGGER.info("[RetryPendingTasks]: retry tasks count: {}", tasks.size()); tasks.forEach(task -> { long streamId = task.streamId; long startOffset = task.startOffset; @@ -104,14 +104,6 @@ private GetObjectsTask(CompletableFuture cf, long streamId, long this.endOffset = endOffset; this.limit = limit; } - - void complete(InRangeObjects objects) { - cf.complete(objects); - } - - void completeExceptionally(Throwable ex) { - cf.completeExceptionally(ex); - } } class ReplayedWalObjects implements InRangeObjectsFetcher { @@ -142,7 +134,7 @@ private CompletableFuture pendingFetch(long streamId, long start k -> new ConcurrentSkipListMap<>()); List getObjectsTasks = tasks.computeIfAbsent(task.endOffset, k -> new ArrayList<>()); getObjectsTasks.add(task); - LOGGER.info("[PendingFetch]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and pending fetch", streamId, startOffset, endOffset, + LOGGER.warn("[PendingFetch]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and pending fetch", streamId, startOffset, endOffset, limit); return task.cf; } @@ -156,7 +148,7 @@ private CompletableFuture fetch0(long streamId, long startOffset streamId, startOffset, endOffset, limit); return CompletableFuture.completedFuture(InRangeObjects.INVALID); } - LOGGER.info( + LOGGER.trace( "[GetObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache success with result: {}", streamId, startOffset, endOffset, limit, cachedInRangeObjects); return CompletableFuture.completedFuture(cachedInRangeObjects); diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/InRangeObjects.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/InRangeObjects.java index 400b18a59f..1be44e5d11 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/InRangeObjects.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/InRangeObjects.java @@ -52,6 +52,16 @@ public List objects() { return objects; } + @Override + public String toString() { + return "InRangeObjects{" + + "streamId=" + streamId + + ", startOffset=" + startOffset + + ", endOffset=" + endOffset + + ", objects=" + objects + + '}'; + } + @Override public boolean equals(Object o) { if (this == o) { From 8ef325825f2639026d6b916df095a58f77482c97 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Wed, 6 Sep 2023 22:36:17 +0800 Subject: [PATCH 4/4] fix(s3): add more concurrent protection 1. add more concurrent protection Signed-off-by: TheR1sing3un --- checkstyle/import-control-core.xml | 1 + .../s3/metadata/StreamMetadataManager.java | 252 ++++++++++-------- .../scala/kafka/server/MetadataCache.scala | 8 - .../server/metadata/KRaftMetadataCache.scala | 30 --- .../server/metadata/ZkMetadataCache.scala | 15 -- .../log/s3/StreamMetadataManagerTest.java | 119 +++++++-- .../kafka/image/S3StreamsMetadataImage.java | 12 +- .../kafka/metadata/stream/RangeMetadata.java | 4 +- .../metadata/stream/S3ObjectMetadata.java | 19 ++ 9 files changed, 268 insertions(+), 192 deletions(-) diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml index 6ae8d94dd4..dcf50cc2c2 100644 --- a/checkstyle/import-control-core.xml +++ b/checkstyle/import-control-core.xml @@ -59,6 +59,7 @@ + diff --git a/core/src/main/scala/kafka/log/s3/metadata/StreamMetadataManager.java b/core/src/main/scala/kafka/log/s3/metadata/StreamMetadataManager.java index 5fb8f90d04..862ab0d43e 100644 --- a/core/src/main/scala/kafka/log/s3/metadata/StreamMetadataManager.java +++ b/core/src/main/scala/kafka/log/s3/metadata/StreamMetadataManager.java @@ -19,25 +19,31 @@ import io.netty.util.concurrent.DefaultThreadFactory; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.stream.Collectors; import kafka.log.es.FutureUtil; import kafka.server.BrokerServer; import kafka.server.KafkaConfig; -import kafka.server.MetadataCache; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.S3ObjectsImage; +import org.apache.kafka.image.S3StreamMetadataImage; +import org.apache.kafka.image.S3StreamsMetadataImage; import org.apache.kafka.metadata.stream.InRangeObjects; +import org.apache.kafka.metadata.stream.S3Object; +import org.apache.kafka.metadata.stream.S3ObjectMetadata; import org.apache.kafka.metadata.stream.StreamOffsetRange; +import org.apache.kafka.raft.OffsetAndEpoch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,27 +53,153 @@ public class StreamMetadataManager implements InRangeObjectsFetcher { private final static Logger LOGGER = LoggerFactory.getLogger(StreamMetadataManager.class); private final KafkaConfig config; private final BrokerServer broker; - private final MetadataCache metadataCache; - private final CatchUpMetadataListener catchUpMetadataListener; private final Map>> pendingGetObjectsTasks; private final ExecutorService pendingExecutorService; - private final ReplayedWalObjects replayedWalObjects; + // TODO: we just need the version of streams metadata, not the whole image + private volatile OffsetAndEpoch version; + private S3StreamsMetadataImage streamsImage; + private S3ObjectsImage objectsImage; public StreamMetadataManager(BrokerServer broker, KafkaConfig config) { this.config = config; this.broker = broker; - this.metadataCache = broker.metadataCache(); - this.catchUpMetadataListener = new CatchUpMetadataListener(); - this.broker.metadataListener().registerStreamMetadataListener(this.catchUpMetadataListener); - this.replayedWalObjects = new ReplayedWalObjects(); + MetadataImage currentImage = this.broker.metadataCache().currentImage(); + this.streamsImage = currentImage.streamsMetadata(); + this.objectsImage = currentImage.objectsMetadata(); + this.version = currentImage.highestOffsetAndEpoch(); + this.broker.metadataListener().registerStreamMetadataListener(this::onImageChanged); // TODO: optimize by more suitable data structure for pending tasks - this.pendingGetObjectsTasks = new ConcurrentHashMap<>(); + this.pendingGetObjectsTasks = new HashMap<>(); this.pendingExecutorService = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pending-get-objects-task-executor")); } + private void onImageChanged(MetadataDelta delta, MetadataImage newImage) { + if (newImage.highestOffsetAndEpoch().equals(this.version)) { + return; + } + synchronized (this) { + // update version + this.version = newImage.highestOffsetAndEpoch(); + // update image + this.streamsImage = newImage.streamsMetadata(); + this.objectsImage = newImage.objectsMetadata(); + // remove all catch up pending tasks + List retryTasks = removePendingTasks(); + // retry all pending tasks + this.pendingExecutorService.submit(() -> { + retryPendingTasks(retryTasks); + }); + } + } + + // must access thread safe + private List removePendingTasks() { + if (this.pendingGetObjectsTasks == null || this.pendingGetObjectsTasks.isEmpty()) { + return Collections.emptyList(); + } + Set pendingStreams = pendingGetObjectsTasks.keySet(); + List pendingStreamsOffsetRange = pendingStreams + .stream() + .map(streamsImage::offsetRange) + .filter(offset -> offset != StreamOffsetRange.INVALID) + .collect(Collectors.toList()); + if (pendingStreamsOffsetRange.isEmpty()) { + return Collections.emptyList(); + } + List retryTasks = new ArrayList<>(); + pendingStreamsOffsetRange.forEach(offsetRange -> { + long streamId = offsetRange.getStreamId(); + long endOffset = offsetRange.getEndOffset(); + Map> tasks = StreamMetadataManager.this.pendingGetObjectsTasks.get(streamId); + if (tasks == null || tasks.isEmpty()) { + return; + } + Iterator>> iterator = + tasks.entrySet().iterator(); + while (iterator.hasNext()) { + Entry> entry = iterator.next(); + long pendingEndOffset = entry.getKey(); + if (pendingEndOffset > endOffset) { + break; + } + iterator.remove(); + List getObjectsTasks = entry.getValue(); + retryTasks.addAll(getObjectsTasks); + } + if (tasks.isEmpty()) { + StreamMetadataManager.this.pendingGetObjectsTasks.remove(streamId); + } + }); + return retryTasks; + } + @Override public CompletableFuture fetch(long streamId, long startOffset, long endOffset, int limit) { - return this.replayedWalObjects.fetch(streamId, startOffset, endOffset, limit); + synchronized (StreamMetadataManager.this) { + S3StreamMetadataImage streamImage = streamsImage.streamsMetadata().get(streamId); + if (streamImage == null) { + LOGGER.warn( + "[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and streamImage is null", + streamId, startOffset, endOffset, limit); + return CompletableFuture.completedFuture(InRangeObjects.INVALID); + } + StreamOffsetRange offsetRange = streamImage.offsetRange(); + if (offsetRange == null || offsetRange == StreamOffsetRange.INVALID) { + return CompletableFuture.completedFuture(InRangeObjects.INVALID); + } + long streamStartOffset = offsetRange.getStartOffset(); + long streamEndOffset = offsetRange.getEndOffset(); + if (startOffset < streamStartOffset) { + LOGGER.warn( + "[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and startOffset < streamStartOffset: {}", + streamId, startOffset, endOffset, limit, streamStartOffset); + return CompletableFuture.completedFuture(InRangeObjects.INVALID); + } + if (endOffset > streamEndOffset) { + // lag behind, need to wait for cache catch up + return pendingFetch(streamId, startOffset, endOffset, limit); + } + return fetch0(streamId, startOffset, endOffset, limit); + } + } + + // must access thread safe + private CompletableFuture pendingFetch(long streamId, long startOffset, long endOffset, int limit) { + GetObjectsTask task = GetObjectsTask.of(streamId, startOffset, endOffset, limit); + Map> tasks = StreamMetadataManager.this.pendingGetObjectsTasks.computeIfAbsent(task.streamId, + k -> new TreeMap<>()); + List getObjectsTasks = tasks.computeIfAbsent(task.endOffset, k -> new ArrayList<>()); + getObjectsTasks.add(task); + LOGGER.warn("[PendingFetch]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and pending fetch", streamId, startOffset, endOffset, + limit); + return task.cf; + } + + // must access thread safe + private CompletableFuture fetch0(long streamId, long startOffset, long endOffset, int limit) { + InRangeObjects cachedInRangeObjects = streamsImage.getObjects(streamId, startOffset, endOffset, limit); + if (cachedInRangeObjects == null || cachedInRangeObjects == InRangeObjects.INVALID) { + LOGGER.warn( + "[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache failed with empty result", + streamId, startOffset, endOffset, limit); + return CompletableFuture.completedFuture(InRangeObjects.INVALID); + } + // fill the objects' size + for (S3ObjectMetadata object : cachedInRangeObjects.objects()) { + S3Object objectMetadata = objectsImage.getObjectMetadata(object.getObjectId()); + if (objectMetadata == null) { + // should not happen + LOGGER.error( + "[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache failed with empty result", + streamId, startOffset, endOffset, limit); + return CompletableFuture.completedFuture(InRangeObjects.INVALID); + } + object.setObjectSize(objectMetadata.getObjectSize()); + } + LOGGER.trace( + "[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache success with result: {}", + streamId, startOffset, endOffset, limit, cachedInRangeObjects); + return CompletableFuture.completedFuture(cachedInRangeObjects); } public void retryPendingTasks(List tasks) { @@ -82,7 +214,6 @@ public void retryPendingTasks(List tasks) { }); } - static class GetObjectsTask { private final CompletableFuture cf; @@ -106,104 +237,9 @@ private GetObjectsTask(CompletableFuture cf, long streamId, long } } - class ReplayedWalObjects implements InRangeObjectsFetcher { - - @Override - public CompletableFuture fetch(long streamId, long startOffset, long endOffset, int limit) { - StreamOffsetRange offsetRange = StreamMetadataManager.this.metadataCache.getStreamOffsetRange(streamId); - if (offsetRange == null || offsetRange == StreamOffsetRange.INVALID) { - return CompletableFuture.completedFuture(InRangeObjects.INVALID); - } - long streamStartOffset = offsetRange.getStartOffset(); - long streamEndOffset = offsetRange.getEndOffset(); - if (startOffset < streamStartOffset) { - LOGGER.warn("[ReplayedWalObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and startOffset < streamStartOffset: {}", - streamId, startOffset, endOffset, limit, streamStartOffset); - return CompletableFuture.completedFuture(InRangeObjects.INVALID); - } - if (endOffset > streamEndOffset) { - // lag behind, need to wait for cache catch up - return pendingFetch(streamId, startOffset, endOffset, limit); - } - return fetch0(streamId, startOffset, endOffset, limit); - } - - private CompletableFuture pendingFetch(long streamId, long startOffset, long endOffset, int limit) { - GetObjectsTask task = GetObjectsTask.of(streamId, startOffset, endOffset, limit); - Map> tasks = StreamMetadataManager.this.pendingGetObjectsTasks.computeIfAbsent(task.streamId, - k -> new ConcurrentSkipListMap<>()); - List getObjectsTasks = tasks.computeIfAbsent(task.endOffset, k -> new ArrayList<>()); - getObjectsTasks.add(task); - LOGGER.warn("[PendingFetch]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and pending fetch", streamId, startOffset, endOffset, - limit); - return task.cf; - } - - private CompletableFuture fetch0(long streamId, long startOffset, long endOffset, int limit) { - // only search in cache - InRangeObjects cachedInRangeObjects = StreamMetadataManager.this.metadataCache.getObjects(streamId, startOffset, endOffset, limit); - if (cachedInRangeObjects == null || cachedInRangeObjects == InRangeObjects.INVALID) { - LOGGER.warn( - "[GetObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache failed with empty result", - streamId, startOffset, endOffset, limit); - return CompletableFuture.completedFuture(InRangeObjects.INVALID); - } - LOGGER.trace( - "[GetObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache success with result: {}", - streamId, startOffset, endOffset, limit, cachedInRangeObjects); - return CompletableFuture.completedFuture(cachedInRangeObjects); - } - - - } - public interface StreamMetadataListener { void onChange(MetadataDelta delta, MetadataImage image); } - class CatchUpMetadataListener implements StreamMetadataListener { - - @Override - public void onChange(MetadataDelta delta, MetadataImage newImage) { - // TODO: pre filter unnecessary situations - Set pendingStreams = pendingGetObjectsTasks.keySet(); - List pendingStreamsOffsetRange = pendingStreams - .stream() - .map(metadataCache::getStreamOffsetRange) - .filter(offset -> offset != StreamOffsetRange.INVALID) - .collect(Collectors.toList()); - if (pendingStreamsOffsetRange.isEmpty()) { - return; - } - List retryTasks = new ArrayList<>(); - pendingStreamsOffsetRange.forEach(offsetRange -> { - long streamId = offsetRange.getStreamId(); - long endOffset = offsetRange.getEndOffset(); - Map> tasks = StreamMetadataManager.this.pendingGetObjectsTasks.get(streamId); - if (tasks == null) { - return; - } - Iterator>> iterator = - tasks.entrySet().iterator(); - while (iterator.hasNext()) { - Entry> entry = iterator.next(); - long pendingEndOffset = entry.getKey(); - if (pendingEndOffset > endOffset) { - break; - } - iterator.remove(); - List getObjectsTasks = entry.getValue(); - retryTasks.addAll(getObjectsTasks); - } - if (tasks.isEmpty()) { - StreamMetadataManager.this.pendingGetObjectsTasks.remove(streamId); - } - }); - pendingExecutorService.submit(() -> { - retryPendingTasks(retryTasks); - }); - } - } - } diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index a10bcad865..1204270ef6 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -22,7 +22,6 @@ import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache} import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.{Cluster, Node, TopicPartition, Uuid} -import org.apache.kafka.metadata.stream.{InRangeObjects, S3Object, StreamOffsetRange} import org.apache.kafka.server.common.MetadataVersion import java.util @@ -118,13 +117,6 @@ trait MetadataCache { def getRandomAliveBrokerId: Option[Int] - // Kafka on S3 inject start - def getObjects(streamId: Long, startOffset: Long, endOffset: Long, limit: Int): InRangeObjects - - def getObjectMetadata(objectId: Long): S3Object - - def getStreamOffsetRange(streamId: Long): StreamOffsetRange - // Kafka on S3 inject end } object MetadataCache { diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala index 3a4cfc2daa..7e6ad7bfd0 100644 --- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala +++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala @@ -35,7 +35,6 @@ import java.util.concurrent.ThreadLocalRandom import kafka.admin.BrokerMetadata import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.message.{DescribeClientQuotasRequestData, DescribeClientQuotasResponseData} -import org.apache.kafka.metadata.stream.{InRangeObjects, S3Object, StreamOffsetRange} import org.apache.kafka.metadata.{PartitionRegistration, Replicas} import org.apache.kafka.server.common.MetadataVersion @@ -393,33 +392,4 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w features.toMap, image.highestOffsetAndEpoch().offset) } - - // Kafka on S3 inject start - override def getObjects(streamId: Long, startOffset: Long, endOffset: Long, limit: Int): InRangeObjects = { - val image = _currentImage - val inRangeObjects = image.streamsMetadata().getObjects(streamId, startOffset, endOffset, limit) - inRangeObjects.objects().forEach(obj => { - val objMetadata = image.objectsMetadata().getObjectMetadata(obj.getObjectId) - Option(objMetadata) match { - case None => throw new RuntimeException(s"Object metadata not found for object ${obj.getObjectId}") - case Some(metadata) => obj.setObjectSize(metadata.getObjectSize) - } - }) - inRangeObjects - } - - override def getObjectMetadata(objectId: Long): S3Object = { - val image = _currentImage - image.objectsMetadata().getObjectMetadata(objectId) - } - - override def getStreamOffsetRange(streamId: Long): StreamOffsetRange = { - val image = _currentImage - Option(image.streamsMetadata().streamsMetadata().get(streamId)) match { - case None => StreamOffsetRange.INVALID - case Some(metadata) => - metadata.offsetRange() - } - } - // Kafka on S3 inject end } diff --git a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala index d7e6f0ac2e..feaaf1c43f 100755 --- a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala +++ b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala @@ -40,7 +40,6 @@ import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{ApiVersionsResponse, MetadataResponse, UpdateMetadataRequest} import org.apache.kafka.common.security.auth.SecurityProtocol -import org.apache.kafka.metadata.stream.{InRangeObjects, S3Object, StreamOffsetRange} import org.apache.kafka.server.common.MetadataVersion import java.util.concurrent.{ThreadLocalRandom, TimeUnit} @@ -585,18 +584,4 @@ class ZkMetadataCache( def getFeatureOption: Option[FinalizedFeaturesAndEpoch] = { featuresAndEpoch } - - // Kafka on S3 inject start - override def getObjects(streamId: Long, startOffset: Long, endOffset: Long, limit: Int): InRangeObjects = { - throw new UnsupportedOperationException("getObjects is not supported in ZkMetadataCache") - } - - override def getObjectMetadata(objectId: Long): S3Object = { - throw new UnsupportedOperationException("getObjectMetadata is not supported in ZkMetadataCache") - } - - override def getStreamOffsetRange(streamId: Long): StreamOffsetRange = { - throw new UnsupportedOperationException("getStreamOffsetRange is not supported in ZkMetadataCache") - } - // Kafka on S3 inject end } diff --git a/core/src/test/java/kafka/log/s3/StreamMetadataManagerTest.java b/core/src/test/java/kafka/log/s3/StreamMetadataManagerTest.java index 859058c877..cff1f79994 100644 --- a/core/src/test/java/kafka/log/s3/StreamMetadataManagerTest.java +++ b/core/src/test/java/kafka/log/s3/StreamMetadataManagerTest.java @@ -22,7 +22,8 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; -import java.util.List; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -31,10 +32,20 @@ import kafka.server.BrokerServer; import kafka.server.metadata.BrokerMetadataListener; import kafka.server.metadata.KRaftMetadataCache; +import org.apache.kafka.image.BrokerS3WALMetadataImage; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.MetadataProvenance; +import org.apache.kafka.image.S3ObjectsImage; +import org.apache.kafka.image.S3StreamMetadataImage; +import org.apache.kafka.image.S3StreamsMetadataImage; import org.apache.kafka.metadata.stream.InRangeObjects; +import org.apache.kafka.metadata.stream.RangeMetadata; +import org.apache.kafka.metadata.stream.S3Object; import org.apache.kafka.metadata.stream.S3ObjectMetadata; +import org.apache.kafka.metadata.stream.S3ObjectState; import org.apache.kafka.metadata.stream.S3ObjectType; -import org.apache.kafka.metadata.stream.StreamOffsetRange; +import org.apache.kafka.metadata.stream.S3StreamObject; +import org.apache.kafka.metadata.stream.StreamState; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -45,6 +56,11 @@ @Tag("S3Unit") public class StreamMetadataManagerTest { + private static final int BROKER0 = 0; + private static final int BROKER1 = 1; + private static final long STREAM0 = 0; + private static final long STREAM1 = 1; + private BrokerServer mockBroker; private KRaftMetadataCache mockMetadataCache; private BrokerMetadataListener mockBrokerMetadataListener; @@ -62,69 +78,115 @@ public void setUp() { this.streamMetadataListener = invocation.getArgument(0); return null; }).when(this.mockBrokerMetadataListener).registerStreamMetadataListener(any()); + Mockito.when(this.mockMetadataCache.currentImage()).thenReturn(MetadataImage.EMPTY); this.manager = new StreamMetadataManager(this.mockBroker, null); } + private static MetadataImage image0; + private static MetadataImage image1; + private static MetadataImage image2; + + static { + S3ObjectsImage objectsImage = new S3ObjectsImage(2L, Map.of( + 0L, new S3Object(0L, 128, null, -1, -1, -1, -1, S3ObjectState.COMMITTED), + 1L, new S3Object(1L, 128, null, -1, -1, -1, -1, S3ObjectState.COMMITTED), + 2L, new S3Object(2L, 128, null, -1, -1, -1, -1, S3ObjectState.COMMITTED) + )); + + Map ranges = Map.of( + 0, new RangeMetadata(STREAM0, 0L, 0, 10L, 100L, BROKER0) + ); + Map streamObjects = Map.of( + 0L, new S3StreamObject(0L, 128, STREAM0, 10L, 100L)); + S3StreamMetadataImage streamImage = new S3StreamMetadataImage(STREAM0, 1L, StreamState.OPENED, 0, 10L, ranges, streamObjects); + S3StreamsMetadataImage streamsImage = new S3StreamsMetadataImage(STREAM0, Map.of(STREAM0, streamImage), + Map.of(BROKER0, BrokerS3WALMetadataImage.EMPTY)); + image0 = new MetadataImage(new MetadataProvenance(0, 0, 0), null, null, null, null, null, null, null, streamsImage, objectsImage, null); + + ranges = new HashMap<>(ranges); + ranges.put(1, new RangeMetadata(STREAM0, 1L, 1, 100L, 150L, BROKER0)); + streamObjects = new HashMap<>(streamObjects); + streamObjects.put(1L, new S3StreamObject(1L, 128, STREAM0, 100L, 150L)); + streamImage = new S3StreamMetadataImage(STREAM0, 2L, StreamState.OPENED, 1, 10L, ranges, streamObjects); + streamsImage = new S3StreamsMetadataImage(STREAM0, Map.of(STREAM0, streamImage), + Map.of(BROKER0, BrokerS3WALMetadataImage.EMPTY)); + image1 = new MetadataImage(new MetadataProvenance(1, 1, 1), null, null, null, null, null, null, null, streamsImage, objectsImage, null); + + ranges = new HashMap<>(ranges); + ranges.put(2, new RangeMetadata(STREAM0, 2L, 2, 150L, 200L, BROKER0)); + streamObjects = new HashMap<>(streamObjects); + streamObjects.put(2L, new S3StreamObject(2L, 128, STREAM0, 150L, 200L)); + streamImage = new S3StreamMetadataImage(STREAM0, 3L, StreamState.OPENED, 2, 10L, ranges, streamObjects); + streamsImage = new S3StreamsMetadataImage(STREAM0, Map.of(STREAM0, streamImage), + Map.of(BROKER0, BrokerS3WALMetadataImage.EMPTY)); + image2 = new MetadataImage(new MetadataProvenance(2, 2, 2), null, null, null, null, null, null, null, streamsImage, objectsImage, null); + } + @Test public void testFetch() throws Exception { - Mockito.when(this.mockMetadataCache.getStreamOffsetRange(1L)).thenReturn(new StreamOffsetRange(1L, 0L, 100L)); - S3ObjectMetadata object0 = new S3ObjectMetadata(1L, 128, S3ObjectType.WAL_LOOSE); - Mockito.when(this.mockMetadataCache.getObjects(1L, 10L, 100L, 5)) - .thenReturn(new InRangeObjects(1L, 10L, 100L, List.of(object0))); + S3ObjectMetadata object0 = new S3ObjectMetadata(0L, 128, S3ObjectType.STREAM); + S3ObjectMetadata object1 = new S3ObjectMetadata(1L, 128, S3ObjectType.STREAM); + S3ObjectMetadata object2 = new S3ObjectMetadata(2L, 128, S3ObjectType.STREAM); + + this.streamMetadataListener.onChange(null, image0); // 1. normal fetch - CompletableFuture result = this.manager.fetch(1L, 10L, 100L, 5); - Mockito.verify(this.mockMetadataCache).getStreamOffsetRange(1L); - Mockito.verify(this.mockMetadataCache).getObjects(1L, 10L, 100L, 5); - Mockito.verifyNoMoreInteractions(this.mockMetadataCache); + CompletableFuture result = this.manager.fetch(STREAM0, 10L, 100L, 5); InRangeObjects inRangeObjects = result.get(); - assertEquals(1L, inRangeObjects.streamId()); + assertEquals(STREAM0, inRangeObjects.streamId()); assertEquals(10L, inRangeObjects.startOffset()); assertEquals(100L, inRangeObjects.endOffset()); assertEquals(1, inRangeObjects.objects().size()); assertEquals(object0, inRangeObjects.objects().get(0)); // 2. fetch with invalid streamId - result = this.manager.fetch(2L, 0L, 100L, 5); + result = this.manager.fetch(STREAM1, 0L, 100L, 5); inRangeObjects = result.get(); assertEquals(InRangeObjects.INVALID, inRangeObjects); + // 3. fetch with larger startOffset + result = this.manager.fetch(STREAM0, 20L, 100L, 5); + inRangeObjects = result.get(); + assertEquals(STREAM0, inRangeObjects.streamId()); + assertEquals(20L, inRangeObjects.startOffset()); + assertEquals(100L, inRangeObjects.endOffset()); + assertEquals(1, inRangeObjects.objects().size()); + assertEquals(object0, inRangeObjects.objects().get(0)); + + // 4. fetch with smaller endOffset + result = this.manager.fetch(STREAM0, 10L, 50L, 5); + inRangeObjects = result.get(); + assertEquals(STREAM0, inRangeObjects.streamId()); + assertEquals(10L, inRangeObjects.startOffset()); + assertEquals(100L, inRangeObjects.endOffset()); + assertEquals(1, inRangeObjects.objects().size()); + assertEquals(object0, inRangeObjects.objects().get(0)); + // 5. fetch with smaller startOffset - result = this.manager.fetch(1L, 5L, 100L, 5); + result = this.manager.fetch(STREAM0, 5L, 100L, 5); inRangeObjects = result.get(); assertEquals(InRangeObjects.INVALID, inRangeObjects); // 6. fetch with larger endOffset - result = this.manager.fetch(1L, 10L, 200L, 5); + result = this.manager.fetch(STREAM0, 10L, 200L, 5); CompletableFuture finalResult = result; assertThrows(TimeoutException.class, () -> { finalResult.get(1, TimeUnit.SECONDS); }); // 7. notify the manager that streams' end offset has been advanced - Mockito.when(this.mockMetadataCache.getStreamOffsetRange(1L)).thenReturn(new StreamOffsetRange(1L, 0L, 150L)); - Mockito.when(this.mockMetadataCache.getStreamOffsetRange(2L)).thenReturn(new StreamOffsetRange(2L, 0L, 150L)); - S3ObjectMetadata object1 = new S3ObjectMetadata(2L, 128, S3ObjectType.WAL_LOOSE); - Mockito.when(this.mockMetadataCache.getObjects(1L, 10L, 150L, 5)) - .thenReturn(new InRangeObjects(1L, 10L, 200L, List.of(object0, object1))); - - streamMetadataListener.onChange(null, null); + streamMetadataListener.onChange(null, image1); assertThrows(TimeoutException.class, () -> { finalResult.get(1, TimeUnit.SECONDS); }); // 8. notify with correct end offset - Mockito.when(this.mockMetadataCache.getStreamOffsetRange(1L)).thenReturn(new StreamOffsetRange(1L, 0L, 200L)); - S3ObjectMetadata object2 = new S3ObjectMetadata(3L, 128, S3ObjectType.WAL_LOOSE); - Mockito.when(this.mockMetadataCache.getObjects(1L, 10L, 200L, 5)) - .thenReturn(new InRangeObjects(1L, 10L, 200L, List.of(object0, object1, object2))); - - streamMetadataListener.onChange(null, null); + streamMetadataListener.onChange(null, image2); assertDoesNotThrow(() -> { InRangeObjects rangeObjects = finalResult.get(1, TimeUnit.SECONDS); - assertEquals(1L, rangeObjects.streamId()); + assertEquals(STREAM0, rangeObjects.streamId()); assertEquals(10L, rangeObjects.startOffset()); assertEquals(200L, rangeObjects.endOffset()); assertEquals(3, rangeObjects.objects().size()); @@ -134,4 +196,5 @@ public void testFetch() throws Exception { }); } + } diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java index 11843559b1..6e3dbd8951 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java @@ -82,6 +82,7 @@ public InRangeObjects getObjects(long streamId, long startOffset, long endOffset List objects = new ArrayList<>(); long realEndOffset = startOffset; List rangeSearchers = rangeSearchers(streamId, startOffset, endOffset); + // TODO: if one stream object in multiple ranges, we may get duplicate objects for (RangeSearcher rangeSearcher : rangeSearchers) { InRangeObjects inRangeObjects = rangeSearcher.getObjects(limit); if (inRangeObjects == InRangeObjects.INVALID) { @@ -90,7 +91,7 @@ public InRangeObjects getObjects(long streamId, long startOffset, long endOffset realEndOffset = inRangeObjects.endOffset(); objects.addAll(inRangeObjects.objects()); limit -= inRangeObjects.objects().size(); - if (limit <= 0) { + if (limit <= 0 || realEndOffset >= endOffset) { break; } } @@ -275,6 +276,15 @@ public Map streamsMetadata() { return streamsMetadata; } + public StreamOffsetRange offsetRange(long streamId) { + S3StreamMetadataImage streamMetadata = streamsMetadata.get(streamId); + if (streamMetadata == null) { + return StreamOffsetRange.INVALID; + } + return streamMetadata.offsetRange(); + } + + public long nextAssignedStreamId() { return nextAssignedStreamId; } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/RangeMetadata.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/RangeMetadata.java index 620f8c2709..47d10f0967 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/RangeMetadata.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/RangeMetadata.java @@ -41,11 +41,11 @@ public class RangeMetadata implements Comparable { */ private int rangeIndex; /** - * Range start offer. (Inclusive) + * Range start offset. (Inclusive) */ private long startOffset; /** - * Range end offer. (Exclusive) + * Range end offset. (Exclusive) */ private long endOffset; /** diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectMetadata.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectMetadata.java index 1a595b3c9a..0fe45c173a 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectMetadata.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectMetadata.java @@ -18,6 +18,8 @@ package org.apache.kafka.metadata.stream; +import java.util.Objects; + public class S3ObjectMetadata { private final long objectId; private long objectSize; @@ -48,4 +50,21 @@ public S3ObjectType getType() { public String key() { return ObjectUtils.genKey(0, "todocluster", objectId); } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + S3ObjectMetadata that = (S3ObjectMetadata) o; + return objectId == that.objectId && objectSize == that.objectSize && type == that.type; + } + + @Override + public int hashCode() { + return Objects.hash(objectId, objectSize, type); + } }