diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml
index 725866e201..4ea2bb5f89 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -56,6 +56,8 @@
+
+
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateStreamRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/s3/CreateStreamRequest.java
similarity index 93%
rename from clients/src/main/java/org/apache/kafka/common/requests/CreateStreamRequest.java
rename to clients/src/main/java/org/apache/kafka/common/requests/s3/CreateStreamRequest.java
index e40334b7db..29dd41c07c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateStreamRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/s3/CreateStreamRequest.java
@@ -15,11 +15,13 @@
* limitations under the License.
*/
-package org.apache.kafka.common.requests;
+package org.apache.kafka.common.requests.s3;
import org.apache.kafka.common.message.CreateStreamRequestData;
import org.apache.kafka.common.message.CreateStreamResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.ApiError;
public class CreateStreamRequest extends AbstractRequest {
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateStreamResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/s3/CreateStreamResponse.java
similarity index 94%
rename from clients/src/main/java/org/apache/kafka/common/requests/CreateStreamResponse.java
rename to clients/src/main/java/org/apache/kafka/common/requests/s3/CreateStreamResponse.java
index 2636797593..0a42572af4 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateStreamResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/s3/CreateStreamResponse.java
@@ -15,12 +15,13 @@
* limitations under the License.
*/
-package org.apache.kafka.common.requests;
+package org.apache.kafka.common.requests.s3;
import java.util.Map;
import org.apache.kafka.common.message.CreateStreamResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
public class CreateStreamResponse extends AbstractResponse {
diff --git a/core/src/main/scala/kafka/log/es/ElasticLogManager.scala b/core/src/main/scala/kafka/log/es/ElasticLogManager.scala
index c3438f3398..c2ef453f31 100644
--- a/core/src/main/scala/kafka/log/es/ElasticLogManager.scala
+++ b/core/src/main/scala/kafka/log/es/ElasticLogManager.scala
@@ -21,7 +21,7 @@ import com.automq.elasticstream.client.api.Client
import kafka.log.{LogConfig, ProducerStateManagerConfig}
import kafka.log.es.ElasticLogManager.NAMESPACE
import kafka.log.es.client.{ClientFactoryProxy, Context}
-import kafka.server.{KafkaConfig, LogDirFailureChannel}
+import kafka.server.{BrokerServer, KafkaConfig, LogDirFailureChannel}
import kafka.utils.Scheduler
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Time
@@ -83,7 +83,7 @@ object ElasticLogManager {
var INSTANCE: Option[ElasticLogManager] = None
var NAMESPACE = ""
- def init(config: KafkaConfig, clusterId: String): Boolean = {
+ def init(broker: BrokerServer, config: KafkaConfig, clusterId: String): Boolean = {
if (!config.elasticStreamEnabled) {
return false
}
@@ -103,6 +103,7 @@ object ElasticLogManager {
}
val context = new Context()
context.config = config
+ context.brokerServer = broker
INSTANCE = Some(new ElasticLogManager(ClientFactoryProxy.get(context)))
val namespace = config.elasticStreamNamespace
diff --git a/core/src/main/scala/kafka/log/es/client/Context.java b/core/src/main/scala/kafka/log/es/client/Context.java
index 9cda65b588..8267f632d3 100644
--- a/core/src/main/scala/kafka/log/es/client/Context.java
+++ b/core/src/main/scala/kafka/log/es/client/Context.java
@@ -17,8 +17,10 @@
package kafka.log.es.client;
+import kafka.server.BrokerServer;
import kafka.server.KafkaConfig;
public class Context {
public KafkaConfig config;
+ public BrokerServer brokerServer;
}
diff --git a/core/src/main/scala/kafka/log/es/client/s3/ClientFactory.java b/core/src/main/scala/kafka/log/es/client/s3/ClientFactory.java
index 4896d4a7b7..a11060adc8 100644
--- a/core/src/main/scala/kafka/log/es/client/s3/ClientFactory.java
+++ b/core/src/main/scala/kafka/log/es/client/s3/ClientFactory.java
@@ -20,7 +20,7 @@
import com.automq.elasticstream.client.api.Client;
import kafka.log.es.AlwaysSuccessClient;
import kafka.log.es.client.Context;
-import kafka.log.s3.S3Client;
+import kafka.log.s3.DefaultS3Client;
import kafka.log.s3.operator.DefaultS3Operator;
import kafka.log.s3.operator.S3Operator;
@@ -30,6 +30,6 @@ public static Client get(Context context) {
String region = context.config.s3Region();
String bucket = context.config.s3Bucket();
S3Operator s3Operator = new DefaultS3Operator(endpoint, region, bucket);
- return new AlwaysSuccessClient(new S3Client(s3Operator));
+ return new AlwaysSuccessClient(new DefaultS3Client(context.brokerServer, context.config, s3Operator));
}
}
diff --git a/core/src/main/scala/kafka/log/s3/DefaultS3Client.java b/core/src/main/scala/kafka/log/s3/DefaultS3Client.java
new file mode 100644
index 0000000000..96802bb9cf
--- /dev/null
+++ b/core/src/main/scala/kafka/log/s3/DefaultS3Client.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log.s3;
+
+import com.automq.elasticstream.client.api.Client;
+import com.automq.elasticstream.client.api.KVClient;
+import com.automq.elasticstream.client.api.StreamClient;
+import kafka.log.es.MemoryClient.KVClientImpl;
+import kafka.log.s3.cache.DefaultS3BlockCache;
+import kafka.log.s3.cache.S3BlockCache;
+import kafka.log.s3.network.ControllerRequestSender;
+import kafka.log.s3.objects.ControllerObjectManager;
+import kafka.log.s3.objects.ObjectManager;
+import kafka.log.s3.operator.S3Operator;
+import kafka.log.s3.streams.ControllerStreamManager;
+import kafka.log.s3.streams.StreamManager;
+import kafka.server.BrokerServer;
+import kafka.server.BrokerToControllerChannelManager;
+import kafka.server.KafkaConfig;
+
+public class DefaultS3Client implements Client {
+
+ private final KafkaConfig config;
+ private final StreamMetadataManager metadataManager;
+
+ private final BrokerToControllerChannelManager channelManager;
+
+ private final ControllerRequestSender requestSender;
+
+ private final S3Operator operator;
+
+ private final Wal wal;
+
+ private final S3BlockCache blockCache;
+
+ private final ObjectManager objectManager;
+
+ private final StreamManager streamManager;
+
+ private final StreamClient streamClient;
+
+ private final KVClient kvClient;
+
+ public DefaultS3Client(BrokerServer brokerServer, KafkaConfig config, S3Operator operator) {
+ this.config = config;
+ this.channelManager = brokerServer.clientToControllerChannelManager();
+ this.metadataManager = new StreamMetadataManager(brokerServer, config);
+ this.operator = operator;
+ this.requestSender = new ControllerRequestSender(channelManager);
+ this.streamManager = new ControllerStreamManager(this.requestSender, config);
+ this.objectManager = new ControllerObjectManager(this.requestSender, this.metadataManager, this.config);
+ this.wal = new S3Wal(objectManager, operator);
+ this.blockCache = new DefaultS3BlockCache(objectManager, operator);
+ this.streamClient = new S3StreamClient(this.streamManager, this.wal, this.blockCache, this.objectManager);
+ this.kvClient = new KVClientImpl();
+ }
+
+ @Override
+ public StreamClient streamClient() {
+ return this.streamClient;
+ }
+
+ @Override
+ public KVClient kvClient() {
+ return this.kvClient;
+ }
+}
diff --git a/core/src/main/scala/kafka/log/s3/KRaftKVClient.java b/core/src/main/scala/kafka/log/s3/KRaftKVClient.java
index c3cbdba41b..240bae215a 100644
--- a/core/src/main/scala/kafka/log/s3/KRaftKVClient.java
+++ b/core/src/main/scala/kafka/log/s3/KRaftKVClient.java
@@ -24,6 +24,7 @@
import java.util.concurrent.CompletableFuture;
public class KRaftKVClient implements KVClient {
+
@Override
public CompletableFuture putKV(List list) {
return null;
diff --git a/core/src/main/scala/kafka/log/s3/ObjectReader.java b/core/src/main/scala/kafka/log/s3/ObjectReader.java
index 3b65cf6cc6..41d9814c61 100644
--- a/core/src/main/scala/kafka/log/s3/ObjectReader.java
+++ b/core/src/main/scala/kafka/log/s3/ObjectReader.java
@@ -19,7 +19,7 @@
import io.netty.buffer.ByteBuf;
import kafka.log.s3.model.StreamRecordBatch;
-import kafka.log.s3.objects.S3ObjectMetadata;
+import org.apache.kafka.metadata.stream.S3ObjectMetadata;
import kafka.log.s3.operator.S3Operator;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.compress.ZstdFactory;
diff --git a/core/src/main/scala/kafka/log/s3/ObjectWriter.java b/core/src/main/scala/kafka/log/s3/ObjectWriter.java
index 891f94a723..81b886d6a3 100644
--- a/core/src/main/scala/kafka/log/s3/ObjectWriter.java
+++ b/core/src/main/scala/kafka/log/s3/ObjectWriter.java
@@ -24,7 +24,7 @@
import kafka.log.s3.objects.ObjectStreamRange;
import kafka.log.s3.operator.S3Operator;
import kafka.log.s3.operator.Writer;
-import kafka.log.s3.utils.ObjectUtils;
+import org.apache.kafka.metadata.stream.ObjectUtils;
import org.apache.kafka.common.compress.ZstdFactory;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
diff --git a/core/src/main/scala/kafka/log/s3/StreamMetadataManager.java b/core/src/main/scala/kafka/log/s3/StreamMetadataManager.java
new file mode 100644
index 0000000000..9d79787a4a
--- /dev/null
+++ b/core/src/main/scala/kafka/log/s3/StreamMetadataManager.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log.s3;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import kafka.server.BrokerServer;
+import kafka.server.KafkaConfig;
+import kafka.server.MetadataCache;
+import org.apache.kafka.image.BrokerS3WALMetadataImage;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.metadata.stream.InRangeObjects;
+import org.apache.kafka.metadata.stream.S3Object;
+import org.apache.kafka.metadata.stream.S3ObjectMetadata;
+import org.apache.kafka.metadata.stream.S3ObjectStreamIndex;
+import org.apache.kafka.metadata.stream.S3WALObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StreamMetadataManager {
+
+ // TODO: optimize by more suitable concurrent protection
+ // TODO: use order id instead of object id
+ private final static Logger LOGGER = LoggerFactory.getLogger(StreamMetadataManager.class);
+ private final KafkaConfig config;
+ private final BrokerServer broker;
+ private final InflightWalObjects inflightWalObjects;
+ private final MetadataCache metadataCache;
+ private final CatchUpMetadataListener catchUpMetadataListener;
+
+ public StreamMetadataManager(BrokerServer broker, KafkaConfig config) {
+ this.config = config;
+ this.broker = broker;
+ this.inflightWalObjects = new InflightWalObjects();
+ this.metadataCache = broker.metadataCache();
+ this.catchUpMetadataListener = new CatchUpMetadataListener();
+ // register listener
+ this.broker.metadataListener().registerStreamMetadataListener(this.catchUpMetadataListener);
+ }
+
+ public synchronized void catchupTo(long objectId) {
+ // delete all wal objects which are <= objectId
+ this.inflightWalObjects.trim(objectId);
+ }
+
+ @SuppressWarnings("all")
+ public synchronized List getObjects(long streamId, long startOffset, long endOffset, int limit) {
+ List objects = new ArrayList<>();
+ if (startOffset >= endOffset) {
+ return objects;
+ }
+ OffsetRange walRange = this.inflightWalObjects.getWalRange(streamId);
+ if (walRange == null || endOffset <= walRange.startOffset()) {
+ // only search in cache
+ InRangeObjects cachedInRangeObjects = this.metadataCache.getObjects(streamId, startOffset, endOffset, limit);
+ if (cachedInRangeObjects != null) {
+ objects.addAll(cachedInRangeObjects.objects());
+ }
+ return objects;
+ }
+ if (startOffset >= walRange.startOffset()) {
+ // only search in inflight wal
+ InRangeObjects inflightInRangeObjects = this.inflightWalObjects.getObjects(streamId, startOffset, endOffset, limit);
+ if (inflightInRangeObjects != null) {
+ objects.addAll(inflightInRangeObjects.objects());
+ }
+ return objects;
+ }
+ long cachedEndOffset = walRange.startOffset();
+ InRangeObjects cachedInRangeObjects = this.metadataCache.getObjects(streamId, startOffset, cachedEndOffset, limit);
+ if (cachedInRangeObjects == null || cachedInRangeObjects == InRangeObjects.INVALID) {
+ return objects;
+ }
+ objects.addAll(cachedInRangeObjects.objects());
+ if (objects.size() >= limit) {
+ return objects;
+ }
+ InRangeObjects inflightinRangeObjects = this.inflightWalObjects.getObjects(streamId, cachedEndOffset, endOffset, limit - objects.size());
+ if (inflightinRangeObjects != null) {
+ objects.addAll(inflightinRangeObjects.objects());
+ }
+ objects.forEach(obj -> {
+ S3Object metadata = metadataCache.getObjectMetadata(obj.getObjectId());
+ if (metadata == null) {
+ LOGGER.error("object: {} metadata not exist", obj.getObjectId());
+ throw new RuntimeException("object: " + obj.getObjectId() + " metadata not exist");
+ }
+ obj.setObjectSize(metadata.getObjectSize());
+ });
+ return objects;
+ }
+
+ private static class OffsetRange {
+
+ private long startOffset;
+ private long endOffset;
+
+ public OffsetRange(long startOffset, long endOffset) {
+ this.startOffset = startOffset;
+ this.endOffset = endOffset;
+ }
+
+ public long startOffset() {
+ return startOffset;
+ }
+
+ public long endOffset() {
+ return endOffset;
+ }
+
+ public void setEndOffset(long endOffset) {
+ this.endOffset = endOffset;
+ }
+
+ public void setStartOffset(long startOffset) {
+ this.startOffset = startOffset;
+ }
+ }
+
+ static class InflightWalObjects {
+
+ private final List objects;
+ private final Map streamOffsets;
+
+ public InflightWalObjects() {
+ this.objects = new LinkedList<>();
+ this.streamOffsets = new HashMap<>();
+ }
+
+ public void append(S3WALObject object) {
+ objects.add(object);
+ object.streamsIndex().forEach((stream, indexes) -> {
+ // wal object only contains one index for each stream
+ streamOffsets.putIfAbsent(stream, new OffsetRange(indexes.get(0).getStartOffset(), indexes.get(indexes.size() - 1).getEndOffset()));
+ streamOffsets.get(stream).setEndOffset(indexes.get(indexes.size() - 1).getEndOffset());
+ });
+ }
+
+ public void trim(long objectId) {
+ // TODO: speed up by binary search
+ int clearEndIndex = objects.size();
+ for (int i = 0; i < objects.size(); i++) {
+ S3WALObject wal = objects.get(i);
+ if (wal.objectId() > objectId) {
+ clearEndIndex = i;
+ break;
+ }
+ wal.streamsIndex().forEach((stream, indexes) -> {
+ streamOffsets.get(stream).setStartOffset(indexes.get(indexes.size() - 1).getEndOffset());
+ });
+ }
+ objects.subList(0, clearEndIndex).clear();
+ }
+
+ public OffsetRange getWalRange(long streamId) {
+ return streamOffsets.get(streamId);
+ }
+
+ @SuppressWarnings("all")
+ public InRangeObjects getObjects(long streamId, long startOffset, long endOffset, int limit) {
+ OffsetRange walRange = getWalRange(streamId);
+ if (walRange == null) {
+ return InRangeObjects.INVALID;
+ }
+ if (startOffset < walRange.startOffset()) {
+ return InRangeObjects.INVALID;
+ }
+ if (endOffset > walRange.endOffset()) {
+ endOffset = walRange.endOffset();
+ }
+ if (startOffset >= endOffset) {
+ return InRangeObjects.INVALID;
+ }
+ List inRangeObjects = new LinkedList<>();
+ long nextStartOffset = startOffset;
+ for (S3WALObject object : objects) {
+ if (limit <= 0) {
+ break;
+ }
+ if (nextStartOffset >= endOffset) {
+ break;
+ }
+ List indexes = object.streamsIndex().get(streamId);
+ if (indexes == null || indexes.size() != 1) {
+ LOGGER.error("invalid wal object: {}", object);
+ continue;
+ }
+ long objStartOffset = indexes.get(0).getStartOffset();
+ long objEndOffset = indexes.get(0).getEndOffset();
+ if (objStartOffset > startOffset) {
+ break;
+ }
+ if (objEndOffset <= startOffset) {
+ continue;
+ }
+ limit--;
+ inRangeObjects.add(new S3ObjectMetadata(object.objectId(), object.objectType()));
+ nextStartOffset = objEndOffset;
+ }
+ return new InRangeObjects(streamId, startOffset, nextStartOffset, inRangeObjects);
+ }
+ }
+
+ public interface StreamMetadataListener {
+
+ void onChange(MetadataDelta delta, MetadataImage image);
+ }
+
+ class CatchUpMetadataListener implements StreamMetadataListener {
+
+ @Override
+ public void onChange(MetadataDelta delta, MetadataImage newImage) {
+ BrokerS3WALMetadataImage walMetadataImage = newImage.streamsMetadata().brokerWALMetadata().get(config.brokerId());
+ if (walMetadataImage == null) {
+ return;
+ }
+ S3WALObject wal = walMetadataImage.getWalObjects().get(walMetadataImage.getWalObjects().size() - 1);
+ if (wal == null) {
+ return;
+ }
+ catchupTo(wal.objectId());
+ }
+ }
+
+}
diff --git a/core/src/main/scala/kafka/log/s3/cache/DefaultS3BlockCache.java b/core/src/main/scala/kafka/log/s3/cache/DefaultS3BlockCache.java
index b18c7aecd5..61a6e43109 100644
--- a/core/src/main/scala/kafka/log/s3/cache/DefaultS3BlockCache.java
+++ b/core/src/main/scala/kafka/log/s3/cache/DefaultS3BlockCache.java
@@ -20,7 +20,7 @@
import kafka.log.s3.ObjectReader;
import kafka.log.s3.model.StreamRecordBatch;
import kafka.log.s3.objects.ObjectManager;
-import kafka.log.s3.objects.S3ObjectMetadata;
+import org.apache.kafka.metadata.stream.S3ObjectMetadata;
import kafka.log.s3.operator.S3Operator;
import org.apache.kafka.common.utils.CloseableIterator;
diff --git a/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java b/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java
index 5fc0ecc879..d7cd01304a 100644
--- a/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java
+++ b/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java
@@ -38,9 +38,9 @@
import kafka.log.s3.objects.ObjectManager;
import kafka.log.s3.objects.ObjectStreamRange;
import kafka.log.s3.objects.OpenStreamMetadata;
-import kafka.log.s3.objects.S3ObjectMetadata;
+import org.apache.kafka.metadata.stream.S3ObjectMetadata;
import kafka.log.s3.streams.StreamManager;
-import kafka.log.s3.utils.ObjectUtils;
+import org.apache.kafka.metadata.stream.ObjectUtils;
import org.apache.kafka.common.errors.s3.StreamFencedException;
import org.apache.kafka.common.errors.s3.StreamNotExistException;
import org.apache.kafka.metadata.stream.S3Object;
diff --git a/core/src/main/scala/kafka/log/s3/S3Client.java b/core/src/main/scala/kafka/log/s3/memory/MemoryS3Client.java
similarity index 89%
rename from core/src/main/scala/kafka/log/s3/S3Client.java
rename to core/src/main/scala/kafka/log/s3/memory/MemoryS3Client.java
index 9a5248d56f..cc6358e2b2 100644
--- a/core/src/main/scala/kafka/log/s3/S3Client.java
+++ b/core/src/main/scala/kafka/log/s3/memory/MemoryS3Client.java
@@ -15,22 +15,24 @@
* limitations under the License.
*/
-package kafka.log.s3;
+package kafka.log.s3.memory;
import com.automq.elasticstream.client.api.Client;
import com.automq.elasticstream.client.api.KVClient;
import com.automq.elasticstream.client.api.StreamClient;
import kafka.log.es.MemoryClient;
+import kafka.log.s3.S3StreamClient;
+import kafka.log.s3.S3Wal;
+import kafka.log.s3.Wal;
import kafka.log.s3.cache.DefaultS3BlockCache;
import kafka.log.s3.cache.S3BlockCache;
-import kafka.log.s3.memory.MemoryMetadataManager;
import kafka.log.s3.operator.S3Operator;
-public class S3Client implements Client {
+public class MemoryS3Client implements Client {
private final StreamClient streamClient;
private final KVClient kvClient;
- public S3Client(S3Operator s3Operator) {
+ public MemoryS3Client(S3Operator s3Operator) {
MemoryMetadataManager manager = new MemoryMetadataManager();
manager.start();
Wal wal = new S3Wal(manager, s3Operator);
diff --git a/core/src/main/scala/kafka/log/s3/network/ControllerRequestSender.java b/core/src/main/scala/kafka/log/s3/network/ControllerRequestSender.java
new file mode 100644
index 0000000000..a23d4603d0
--- /dev/null
+++ b/core/src/main/scala/kafka/log/s3/network/ControllerRequestSender.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log.s3.network;
+
+import java.util.concurrent.CompletableFuture;
+import kafka.server.BrokerToControllerChannelManager;
+import kafka.server.ControllerRequestCompletionHandler;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ControllerRequestSender {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ControllerRequestSender.class);
+
+ private final BrokerToControllerChannelManager channelManager;
+
+ public ControllerRequestSender(BrokerToControllerChannelManager channelManager) {
+ this.channelManager = channelManager;
+ }
+
+ public CompletableFuture send(AbstractRequest.Builder requestBuilder,
+ Class responseDataType) {
+ CompletableFuture cf = new CompletableFuture<>();
+ LOGGER.debug("Sending request {}", requestBuilder);
+ channelManager.sendRequest(requestBuilder, new ControllerRequestCompletionHandler() {
+ @Override
+ public void onTimeout() {
+ // TODO: add timeout retry policy
+ LOGGER.error("Timeout while creating stream");
+ cf.completeExceptionally(new TimeoutException("Timeout while creating stream"));
+ }
+
+ @Override
+ public void onComplete(ClientResponse response) {
+ if (response.authenticationException() != null) {
+ LOGGER.error("Authentication error while sending request: {}", requestBuilder, response.authenticationException());
+ cf.completeExceptionally(response.authenticationException());
+ return;
+ }
+ if (response.versionMismatch() != null) {
+ LOGGER.error("Version mismatch while sending request: {}", requestBuilder, response.versionMismatch());
+ cf.completeExceptionally(response.versionMismatch());
+ return;
+ }
+ if (responseDataType.isInstance(response.responseBody().data())) {
+ LOGGER.error("Unexpected response type: {} while sending request: {}",
+ response.responseBody().data().getClass().getSimpleName(), requestBuilder);
+ cf.completeExceptionally(new RuntimeException("Unexpected response type while sending request"));
+ }
+ cf.complete((R) response.responseBody().data());
+ }
+ });
+ return cf;
+ }
+}
diff --git a/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java b/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java
new file mode 100644
index 0000000000..005dd492fb
--- /dev/null
+++ b/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log.s3.objects;
+
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import kafka.log.s3.StreamMetadataManager;
+import kafka.log.s3.network.ControllerRequestSender;
+import kafka.server.KafkaConfig;
+import org.apache.kafka.common.message.PrepareS3ObjectRequestData;
+import org.apache.kafka.common.message.PrepareS3ObjectResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.PrepareS3ObjectRequest;
+import org.apache.kafka.common.requests.PrepareS3ObjectRequest.Builder;
+import org.apache.kafka.metadata.stream.S3ObjectMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ControllerObjectManager implements ObjectManager {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(ControllerObjectManager.class);
+
+ private final ControllerRequestSender requestSender;
+ private final StreamMetadataManager metadataManager;
+ private final KafkaConfig config;
+
+ public ControllerObjectManager(ControllerRequestSender requestSender, StreamMetadataManager metadataManager, KafkaConfig config) {
+ this.requestSender = requestSender;
+ this.metadataManager = metadataManager;
+ this.config = config;
+ }
+
+ @Override
+ public CompletableFuture prepareObject(int count, long ttl) {
+ PrepareS3ObjectRequest.Builder request = new Builder(
+ new PrepareS3ObjectRequestData()
+ .setBrokerId(config.brokerId())
+ .setPreparedCount(count)
+ .setTimeToLiveInMs(ttl)
+ );
+ return requestSender.send(request, PrepareS3ObjectResponseData.class).thenApply(resp -> {
+ Errors code = Errors.forCode(resp.errorCode());
+ switch (code) {
+ case NONE:
+ // TODO: simply response's data structure, only return first object id is enough
+ return resp.s3ObjectIds().stream().findFirst().get();
+ default:
+ LOGGER.error("Error while preparing {} object, code: {}", count, code);
+ throw code.exception();
+ }
+ });
+ }
+
+ @Override
+ public CompletableFuture commitWalObject(CommitWalObjectRequest request) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture commitMinorCompactObject(CommitCompactObjectRequest request) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture commitMajorCompactObject(CommitCompactObjectRequest request) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture commitStreamObject(CommitStreamObjectRequest request) {
+ return null;
+ }
+
+ @Override
+ public List getObjects(long streamId, long startOffset, long endOffset, int limit) {
+ try {
+ return this.metadataManager.getObjects(streamId, startOffset, endOffset, limit);
+ } catch (Exception e) {
+ LOGGER.error("Error while get objects, streamId: {}, startOffset: {}, endOffset: {}, limit: {}", streamId, startOffset, endOffset, limit, e);
+ return Collections.emptyList();
+ }
+ }
+
+ @Override
+ public List getServerObjects() {
+ return null;
+ }
+}
diff --git a/core/src/main/scala/kafka/log/s3/objects/ObjectManager.java b/core/src/main/scala/kafka/log/s3/objects/ObjectManager.java
index e0829d2211..a51caa3d0d 100644
--- a/core/src/main/scala/kafka/log/s3/objects/ObjectManager.java
+++ b/core/src/main/scala/kafka/log/s3/objects/ObjectManager.java
@@ -19,6 +19,7 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import org.apache.kafka.metadata.stream.S3ObjectMetadata;
/**
* Object metadata registry.
diff --git a/core/src/main/scala/kafka/log/s3/streams/ControllerStreamManager.java b/core/src/main/scala/kafka/log/s3/streams/ControllerStreamManager.java
new file mode 100644
index 0000000000..ea89a02f0e
--- /dev/null
+++ b/core/src/main/scala/kafka/log/s3/streams/ControllerStreamManager.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log.s3.streams;
+
+import java.util.concurrent.CompletableFuture;
+import kafka.log.s3.network.ControllerRequestSender;
+import kafka.log.s3.objects.OpenStreamMetadata;
+import kafka.server.KafkaConfig;
+import org.apache.kafka.common.message.CreateStreamRequestData;
+import org.apache.kafka.common.message.CreateStreamResponseData;
+import org.apache.kafka.common.message.OpenStreamRequestData;
+import org.apache.kafka.common.message.OpenStreamResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.OpenStreamRequest;
+import org.apache.kafka.common.requests.s3.CreateStreamRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ControllerStreamManager implements StreamManager {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ControllerStreamManager.class);
+ private final KafkaConfig config;
+ private final ControllerRequestSender requestSender;
+
+ public ControllerStreamManager(ControllerRequestSender requestSender, KafkaConfig config) {
+ this.config = config;
+ this.requestSender = requestSender;
+ }
+
+ @Override
+ public CompletableFuture createStream() {
+ CreateStreamRequest.Builder request = new CreateStreamRequest.Builder(
+ new CreateStreamRequestData()
+ );
+ return this.requestSender.send(request, CreateStreamResponseData.class).thenApply(resp -> {
+ switch (Errors.forCode(resp.errorCode())) {
+ case NONE:
+ return resp.streamId();
+ default:
+ LOGGER.error("Error while creating stream: {}, code: {}", request, Errors.forCode(resp.errorCode()));
+ throw Errors.forCode(resp.errorCode()).exception();
+ }
+ });
+ }
+
+ @Override
+ public CompletableFuture openStream(long streamId, long epoch) {
+ OpenStreamRequest.Builder request = new OpenStreamRequest.Builder(
+ new OpenStreamRequestData()
+ .setStreamId(streamId)
+ .setStreamEpoch(epoch)
+ .setBrokerId(config.brokerId())
+ );
+ return this.requestSender.send(request, OpenStreamResponseData.class).thenApply(resp -> {
+ switch (Errors.forCode(resp.errorCode())) {
+ case NONE:
+ return new OpenStreamMetadata(streamId, epoch, resp.startOffset(), resp.nextOffset());
+ case STREAM_FENCED:
+ LOGGER.error("Stream fenced while opening stream: {}, code: {}", request, Errors.forCode(resp.errorCode()));
+ throw Errors.forCode(resp.errorCode()).exception();
+ default:
+ LOGGER.error("Error while opening stream: {}, code: {}", request, Errors.forCode(resp.errorCode()));
+ throw Errors.forCode(resp.errorCode()).exception();
+ }
+ });
+ }
+
+ @Override
+ public CompletableFuture trimStream(long streamId, long epoch, long newStartOffset) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture deleteStream(long streamId, long epoch) {
+ return null;
+ }
+}
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index 7e66bb83fd..bad81db467 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -199,9 +199,22 @@ class BrokerServer(
metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId)
+ val controllerNodes = RaftConfig.voterConnectionsToNodes(sharedServer.controllerQuorumVotersFuture.get()).asScala
+
+ val controllerNodeProvider = RaftControllerNodeProvider(raftManager, config, controllerNodes)
// elastic stream inject start
+ clientToControllerChannelManager = BrokerToControllerChannelManager(
+ controllerNodeProvider,
+ time,
+ metrics,
+ config,
+ channelName = "forwarding",
+ threadNamePrefix,
+ retryTimeoutMs = 60000
+ )
+
if (config.elasticStreamEnabled) {
- if (!ElasticLogManager.init(config, clusterId)) {
+ if (!ElasticLogManager.init(this, config, clusterId)) {
throw new UnsupportedOperationException("Elastic stream client failed to be configured. Please check your configuration.")
}
} else {
@@ -212,25 +225,13 @@ class BrokerServer(
// Create log manager, but don't start it because we need to delay any potential unclean shutdown log recovery
// until we catch up on the metadata log and have up-to-date topic and broker configs.
logManager = LogManager(config, initialOfflineDirs, metadataCache, kafkaScheduler, time,
- brokerTopicStats, logDirFailureChannel, keepPartitionMetadataFile = true)
+ brokerTopicStats, logDirFailureChannel, keepPartitionMetadataFile = true)
// Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update.
// This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically.
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
- val controllerNodes = RaftConfig.voterConnectionsToNodes(sharedServer.controllerQuorumVotersFuture.get()).asScala
- val controllerNodeProvider = RaftControllerNodeProvider(raftManager, config, controllerNodes)
-
- clientToControllerChannelManager = BrokerToControllerChannelManager(
- controllerNodeProvider,
- time,
- metrics,
- config,
- channelName = "forwarding",
- threadNamePrefix,
- retryTimeoutMs = 60000
- )
clientToControllerChannelManager.start()
forwardingManager = new ForwardingManagerImpl(clientToControllerChannelManager)
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala
index e346816cb6..d717a56361 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -42,6 +42,7 @@ import org.apache.kafka.common.message.{CreateTopicsRequestData, _}
import org.apache.kafka.common.protocol.Errors._
import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
import org.apache.kafka.common.requests._
+import org.apache.kafka.common.requests.s3.{CreateStreamRequest, CreateStreamResponse}
import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
import org.apache.kafka.common.resource.ResourceType.{CLUSTER, TOPIC}
import org.apache.kafka.common.utils.Time
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 342b23cec4..cef618a703 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -22,6 +22,7 @@ import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.{Cluster, Node, TopicPartition, Uuid}
+import org.apache.kafka.metadata.stream.{InRangeObjects, S3Object}
import org.apache.kafka.server.common.MetadataVersion
import java.util
@@ -116,6 +117,12 @@ trait MetadataCache {
def features(): FinalizedFeaturesAndEpoch
def getRandomAliveBrokerId: Option[Int]
+
+ // Kafka on S3 inject start
+ def getObjects(streamId: Long, startOffset: Long, endOffset: Long, limit: Int): InRangeObjects
+
+ def getObjectMetadata(objectId: Long): S3Object
+ // Kafka on S3 inject end
}
object MetadataCache {
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
index 789ae89f04..58809d6f01 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
@@ -16,6 +16,8 @@
*/
package kafka.server.metadata
+import kafka.log.s3.StreamMetadataManager.StreamMetadataListener
+
import java.util
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.CompletableFuture
@@ -95,6 +97,12 @@ class BrokerMetadataListener(
*/
private var _publisher: Option[MetadataPublisher] = None
+ // Kafka on S3 inject start
+
+ private var _streamMetadataListener: Option[StreamMetadataListener] = None
+
+ // Kafka on S3 inject end
+
/**
* The number of bytes of records that we have read since the last snapshot we took.
* This does not include records we read from a snapshot.
@@ -352,6 +360,12 @@ class BrokerMetadataListener(
// This publish call is done with its own try-catch and fault handler
publisher.publish(delta, _image)
+ // Kafka on S3 inject start
+
+ _streamMetadataListener.foreach(_.onChange(delta, _image))
+
+ // Kafka on S3 inject end
+
// Update the metrics since the publisher handled the lastest image
brokerMetrics.updateLastAppliedImageProvenance(_image.provenance())
}
@@ -391,4 +405,12 @@ class BrokerMetadataListener(
future.complete(writer.records())
}
}
+
+ // Kafka on S3 inject start
+
+ def registerStreamMetadataListener(listener: StreamMetadataListener): Unit = {
+ _streamMetadataListener = Some(listener)
+ }
+
+ // Kafka on S3 inject end
}
diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
index 7e6ad7bfd0..05a6a393ac 100644
--- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
@@ -35,6 +35,7 @@ import java.util.concurrent.ThreadLocalRandom
import kafka.admin.BrokerMetadata
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.message.{DescribeClientQuotasRequestData, DescribeClientQuotasResponseData}
+import org.apache.kafka.metadata.stream.{InRangeObjects, S3Object}
import org.apache.kafka.metadata.{PartitionRegistration, Replicas}
import org.apache.kafka.server.common.MetadataVersion
@@ -392,4 +393,16 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w
features.toMap,
image.highestOffsetAndEpoch().offset)
}
+
+ // Kafka on S3 inject start
+ override def getObjects(streamId: Long, startOffset: Long, endOffset: Long, limit: Int): InRangeObjects = {
+ val image = _currentImage
+ image.streamsMetadata().getObjects(streamId, startOffset, endOffset, limit)
+ }
+
+ override def getObjectMetadata(objectId: Long): S3Object = {
+ val image = _currentImage
+ image.objectsMetadata().getObjectMetadata(objectId)
+ }
+ // Kafka on S3 inject end
}
diff --git a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
index feaaf1c43f..efbc546f63 100755
--- a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
@@ -40,6 +40,7 @@ import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{ApiVersionsResponse, MetadataResponse, UpdateMetadataRequest}
import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.metadata.stream.{InRangeObjects, S3Object}
import org.apache.kafka.server.common.MetadataVersion
import java.util.concurrent.{ThreadLocalRandom, TimeUnit}
@@ -584,4 +585,14 @@ class ZkMetadataCache(
def getFeatureOption: Option[FinalizedFeaturesAndEpoch] = {
featuresAndEpoch
}
+
+ // Kafka on S3 inject start
+ override def getObjects(streamId: Long, startOffset: Long, endOffset: Long, limit: Int): InRangeObjects = {
+ throw new UnsupportedOperationException("getObjects is not supported in ZkMetadataCache")
+ }
+
+ override def getObjectMetadata(objectId: Long): S3Object = {
+ throw new UnsupportedOperationException("getObjectMetadata is not supported in ZkMetadataCache")
+ }
+ // Kafka on S3 inject end
}
diff --git a/core/src/test/java/kafka/log/s3/DefaultS3BlockCacheTest.java b/core/src/test/java/kafka/log/s3/DefaultS3BlockCacheTest.java
index 32bce40d99..c2bc85bc56 100644
--- a/core/src/test/java/kafka/log/s3/DefaultS3BlockCacheTest.java
+++ b/core/src/test/java/kafka/log/s3/DefaultS3BlockCacheTest.java
@@ -22,7 +22,7 @@
import kafka.log.s3.cache.ReadDataBlock;
import kafka.log.s3.model.StreamRecordBatch;
import kafka.log.s3.objects.ObjectManager;
-import kafka.log.s3.objects.S3ObjectMetadata;
+import org.apache.kafka.metadata.stream.S3ObjectMetadata;
import kafka.log.s3.operator.MemoryS3Operator;
import kafka.log.s3.operator.S3Operator;
import org.apache.kafka.metadata.stream.S3ObjectType;
diff --git a/core/src/test/java/kafka/log/s3/ObjectWriterTest.java b/core/src/test/java/kafka/log/s3/ObjectWriterTest.java
index 5256ccf115..fab21084a9 100644
--- a/core/src/test/java/kafka/log/s3/ObjectWriterTest.java
+++ b/core/src/test/java/kafka/log/s3/ObjectWriterTest.java
@@ -20,7 +20,7 @@
import com.automq.elasticstream.client.api.RecordBatch;
import kafka.log.s3.model.StreamRecordBatch;
import kafka.log.s3.objects.ObjectStreamRange;
-import kafka.log.s3.objects.S3ObjectMetadata;
+import org.apache.kafka.metadata.stream.S3ObjectMetadata;
import kafka.log.s3.operator.MemoryS3Operator;
import kafka.log.s3.operator.S3Operator;
import org.apache.kafka.metadata.stream.S3ObjectType;
diff --git a/core/src/test/scala/unit/kafka/log/es/ElasticLogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/es/ElasticLogSegmentTest.scala
index 67102a869b..1f8191ddfb 100644
--- a/core/src/test/scala/unit/kafka/log/es/ElasticLogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/es/ElasticLogSegmentTest.scala
@@ -86,7 +86,7 @@ class ElasticLogSegmentTest {
def setup(): Unit = {
segments.clear()
logDir = TestUtils.tempDir()
- ElasticLogManager.init(kafkaConfig, "fake_cluster_id")
+ ElasticLogManager.init(null, kafkaConfig, "fake_cluster_id")
}
@AfterEach
diff --git a/core/src/test/scala/unit/kafka/log/es/ElasticLogTest.scala b/core/src/test/scala/unit/kafka/log/es/ElasticLogTest.scala
index 92d2b71a83..925356ac53 100644
--- a/core/src/test/scala/unit/kafka/log/es/ElasticLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/es/ElasticLogTest.scala
@@ -681,7 +681,7 @@ class ElasticLogTest {
topicPartition: TopicPartition = topicPartition,
logDirFailureChannel: LogDirFailureChannel = logDirFailureChannel,
clusterId: String = "test_cluster"): ElasticLog = {
- ElasticLogManager.init(kafkaConfig, clusterId)
+ ElasticLogManager.init(null, kafkaConfig, clusterId)
ElasticLogManager.getOrCreateLog(dir = dir,
config = config,
scheduler = scheduler,
diff --git a/core/src/test/scala/unit/kafka/log/es/ElasticUnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/es/ElasticUnifiedLogTest.scala
index 29be3d0ba7..27533e7b23 100755
--- a/core/src/test/scala/unit/kafka/log/es/ElasticUnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/es/ElasticUnifiedLogTest.scala
@@ -62,7 +62,7 @@ class ElasticUnifiedLogTest {
def setUp(): Unit = {
val props = TestUtils.createSimpleEsBrokerConfig()
config = KafkaConfig.fromProps(props)
- ElasticLogManager.init(config, clusterId)
+ ElasticLogManager.init(null, config, clusterId)
}
@AfterEach
diff --git a/metadata/src/main/java/org/apache/kafka/image/S3ObjectsImage.java b/metadata/src/main/java/org/apache/kafka/image/S3ObjectsImage.java
index 530793c744..a27f615189 100644
--- a/metadata/src/main/java/org/apache/kafka/image/S3ObjectsImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/S3ObjectsImage.java
@@ -45,6 +45,10 @@ public S3ObjectsImage(long assignedObjectId, final Map objectsMe
this.objectsMetadata = objectsMetadata;
}
+ public S3Object getObjectMetadata(long objectId) {
+ return this.objectsMetadata.get(objectId);
+ }
+
public Map objectsMetadata() {
return objectsMetadata;
}
diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java
index 2eb3521615..54d99e5707 100644
--- a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java
@@ -62,6 +62,7 @@ public void write(ImageWriter writer, ImageWriterOptions options) {
streamObjects.values().forEach(streamObject -> writer.write(streamObject.toRecord()));
}
+
public Map getRanges() {
return ranges;
}
@@ -82,6 +83,10 @@ public long getStreamId() {
return streamId;
}
+ public long startOffset() {
+ return startOffset;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java
index fbfc458267..a8070dff19 100644
--- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java
@@ -17,12 +17,22 @@
package org.apache.kafka.image;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.stream.Collectors;
import org.apache.kafka.common.metadata.AssignedStreamIdRecord;
import org.apache.kafka.image.writer.ImageWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;
+import org.apache.kafka.metadata.stream.InRangeObjects;
+import org.apache.kafka.metadata.stream.RangeMetadata;
+import org.apache.kafka.metadata.stream.S3ObjectMetadata;
+import org.apache.kafka.metadata.stream.S3ObjectStreamIndex;
+import org.apache.kafka.metadata.stream.S3ObjectType;
+import org.apache.kafka.metadata.stream.S3StreamObject;
import org.apache.kafka.server.common.ApiMessageAndVersion;
public final class S3StreamsMetadataImage {
@@ -58,6 +68,226 @@ public void write(ImageWriter writer, ImageWriterOptions options) {
brokerWALMetadata.values().forEach(image -> image.write(writer, options));
}
+ public InRangeObjects getObjects(long streamId, long startOffset, long endOffset, int limit) {
+ S3StreamMetadataImage streamMetadata = streamsMetadata.get(streamId);
+ if (streamMetadata == null) {
+ return InRangeObjects.INVALID;
+ }
+ if (startOffset < streamMetadata.startOffset()) {
+ // start offset mismatch
+ return InRangeObjects.INVALID;
+ }
+ List objects = new ArrayList<>();
+ long realEndOffset = startOffset;
+ List rangeSearchers = rangeSearchers(streamId, startOffset, endOffset);
+ for (RangeSearcher rangeSearcher : rangeSearchers) {
+ InRangeObjects inRangeObjects = rangeSearcher.getObjects(limit);
+ if (inRangeObjects == InRangeObjects.INVALID) {
+ break;
+ }
+ realEndOffset = inRangeObjects.endOffset();
+ objects.addAll(inRangeObjects.objects());
+ limit -= inRangeObjects.objects().size();
+ if (limit <= 0) {
+ break;
+ }
+ }
+ return new InRangeObjects(streamId, startOffset, realEndOffset, objects);
+ }
+
+ private List rangeSearchers(long streamId, long startOffset, long endOffset) {
+ S3StreamMetadataImage streamMetadata = streamsMetadata.get(streamId);
+ List rangeSearchers = new ArrayList<>();
+ // TODO: refactor to make ranges in order
+ List ranges = streamMetadata.getRanges().values().stream().sorted(new Comparator() {
+ @Override
+ public int compare(RangeMetadata o1, RangeMetadata o2) {
+ return o1.rangeIndex() - o2.rangeIndex();
+ }
+ }).collect(Collectors.toList());
+ for (RangeMetadata range : ranges) {
+ if (range.endOffset() <= startOffset) {
+ continue;
+ }
+ if (range.startOffset() >= endOffset) {
+ break;
+ }
+ long searchEndOffset = Math.min(range.endOffset(), endOffset);
+ long searchStartOffset = Math.max(range.startOffset(), startOffset);
+ rangeSearchers.add(new RangeSearcher(searchStartOffset, searchEndOffset, streamId, range.brokerId()));
+ }
+ return rangeSearchers;
+ }
+
+ class RangeSearcher {
+
+ private final long startOffset;
+ private final long endOffset;
+ private final long streamId;
+ private final int brokerId;
+
+ public RangeSearcher(long startOffset, long endOffset, long streamId, int brokerId) {
+ this.startOffset = startOffset;
+ this.endOffset = endOffset;
+ this.streamId = streamId;
+ this.brokerId = brokerId;
+ }
+
+ @SuppressWarnings("all")
+ public InRangeObjects getObjects(int limit) {
+ if (limit <= 0) {
+ return InRangeObjects.INVALID;
+ }
+ BrokerS3WALMetadataImage wal = brokerWALMetadata.get(brokerId);
+ if (wal == null) {
+ return InRangeObjects.INVALID;
+ }
+ List walObjects = wal.getWalObjects().stream()
+ .filter(obj -> obj.streamsIndex().containsKey(streamId) && obj.streamsIndex().get(streamId).size() != 0)
+ .flatMap(obj -> {
+ List indexes = obj.streamsIndex().get(streamId);
+ // TODO: pre filter useless objects
+ return indexes.stream().filter(index -> {
+ long objectStartOffset = index.getStartOffset();
+ long objectEndOffset = index.getEndOffset();
+ return objectStartOffset < endOffset && objectEndOffset > startOffset;
+ }).map(index -> {
+ long startOffset = index.getStartOffset();
+ long endOffset = index.getEndOffset();
+ return new ObjectStreamRange(obj.objectId(), obj.objectType(), startOffset, endOffset);
+ });
+ }).collect(Collectors.toList());
+ S3StreamMetadataImage stream = streamsMetadata.get(streamId);
+ if (stream == null) {
+ return InRangeObjects.INVALID;
+ }
+ List streamObjects = new ArrayList<>();
+ Map streamObjectsMetadata = stream.getStreamObjects();
+ // TODO: refactor to make stream objects in order
+ if (streamObjectsMetadata != null && !streamObjectsMetadata.isEmpty()) {
+ List streamObjs = streamObjectsMetadata.values().stream().filter(obj -> {
+ long objectStartOffset = obj.streamIndex().getStartOffset();
+ long objectEndOffset = obj.streamIndex().getEndOffset();
+ return objectStartOffset < endOffset && objectEndOffset > startOffset;
+ }).sorted(new Comparator() {
+ @Override
+ public int compare(S3StreamObject o1, S3StreamObject o2) {
+ return o1.objectId() > o2.objectId() ? 1 : -1;
+ }
+ }).collect(Collectors.toList());
+ streamObjects.addAll(
+ streamObjs.stream().map(
+ obj -> {
+ long startOffset = obj.streamIndex().getStartOffset();
+ long endOffset = obj.streamIndex().getEndOffset();
+ return new ObjectStreamRange(obj.objectId(), obj.objectType(), startOffset, endOffset);
+ }).collect(Collectors.toList()));
+ }
+ List inRangeObjects = new ArrayList<>();
+ int walIndex = 0;
+ int streamIndex = 0;
+ long nextStartOffset = startOffset;
+ while (limit > 0
+ && nextStartOffset < endOffset
+ && (walIndex < walObjects.size() || streamIndex < streamObjects.size())) {
+
+ if (walIndex >= walObjects.size()) {
+ // only stream objects left
+ ObjectStreamRange streamRange = streamObjects.get(streamIndex++);
+ long objectStartOffset = streamRange.startOffset();
+ long objectEndOffset = streamRange.endOffset();
+ if (objectStartOffset > nextStartOffset) {
+ break;
+ }
+ if (objectEndOffset <= nextStartOffset) {
+ continue;
+ }
+ inRangeObjects.add(streamRange.toS3ObjectMetadata());
+ limit--;
+ nextStartOffset = objectEndOffset;
+ continue;
+ }
+
+ if (streamIndex >= streamObjects.size()) {
+ // only wal objects left
+ ObjectStreamRange walRange = walObjects.get(walIndex++);
+ long objectStartOffset = walRange.startOffset();
+ long objectEndOffset = walRange.endOffset();
+ if (objectStartOffset > nextStartOffset) {
+ break;
+ }
+ if (objectEndOffset <= nextStartOffset) {
+ continue;
+ }
+ inRangeObjects.add(walRange.toS3ObjectMetadata());
+ limit--;
+ nextStartOffset = objectEndOffset;
+ continue;
+ }
+
+ ObjectStreamRange walRange = walObjects.get(walIndex);
+ ObjectStreamRange streamRange = streamObjects.get(streamIndex);
+ long walObjectStartOffset = walRange.startOffset();
+ long walObjectEndOffset = walRange.endOffset();
+ long streamObjectStartOffset = streamRange.startOffset();
+ long streamObjectEndOffset = streamRange.endOffset();
+ if (walObjectStartOffset > nextStartOffset && streamObjectStartOffset > nextStartOffset) {
+ // both start offset are greater than nextStartOffset
+ break;
+ }
+ if (walObjectStartOffset <= nextStartOffset && walObjectEndOffset > nextStartOffset) {
+ // wal object contains nextStartOffset
+ inRangeObjects.add(walRange.toS3ObjectMetadata());
+ limit--;
+ nextStartOffset = walObjectEndOffset;
+ walIndex++;
+ continue;
+ }
+ if (streamObjectStartOffset <= nextStartOffset && streamObjectEndOffset > nextStartOffset) {
+ // stream object contains nextStartOffset
+ inRangeObjects.add(streamRange.toS3ObjectMetadata());
+ limit--;
+ nextStartOffset = streamObjectEndOffset;
+ streamIndex++;
+ }
+ }
+ return new InRangeObjects(streamId, startOffset, nextStartOffset, inRangeObjects);
+ }
+
+ }
+
+ static class ObjectStreamRange {
+
+ private final long objectId;
+ private final S3ObjectType objectType;
+ private final long startOffset;
+ private final long endOffset;
+
+ public ObjectStreamRange(long objectId, S3ObjectType objectType, long startOffset, long endOffset) {
+ this.objectId = objectId;
+ this.objectType = objectType;
+ this.startOffset = startOffset;
+ this.endOffset = endOffset;
+ }
+
+ public long objectId() {
+ return objectId;
+ }
+
+ public long startOffset() {
+ return startOffset;
+ }
+
+ public long endOffset() {
+ return endOffset;
+ }
+
+ public S3ObjectMetadata toS3ObjectMetadata() {
+ // TODO: fill object size later
+ return new S3ObjectMetadata(objectId, -1, objectType);
+ }
+ }
+
@Override
public boolean equals(Object obj) {
if (this == obj) {
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/InRangeObjects.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/InRangeObjects.java
new file mode 100644
index 0000000000..400b18a59f
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/InRangeObjects.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.stream;
+
+import java.util.List;
+import java.util.Objects;
+
+public class InRangeObjects {
+
+ public static final InRangeObjects INVALID = new InRangeObjects(-1, -1, -1, List.of());
+
+ private final long streamId;
+ private final long startOffset;
+ private final long endOffset;
+ private final List objects;
+
+ public InRangeObjects(long streamId, long startOffset, long endOffset, List objects) {
+ this.streamId = streamId;
+ this.startOffset = startOffset;
+ this.endOffset = endOffset;
+ this.objects = objects;
+ }
+
+ public long streamId() {
+ return streamId;
+ }
+
+ public long startOffset() {
+ return startOffset;
+ }
+
+ public long endOffset() {
+ return endOffset;
+ }
+
+ public List objects() {
+ return objects;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ InRangeObjects that = (InRangeObjects) o;
+ return streamId == that.streamId
+ && startOffset == that.startOffset
+ && endOffset == that.endOffset
+ && objects.equals(that.objects);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(streamId, startOffset, endOffset, objects);
+ }
+}
diff --git a/core/src/main/scala/kafka/log/s3/utils/ObjectUtils.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/ObjectUtils.java
similarity index 96%
rename from core/src/main/scala/kafka/log/s3/utils/ObjectUtils.java
rename to metadata/src/main/java/org/apache/kafka/metadata/stream/ObjectUtils.java
index 4eaef394ce..50f35b6179 100644
--- a/core/src/main/scala/kafka/log/s3/utils/ObjectUtils.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/ObjectUtils.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package kafka.log.s3.utils;
+package org.apache.kafka.metadata.stream;
public class ObjectUtils {
diff --git a/core/src/main/scala/kafka/log/s3/objects/S3ObjectMetadata.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectMetadata.java
similarity index 82%
rename from core/src/main/scala/kafka/log/s3/objects/S3ObjectMetadata.java
rename to metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectMetadata.java
index d8c7a1f429..67dfdade6c 100644
--- a/core/src/main/scala/kafka/log/s3/objects/S3ObjectMetadata.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectMetadata.java
@@ -15,22 +15,29 @@
* limitations under the License.
*/
-package kafka.log.s3.objects;
+package org.apache.kafka.metadata.stream;
-import kafka.log.s3.utils.ObjectUtils;
-import org.apache.kafka.metadata.stream.S3ObjectType;
public class S3ObjectMetadata {
private final long objectId;
- private final long objectSize;
+ private long objectSize;
private final S3ObjectType type;
+ public S3ObjectMetadata(long objectId, S3ObjectType type) {
+ this.objectId = objectId;
+ this.type = type;
+ }
+
public S3ObjectMetadata(long objectId, long objectSize, S3ObjectType type) {
this.objectId = objectId;
this.objectSize = objectSize;
this.type = type;
}
+ public void setObjectSize(long objectSize) {
+ this.objectSize = objectSize;
+ }
+
public long getObjectId() {
return objectId;
}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java
index 543b5715ba..8ede6fd18f 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java
@@ -45,6 +45,10 @@ public long objectId() {
return objectId;
}
+ public S3ObjectType objectType() {
+ return S3ObjectType.STREAM;
+ }
+
public ApiMessageAndVersion toRecord() {
return new ApiMessageAndVersion(new S3StreamObjectRecord()
.setObjectId(objectId)
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java
index 26f6c8324c..892f3bdbee 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java
@@ -40,16 +40,17 @@ public S3WALObject(long objectId, int brokerId, final Map indices = streamsIndex.get(streamId);
- if (indices == null || indices.isEmpty()) {
+ List indexes = streamsIndex.get(streamId);
+ if (indexes == null || indexes.isEmpty()) {
return false;
}
- S3ObjectStreamIndex firstIndex = indices.get(0);
- S3ObjectStreamIndex lastIndex = indices.get(indices.size() - 1);
- if (endOffset <= firstIndex.getStartOffset() || startOffset >= lastIndex.getEndOffset()) {
- return false;
- }
- return true;
+ S3ObjectStreamIndex firstIndex = indexes.get(0);
+ S3ObjectStreamIndex lastIndex = indexes.get(indexes.size() - 1);
+ return startOffset >= firstIndex.getStartOffset() && startOffset <= lastIndex.getEndOffset();
+ }
+
+ public Map> streamsIndex() {
+ return streamsIndex;
}
public ApiMessageAndVersion toRecord() {
@@ -71,7 +72,7 @@ public static S3WALObject of(WALObjectRecord record) {
return s3WalObject;
}
- public Integer getBrokerId() {
+ public Integer brokerId() {
return brokerId;
}
diff --git a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java
index 5e45ff11d9..f3c9f6052a 100644
--- a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java
@@ -21,10 +21,18 @@
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
import org.apache.kafka.common.metadata.AssignedStreamIdRecord;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.image.writer.RecordListWriter;
import org.apache.kafka.metadata.RecordTestUtils;
+import org.apache.kafka.metadata.stream.InRangeObjects;
+import org.apache.kafka.metadata.stream.RangeMetadata;
+import org.apache.kafka.metadata.stream.S3ObjectMetadata;
+import org.apache.kafka.metadata.stream.S3ObjectStreamIndex;
+import org.apache.kafka.metadata.stream.S3StreamObject;
+import org.apache.kafka.metadata.stream.S3WALObject;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
@@ -34,6 +42,10 @@
@Tag("S3Unit")
public class S3StreamsMetadataImageTest {
+ private static final int BROKER0 = 0;
+ private static final int BROKER1 = 1;
+ private static final long STREAM0 = 0;
+ private static final long STREAM1 = 1;
private static final long KB = 1024;
private static final long MB = 1024 * KB;
@@ -82,4 +94,87 @@ private void testToImageAndBack(S3StreamsMetadataImage image) {
S3StreamsMetadataImage newImage = delta.apply();
assertEquals(image, newImage);
}
+
+ @Test
+ public void testGetObjects() {
+ List broker0WalObjects = List.of(
+ new S3WALObject(0, BROKER0, Map.of(STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 100L, 120L)))),
+ new S3WALObject(1, BROKER0, Map.of(STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 120L, 140L)))),
+ new S3WALObject(2, BROKER0, Map.of(STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 180L, 200L)))),
+ new S3WALObject(3, BROKER0, Map.of(STREAM0, List.of(
+ new S3ObjectStreamIndex(STREAM0, 400L, 420L), new S3ObjectStreamIndex(STREAM0, 500L, 520L)))),
+ new S3WALObject(4, BROKER0, Map.of(STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 520L, 600L)))));
+ List broker1WalObjects = List.of(
+ new S3WALObject(5, BROKER1, Map.of(STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 140L, 160L)))),
+ new S3WALObject(6, BROKER1, Map.of(STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 160L, 180L)))),
+ new S3WALObject(7, BROKER1, Map.of(STREAM0, List.of(new S3ObjectStreamIndex(STREAM0, 420L, 500L)))));
+ BrokerS3WALMetadataImage broker0WALMetadataImage = new BrokerS3WALMetadataImage(BROKER0, broker0WalObjects);
+ BrokerS3WALMetadataImage broker1WALMetadataImage = new BrokerS3WALMetadataImage(BROKER1, broker1WalObjects);
+ Map ranges = Map.of(
+ 0, new RangeMetadata(STREAM0, 0L, 0, 10L, 140L, BROKER0),
+ 1, new RangeMetadata(STREAM0, 1L, 1, 140L, 180L, BROKER1),
+ 2, new RangeMetadata(STREAM0, 2L, 2, 180L, 420L, BROKER0),
+ 3, new RangeMetadata(STREAM0, 3L, 3, 420L, 500L, BROKER1),
+ 4, new RangeMetadata(STREAM0, 4L, 4, 500L, 600L, BROKER0));
+ Map streamObjects = Map.of(
+ 8L, new S3StreamObject(8, STREAM0, 10L, 100L),
+ 9L, new S3StreamObject(9, STREAM0, 200L, 300L),
+ 10L, new S3StreamObject(10, STREAM0, 300L, 400L));
+ S3StreamMetadataImage streamImage = new S3StreamMetadataImage(STREAM0, 4L, 10, ranges, streamObjects);
+ S3StreamsMetadataImage streamsImage = new S3StreamsMetadataImage(STREAM0, Map.of(STREAM0, streamImage),
+ Map.of(BROKER0, broker0WALMetadataImage, BROKER1, broker1WALMetadataImage));
+
+ // 1. search stream_1
+ InRangeObjects objects = streamsImage.getObjects(STREAM1, 10, 100, Integer.MAX_VALUE);
+ assertEquals(InRangeObjects.INVALID, objects);
+
+ // 2. search stream_0 in [0, 600)
+ // failed for trimmed startOffset
+ objects = streamsImage.getObjects(STREAM0, 0, 600, Integer.MAX_VALUE);
+ assertEquals(InRangeObjects.INVALID, objects);
+
+ // 3. search stream_0 for full range [10, 600)
+ objects = streamsImage.getObjects(STREAM0, 10, 600, Integer.MAX_VALUE);
+ assertEquals(10, objects.startOffset());
+ assertEquals(600, objects.endOffset());
+ assertEquals(12, objects.objects().size());
+ List expectedObjectIds = List.of(
+ 8L, 0L, 1L, 5L, 6L, 2L, 9L, 10L, 3L, 7L, 3L, 4L);
+ assertEquals(expectedObjectIds, objects.objects().stream().map(S3ObjectMetadata::getObjectId).collect(Collectors.toList()));
+
+ // 4. search stream_0 in [20, 550)
+ objects = streamsImage.getObjects(STREAM0, 20, 550, Integer.MAX_VALUE);
+ assertEquals(20, objects.startOffset());
+ assertEquals(600, objects.endOffset());
+ assertEquals(12, objects.objects().size());
+ assertEquals(expectedObjectIds, objects.objects().stream().map(S3ObjectMetadata::getObjectId).collect(Collectors.toList()));
+
+ // 5. search stream_0 in [20, 550) with limit 5
+ objects = streamsImage.getObjects(STREAM0, 20, 550, 5);
+ assertEquals(20, objects.startOffset());
+ assertEquals(180, objects.endOffset());
+ assertEquals(5, objects.objects().size());
+ assertEquals(expectedObjectIds.subList(0, 5), objects.objects().stream().map(S3ObjectMetadata::getObjectId).collect(Collectors.toList()));
+
+ // 6. search stream_0 in [400, 520)
+ objects = streamsImage.getObjects(STREAM0, 400, 520, Integer.MAX_VALUE);
+ assertEquals(400, objects.startOffset());
+ assertEquals(520, objects.endOffset());
+ assertEquals(3, objects.objects().size());
+ assertEquals(expectedObjectIds.subList(8, 11), objects.objects().stream().map(S3ObjectMetadata::getObjectId).collect(Collectors.toList()));
+
+ // 7. search stream_0 in [401, 519)
+ objects = streamsImage.getObjects(STREAM0, 401, 519, Integer.MAX_VALUE);
+ assertEquals(401, objects.startOffset());
+ assertEquals(520, objects.endOffset());
+ assertEquals(3, objects.objects().size());
+ assertEquals(expectedObjectIds.subList(8, 11), objects.objects().stream().map(S3ObjectMetadata::getObjectId).collect(Collectors.toList()));
+
+ // 8. search stream_0 in [399, 521)
+ objects = streamsImage.getObjects(STREAM0, 399, 521, Integer.MAX_VALUE);
+ assertEquals(399, objects.startOffset());
+ assertEquals(600, objects.endOffset());
+ assertEquals(5, objects.objects().size());
+ assertEquals(expectedObjectIds.subList(7, 12), objects.objects().stream().map(S3ObjectMetadata::getObjectId).collect(Collectors.toList()));
+ }
}