diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml index 6ae8d94dd4..dcf50cc2c2 100644 --- a/checkstyle/import-control-core.xml +++ b/checkstyle/import-control-core.xml @@ -59,6 +59,7 @@ + diff --git a/core/src/main/scala/kafka/log/s3/DefaultS3Client.java b/core/src/main/scala/kafka/log/s3/DefaultS3Client.java index d795bfd696..97bcc6c3fe 100644 --- a/core/src/main/scala/kafka/log/s3/DefaultS3Client.java +++ b/core/src/main/scala/kafka/log/s3/DefaultS3Client.java @@ -22,6 +22,7 @@ import com.automq.elasticstream.client.api.StreamClient; import kafka.log.s3.cache.DefaultS3BlockCache; import kafka.log.s3.cache.S3BlockCache; +import kafka.log.s3.metadata.StreamMetadataManager; import kafka.log.s3.network.ControllerRequestSender; import kafka.log.s3.objects.ControllerObjectManager; import kafka.log.s3.objects.ObjectManager; @@ -60,7 +61,7 @@ public DefaultS3Client(BrokerServer brokerServer, KafkaConfig config, S3Operator this.requestSender = new ControllerRequestSender(brokerServer); this.streamManager = new ControllerStreamManager(this.requestSender, config); this.objectManager = new ControllerObjectManager(this.requestSender, this.metadataManager, this.config); - this.blockCache = new DefaultS3BlockCache(config.s3CacheSize() * 1024L * 1024, objectManager, operator); + this.blockCache = new DefaultS3BlockCache(config.s3CacheSize(), objectManager, operator); this.storage = new S3Storage(config, new MemoryWriteAheadLog(), objectManager, blockCache, operator); this.streamClient = new S3StreamClient(this.streamManager, this.storage); this.kvClient = new ControllerKVClient(this.requestSender); diff --git a/core/src/main/scala/kafka/log/s3/StreamMetadataManager.java b/core/src/main/scala/kafka/log/s3/StreamMetadataManager.java deleted file mode 100644 index 4b67aabc64..0000000000 --- a/core/src/main/scala/kafka/log/s3/StreamMetadataManager.java +++ /dev/null @@ -1,340 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.log.s3; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import kafka.server.BrokerServer; -import kafka.server.KafkaConfig; -import kafka.server.MetadataCache; -import org.apache.kafka.metadata.stream.S3StreamConstant; -import org.apache.kafka.image.BrokerS3WALMetadataImage; -import org.apache.kafka.image.MetadataDelta; -import org.apache.kafka.image.MetadataImage; -import org.apache.kafka.metadata.stream.InRangeObjects; -import org.apache.kafka.metadata.stream.S3Object; -import org.apache.kafka.metadata.stream.S3ObjectMetadata; -import org.apache.kafka.metadata.stream.S3ObjectStreamIndex; -import org.apache.kafka.metadata.stream.S3WALObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class StreamMetadataManager { - - // TODO: optimize by more suitable concurrent protection - // TODO: use order id instead of object id - private final static Logger LOGGER = LoggerFactory.getLogger(StreamMetadataManager.class); - private final KafkaConfig config; - private final BrokerServer broker; - private final InflightWalObjects inflightWalObjects; - private final MetadataCache metadataCache; - private final CatchUpMetadataListener catchUpMetadataListener; - - public StreamMetadataManager(BrokerServer broker, KafkaConfig config) { - this.config = config; - this.broker = broker; - this.inflightWalObjects = new InflightWalObjects(); - this.metadataCache = broker.metadataCache(); - this.catchUpMetadataListener = new CatchUpMetadataListener(); - this.broker.metadataListener().registerStreamMetadataListener(this.catchUpMetadataListener); - } - - public synchronized void append(InflightWalObject object) { - this.inflightWalObjects.append(object); - } - - public synchronized void catchupTo(long objectId) { - // delete all wal objects which are <= objectId - this.inflightWalObjects.trim(objectId); - } - - @SuppressWarnings("all") - public synchronized List getObjects(long streamId, long startOffset, long endOffset, int limit) { - List objects = new ArrayList<>(); - if (startOffset >= endOffset) { - LOGGER.warn("[GetObjects]: invalid offset range, stream: {}, startOffset: {}, endOffset: {}", streamId, startOffset, endOffset); - return objects; - } - OffsetRange walRange = this.inflightWalObjects.getWalRange(streamId); - if (walRange == null || endOffset <= walRange.startOffset()) { - // only search in cache - InRangeObjects cachedInRangeObjects = this.metadataCache.getObjects(streamId, startOffset, endOffset, limit); - if (cachedInRangeObjects == null) { - LOGGER.warn( - "[GetObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache failed with empty result", - streamId, startOffset, endOffset, limit); - return objects; - } - objects.addAll(cachedInRangeObjects.objects()); - objects.forEach(obj -> { - S3Object metadata = metadataCache.getObjectMetadata(obj.getObjectId()); - if (metadata == null) { - LOGGER.error("object: {} metadata not exist", obj.getObjectId()); - throw new RuntimeException("object: " + obj.getObjectId() + " metadata not exist"); - } - obj.setObjectSize(metadata.getObjectSize()); - }); - LOGGER.trace( - "[GetObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and return all from metadataCache: startOffset: {}, endOffset: {}, object count: {}", - streamId, startOffset, endOffset, limit, - cachedInRangeObjects.startOffset(), cachedInRangeObjects.endOffset(), objects.size()); - return objects; - } - if (startOffset >= walRange.startOffset()) { - // only search in inflight wal - InRangeObjects inflightInRangeObjects = this.inflightWalObjects.getObjects(streamId, startOffset, endOffset, limit); - if (inflightInRangeObjects == null) { - LOGGER.warn( - "[GetObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in inflightWalObjects failed with empty result", - streamId, startOffset, endOffset, limit); - return objects; - } - objects.addAll(inflightInRangeObjects.objects()); - LOGGER.trace( - "[GetObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and return all from inflight: startOffset: {}, endOffset: {}, object count: {}", - streamId, startOffset, endOffset, limit, - inflightInRangeObjects.startOffset(), inflightInRangeObjects.endOffset(), objects.size()); - return objects; - } - long cachedEndOffset = walRange.startOffset(); - InRangeObjects cachedInRangeObjects = this.metadataCache.getObjects(streamId, startOffset, cachedEndOffset, limit); - if (cachedInRangeObjects == null || cachedInRangeObjects == InRangeObjects.INVALID) { - LOGGER.warn("[GetObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache failed with empty result", - streamId, startOffset, endOffset, limit); - return objects; - } - objects.addAll(cachedInRangeObjects.objects()); - objects.forEach(obj -> { - S3Object metadata = metadataCache.getObjectMetadata(obj.getObjectId()); - if (metadata == null) { - LOGGER.error("object: {} metadata not exist", obj.getObjectId()); - throw new RuntimeException("object: " + obj.getObjectId() + " metadata not exist"); - } - obj.setObjectSize(metadata.getObjectSize()); - }); - if (objects.size() >= limit) { - LOGGER.trace( - "[GetObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and return all from metadataCache: startOffset: {}, endOffset: {}, object count: {}", - streamId, startOffset, endOffset, limit, - cachedInRangeObjects.startOffset(), cachedInRangeObjects.endOffset(), objects.size()); - return objects; - } - InRangeObjects inflightinRangeObjects = this.inflightWalObjects.getObjects(streamId, cachedEndOffset, endOffset, limit - objects.size()); - if (inflightinRangeObjects == null || inflightinRangeObjects == InRangeObjects.INVALID) { - LOGGER.warn( - "[GetObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in inflightWalObjects failed with empty result", - streamId, startOffset, endOffset, limit); - return objects; - } - objects.addAll(inflightinRangeObjects.objects()); - LOGGER.trace( - "[GetObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and return all from metadataCache and inflight: startOffset: {}, endOffset: {}, object count: {}", - streamId, startOffset, endOffset, limit, - cachedInRangeObjects.startOffset(), inflightinRangeObjects.endOffset(), objects.size()); - return objects; - } - - private static class OffsetRange { - - private long startOffset; - private long endOffset; - - public OffsetRange(long startOffset, long endOffset) { - this.startOffset = startOffset; - this.endOffset = endOffset; - } - - public long startOffset() { - return startOffset; - } - - public long endOffset() { - return endOffset; - } - - public void setEndOffset(long endOffset) { - this.endOffset = endOffset; - } - - public void setStartOffset(long startOffset) { - this.startOffset = startOffset; - } - } - - public static class InflightWalObject extends S3WALObject { - - private final long objectSize; - - public InflightWalObject(long objectId, int brokerId, Map> streamsIndex, long orderId, long objectSize) { - super(objectId, brokerId, streamsIndex, orderId); - this.objectSize = objectSize; - } - - public long startOffset(long streamId) { - List indexes = streamsIndex().get(streamId); - if (indexes == null || indexes.isEmpty()) { - return S3StreamConstant.INVALID_OFFSET; - } - return indexes.get(0).getStartOffset(); - } - - public long endOffset(long streamId) { - List indexes = streamsIndex().get(streamId); - if (indexes == null || indexes.isEmpty()) { - return S3StreamConstant.INVALID_OFFSET; - } - return indexes.get(indexes.size() - 1).getEndOffset(); - } - - public long objectSize() { - return objectSize; - } - - @Override - public boolean equals(Object o) { - return super.equals(o); - } - - @Override - public int hashCode() { - return super.hashCode(); - } - } - - static class InflightWalObjects { - - private final Logger log = LoggerFactory.getLogger(InflightWalObjects.class); - - private final List objects; - private final Map streamOffsets; - private volatile long firstObjectId = S3StreamConstant.MAX_OBJECT_ID; - - public InflightWalObjects() { - this.objects = new LinkedList<>(); - this.streamOffsets = new HashMap<>(); - } - - public void append(InflightWalObject object) { - objects.add(object); - if (objects.size() == 1) { - firstObjectId = object.objectId(); - } - log.trace("[AppendInflight]: append wal object: {}", object.objectId()); - object.streamsIndex().forEach((stream, indexes) -> { - // wal object only contains one index for each stream - streamOffsets.putIfAbsent(stream, new OffsetRange(indexes.get(0).getStartOffset(), indexes.get(indexes.size() - 1).getEndOffset())); - streamOffsets.get(stream).setEndOffset(indexes.get(indexes.size() - 1).getEndOffset()); - }); - } - - public void trim(long objectId) { - log.trace("[TrimInflight]: trim wal object <= {}", objectId); - // TODO: speed up by binary search - int clearEndIndex = objects.size(); - for (int i = 0; i < objects.size(); i++) { - S3WALObject wal = objects.get(i); - if (wal.objectId() > objectId) { - clearEndIndex = i; - firstObjectId = wal.objectId(); - break; - } - wal.streamsIndex().forEach((stream, indexes) -> { - streamOffsets.get(stream).setStartOffset(indexes.get(indexes.size() - 1).getEndOffset()); - }); - } - if (clearEndIndex == objects.size()) { - firstObjectId = S3StreamConstant.MAX_OBJECT_ID; - } - objects.subList(0, clearEndIndex).clear(); - } - - public OffsetRange getWalRange(long streamId) { - return streamOffsets.get(streamId); - } - - @SuppressWarnings("all") - public InRangeObjects getObjects(long streamId, long startOffset, long endOffset, int limit) { - OffsetRange walRange = getWalRange(streamId); - if (walRange == null) { - return InRangeObjects.INVALID; - } - if (startOffset < walRange.startOffset()) { - return InRangeObjects.INVALID; - } - if (endOffset > walRange.endOffset()) { - endOffset = walRange.endOffset(); - } - if (startOffset >= endOffset) { - return InRangeObjects.INVALID; - } - List inRangeObjects = new LinkedList<>(); - long nextStartOffset = startOffset; - for (InflightWalObject object : objects) { - if (limit <= 0) { - break; - } - if (nextStartOffset >= endOffset) { - break; - } - long objStartOffset = object.startOffset(streamId); - long objEndOffset = object.endOffset(streamId); - if (objStartOffset == S3StreamConstant.INVALID_OFFSET || objEndOffset == S3StreamConstant.INVALID_OFFSET) { - continue; - } - if (objStartOffset > startOffset) { - break; - } - if (objEndOffset <= startOffset) { - continue; - } - limit--; - inRangeObjects.add(new S3ObjectMetadata(object.objectId(), object.objectSize(), object.objectType())); - nextStartOffset = objEndOffset; - } - return new InRangeObjects(streamId, startOffset, nextStartOffset, inRangeObjects); - } - } - - public interface StreamMetadataListener { - - void onChange(MetadataDelta delta, MetadataImage image); - } - - class CatchUpMetadataListener implements StreamMetadataListener { - - @Override - public void onChange(MetadataDelta delta, MetadataImage newImage) { - BrokerS3WALMetadataImage walMetadataImage = newImage.streamsMetadata().brokerWALMetadata().get(config.brokerId()); - if (walMetadataImage == null) { - return; - } - S3WALObject wal = walMetadataImage.getWalObjects().get(walMetadataImage.getWalObjects().size() - 1); - if (wal == null) { - return; - } - if (wal.objectId() < inflightWalObjects.firstObjectId) { - return; - } - catchupTo(wal.objectId()); - } - } - -} diff --git a/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java b/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java index fb6f0b3a43..2bba94c283 100644 --- a/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java +++ b/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java @@ -32,7 +32,6 @@ import java.util.function.Supplier; import java.util.stream.Collectors; -import kafka.log.s3.model.StreamOffset; import kafka.log.s3.objects.CommitCompactObjectRequest; import kafka.log.s3.objects.CommitStreamObjectRequest; import kafka.log.s3.objects.CommitWALObjectRequest; @@ -49,7 +48,7 @@ import org.apache.kafka.common.errors.s3.StreamNotExistException; import org.apache.kafka.metadata.stream.S3Object; import org.apache.kafka.metadata.stream.S3ObjectState; -import org.apache.kafka.metadata.stream.S3ObjectStreamIndex; +import org.apache.kafka.metadata.stream.StreamOffsetRange; import org.apache.kafka.metadata.stream.S3StreamObject; import org.apache.kafka.metadata.stream.S3WALObject; import org.apache.kafka.metadata.stream.StreamState; @@ -168,12 +167,12 @@ public CompletableFuture commitWALObject(CommitWALObjec // build metadata MemoryBrokerWALMetadata walMetadata = this.brokerWALMetadata.computeIfAbsent(MOCK_BROKER_ID, k -> new MemoryBrokerWALMetadata(k)); - Map> index = new HashMap<>(); + Map> index = new HashMap<>(); streamRanges.stream().forEach(range -> { long streamId = range.getStreamId(); long startOffset = range.getStartOffset(); long endOffset = range.getEndOffset(); - index.put(streamId, List.of(new S3ObjectStreamIndex(streamId, startOffset, endOffset))); + index.put(streamId, List.of(new StreamOffsetRange(streamId, startOffset, endOffset))); // update range endOffset MemoryStreamMetadata streamMetadata = this.streamsMetadata.get(streamId); streamMetadata.endOffset = endOffset; @@ -357,10 +356,10 @@ public CompletableFuture deleteStream(long streamId, long epoch) { } @Override - public CompletableFuture> getStreamsOffset(List streamIds) { + public CompletableFuture> getStreamsOffset(List streamIds) { return this.submitEvent(() -> { return streamIds.stream().filter(this.streamsMetadata::containsKey).map(id -> { - return new StreamOffset(id, this.streamsMetadata.get(id).startOffset, this.streamsMetadata.get(id).endOffset); + return new StreamOffsetRange(id, this.streamsMetadata.get(id).startOffset, this.streamsMetadata.get(id).endOffset); }).collect(Collectors.toList()); }); } diff --git a/core/src/main/scala/kafka/log/s3/model/StreamOffset.java b/core/src/main/scala/kafka/log/s3/metadata/InRangeObjectsFetcher.java similarity index 56% rename from core/src/main/scala/kafka/log/s3/model/StreamOffset.java rename to core/src/main/scala/kafka/log/s3/metadata/InRangeObjectsFetcher.java index 0cfdf340df..5c883a2b22 100644 --- a/core/src/main/scala/kafka/log/s3/model/StreamOffset.java +++ b/core/src/main/scala/kafka/log/s3/metadata/InRangeObjectsFetcher.java @@ -15,28 +15,22 @@ * limitations under the License. */ -package kafka.log.s3.model; +package kafka.log.s3.metadata; -public class StreamOffset { - private final long streamId; - private final long startOffset; - private final long endOffset; +import java.util.concurrent.CompletableFuture; +import org.apache.kafka.metadata.stream.InRangeObjects; - public StreamOffset(long streamId, long startOffset, long endOffset) { - this.streamId = streamId; - this.startOffset = startOffset; - this.endOffset = endOffset; - } +public interface InRangeObjectsFetcher { - public long streamId() { - return streamId; - } + /** + * fetch stream interval related objects + * + * @param streamId stream id + * @param startOffset start offset, inclusive, if not exist, return INVALID + * @param endOffset end offset, exclusive, if not exist, wait for it + * @param limit max object count + * @return {@link InRangeObjects} + */ + CompletableFuture fetch(long streamId, long startOffset, long endOffset, int limit); - public long startOffset() { - return startOffset; - } - - public long endOffset() { - return endOffset; - } -} \ No newline at end of file +} diff --git a/core/src/main/scala/kafka/log/s3/metadata/StreamMetadataManager.java b/core/src/main/scala/kafka/log/s3/metadata/StreamMetadataManager.java new file mode 100644 index 0000000000..862ab0d43e --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/metadata/StreamMetadataManager.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.metadata; + +import io.netty.util.concurrent.DefaultThreadFactory; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; +import kafka.log.es.FutureUtil; +import kafka.server.BrokerServer; +import kafka.server.KafkaConfig; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.S3ObjectsImage; +import org.apache.kafka.image.S3StreamMetadataImage; +import org.apache.kafka.image.S3StreamsMetadataImage; +import org.apache.kafka.metadata.stream.InRangeObjects; +import org.apache.kafka.metadata.stream.S3Object; +import org.apache.kafka.metadata.stream.S3ObjectMetadata; +import org.apache.kafka.metadata.stream.StreamOffsetRange; +import org.apache.kafka.raft.OffsetAndEpoch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StreamMetadataManager implements InRangeObjectsFetcher { + + // TODO: optimize by more suitable concurrent protection + private final static Logger LOGGER = LoggerFactory.getLogger(StreamMetadataManager.class); + private final KafkaConfig config; + private final BrokerServer broker; + private final Map>> pendingGetObjectsTasks; + private final ExecutorService pendingExecutorService; + // TODO: we just need the version of streams metadata, not the whole image + private volatile OffsetAndEpoch version; + private S3StreamsMetadataImage streamsImage; + private S3ObjectsImage objectsImage; + + public StreamMetadataManager(BrokerServer broker, KafkaConfig config) { + this.config = config; + this.broker = broker; + MetadataImage currentImage = this.broker.metadataCache().currentImage(); + this.streamsImage = currentImage.streamsMetadata(); + this.objectsImage = currentImage.objectsMetadata(); + this.version = currentImage.highestOffsetAndEpoch(); + this.broker.metadataListener().registerStreamMetadataListener(this::onImageChanged); + // TODO: optimize by more suitable data structure for pending tasks + this.pendingGetObjectsTasks = new HashMap<>(); + this.pendingExecutorService = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pending-get-objects-task-executor")); + } + + private void onImageChanged(MetadataDelta delta, MetadataImage newImage) { + if (newImage.highestOffsetAndEpoch().equals(this.version)) { + return; + } + synchronized (this) { + // update version + this.version = newImage.highestOffsetAndEpoch(); + // update image + this.streamsImage = newImage.streamsMetadata(); + this.objectsImage = newImage.objectsMetadata(); + // remove all catch up pending tasks + List retryTasks = removePendingTasks(); + // retry all pending tasks + this.pendingExecutorService.submit(() -> { + retryPendingTasks(retryTasks); + }); + } + } + + // must access thread safe + private List removePendingTasks() { + if (this.pendingGetObjectsTasks == null || this.pendingGetObjectsTasks.isEmpty()) { + return Collections.emptyList(); + } + Set pendingStreams = pendingGetObjectsTasks.keySet(); + List pendingStreamsOffsetRange = pendingStreams + .stream() + .map(streamsImage::offsetRange) + .filter(offset -> offset != StreamOffsetRange.INVALID) + .collect(Collectors.toList()); + if (pendingStreamsOffsetRange.isEmpty()) { + return Collections.emptyList(); + } + List retryTasks = new ArrayList<>(); + pendingStreamsOffsetRange.forEach(offsetRange -> { + long streamId = offsetRange.getStreamId(); + long endOffset = offsetRange.getEndOffset(); + Map> tasks = StreamMetadataManager.this.pendingGetObjectsTasks.get(streamId); + if (tasks == null || tasks.isEmpty()) { + return; + } + Iterator>> iterator = + tasks.entrySet().iterator(); + while (iterator.hasNext()) { + Entry> entry = iterator.next(); + long pendingEndOffset = entry.getKey(); + if (pendingEndOffset > endOffset) { + break; + } + iterator.remove(); + List getObjectsTasks = entry.getValue(); + retryTasks.addAll(getObjectsTasks); + } + if (tasks.isEmpty()) { + StreamMetadataManager.this.pendingGetObjectsTasks.remove(streamId); + } + }); + return retryTasks; + } + + @Override + public CompletableFuture fetch(long streamId, long startOffset, long endOffset, int limit) { + synchronized (StreamMetadataManager.this) { + S3StreamMetadataImage streamImage = streamsImage.streamsMetadata().get(streamId); + if (streamImage == null) { + LOGGER.warn( + "[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and streamImage is null", + streamId, startOffset, endOffset, limit); + return CompletableFuture.completedFuture(InRangeObjects.INVALID); + } + StreamOffsetRange offsetRange = streamImage.offsetRange(); + if (offsetRange == null || offsetRange == StreamOffsetRange.INVALID) { + return CompletableFuture.completedFuture(InRangeObjects.INVALID); + } + long streamStartOffset = offsetRange.getStartOffset(); + long streamEndOffset = offsetRange.getEndOffset(); + if (startOffset < streamStartOffset) { + LOGGER.warn( + "[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and startOffset < streamStartOffset: {}", + streamId, startOffset, endOffset, limit, streamStartOffset); + return CompletableFuture.completedFuture(InRangeObjects.INVALID); + } + if (endOffset > streamEndOffset) { + // lag behind, need to wait for cache catch up + return pendingFetch(streamId, startOffset, endOffset, limit); + } + return fetch0(streamId, startOffset, endOffset, limit); + } + } + + // must access thread safe + private CompletableFuture pendingFetch(long streamId, long startOffset, long endOffset, int limit) { + GetObjectsTask task = GetObjectsTask.of(streamId, startOffset, endOffset, limit); + Map> tasks = StreamMetadataManager.this.pendingGetObjectsTasks.computeIfAbsent(task.streamId, + k -> new TreeMap<>()); + List getObjectsTasks = tasks.computeIfAbsent(task.endOffset, k -> new ArrayList<>()); + getObjectsTasks.add(task); + LOGGER.warn("[PendingFetch]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and pending fetch", streamId, startOffset, endOffset, + limit); + return task.cf; + } + + // must access thread safe + private CompletableFuture fetch0(long streamId, long startOffset, long endOffset, int limit) { + InRangeObjects cachedInRangeObjects = streamsImage.getObjects(streamId, startOffset, endOffset, limit); + if (cachedInRangeObjects == null || cachedInRangeObjects == InRangeObjects.INVALID) { + LOGGER.warn( + "[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache failed with empty result", + streamId, startOffset, endOffset, limit); + return CompletableFuture.completedFuture(InRangeObjects.INVALID); + } + // fill the objects' size + for (S3ObjectMetadata object : cachedInRangeObjects.objects()) { + S3Object objectMetadata = objectsImage.getObjectMetadata(object.getObjectId()); + if (objectMetadata == null) { + // should not happen + LOGGER.error( + "[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache failed with empty result", + streamId, startOffset, endOffset, limit); + return CompletableFuture.completedFuture(InRangeObjects.INVALID); + } + object.setObjectSize(objectMetadata.getObjectSize()); + } + LOGGER.trace( + "[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache success with result: {}", + streamId, startOffset, endOffset, limit, cachedInRangeObjects); + return CompletableFuture.completedFuture(cachedInRangeObjects); + } + + public void retryPendingTasks(List tasks) { + LOGGER.info("[RetryPendingTasks]: retry tasks count: {}", tasks.size()); + tasks.forEach(task -> { + long streamId = task.streamId; + long startOffset = task.startOffset; + long endOffset = task.endOffset; + int limit = task.limit; + CompletableFuture newCf = this.fetch(streamId, startOffset, endOffset, limit); + FutureUtil.propagate(newCf, task.cf); + }); + } + + static class GetObjectsTask { + + private final CompletableFuture cf; + private final long streamId; + private final long startOffset; + private final long endOffset; + private final int limit; + + public static GetObjectsTask of(long streamId, long startOffset, long endOffset, int limit) { + CompletableFuture cf = new CompletableFuture<>(); + GetObjectsTask task = new GetObjectsTask(cf, streamId, startOffset, endOffset, limit); + return task; + } + + private GetObjectsTask(CompletableFuture cf, long streamId, long startOffset, long endOffset, int limit) { + this.cf = cf; + this.streamId = streamId; + this.startOffset = startOffset; + this.endOffset = endOffset; + this.limit = limit; + } + } + + public interface StreamMetadataListener { + + void onChange(MetadataDelta delta, MetadataImage image); + } + +} diff --git a/core/src/main/scala/kafka/log/s3/model/RangeMetadata.java b/core/src/main/scala/kafka/log/s3/model/RangeMetadata.java deleted file mode 100644 index 85b1d1c4a3..0000000000 --- a/core/src/main/scala/kafka/log/s3/model/RangeMetadata.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.log.s3.model; - - -public class RangeMetadata { - private static final long NOOP_OFFSET = -1; - private int index; - private long startOffset; - private long endOffset; - private int brokerId; - - public RangeMetadata(int index, long startOffset, long endOffset, int serverId) { - this.index = index; - this.startOffset = startOffset; - this.endOffset = endOffset; - this.brokerId = serverId; - } - - public int getIndex() { - return index; - } - - public void setIndex(int index) { - this.index = index; - } - - public long getStartOffset() { - return startOffset; - } - - public void setStartOffset(long startOffset) { - this.startOffset = startOffset; - } - - public long getEndOffset() { - return endOffset; - } - - public void setEndOffset(long endOffset) { - this.endOffset = endOffset; - } - - public int getBrokerId() { - return brokerId; - } - - public void setBrokerId(int brokerId) { - this.brokerId = brokerId; - } -} diff --git a/core/src/main/scala/kafka/log/s3/model/StreamMetadata.java b/core/src/main/scala/kafka/log/s3/model/StreamMetadata.java deleted file mode 100644 index 1db1020ada..0000000000 --- a/core/src/main/scala/kafka/log/s3/model/StreamMetadata.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.log.s3.model; - -import java.util.List; - -public class StreamMetadata { - private long streamId; - private long epoch; - private int rangeIndex; - private long startOffset; - - private List ranges; - - public StreamMetadata(long streamId, long epoch, int rangeIndex, long startOffset, List ranges) { - this.streamId = streamId; - this.epoch = epoch; - this.rangeIndex = rangeIndex; - this.startOffset = startOffset; - this.ranges = ranges; - } - - public long getStreamId() { - return streamId; - } - - public void setStreamId(long streamId) { - this.streamId = streamId; - } - - public long getEpoch() { - return epoch; - } - - public void setEpoch(long epoch) { - this.epoch = epoch; - } - - public long getStartOffset() { - return startOffset; - } - - public void setStartOffset(long startOffset) { - this.startOffset = startOffset; - } - - public List getRanges() { - return ranges; - } - - public void setRanges(List ranges) { - this.ranges = ranges; - } - - public int getRangeIndex() { - return rangeIndex; - } - - public void setRangeIndex(int rangeIndex) { - this.rangeIndex = rangeIndex; - } -} diff --git a/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java b/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java index 006ff50c89..434eceef39 100644 --- a/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java +++ b/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java @@ -20,11 +20,9 @@ import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; -import kafka.log.s3.StreamMetadataManager; -import kafka.log.s3.StreamMetadataManager.InflightWalObject; +import kafka.log.s3.metadata.StreamMetadataManager; import kafka.log.s3.network.ControllerRequestSender; import kafka.server.KafkaConfig; import org.apache.kafka.common.message.CommitWALObjectRequestData; @@ -34,8 +32,8 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.s3.PrepareS3ObjectRequest; import org.apache.kafka.common.requests.s3.PrepareS3ObjectRequest.Builder; +import org.apache.kafka.metadata.stream.InRangeObjects; import org.apache.kafka.metadata.stream.S3ObjectMetadata; -import org.apache.kafka.metadata.stream.S3ObjectStreamIndex; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -97,16 +95,6 @@ public CompletableFuture commitWALObject(CommitWALObjec LOGGER.error("Error while committing WAL object: {}, code: {}", request, code); throw code.exception(); } - }).thenApply(resp -> { - long objectId = request.getObjectId(); - long orderId = request.getOrderId(); - int brokerId = config.brokerId(); - long objectSize = request.getObjectSize(); - Map> rangeList = request.getStreamRanges().stream() - .map(range -> new S3ObjectStreamIndex(range.getStreamId(), range.getStartOffset(), range.getEndOffset())) - .collect(Collectors.groupingBy(S3ObjectStreamIndex::getStreamId)); - this.metadataManager.append(new InflightWalObject(objectId, brokerId, rangeList, orderId, objectSize)); - return resp; }); } @@ -123,7 +111,13 @@ public CompletableFuture commitStreamObject(CommitStreamObjectRequest requ @Override public List getObjects(long streamId, long startOffset, long endOffset, int limit) { try { - return this.metadataManager.getObjects(streamId, startOffset, endOffset, limit); + return this.metadataManager.fetch(streamId, startOffset, endOffset, limit).thenApply(inRangeObjects -> { + if (inRangeObjects == null || inRangeObjects == InRangeObjects.INVALID) { + List objects = Collections.emptyList(); + return objects; + } + return inRangeObjects.objects(); + }).get(); } catch (Exception e) { LOGGER.error("Error while get objects, streamId: {}, startOffset: {}, endOffset: {}, limit: {}", streamId, startOffset, endOffset, limit, e); diff --git a/core/src/main/scala/kafka/log/s3/streams/ControllerStreamManager.java b/core/src/main/scala/kafka/log/s3/streams/ControllerStreamManager.java index d7838a824e..1afd60bce9 100644 --- a/core/src/main/scala/kafka/log/s3/streams/ControllerStreamManager.java +++ b/core/src/main/scala/kafka/log/s3/streams/ControllerStreamManager.java @@ -17,7 +17,6 @@ package kafka.log.s3.streams; -import kafka.log.s3.model.StreamOffset; import kafka.log.s3.network.ControllerRequestSender; import kafka.log.s3.objects.OpenStreamMetadata; import kafka.server.KafkaConfig; @@ -34,6 +33,7 @@ import org.apache.kafka.common.requests.s3.CreateStreamRequest; import org.apache.kafka.common.requests.s3.GetStreamsOffsetRequest; import org.apache.kafka.common.requests.s3.OpenStreamRequest; +import org.apache.kafka.metadata.stream.StreamOffsetRange; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -135,7 +135,7 @@ public CompletableFuture deleteStream(long streamId, long epoch) { } @Override - public CompletableFuture> getStreamsOffset(List streamIds) { + public CompletableFuture> getStreamsOffset(List streamIds) { GetStreamsOffsetRequest.Builder request = new GetStreamsOffsetRequest.Builder( new GetStreamsOffsetRequestData() .setStreamIds(streamIds)); @@ -143,7 +143,7 @@ public CompletableFuture> getStreamsOffset(List streamI switch (Errors.forCode(resp.errorCode())) { case NONE: return resp.streamsOffset().stream() - .map(streamOffset -> new StreamOffset(streamOffset.streamId(), streamOffset.startOffset(), streamOffset.endOffset())) + .map(streamOffset -> new StreamOffsetRange(streamOffset.streamId(), streamOffset.startOffset(), streamOffset.endOffset())) .collect(Collectors.toList()); default: LOGGER.error("Error while getting streams offset: {}, code: {}", request, Errors.forCode(resp.errorCode())); diff --git a/core/src/main/scala/kafka/log/s3/streams/StreamManager.java b/core/src/main/scala/kafka/log/s3/streams/StreamManager.java index 175aee16cf..51e3c39de8 100644 --- a/core/src/main/scala/kafka/log/s3/streams/StreamManager.java +++ b/core/src/main/scala/kafka/log/s3/streams/StreamManager.java @@ -19,8 +19,8 @@ import java.util.List; import java.util.concurrent.CompletableFuture; -import kafka.log.s3.model.StreamOffset; import kafka.log.s3.objects.OpenStreamMetadata; +import org.apache.kafka.metadata.stream.StreamOffsetRange; public interface StreamManager { @@ -74,8 +74,8 @@ public interface StreamManager { * When server is starting or recovering, wal in EBS need streams offset to determine the recover point. * * @param streamIds stream ids. - * @return {@link StreamOffset} + * @return {@link StreamOffsetRange} */ - CompletableFuture> getStreamsOffset(List streamIds); + CompletableFuture> getStreamsOffset(List streamIds); } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index fe9730190e..c76842a1a7 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -1500,7 +1500,7 @@ object KafkaConfig { .define(S3StreamSplitSizeProp, INT, 16777216, MEDIUM, S3StreamSplitSizeDoc) .define(S3ObjectBlockSizeProp, INT, 8388608, MEDIUM, S3ObjectBlockSizeDoc) .define(S3ObjectPartSizeProp, INT, 16777216, MEDIUM, S3ObjectPartSizeDoc) - .define(S3CacheSizeProp, INT, 1024, MEDIUM, S3CacheSizeDoc) + .define(S3CacheSizeProp, LONG, 1073741824, MEDIUM, S3CacheSizeDoc) // Kafka on S3 inject end } @@ -2043,7 +2043,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami val s3StreamSplitSize = getInt(KafkaConfig.S3StreamSplitSizeProp) val s3ObjectBlockSize = getInt(KafkaConfig.S3ObjectBlockSizeProp) val s3ObjectPartSize = getInt(KafkaConfig.S3ObjectPartSizeProp) - val s3CacheSize = getInt(KafkaConfig.S3CacheSizeProp) + val s3CacheSize = getLong(KafkaConfig.S3CacheSizeProp) // Kafka on S3 inject end def addReconfigurable(reconfigurable: Reconfigurable): Unit = { diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index cef618a703..1204270ef6 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -22,7 +22,6 @@ import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache} import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.{Cluster, Node, TopicPartition, Uuid} -import org.apache.kafka.metadata.stream.{InRangeObjects, S3Object} import org.apache.kafka.server.common.MetadataVersion import java.util @@ -118,11 +117,6 @@ trait MetadataCache { def getRandomAliveBrokerId: Option[Int] - // Kafka on S3 inject start - def getObjects(streamId: Long, startOffset: Long, endOffset: Long, limit: Int): InRangeObjects - - def getObjectMetadata(objectId: Long): S3Object - // Kafka on S3 inject end } object MetadataCache { diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala index 58809d6f01..d6123bc53e 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala @@ -16,7 +16,7 @@ */ package kafka.server.metadata -import kafka.log.s3.StreamMetadataManager.StreamMetadataListener +import kafka.log.s3.metadata.StreamMetadataManager.StreamMetadataListener import java.util import java.util.concurrent.atomic.AtomicBoolean diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala index 05a6a393ac..7e6ad7bfd0 100644 --- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala +++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala @@ -35,7 +35,6 @@ import java.util.concurrent.ThreadLocalRandom import kafka.admin.BrokerMetadata import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.message.{DescribeClientQuotasRequestData, DescribeClientQuotasResponseData} -import org.apache.kafka.metadata.stream.{InRangeObjects, S3Object} import org.apache.kafka.metadata.{PartitionRegistration, Replicas} import org.apache.kafka.server.common.MetadataVersion @@ -393,16 +392,4 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w features.toMap, image.highestOffsetAndEpoch().offset) } - - // Kafka on S3 inject start - override def getObjects(streamId: Long, startOffset: Long, endOffset: Long, limit: Int): InRangeObjects = { - val image = _currentImage - image.streamsMetadata().getObjects(streamId, startOffset, endOffset, limit) - } - - override def getObjectMetadata(objectId: Long): S3Object = { - val image = _currentImage - image.objectsMetadata().getObjectMetadata(objectId) - } - // Kafka on S3 inject end } diff --git a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala index efbc546f63..feaaf1c43f 100755 --- a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala +++ b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala @@ -40,7 +40,6 @@ import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{ApiVersionsResponse, MetadataResponse, UpdateMetadataRequest} import org.apache.kafka.common.security.auth.SecurityProtocol -import org.apache.kafka.metadata.stream.{InRangeObjects, S3Object} import org.apache.kafka.server.common.MetadataVersion import java.util.concurrent.{ThreadLocalRandom, TimeUnit} @@ -585,14 +584,4 @@ class ZkMetadataCache( def getFeatureOption: Option[FinalizedFeaturesAndEpoch] = { featuresAndEpoch } - - // Kafka on S3 inject start - override def getObjects(streamId: Long, startOffset: Long, endOffset: Long, limit: Int): InRangeObjects = { - throw new UnsupportedOperationException("getObjects is not supported in ZkMetadataCache") - } - - override def getObjectMetadata(objectId: Long): S3Object = { - throw new UnsupportedOperationException("getObjectMetadata is not supported in ZkMetadataCache") - } - // Kafka on S3 inject end } diff --git a/core/src/test/java/kafka/log/s3/S3StorageTest.java b/core/src/test/java/kafka/log/s3/S3StorageTest.java index 0ef201ed48..7854ca8122 100644 --- a/core/src/test/java/kafka/log/s3/S3StorageTest.java +++ b/core/src/test/java/kafka/log/s3/S3StorageTest.java @@ -64,7 +64,7 @@ public class S3StorageTest { public void setup() { objectManager = mock(ObjectManager.class); S3Operator s3Operator = new MemoryS3Operator(); - storage = new S3Storage(KafkaConfig.fromProps(TestUtils.defaultBrokerConfig()), new MemoryWriteAheadLog(), objectManager, new DefaultS3BlockCache(0, objectManager, s3Operator), s3Operator); + storage = new S3Storage(KafkaConfig.fromProps(TestUtils.defaultBrokerConfig()), new MemoryWriteAheadLog(), objectManager, new DefaultS3BlockCache(0L, objectManager, s3Operator), s3Operator); } @Test diff --git a/core/src/test/java/kafka/log/s3/S3StreamMemoryTest.java b/core/src/test/java/kafka/log/s3/S3StreamMemoryTest.java index 6441099416..416e28de52 100644 --- a/core/src/test/java/kafka/log/s3/S3StreamMemoryTest.java +++ b/core/src/test/java/kafka/log/s3/S3StreamMemoryTest.java @@ -113,7 +113,7 @@ public void setUp() { streamManager = manager; objectManager = manager; operator = new MemoryS3Operator(); - blockCache = new DefaultS3BlockCache(0, objectManager, operator); + blockCache = new DefaultS3BlockCache(0L, objectManager, operator); storage = new S3Storage(KafkaConfig.fromProps(TestUtils.defaultBrokerConfig()), new MemoryWriteAheadLog(), objectManager, blockCache, operator); streamClient = new S3StreamClient(streamManager, storage); } diff --git a/core/src/test/java/kafka/log/s3/StreamMetadataManagerTest.java b/core/src/test/java/kafka/log/s3/StreamMetadataManagerTest.java new file mode 100644 index 0000000000..cff1f79994 --- /dev/null +++ b/core/src/test/java/kafka/log/s3/StreamMetadataManagerTest.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import kafka.log.s3.metadata.StreamMetadataManager; +import kafka.log.s3.metadata.StreamMetadataManager.StreamMetadataListener; +import kafka.server.BrokerServer; +import kafka.server.metadata.BrokerMetadataListener; +import kafka.server.metadata.KRaftMetadataCache; +import org.apache.kafka.image.BrokerS3WALMetadataImage; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.MetadataProvenance; +import org.apache.kafka.image.S3ObjectsImage; +import org.apache.kafka.image.S3StreamMetadataImage; +import org.apache.kafka.image.S3StreamsMetadataImage; +import org.apache.kafka.metadata.stream.InRangeObjects; +import org.apache.kafka.metadata.stream.RangeMetadata; +import org.apache.kafka.metadata.stream.S3Object; +import org.apache.kafka.metadata.stream.S3ObjectMetadata; +import org.apache.kafka.metadata.stream.S3ObjectState; +import org.apache.kafka.metadata.stream.S3ObjectType; +import org.apache.kafka.metadata.stream.S3StreamObject; +import org.apache.kafka.metadata.stream.StreamState; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.mockito.Mockito; + +@Timeout(40) +@Tag("S3Unit") +public class StreamMetadataManagerTest { + + private static final int BROKER0 = 0; + private static final int BROKER1 = 1; + private static final long STREAM0 = 0; + private static final long STREAM1 = 1; + + private BrokerServer mockBroker; + private KRaftMetadataCache mockMetadataCache; + private BrokerMetadataListener mockBrokerMetadataListener; + private StreamMetadataListener streamMetadataListener; + private StreamMetadataManager manager; + + @BeforeEach + public void setUp() { + this.mockBroker = Mockito.mock(BrokerServer.class); + this.mockMetadataCache = Mockito.mock(KRaftMetadataCache.class); + this.mockBrokerMetadataListener = Mockito.mock(BrokerMetadataListener.class); + Mockito.when(this.mockBroker.metadataCache()).thenReturn(this.mockMetadataCache); + Mockito.when(this.mockBroker.metadataListener()).thenReturn(this.mockBrokerMetadataListener); + Mockito.doAnswer(invocation -> { + this.streamMetadataListener = invocation.getArgument(0); + return null; + }).when(this.mockBrokerMetadataListener).registerStreamMetadataListener(any()); + Mockito.when(this.mockMetadataCache.currentImage()).thenReturn(MetadataImage.EMPTY); + this.manager = new StreamMetadataManager(this.mockBroker, null); + } + + private static MetadataImage image0; + private static MetadataImage image1; + private static MetadataImage image2; + + static { + S3ObjectsImage objectsImage = new S3ObjectsImage(2L, Map.of( + 0L, new S3Object(0L, 128, null, -1, -1, -1, -1, S3ObjectState.COMMITTED), + 1L, new S3Object(1L, 128, null, -1, -1, -1, -1, S3ObjectState.COMMITTED), + 2L, new S3Object(2L, 128, null, -1, -1, -1, -1, S3ObjectState.COMMITTED) + )); + + Map ranges = Map.of( + 0, new RangeMetadata(STREAM0, 0L, 0, 10L, 100L, BROKER0) + ); + Map streamObjects = Map.of( + 0L, new S3StreamObject(0L, 128, STREAM0, 10L, 100L)); + S3StreamMetadataImage streamImage = new S3StreamMetadataImage(STREAM0, 1L, StreamState.OPENED, 0, 10L, ranges, streamObjects); + S3StreamsMetadataImage streamsImage = new S3StreamsMetadataImage(STREAM0, Map.of(STREAM0, streamImage), + Map.of(BROKER0, BrokerS3WALMetadataImage.EMPTY)); + image0 = new MetadataImage(new MetadataProvenance(0, 0, 0), null, null, null, null, null, null, null, streamsImage, objectsImage, null); + + ranges = new HashMap<>(ranges); + ranges.put(1, new RangeMetadata(STREAM0, 1L, 1, 100L, 150L, BROKER0)); + streamObjects = new HashMap<>(streamObjects); + streamObjects.put(1L, new S3StreamObject(1L, 128, STREAM0, 100L, 150L)); + streamImage = new S3StreamMetadataImage(STREAM0, 2L, StreamState.OPENED, 1, 10L, ranges, streamObjects); + streamsImage = new S3StreamsMetadataImage(STREAM0, Map.of(STREAM0, streamImage), + Map.of(BROKER0, BrokerS3WALMetadataImage.EMPTY)); + image1 = new MetadataImage(new MetadataProvenance(1, 1, 1), null, null, null, null, null, null, null, streamsImage, objectsImage, null); + + ranges = new HashMap<>(ranges); + ranges.put(2, new RangeMetadata(STREAM0, 2L, 2, 150L, 200L, BROKER0)); + streamObjects = new HashMap<>(streamObjects); + streamObjects.put(2L, new S3StreamObject(2L, 128, STREAM0, 150L, 200L)); + streamImage = new S3StreamMetadataImage(STREAM0, 3L, StreamState.OPENED, 2, 10L, ranges, streamObjects); + streamsImage = new S3StreamsMetadataImage(STREAM0, Map.of(STREAM0, streamImage), + Map.of(BROKER0, BrokerS3WALMetadataImage.EMPTY)); + image2 = new MetadataImage(new MetadataProvenance(2, 2, 2), null, null, null, null, null, null, null, streamsImage, objectsImage, null); + } + + @Test + public void testFetch() throws Exception { + S3ObjectMetadata object0 = new S3ObjectMetadata(0L, 128, S3ObjectType.STREAM); + S3ObjectMetadata object1 = new S3ObjectMetadata(1L, 128, S3ObjectType.STREAM); + S3ObjectMetadata object2 = new S3ObjectMetadata(2L, 128, S3ObjectType.STREAM); + + this.streamMetadataListener.onChange(null, image0); + + // 1. normal fetch + CompletableFuture result = this.manager.fetch(STREAM0, 10L, 100L, 5); + InRangeObjects inRangeObjects = result.get(); + assertEquals(STREAM0, inRangeObjects.streamId()); + assertEquals(10L, inRangeObjects.startOffset()); + assertEquals(100L, inRangeObjects.endOffset()); + assertEquals(1, inRangeObjects.objects().size()); + assertEquals(object0, inRangeObjects.objects().get(0)); + + // 2. fetch with invalid streamId + result = this.manager.fetch(STREAM1, 0L, 100L, 5); + inRangeObjects = result.get(); + assertEquals(InRangeObjects.INVALID, inRangeObjects); + + // 3. fetch with larger startOffset + result = this.manager.fetch(STREAM0, 20L, 100L, 5); + inRangeObjects = result.get(); + assertEquals(STREAM0, inRangeObjects.streamId()); + assertEquals(20L, inRangeObjects.startOffset()); + assertEquals(100L, inRangeObjects.endOffset()); + assertEquals(1, inRangeObjects.objects().size()); + assertEquals(object0, inRangeObjects.objects().get(0)); + + // 4. fetch with smaller endOffset + result = this.manager.fetch(STREAM0, 10L, 50L, 5); + inRangeObjects = result.get(); + assertEquals(STREAM0, inRangeObjects.streamId()); + assertEquals(10L, inRangeObjects.startOffset()); + assertEquals(100L, inRangeObjects.endOffset()); + assertEquals(1, inRangeObjects.objects().size()); + assertEquals(object0, inRangeObjects.objects().get(0)); + + // 5. fetch with smaller startOffset + result = this.manager.fetch(STREAM0, 5L, 100L, 5); + inRangeObjects = result.get(); + assertEquals(InRangeObjects.INVALID, inRangeObjects); + + // 6. fetch with larger endOffset + result = this.manager.fetch(STREAM0, 10L, 200L, 5); + CompletableFuture finalResult = result; + assertThrows(TimeoutException.class, () -> { + finalResult.get(1, TimeUnit.SECONDS); + }); + + // 7. notify the manager that streams' end offset has been advanced + streamMetadataListener.onChange(null, image1); + + assertThrows(TimeoutException.class, () -> { + finalResult.get(1, TimeUnit.SECONDS); + }); + + // 8. notify with correct end offset + streamMetadataListener.onChange(null, image2); + + assertDoesNotThrow(() -> { + InRangeObjects rangeObjects = finalResult.get(1, TimeUnit.SECONDS); + assertEquals(STREAM0, rangeObjects.streamId()); + assertEquals(10L, rangeObjects.startOffset()); + assertEquals(200L, rangeObjects.endOffset()); + assertEquals(3, rangeObjects.objects().size()); + assertEquals(object0, rangeObjects.objects().get(0)); + assertEquals(object1, rangeObjects.objects().get(1)); + assertEquals(object2, rangeObjects.objects().get(2)); + }); + + } + +} diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java index 3ee287fdab..9aa2b00b66 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java @@ -56,7 +56,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.controller.ControllerResult; import org.apache.kafka.metadata.stream.RangeMetadata; -import org.apache.kafka.metadata.stream.S3ObjectStreamIndex; +import org.apache.kafka.metadata.stream.StreamOffsetRange; import org.apache.kafka.metadata.stream.S3StreamConstant; import org.apache.kafka.metadata.stream.S3StreamObject; import org.apache.kafka.metadata.stream.S3WALObject; @@ -383,8 +383,8 @@ public ControllerResult commitWALObject(CommitWALOb records.addAll(destroyResult.records()); } - List indexes = streamRanges.stream() - .map(range -> new S3ObjectStreamIndex(range.streamId(), range.startOffset(), range.endOffset())) + List indexes = streamRanges.stream() + .map(range -> new StreamOffsetRange(range.streamId(), range.startOffset(), range.endOffset())) .collect(Collectors.toList()); // update broker's wal object BrokerS3WALMetadata brokerMetadata = this.brokersMetadata.get(brokerId); @@ -400,7 +400,7 @@ public ControllerResult commitWALObject(CommitWALOb .setBrokerId(brokerId) .setStreamsIndex( indexes.stream() - .map(S3ObjectStreamIndex::toRecordStreamIndex) + .map(StreamOffsetRange::toRecordStreamIndex) .collect(Collectors.toList())), (short) 0)); // generate compacted objects' remove record data.compactedObjectIds().forEach(id -> records.add(new ApiMessageAndVersion(new RemoveWALObjectRecord() @@ -507,10 +507,10 @@ public void replay(WALObjectRecord record) { } // create wal object - Map> indexMap = streamIndexes + Map> indexMap = streamIndexes .stream() - .map(S3ObjectStreamIndex::of) - .collect(Collectors.groupingBy(S3ObjectStreamIndex::getStreamId)); + .map(StreamOffsetRange::of) + .collect(Collectors.groupingBy(StreamOffsetRange::getStreamId)); brokerMetadata.walObjects.put(objectId, new S3WALObject(objectId, brokerId, indexMap, orderId)); // update range diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java index db97072b87..e6dc1c0f22 100644 --- a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java @@ -584,6 +584,7 @@ public String toString() { ", aclsDelta=" + aclsDelta + ", streamMetadataDelta=" + s3StreamsMetadataDelta + ", objectsMetadataDelta=" + s3ObjectsDelta + + ", kvDelta=" + kvDelta + ')'; } } diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java index 64a71263dd..c547212e37 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java @@ -26,6 +26,7 @@ import org.apache.kafka.metadata.stream.S3StreamObject; import org.apache.kafka.image.writer.ImageWriter; import org.apache.kafka.image.writer.ImageWriterOptions; +import org.apache.kafka.metadata.stream.StreamOffsetRange; import org.apache.kafka.metadata.stream.StreamState; public class S3StreamMetadataImage { @@ -91,6 +92,11 @@ public long getStartOffset() { return startOffset; } + public long getEndOffset() { + RangeMetadata range = ranges.get(rangeIndex); + return range == null ? startOffset : range.endOffset(); + } + public long getStreamId() { return streamId; } @@ -107,6 +113,10 @@ public StreamState state() { return state; } + public StreamOffsetRange offsetRange() { + return new StreamOffsetRange(streamId, startOffset, getEndOffset()); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java index a509d73cf0..6e3dbd8951 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java @@ -20,9 +20,11 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Queue; import java.util.stream.Collectors; import org.apache.kafka.common.metadata.AssignedStreamIdRecord; import org.apache.kafka.image.writer.ImageWriter; @@ -30,7 +32,7 @@ import org.apache.kafka.metadata.stream.InRangeObjects; import org.apache.kafka.metadata.stream.RangeMetadata; import org.apache.kafka.metadata.stream.S3ObjectMetadata; -import org.apache.kafka.metadata.stream.S3ObjectStreamIndex; +import org.apache.kafka.metadata.stream.StreamOffsetRange; import org.apache.kafka.metadata.stream.S3ObjectType; import org.apache.kafka.metadata.stream.S3StreamObject; import org.apache.kafka.server.common.ApiMessageAndVersion; @@ -80,6 +82,7 @@ public InRangeObjects getObjects(long streamId, long startOffset, long endOffset List objects = new ArrayList<>(); long realEndOffset = startOffset; List rangeSearchers = rangeSearchers(streamId, startOffset, endOffset); + // TODO: if one stream object in multiple ranges, we may get duplicate objects for (RangeSearcher rangeSearcher : rangeSearchers) { InRangeObjects inRangeObjects = rangeSearcher.getObjects(limit); if (inRangeObjects == InRangeObjects.INVALID) { @@ -88,7 +91,7 @@ public InRangeObjects getObjects(long streamId, long startOffset, long endOffset realEndOffset = inRangeObjects.endOffset(); objects.addAll(inRangeObjects.objects()); limit -= inRangeObjects.objects().size(); - if (limit <= 0) { + if (limit <= 0 || realEndOffset >= endOffset) { break; } } @@ -133,19 +136,12 @@ public RangeSearcher(long startOffset, long endOffset, long streamId, int broker this.brokerId = brokerId; } - @SuppressWarnings("all") - public InRangeObjects getObjects(int limit) { - if (limit <= 0) { - return InRangeObjects.INVALID; - } + private Queue rangeOfWalObjects() { BrokerS3WALMetadataImage wal = brokerWALMetadata.get(brokerId); - if (wal == null) { - return InRangeObjects.INVALID; - } - List walObjects = wal.getWalObjects().list().stream() + return wal.getWalObjects().list().stream() .filter(obj -> obj.streamsIndex().containsKey(streamId) && obj.streamsIndex().get(streamId).size() != 0) .flatMap(obj -> { - List indexes = obj.streamsIndex().get(streamId); + List indexes = obj.streamsIndex().get(streamId); // TODO: pre filter useless objects return indexes.stream().filter(index -> { long objectStartOffset = index.getStartOffset(); @@ -156,16 +152,15 @@ public InRangeObjects getObjects(int limit) { long endOffset = index.getEndOffset(); return new ObjectStreamRange(obj.objectId(), obj.objectType(), startOffset, endOffset); }); - }).collect(Collectors.toList()); + }).collect(Collectors.toCollection(LinkedList::new)); + } + + private Queue rangeOfStreamObjects() { S3StreamMetadataImage stream = streamsMetadata.get(streamId); - if (stream == null) { - return InRangeObjects.INVALID; - } - List streamObjects = new ArrayList<>(); Map streamObjectsMetadata = stream.getStreamObjects(); // TODO: refactor to make stream objects in order if (streamObjectsMetadata != null && !streamObjectsMetadata.isEmpty()) { - List streamObjs = streamObjectsMetadata.values().stream().filter(obj -> { + return streamObjectsMetadata.values().stream().filter(obj -> { long objectStartOffset = obj.streamIndex().getStartOffset(); long objectEndOffset = obj.streamIndex().getEndOffset(); return objectStartOffset < endOffset && objectEndOffset > startOffset; @@ -174,82 +169,48 @@ public InRangeObjects getObjects(int limit) { public int compare(S3StreamObject o1, S3StreamObject o2) { return o1.objectId() > o2.objectId() ? 1 : -1; } - }).collect(Collectors.toList()); - streamObjects.addAll( - streamObjs.stream().map( - obj -> { - long startOffset = obj.streamIndex().getStartOffset(); - long endOffset = obj.streamIndex().getEndOffset(); - return new ObjectStreamRange(obj.objectId(), obj.objectType(), startOffset, endOffset); - }).collect(Collectors.toList())); + }).map(obj -> { + long startOffset = obj.streamIndex().getStartOffset(); + long endOffset = obj.streamIndex().getEndOffset(); + return new ObjectStreamRange(obj.objectId(), obj.objectType(), startOffset, endOffset); + }).collect(Collectors.toCollection(LinkedList::new)); + } + return new LinkedList<>(); + } + + public InRangeObjects getObjects(int limit) { + if (limit <= 0) { + return InRangeObjects.INVALID; + } + if (!brokerWALMetadata.containsKey(brokerId) || !streamsMetadata.containsKey(streamId)) { + return InRangeObjects.INVALID; } + + Queue streamObjects = rangeOfStreamObjects(); + Queue walObjects = rangeOfWalObjects(); List inRangeObjects = new ArrayList<>(); - int walIndex = 0; - int streamIndex = 0; long nextStartOffset = startOffset; + while (limit > 0 && nextStartOffset < endOffset - && (walIndex < walObjects.size() || streamIndex < streamObjects.size())) { - - if (walIndex >= walObjects.size()) { - // only stream objects left - ObjectStreamRange streamRange = streamObjects.get(streamIndex++); - long objectStartOffset = streamRange.startOffset(); - long objectEndOffset = streamRange.endOffset(); - if (objectStartOffset > nextStartOffset) { - break; - } - if (objectEndOffset <= nextStartOffset) { - continue; - } - inRangeObjects.add(streamRange.toS3ObjectMetadata()); - limit--; - nextStartOffset = objectEndOffset; - continue; - } - - if (streamIndex >= streamObjects.size()) { - // only wal objects left - ObjectStreamRange walRange = walObjects.get(walIndex++); - long objectStartOffset = walRange.startOffset(); - long objectEndOffset = walRange.endOffset(); - if (objectStartOffset > nextStartOffset) { - break; - } - if (objectEndOffset <= nextStartOffset) { - continue; - } - inRangeObjects.add(walRange.toS3ObjectMetadata()); - limit--; - nextStartOffset = objectEndOffset; - continue; + && (!streamObjects.isEmpty() || !walObjects.isEmpty())) { + ObjectStreamRange streamRange = null; + if (walObjects.isEmpty() || (!streamObjects.isEmpty() && streamObjects.peek().startOffset() < walObjects.peek().startOffset())) { + streamRange = streamObjects.poll(); + } else { + streamRange = walObjects.poll(); } - - ObjectStreamRange walRange = walObjects.get(walIndex); - ObjectStreamRange streamRange = streamObjects.get(streamIndex); - long walObjectStartOffset = walRange.startOffset(); - long walObjectEndOffset = walRange.endOffset(); - long streamObjectStartOffset = streamRange.startOffset(); - long streamObjectEndOffset = streamRange.endOffset(); - if (walObjectStartOffset > nextStartOffset && streamObjectStartOffset > nextStartOffset) { - // both start offset are greater than nextStartOffset + long objectStartOffset = streamRange.startOffset(); + long objectEndOffset = streamRange.endOffset(); + if (objectStartOffset > nextStartOffset) { break; } - if (walObjectStartOffset <= nextStartOffset && walObjectEndOffset > nextStartOffset) { - // wal object contains nextStartOffset - inRangeObjects.add(walRange.toS3ObjectMetadata()); - limit--; - nextStartOffset = walObjectEndOffset; - walIndex++; + if (objectEndOffset <= nextStartOffset) { continue; } - if (streamObjectStartOffset <= nextStartOffset && streamObjectEndOffset > nextStartOffset) { - // stream object contains nextStartOffset - inRangeObjects.add(streamRange.toS3ObjectMetadata()); - limit--; - nextStartOffset = streamObjectEndOffset; - streamIndex++; - } + inRangeObjects.add(streamRange.toS3ObjectMetadata()); + limit--; + nextStartOffset = objectEndOffset; } return new InRangeObjects(streamId, startOffset, nextStartOffset, inRangeObjects); } @@ -315,6 +276,15 @@ public Map streamsMetadata() { return streamsMetadata; } + public StreamOffsetRange offsetRange(long streamId) { + S3StreamMetadataImage streamMetadata = streamsMetadata.get(streamId); + if (streamMetadata == null) { + return StreamOffsetRange.INVALID; + } + return streamMetadata.offsetRange(); + } + + public long nextAssignedStreamId() { return nextAssignedStreamId; } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/InRangeObjects.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/InRangeObjects.java index 400b18a59f..1be44e5d11 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/InRangeObjects.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/InRangeObjects.java @@ -52,6 +52,16 @@ public List objects() { return objects; } + @Override + public String toString() { + return "InRangeObjects{" + + "streamId=" + streamId + + ", startOffset=" + startOffset + + ", endOffset=" + endOffset + + ", objects=" + objects + + '}'; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/RangeMetadata.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/RangeMetadata.java index 6fc0e7125b..47d10f0967 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/RangeMetadata.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/RangeMetadata.java @@ -21,18 +21,36 @@ import org.apache.kafka.common.metadata.RangeRecord; import org.apache.kafka.server.common.ApiMessageAndVersion; +/** + * RangeMetadata is the metadata of a range of the stream. + *

+ * The range represents a continuous sequence of data [startOffset, endOffset) in the stream. + */ public class RangeMetadata implements Comparable { + + /** + * The id of the stream that the range belongs to. + */ private long streamId; + /** + * The epoch of the stream when the range is created. + */ private long epoch; + /** + * The index of the range in the stream. + */ private int rangeIndex; /** - * Inclusive + * Range start offset. (Inclusive) */ private long startOffset; /** - * Exclusive + * Range end offset. (Exclusive) */ private long endOffset; + /** + * The broker id of the broker that owns the range. + */ private int brokerId; public RangeMetadata(long streamId, long epoch, int rangeIndex, long startOffset, long endOffset, int brokerId) { diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectMetadata.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectMetadata.java index 1a595b3c9a..0fe45c173a 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectMetadata.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectMetadata.java @@ -18,6 +18,8 @@ package org.apache.kafka.metadata.stream; +import java.util.Objects; + public class S3ObjectMetadata { private final long objectId; private long objectSize; @@ -48,4 +50,21 @@ public S3ObjectType getType() { public String key() { return ObjectUtils.genKey(0, "todocluster", objectId); } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + S3ObjectMetadata that = (S3ObjectMetadata) o; + return objectId == that.objectId && objectSize == that.objectSize && type == that.type; + } + + @Override + public int hashCode() { + return Objects.hash(objectId, objectSize, type); + } } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java index 640ca43e24..87b0e71cdc 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java @@ -27,15 +27,15 @@ public class S3StreamObject { private final long objectSize; - private final S3ObjectStreamIndex streamIndex; + private final StreamOffsetRange streamIndex; public S3StreamObject(long objectId, long objectSize, long streamId, long startOffset, long endOffset) { this.objectId = objectId; this.objectSize = objectSize; - this.streamIndex = new S3ObjectStreamIndex(streamId, startOffset, endOffset); + this.streamIndex = new StreamOffsetRange(streamId, startOffset, endOffset); } - public S3ObjectStreamIndex streamIndex() { + public StreamOffsetRange streamIndex() { return streamIndex; } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java index 845100ca30..69af4385ab 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java @@ -37,11 +37,11 @@ public class S3WALObject implements Comparable { private final long objectId; private final int brokerId; - private final Map> streamsIndex; + private final Map> streamsIndex; private final S3ObjectType objectType = S3ObjectType.UNKNOWN; - public S3WALObject(long objectId, int brokerId, final Map> streamsIndex, long orderId) { + public S3WALObject(long objectId, int brokerId, final Map> streamsIndex, long orderId) { this.orderId = orderId; this.objectId = objectId; this.brokerId = brokerId; @@ -49,16 +49,16 @@ public S3WALObject(long objectId, int brokerId, final Map indexes = streamsIndex.get(streamId); + List indexes = streamsIndex.get(streamId); if (indexes == null || indexes.isEmpty()) { return false; } - S3ObjectStreamIndex firstIndex = indexes.get(0); - S3ObjectStreamIndex lastIndex = indexes.get(indexes.size() - 1); + StreamOffsetRange firstIndex = indexes.get(0); + StreamOffsetRange lastIndex = indexes.get(indexes.size() - 1); return startOffset >= firstIndex.getStartOffset() && startOffset <= lastIndex.getEndOffset(); } - public Map> streamsIndex() { + public Map> streamsIndex() { return streamsIndex; } @@ -69,14 +69,14 @@ public ApiMessageAndVersion toRecord() { .setOrderId(orderId) .setStreamsIndex( streamsIndex.values().stream().flatMap(List::stream) - .map(S3ObjectStreamIndex::toRecordStreamIndex) + .map(StreamOffsetRange::toRecordStreamIndex) .collect(Collectors.toList())), (short) 0); } public static S3WALObject of(WALObjectRecord record) { - Map> collect = record.streamsIndex().stream() - .map(index -> new S3ObjectStreamIndex(index.streamId(), index.startOffset(), index.endOffset())) - .collect(Collectors.groupingBy(S3ObjectStreamIndex::getStreamId)); + Map> collect = record.streamsIndex().stream() + .map(index -> new StreamOffsetRange(index.streamId(), index.startOffset(), index.endOffset())) + .collect(Collectors.groupingBy(StreamOffsetRange::getStreamId)); S3WALObject s3WalObject = new S3WALObject(record.objectId(), record.brokerId(), collect, record.orderId()); return s3WalObject; diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectStreamIndex.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/StreamOffsetRange.java similarity index 58% rename from metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectStreamIndex.java rename to metadata/src/main/java/org/apache/kafka/metadata/stream/StreamOffsetRange.java index 5c8fd88071..e405a9d732 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectStreamIndex.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/StreamOffsetRange.java @@ -20,38 +20,41 @@ import org.apache.kafka.common.metadata.WALObjectRecord.StreamIndex; /** - * ObjectStreamIndex is the index of a stream range in a WAL object or STREAM object. + * StreamOffsetRange represents [startOffset, endOffset) in the stream. */ -public class S3ObjectStreamIndex implements Comparable { +public class StreamOffsetRange implements Comparable { - private final Long streamId; + public static final StreamOffsetRange INVALID = new StreamOffsetRange(S3StreamConstant.INVALID_STREAM_ID, + S3StreamConstant.INVALID_OFFSET, S3StreamConstant.INVALID_OFFSET); - private final Long startOffset; + private final long streamId; - private final Long endOffset; + private final long startOffset; - public S3ObjectStreamIndex(Long streamId, Long startOffset, Long endOffset) { + private final long endOffset; + + public StreamOffsetRange(long streamId, long startOffset, long endOffset) { this.streamId = streamId; this.startOffset = startOffset; this.endOffset = endOffset; } - public Long getStreamId() { + public long getStreamId() { return streamId; } - public Long getStartOffset() { + public long getStartOffset() { return startOffset; } - public Long getEndOffset() { + public long getEndOffset() { return endOffset; } @Override - public int compareTo(S3ObjectStreamIndex o) { - int res = this.streamId.compareTo(o.streamId); - return res == 0 ? this.startOffset.compareTo(o.startOffset) : res; + public int compareTo(StreamOffsetRange o) { + int res = Long.compare(this.streamId, o.streamId); + return res == 0 ? Long.compare(this.startOffset, o.startOffset) : res; } public StreamIndex toRecordStreamIndex() { @@ -61,7 +64,7 @@ public StreamIndex toRecordStreamIndex() { .setEndOffset(endOffset); } - public static S3ObjectStreamIndex of(StreamIndex index) { - return new S3ObjectStreamIndex(index.streamId(), index.startOffset(), index.endOffset()); + public static StreamOffsetRange of(StreamIndex index) { + return new StreamOffsetRange(index.streamId(), index.startOffset(), index.endOffset()); } } diff --git a/metadata/src/test/java/org/apache/kafka/image/BrokerS3WALMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/BrokerS3WALMetadataImageTest.java index ce643ef872..c5a6be245e 100644 --- a/metadata/src/test/java/org/apache/kafka/image/BrokerS3WALMetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/BrokerS3WALMetadataImageTest.java @@ -28,7 +28,7 @@ import org.apache.kafka.image.writer.ImageWriterOptions; import org.apache.kafka.image.writer.RecordListWriter; import org.apache.kafka.metadata.RecordTestUtils; -import org.apache.kafka.metadata.stream.S3ObjectStreamIndex; +import org.apache.kafka.metadata.stream.StreamOffsetRange; import org.apache.kafka.metadata.stream.S3WALObject; import org.apache.kafka.metadata.stream.SortedWALObjectsList; import org.apache.kafka.server.common.ApiMessageAndVersion; @@ -80,10 +80,10 @@ public void testS3WALObjects() { // verify delta and check image's write BrokerS3WALMetadataImage image1 = new BrokerS3WALMetadataImage(BROKER0, new SortedWALObjectsList(List.of( new S3WALObject(0L, BROKER0, Map.of( - STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 0L, 100L)), - STREAM1, List.of(new S3ObjectStreamIndex(STREAM1, 0L, 200L))), 0L), + STREAM0, List.of(new StreamOffsetRange(STREAM0, 0L, 100L)), + STREAM1, List.of(new StreamOffsetRange(STREAM1, 0L, 200L))), 0L), new S3WALObject(1L, BROKER0, Map.of( - STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 101L, 200L))), 1L)))); + STREAM0, List.of(new StreamOffsetRange(STREAM0, 101L, 200L))), 1L)))); assertEquals(image1, delta0.apply()); testToImageAndBack(image1); @@ -96,7 +96,7 @@ public void testS3WALObjects() { // verify delta and check image's write BrokerS3WALMetadataImage image2 = new BrokerS3WALMetadataImage(BROKER0, new SortedWALObjectsList(List.of( new S3WALObject(1L, BROKER0, Map.of( - STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 101L, 200L))), 1L)))); + STREAM0, List.of(new StreamOffsetRange(STREAM0, 101L, 200L))), 1L)))); assertEquals(image2, delta1.apply()); testToImageAndBack(image2); } diff --git a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java index 2cfae9d27d..ecbb1fec26 100644 --- a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java @@ -30,7 +30,7 @@ import org.apache.kafka.metadata.stream.InRangeObjects; import org.apache.kafka.metadata.stream.RangeMetadata; import org.apache.kafka.metadata.stream.S3ObjectMetadata; -import org.apache.kafka.metadata.stream.S3ObjectStreamIndex; +import org.apache.kafka.metadata.stream.StreamOffsetRange; import org.apache.kafka.metadata.stream.S3StreamObject; import org.apache.kafka.metadata.stream.S3WALObject; import org.apache.kafka.metadata.stream.SortedWALObjectsList; @@ -100,16 +100,16 @@ private void testToImageAndBack(S3StreamsMetadataImage image) { @Test public void testGetObjects() { List broker0WalObjects = List.of( - new S3WALObject(0, BROKER0, Map.of(STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 100L, 120L))), 0L), - new S3WALObject(1, BROKER0, Map.of(STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 120L, 140L))), 1L), - new S3WALObject(2, BROKER0, Map.of(STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 180L, 200L))), 2L), + new S3WALObject(0, BROKER0, Map.of(STREAM0, List.of(new StreamOffsetRange(STREAM0, 100L, 120L))), 0L), + new S3WALObject(1, BROKER0, Map.of(STREAM0, List.of(new StreamOffsetRange(STREAM0, 120L, 140L))), 1L), + new S3WALObject(2, BROKER0, Map.of(STREAM0, List.of(new StreamOffsetRange(STREAM0, 180L, 200L))), 2L), new S3WALObject(3, BROKER0, Map.of(STREAM0, List.of( - new S3ObjectStreamIndex(STREAM0, 400L, 420L), new S3ObjectStreamIndex(STREAM0, 500L, 520L))), 3L), - new S3WALObject(4, BROKER0, Map.of(STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 520L, 600L))), 4L)); + new StreamOffsetRange(STREAM0, 400L, 420L), new StreamOffsetRange(STREAM0, 500L, 520L))), 3L), + new S3WALObject(4, BROKER0, Map.of(STREAM0, List.of(new StreamOffsetRange(STREAM0, 520L, 600L))), 4L)); List broker1WalObjects = List.of( - new S3WALObject(5, BROKER1, Map.of(STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 140L, 160L))), 0L), - new S3WALObject(6, BROKER1, Map.of(STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 160L, 180L))), 1L), - new S3WALObject(7, BROKER1, Map.of(STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 420L, 500L))), 2L)); + new S3WALObject(5, BROKER1, Map.of(STREAM0, List.of(new StreamOffsetRange(STREAM0, 140L, 160L))), 0L), + new S3WALObject(6, BROKER1, Map.of(STREAM0, List.of(new StreamOffsetRange(STREAM0, 160L, 180L))), 1L), + new S3WALObject(7, BROKER1, Map.of(STREAM0, List.of(new StreamOffsetRange(STREAM0, 420L, 500L))), 2L)); BrokerS3WALMetadataImage broker0WALMetadataImage = new BrokerS3WALMetadataImage(BROKER0, new SortedWALObjectsList(broker0WalObjects)); BrokerS3WALMetadataImage broker1WALMetadataImage = new BrokerS3WALMetadataImage(BROKER1, new SortedWALObjectsList(broker1WalObjects)); Map ranges = Map.of(