diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml
index 2f4b2f3a5d..e491b58a00 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -52,7 +52,9 @@
+
+
diff --git a/core/src/main/scala/kafka/log/s3/ObjectReader.java b/core/src/main/scala/kafka/log/s3/ObjectReader.java
new file mode 100644
index 0000000000..f7a9a6bf1f
--- /dev/null
+++ b/core/src/main/scala/kafka/log/s3/ObjectReader.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log.s3;
+
+import io.netty.buffer.ByteBuf;
+import kafka.log.s3.model.StreamRecordBatch;
+import kafka.log.s3.objects.S3ObjectMetadata;
+import kafka.log.s3.operator.S3Operator;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.compress.ZstdFactory;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.CloseableIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ObjectReader {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ObjectReader.class);
+ private final S3ObjectMetadata metadata;
+ private final String objectKey;
+ private final S3Operator s3Operator;
+ private final CompletableFuture indexBlockCf;
+
+ public ObjectReader(S3ObjectMetadata metadata, S3Operator s3Operator) {
+ this.metadata = metadata;
+ this.objectKey = metadata.key();
+ this.s3Operator = s3Operator;
+ this.indexBlockCf = new CompletableFuture<>();
+ asyncGetIndexBlock();
+ }
+
+ public CompletableFuture> find(long streamId, long startOffset, long endOffset) {
+ return indexBlockCf.thenApply(indexBlock -> indexBlock.find(streamId, startOffset, endOffset));
+ }
+
+ public CompletableFuture read(DataBlockIndex block) {
+ CompletableFuture rangeReadCf = s3Operator.rangeRead(objectKey, block.startPosition(), block.endPosition());
+ return rangeReadCf.thenApply(buf -> new DataBlock(buf, block.recordCount()));
+ }
+
+ private void asyncGetIndexBlock() {
+ asyncGetIndexBlock0(Math.max(0, metadata.getObjectSize() - 1024 * 1024));
+ }
+
+ private void asyncGetIndexBlock0(long startPosition) {
+ CompletableFuture cf = s3Operator.rangeRead(objectKey, startPosition, metadata.getObjectSize());
+ cf.thenAccept(buf -> {
+ try {
+ IndexBlock indexBlock = IndexBlock.parse(buf, metadata.getObjectSize());
+ indexBlockCf.complete(indexBlock);
+ } catch (IndexBlockParseException ex) {
+ asyncGetIndexBlock0(ex.indexBlockPosition);
+ }
+ }).exceptionally(ex -> {
+ LOGGER.warn("s3 range read from {} [{}, {}) failed", objectKey, startPosition, metadata.getObjectSize(), ex);
+ // TODO: delay retry.
+ asyncGetIndexBlock0(startPosition);
+ return null;
+ });
+ }
+
+ static class IndexBlock {
+ private final ByteBuf blocks;
+ private final ByteBuf streamRanges;
+
+ public IndexBlock(ByteBuf blocks, ByteBuf streamRanges) {
+ this.blocks = blocks;
+ this.streamRanges = streamRanges;
+ }
+
+ public static IndexBlock parse(ByteBuf objectTailBuf, long objectSize) throws IndexBlockParseException {
+ long indexBlockPosition = objectTailBuf.getLong(objectTailBuf.readableBytes() - 48);
+ int indexBlockSize = objectTailBuf.getInt(objectTailBuf.readableBytes() - 40);
+ if (indexBlockPosition + objectTailBuf.readableBytes() < objectSize) {
+ throw new IndexBlockParseException(indexBlockPosition);
+ } else {
+ int indexRelativePosition = objectTailBuf.readableBytes() - (int) (objectSize - indexBlockPosition);
+ ByteBuf indexBlockBuf = objectTailBuf.slice(indexRelativePosition, indexBlockSize);
+ int blockCount = indexBlockBuf.readInt();
+ ByteBuf blocks = indexBlockBuf.slice(indexBlockBuf.readerIndex(), blockCount * 16);
+ indexBlockBuf.skipBytes(blockCount * 16);
+ ByteBuf streamRanges = indexBlockBuf.slice(indexBlockBuf.readerIndex(), indexBlockBuf.readableBytes());
+ return new IndexBlock(blocks, streamRanges);
+ }
+ }
+
+ public List find(long streamId, long startOffset, long endOffset) {
+ // TODO: binary search
+ long nextStartOffset = startOffset;
+ List rst = new LinkedList<>();
+ for (int i = 0; i < streamRanges.readableBytes(); i += 24) {
+ long rangeStreamId = streamRanges.getLong(i);
+ long rangeStartOffset = streamRanges.getLong(i + 8);
+ long rangeEndOffset = rangeStartOffset + streamRanges.getInt(i + 16);
+ int rangeBlockId = streamRanges.getInt(i + 20);
+ if (rangeStreamId == streamId && rangeStartOffset < endOffset && rangeEndOffset > nextStartOffset) {
+ nextStartOffset = rangeEndOffset;
+ long blockPosition = blocks.getLong(rangeBlockId * 16);
+ int blockSize = blocks.getInt(rangeBlockId * 16 + 8);
+ int recordCount = blocks.getInt(rangeBlockId * 16 + 12);
+ rst.add(new DataBlockIndex(rangeBlockId, blockPosition, blockSize, recordCount));
+ if (nextStartOffset >= endOffset) {
+ break;
+ }
+ }
+ }
+ return rst;
+ }
+
+ }
+
+ static class IndexBlockParseException extends Exception {
+ long indexBlockPosition;
+
+ public IndexBlockParseException(long indexBlockPosition) {
+ this.indexBlockPosition = indexBlockPosition;
+ }
+
+ }
+
+
+ public static class DataBlockIndex {
+ private final int blockId;
+ private final long startPosition;
+ private final int size;
+ private final int recordCount;
+
+ public DataBlockIndex(int blockId, long startPosition, int size, int recordCount) {
+ this.blockId = blockId;
+ this.startPosition = startPosition;
+ this.size = size;
+ this.recordCount = recordCount;
+ }
+
+ public int blockId() {
+ return blockId;
+ }
+
+ public long startPosition() {
+ return startPosition;
+ }
+
+ public long endPosition() {
+ return startPosition + size;
+ }
+
+ public int recordCount() {
+ return recordCount;
+ }
+ }
+
+ public static class DataBlock {
+ private final ByteBuf buf;
+ private final int recordCount;
+
+ public DataBlock(ByteBuf buf, int recordCount) {
+ this.buf = buf;
+ this.recordCount = recordCount;
+ }
+
+ public CloseableIterator iterator() {
+ ByteBuf buf = this.buf.duplicate();
+ AtomicInteger remainingRecordCount = new AtomicInteger(recordCount);
+ DataInputStream in = new DataInputStream(ZstdFactory.wrapForInput(buf.nioBuffer(), (byte) 0, BufferSupplier.NO_CACHING));
+ return new CloseableIterator<>() {
+ @Override
+ public boolean hasNext() {
+ // in.available() is not reliable. ZstdInputStreamNoFinalizer might return 1 when there is no more data.
+ return remainingRecordCount.get() != 0;
+ }
+
+ @Override
+ public StreamRecordBatch next() {
+ if (remainingRecordCount.decrementAndGet() < 0) {
+ throw new NoSuchElementException();
+ }
+ return StreamRecordBatchCodec.decode(in);
+ }
+
+ @Override
+ public void close() {
+ try {
+ in.close();
+ } catch (IOException e) {
+ throw new KafkaException("Failed to close object block stream", e);
+ }
+ }
+ };
+ }
+ }
+
+}
diff --git a/core/src/main/scala/kafka/log/s3/ObjectWriter.java b/core/src/main/scala/kafka/log/s3/ObjectWriter.java
new file mode 100644
index 0000000000..62833973c9
--- /dev/null
+++ b/core/src/main/scala/kafka/log/s3/ObjectWriter.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log.s3;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.Unpooled;
+import kafka.log.s3.model.StreamRecordBatch;
+import kafka.log.s3.objects.ObjectStreamRange;
+import kafka.log.s3.operator.S3Operator;
+import kafka.log.s3.operator.Writer;
+import org.apache.kafka.common.compress.ZstdFactory;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+// TODO: memory optimization
+public class ObjectWriter {
+ private int blockSizeThreshold;
+ private int partSizeThreshold;
+ private final S3Operator s3Operator;
+ private final List waitingUploadBlocks;
+ private final List completedBlocks;
+ private IndexBlock indexBlock;
+ private final Writer writer;
+ private final String objectKey;
+ private long nextDataBlockPosition;
+
+ private long size;
+
+ private DataBlock dataBlock;
+
+ public ObjectWriter(String objectKey, S3Operator s3Operator, int blockSizeThreshold, int partSizeThreshold) {
+ this.objectKey = objectKey;
+ this.s3Operator = s3Operator;
+ this.blockSizeThreshold = blockSizeThreshold;
+ this.partSizeThreshold = partSizeThreshold;
+ waitingUploadBlocks = new LinkedList<>();
+ completedBlocks = new LinkedList<>();
+ writer = s3Operator.writer(objectKey);
+ }
+
+ public ObjectWriter(String objectKey, S3Operator s3Operator) {
+ this(objectKey, s3Operator, 16 * 1024 * 1024, 32 * 1024 * 1024);
+ }
+
+ public void write(StreamRecordBatch record) {
+ if (dataBlock == null) {
+ dataBlock = new DataBlock(nextDataBlockPosition);
+ }
+ if (dataBlock.write(record)) {
+ waitingUploadBlocks.add(dataBlock);
+ nextDataBlockPosition += dataBlock.size();
+ dataBlock = null;
+ tryUploadPart();
+ }
+ }
+
+ private void tryUploadPart() {
+ long waitingUploadSize = waitingUploadBlocks.stream().mapToLong(DataBlock::size).sum();
+ if (waitingUploadSize >= partSizeThreshold) {
+ for (DataBlock block : waitingUploadBlocks) {
+ writer.write(block.buffer());
+ completedBlocks.add(block);
+ }
+ waitingUploadBlocks.clear();
+ }
+ }
+
+ public CompletableFuture close() {
+ CompositeByteBuf buf = Unpooled.compositeBuffer();
+ if (dataBlock != null) {
+ dataBlock.close();
+ nextDataBlockPosition += dataBlock.size();
+ waitingUploadBlocks.add(dataBlock);
+ completedBlocks.add(dataBlock);
+ dataBlock = null;
+ }
+ for (DataBlock block : waitingUploadBlocks) {
+ buf.addComponent(true, block.buffer());
+ }
+ waitingUploadBlocks.clear();
+ indexBlock = new IndexBlock();
+ buf.addComponent(true, indexBlock.buffer());
+ Footer footer = new Footer();
+ buf.addComponent(true, footer.buffer());
+ writer.write(buf.duplicate());
+ size = indexBlock.position() + indexBlock.size() + footer.size();
+ return writer.close();
+ }
+
+ public List getStreamRanges() {
+ List streamRanges = new LinkedList<>();
+ ObjectStreamRange lastStreamRange = null;
+ for (DataBlock block : completedBlocks) {
+ for (ObjectStreamRange streamRange : block.getStreamRanges()) {
+ if (lastStreamRange == null || lastStreamRange.getStreamId() != streamRange.getStreamId()) {
+ lastStreamRange = new ObjectStreamRange();
+ lastStreamRange.setStreamId(streamRange.getStreamId());
+ lastStreamRange.setEpoch(streamRange.getEpoch());
+ lastStreamRange.setStartOffset(streamRange.getStartOffset());
+ streamRanges.add(lastStreamRange);
+ }
+ lastStreamRange.setEndOffset(streamRange.getEndOffset());
+
+ }
+ }
+ return streamRanges;
+ }
+
+ public long size() {
+ return size;
+ }
+
+ class DataBlock {
+ private final long position;
+ private ByteBufferOutputStream compressedBlock;
+ private OutputStream out;
+ private ByteBuf compressedBlockBuf;
+ private int blockSize;
+ private final List streamRanges;
+ private ObjectStreamRange streamRange;
+ private int recordCount = 0;
+
+ public DataBlock(long position) {
+ this.position = position;
+ compressedBlock = new ByteBufferOutputStream(blockSizeThreshold * 3 / 2);
+ out = ZstdFactory.wrapForOutput(compressedBlock);
+ streamRanges = new LinkedList<>();
+ }
+
+ public boolean write(StreamRecordBatch record) {
+ try {
+ recordCount++;
+ return write0(record);
+ } catch (IOException ex) {
+ // won't happen
+ throw new RuntimeException(ex);
+ }
+ }
+
+ public boolean write0(StreamRecordBatch record) throws IOException {
+ if (streamRange == null || streamRange.getStreamId() != record.getStreamId()) {
+ streamRange = new ObjectStreamRange();
+ streamRange.setStreamId(record.getStreamId());
+ streamRange.setEpoch(record.getEpoch());
+ streamRange.setStartOffset(record.getBaseOffset());
+ streamRanges.add(streamRange);
+ }
+ streamRange.setEndOffset(record.getBaseOffset() + record.getRecordBatch().count());
+
+ ByteBuf recordBuf = StreamRecordBatchCodec.encode(record);
+ out.write(recordBuf.array(), recordBuf.arrayOffset(), recordBuf.readableBytes());
+ recordBuf.release();
+ blockSize += recordBuf.readableBytes();
+ if (blockSize >= blockSizeThreshold) {
+ close();
+ return true;
+ }
+ return false;
+ }
+
+ public void close() {
+ try {
+ close0();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void close0() throws IOException {
+ out.close();
+ compressedBlock.close();
+ compressedBlockBuf = Unpooled.wrappedBuffer(compressedBlock.buffer().duplicate().flip());
+ out = null;
+ compressedBlock = null;
+ }
+
+ public long position() {
+ return position;
+ }
+
+ public int size() {
+ return compressedBlockBuf.readableBytes();
+ }
+
+ public int recordCount() {
+ return recordCount;
+ }
+
+ public List getStreamRanges() {
+ return streamRanges;
+ }
+
+ public ByteBuf buffer() {
+ return compressedBlockBuf.duplicate();
+ }
+ }
+
+ class IndexBlock {
+ private final ByteBuf buf;
+ private final long position;
+
+ public IndexBlock() {
+ position = nextDataBlockPosition;
+ buf = Unpooled.buffer(1024 * 1024);
+ buf.writeInt(completedBlocks.size()); // block count
+ // block index
+ for (DataBlock block : completedBlocks) {
+ buf.writeLong(block.position());
+ buf.writeInt(block.size());
+ buf.writeInt(block.recordCount());
+ }
+ // object stream range
+ for (int blockIndex = 0; blockIndex < completedBlocks.size(); blockIndex++) {
+ DataBlock block = completedBlocks.get(blockIndex);
+ for (ObjectStreamRange range : block.getStreamRanges()) {
+ buf.writeLong(range.getStreamId());
+ buf.writeLong(range.getStartOffset());
+ buf.writeInt((int) (range.getEndOffset() - range.getStartOffset()));
+ buf.writeInt(blockIndex);
+ }
+ }
+ }
+
+ public ByteBuf buffer() {
+ return buf.duplicate();
+ }
+
+ public long position() {
+ return position;
+ }
+
+ public int size() {
+ return buf.readableBytes();
+ }
+ }
+
+ class Footer {
+ private static final int FOOTER_SIZE = 48;
+ private static final long MAGIC = 0x88e241b785f4cff7L;
+ private final ByteBuf buf;
+
+ public Footer() {
+ buf = Unpooled.buffer(FOOTER_SIZE);
+ buf.writeLong(indexBlock.position());
+ buf.writeInt(indexBlock.size());
+ buf.writeZero(40 - 8 - 4);
+ buf.writeLong(MAGIC);
+ }
+
+ public ByteBuf buffer() {
+ return buf.duplicate();
+ }
+
+ public int size() {
+ return FOOTER_SIZE;
+ }
+
+ }
+}
diff --git a/core/src/main/scala/kafka/log/s3/RecordBatchCodec.java b/core/src/main/scala/kafka/log/s3/RecordBatchCodec.java
deleted file mode 100644
index fa83fafe66..0000000000
--- a/core/src/main/scala/kafka/log/s3/RecordBatchCodec.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.log.s3;
-
-import com.automq.elasticstream.client.FlatRecordBatchWithContext;
-import com.automq.elasticstream.client.api.RecordBatch;
-import com.automq.elasticstream.client.api.RecordBatchWithContext;
-import com.automq.elasticstream.client.flatc.records.KeyValue;
-import com.automq.elasticstream.client.flatc.records.RecordBatchMeta;
-import com.automq.elasticstream.client.flatc.records.RecordBatchMetaT;
-import com.google.flatbuffers.FlatBufferBuilder;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.buffer.Unpooled;
-
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-public class RecordBatchCodec {
- private static final byte MAGIC_V0 = 0x22;
- private static final ThreadLocal META_BUF = ThreadLocal.withInitial(() -> ByteBuffer.allocate(4096));
- private static final PooledByteBufAllocator ALLOCATOR = PooledByteBufAllocator.DEFAULT;
-
- /**
- * Encode RecordBatch to storage format record.
- *
- * @param recordBatch {@link RecordBatch}
- * @return storage format record bytes.
- */
- public static ByteBuf encode(long streamId, long baseOffset, RecordBatch recordBatch) {
-
- int totalLength = 0;
-
- totalLength += 1; // magic
-
- FlatBufferBuilder metaBuilder = new FlatBufferBuilder(META_BUF.get());
-
- Integer propsVector = null;
- int propsSize = recordBatch.properties().size();
- if (propsSize > 0) {
- int[] kvs = new int[propsSize];
- int index = 0;
- for (Map.Entry kv : recordBatch.properties().entrySet()) {
- int k = metaBuilder.createString(kv.getKey());
- int v = metaBuilder.createString(kv.getValue());
- int kvPtr = KeyValue.createKeyValue(metaBuilder, k, v);
- kvs[index++] = kvPtr;
- }
- propsVector = RecordBatchMeta.createPropertiesVector(metaBuilder, kvs);
- }
-
- // encode RecordBatchMeta
- RecordBatchMeta.startRecordBatchMeta(metaBuilder);
- RecordBatchMeta.addStreamId(metaBuilder, streamId);
- RecordBatchMeta.addBaseOffset(metaBuilder, baseOffset);
- RecordBatchMeta.addLastOffsetDelta(metaBuilder, recordBatch.count());
- RecordBatchMeta.addBaseTimestamp(metaBuilder, recordBatch.baseTimestamp());
- if (null != propsVector) {
- RecordBatchMeta.addProperties(metaBuilder, propsVector);
- }
- int ptr = RecordBatchMeta.endRecordBatchMeta(metaBuilder);
- metaBuilder.finish(ptr);
-
- // The data in this ByteBuffer does NOT start at 0, but at buf.position().
- // The number of bytes is buf.remaining().
- ByteBuffer metaBuf = metaBuilder.dataBuffer();
-
- totalLength += 4; // meta length
- totalLength += metaBuf.remaining(); // RecordBatchMeta
-
- totalLength += 4; // payload length
- totalLength += recordBatch.rawPayload().remaining(); // payload
-
- ByteBuf buf = ALLOCATOR.heapBuffer(totalLength);
- buf.writeByte(MAGIC_V0); // magic
- buf.writeInt(metaBuf.remaining()); // meta length
- buf.writeBytes(metaBuf); // RecordBatchMeta
- buf.writeInt(recordBatch.rawPayload().remaining()); // payload length
- buf.writeBytes(recordBatch.rawPayload()); // payload
-
- META_BUF.get().clear();
- return buf;
- }
-
- /**
- * Decode storage format record to RecordBatchWithContext list.
- *
- * @param storageFormatBytes storage format bytes.
- * @return RecordBatchWithContext list.
- */
- public static List decode(ByteBuffer storageFormatBytes) {
- ByteBuf buf = Unpooled.wrappedBuffer(storageFormatBytes);
- List recordBatchList = new LinkedList<>();
- while (buf.isReadable()) {
- buf.readByte(); // magic
- int metaLength = buf.readInt();
- ByteBuf metaBuf = buf.slice(buf.readerIndex(), metaLength);
- RecordBatchMetaT recordBatchMetaT = RecordBatchMeta.getRootAsRecordBatchMeta(metaBuf.nioBuffer()).unpack();
- buf.skipBytes(metaLength);
- int payloadLength = buf.readInt();
- ByteBuf payloadBuf = buf.slice(buf.readerIndex(), payloadLength);
- buf.skipBytes(payloadLength);
- recordBatchList.add(new FlatRecordBatchWithContext(recordBatchMetaT, payloadBuf.nioBuffer()));
- }
- return recordBatchList;
- }
-}
diff --git a/core/src/main/scala/kafka/log/s3/S3Stream.java b/core/src/main/scala/kafka/log/s3/S3Stream.java
index 3a9650d679..bd60952a0c 100644
--- a/core/src/main/scala/kafka/log/s3/S3Stream.java
+++ b/core/src/main/scala/kafka/log/s3/S3Stream.java
@@ -23,16 +23,12 @@
import com.automq.elasticstream.client.api.RecordBatch;
import com.automq.elasticstream.client.api.RecordBatchWithContext;
import com.automq.elasticstream.client.api.Stream;
-import kafka.log.s3.cache.ReadDataBlock;
import kafka.log.s3.cache.S3BlockCache;
import kafka.log.s3.model.StreamMetadata;
import kafka.log.s3.model.StreamRecordBatch;
-import kafka.log.s3.objects.ObjectManager;
import kafka.log.s3.streams.StreamManager;
-import java.util.LinkedList;
import java.util.List;
-import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
@@ -44,9 +40,8 @@ public class S3Stream implements Stream {
private final Wal wal;
private final S3BlockCache blockCache;
private final StreamManager streamManager;
- private final ObjectManager objectManager;
- public S3Stream(StreamMetadata metadata, Wal wal, S3BlockCache blockCache, StreamManager streamManager, ObjectManager objectManager) {
+ public S3Stream(StreamMetadata metadata, Wal wal, S3BlockCache blockCache, StreamManager streamManager) {
this.metadata = metadata;
this.streamId = metadata.getStreamId();
this.epoch = metadata.getEpoch();
@@ -54,7 +49,6 @@ public S3Stream(StreamMetadata metadata, Wal wal, S3BlockCache blockCache, Strea
this.wal = wal;
this.blockCache = blockCache;
this.streamManager = streamManager;
- this.objectManager = objectManager;
}
@Override
@@ -82,37 +76,7 @@ public CompletableFuture append(RecordBatch recordBatch) {
@Override
public CompletableFuture fetch(long startOffset, long endOffset, int maxBytes) {
//TODO: bound check
- //TODO: concurrent read.
- //TODO: async read.
- List objects = objectManager.getObjects(streamId, startOffset, endOffset, maxBytes);
- long nextStartOffset = startOffset;
- int nextMaxBytes = maxBytes;
- List records = new LinkedList<>();
- for (long objectId : objects) {
- try {
- ReadDataBlock dataBlock = blockCache.read(objectId, streamId, nextStartOffset, endOffset, nextMaxBytes).get();
- OptionalLong blockStartOffset = dataBlock.startOffset();
- if (blockStartOffset.isEmpty()) {
- throw new IllegalStateException("expect not empty block from object[" + objectId + "]");
- }
- if (blockStartOffset.getAsLong() != nextStartOffset) {
- throw new IllegalStateException("records not continuous, expect start offset[" + nextStartOffset + "], actual["
- + blockStartOffset.getAsLong() + "]");
- }
- records.addAll(dataBlock.getRecords());
- // Already check start offset, so it's safe to get end offset.
- //noinspection OptionalGetWithoutIsPresent
- nextStartOffset = dataBlock.endOffset().getAsLong();
- nextMaxBytes -= Math.min(nextMaxBytes, dataBlock.sizeInBytes());
- if (nextStartOffset >= endOffset || nextMaxBytes <= 0) {
- break;
- }
- } catch (Throwable e) {
- throw new RuntimeException(e);
- }
- }
- //TODO: records integrity check.
- return CompletableFuture.completedFuture(new DefaultFetchResult(records));
+ return blockCache.read(streamId, startOffset, endOffset, maxBytes).thenApply(dataBlock -> new DefaultFetchResult(dataBlock.getRecords()));
}
@Override
diff --git a/core/src/main/scala/kafka/log/s3/S3StreamClient.java b/core/src/main/scala/kafka/log/s3/S3StreamClient.java
index 2c473c0d1d..722b7d9b7f 100644
--- a/core/src/main/scala/kafka/log/s3/S3StreamClient.java
+++ b/core/src/main/scala/kafka/log/s3/S3StreamClient.java
@@ -51,6 +51,6 @@ public CompletableFuture openStream(long streamId, OpenStreamOptions ope
}
private CompletableFuture openStream0(long streamId, long epoch) {
- return streamController.openStream(streamId, epoch).thenApply(metadata -> new S3Stream(metadata, wal, blockCache, streamController, objectManager));
+ return streamController.openStream(streamId, epoch).thenApply(metadata -> new S3Stream(metadata, wal, blockCache, streamController));
}
}
diff --git a/core/src/main/scala/kafka/log/s3/SingleWalObjectWriteTask.java b/core/src/main/scala/kafka/log/s3/SingleWalObjectWriteTask.java
index d38d3a7006..94104d9bad 100644
--- a/core/src/main/scala/kafka/log/s3/SingleWalObjectWriteTask.java
+++ b/core/src/main/scala/kafka/log/s3/SingleWalObjectWriteTask.java
@@ -17,24 +17,15 @@
package kafka.log.s3;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
import kafka.log.s3.exception.StreamFencedException;
-import kafka.log.s3.model.StreamRecordBatch;
import kafka.log.s3.objects.CommitWalObjectRequest;
import kafka.log.s3.objects.CommitWalObjectResponse;
import kafka.log.s3.objects.ObjectManager;
-import kafka.log.s3.objects.ObjectStreamRange;
import kafka.log.s3.operator.S3Operator;
import kafka.log.s3.utils.ObjectUtils;
-import org.apache.kafka.common.compress.ZstdFactory;
-import org.apache.kafka.common.utils.ByteBufferOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
import java.util.Collections;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -44,83 +35,49 @@ public class SingleWalObjectWriteTask {
private final List requests;
private final ObjectManager objectManager;
private final S3Operator s3Operator;
- private final List streamRanges;
- private ByteBuf objectBuf;
+ private ObjectWriter objectWriter;
private CommitWalObjectResponse response;
private volatile boolean isDone = false;
public SingleWalObjectWriteTask(List records, ObjectManager objectManager, S3Operator s3Operator) {
Collections.sort(records);
this.requests = records;
- this.streamRanges = new LinkedList<>();
this.objectManager = objectManager;
this.s3Operator = s3Operator;
- parse();
}
public CompletableFuture upload() {
if (isDone) {
return CompletableFuture.completedFuture(null);
}
+ // TODO: partial retry
UploadContext context = new UploadContext();
CompletableFuture objectIdCf = objectManager.prepareObject(1, TimeUnit.SECONDS.toMillis(30));
CompletableFuture writeCf = objectIdCf.thenCompose(objectId -> {
context.objectId = objectId;
// TODO: fill cluster name
- return s3Operator.write(ObjectUtils.genKey(0, "todocluster", objectId), objectBuf.duplicate());
+ String objectKey = ObjectUtils.genKey(0, "todocluster", objectId);
+ objectWriter = new ObjectWriter(objectKey, s3Operator);
+ for (WalWriteRequest request : requests) {
+ objectWriter.write(request.record);
+ }
+ return objectWriter.close();
});
return writeCf
.thenCompose(nil -> {
CommitWalObjectRequest request = new CommitWalObjectRequest();
request.setObjectId(context.objectId);
- request.setObjectSize(objectBuf.readableBytes());
- request.setStreamRanges(streamRanges);
+ request.setObjectSize(objectWriter.size());
+ request.setStreamRanges(objectWriter.getStreamRanges());
return objectManager.commitWalObject(request);
})
.thenApply(resp -> {
isDone = true;
response = resp;
- objectBuf.release();
return null;
});
}
- public void parse() {
- int totalSize = requests.stream().mapToInt(r -> r.record.getRecordBatch().rawPayload().remaining()).sum();
- ByteBufferOutputStream compressed = new ByteBufferOutputStream(totalSize);
- OutputStream out = ZstdFactory.wrapForOutput(compressed);
- long streamId = -1;
- long streamStartOffset = -1;
- long streamEpoch = -1;
- long streamEndOffset = -1;
- for (WalWriteRequest request : requests) {
- StreamRecordBatch record = request.record;
- long currentStreamId = record.getStreamId();
- if (streamId != currentStreamId) {
- if (streamId != -1) {
- streamRanges.add(new ObjectStreamRange(streamId, streamEpoch, streamStartOffset, streamEndOffset));
- }
- streamId = currentStreamId;
- streamEpoch = record.getEpoch();
- streamStartOffset = record.getBaseOffset();
- }
- streamEndOffset = record.getBaseOffset() + record.getRecordBatch().count();
- ByteBuf recordBuf = RecordBatchCodec.encode(record.getStreamId(), record.getBaseOffset(), record.getRecordBatch());
- try {
- out.write(recordBuf.array(), recordBuf.readerIndex(), recordBuf.readableBytes());
- } catch (IOException e) {
- // won't happen
- throw new RuntimeException(e);
- }
- recordBuf.release();
- }
- // add last stream range
- if (streamId != -1) {
- streamRanges.add(new ObjectStreamRange(streamId, streamEpoch, streamStartOffset, streamEndOffset));
- }
- objectBuf = Unpooled.wrappedBuffer(compressed.buffer().flip());
- }
-
public boolean isDone() {
return isDone;
}
diff --git a/core/src/main/scala/kafka/log/s3/StreamRecordBatchCodec.java b/core/src/main/scala/kafka/log/s3/StreamRecordBatchCodec.java
new file mode 100644
index 0000000000..997bf2e2db
--- /dev/null
+++ b/core/src/main/scala/kafka/log/s3/StreamRecordBatchCodec.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log.s3;
+
+import com.automq.elasticstream.client.DefaultRecordBatch;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import kafka.log.s3.model.StreamRecordBatch;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+public class StreamRecordBatchCodec {
+ private static final byte MAGIC_V0 = 0x22;
+ private static final PooledByteBufAllocator ALLOCATOR = PooledByteBufAllocator.DEFAULT;
+
+ public static ByteBuf encode(StreamRecordBatch streamRecord) {
+ int totalLength = 1 // magic
+ + 8 // streamId
+ + 8 // epoch
+ + 8 // baseOffset
+ + 4 // lastOffsetDelta
+ + 4 // payload length
+ + streamRecord.getRecordBatch().rawPayload().remaining(); // payload
+
+ ByteBuf buf = ALLOCATOR.heapBuffer(totalLength);
+ buf.writeByte(MAGIC_V0);
+ buf.writeLong(streamRecord.getStreamId());
+ buf.writeLong(streamRecord.getEpoch());
+ buf.writeLong(streamRecord.getBaseOffset());
+ buf.writeInt(streamRecord.getRecordBatch().count());
+ ByteBuffer payload = streamRecord.getRecordBatch().rawPayload().duplicate();
+ buf.writeInt(payload.remaining());
+ buf.writeBytes(payload);
+ return buf;
+ }
+
+ /**
+ * Decode a stream record batch from a byte buffer and move the reader index.
+ */
+ public static StreamRecordBatch decode(DataInputStream in) {
+ try {
+ in.readByte(); // magic
+ long streamId = in.readLong();
+ long epoch = in.readLong();
+ long baseOffset = in.readLong();
+ int lastOffsetDelta = in.readInt();
+ int payloadLength = in.readInt();
+ ByteBuffer payload = ByteBuffer.allocate(payloadLength);
+ in.readFully(payload.array());
+ DefaultRecordBatch defaultRecordBatch = new DefaultRecordBatch(lastOffsetDelta, 0, Collections.emptyMap(), payload);
+ return new StreamRecordBatch(streamId, epoch, baseOffset, defaultRecordBatch);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/core/src/main/scala/kafka/log/s3/cache/S3BlockCache.java b/core/src/main/scala/kafka/log/s3/cache/S3BlockCache.java
index 2d2fdae366..d4e7cea55b 100644
--- a/core/src/main/scala/kafka/log/s3/cache/S3BlockCache.java
+++ b/core/src/main/scala/kafka/log/s3/cache/S3BlockCache.java
@@ -26,6 +26,6 @@
*/
public interface S3BlockCache {
- CompletableFuture read(long objectId, long streamId, long startOffset, long endOffset, int maxBytes);
+ CompletableFuture read(long streamId, long startOffset, long endOffset, int maxBytes);
}
diff --git a/core/src/main/scala/kafka/log/s3/objects/ObjectManager.java b/core/src/main/scala/kafka/log/s3/objects/ObjectManager.java
index 1073a2a449..db47608e3e 100644
--- a/core/src/main/scala/kafka/log/s3/objects/ObjectManager.java
+++ b/core/src/main/scala/kafka/log/s3/objects/ObjectManager.java
@@ -66,7 +66,12 @@ public interface ObjectManager {
/**
* Get objects by stream range.
*/
- List getObjects(long streamId, long startOffset, long endOffset, int maxBytes);
+ List getObjects(long streamId, long startOffset, long endOffset, int maxBytes);
+ /**
+ * Get current server wal objects.
+ * When server is starting, wal need server wal objects to recover.
+ */
+ List getServerObjects();
}
diff --git a/core/src/main/scala/kafka/log/s3/objects/ObjectStreamRange.java b/core/src/main/scala/kafka/log/s3/objects/ObjectStreamRange.java
index 38d807809d..8d620a79bd 100644
--- a/core/src/main/scala/kafka/log/s3/objects/ObjectStreamRange.java
+++ b/core/src/main/scala/kafka/log/s3/objects/ObjectStreamRange.java
@@ -18,17 +18,10 @@
package kafka.log.s3.objects;
public class ObjectStreamRange {
- private final long streamId;
- private final long epoch;
- private final long startOffset;
- private final long endOffset;
-
- public ObjectStreamRange(long streamId, long epoch, long startOffset, long endOffset) {
- this.streamId = streamId;
- this.epoch = epoch;
- this.startOffset = startOffset;
- this.endOffset = endOffset;
- }
+ private long streamId;
+ private long epoch;
+ private long startOffset;
+ private long endOffset;
public long getStreamId() {
return streamId;
@@ -44,4 +37,20 @@ public long getStartOffset() {
public long getEndOffset() {
return endOffset;
}
+
+ public void setStreamId(long streamId) {
+ this.streamId = streamId;
+ }
+
+ public void setEpoch(long epoch) {
+ this.epoch = epoch;
+ }
+
+ public void setStartOffset(long startOffset) {
+ this.startOffset = startOffset;
+ }
+
+ public void setEndOffset(long endOffset) {
+ this.endOffset = endOffset;
+ }
}
diff --git a/core/src/main/scala/kafka/log/s3/objects/S3ObjectMetadata.java b/core/src/main/scala/kafka/log/s3/objects/S3ObjectMetadata.java
new file mode 100644
index 0000000000..d8c7a1f429
--- /dev/null
+++ b/core/src/main/scala/kafka/log/s3/objects/S3ObjectMetadata.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log.s3.objects;
+
+import kafka.log.s3.utils.ObjectUtils;
+import org.apache.kafka.metadata.stream.S3ObjectType;
+
+public class S3ObjectMetadata {
+ private final long objectId;
+ private final long objectSize;
+ private final S3ObjectType type;
+
+ public S3ObjectMetadata(long objectId, long objectSize, S3ObjectType type) {
+ this.objectId = objectId;
+ this.objectSize = objectSize;
+ this.type = type;
+ }
+
+ public long getObjectId() {
+ return objectId;
+ }
+
+ public long getObjectSize() {
+ return objectSize;
+ }
+
+ public S3ObjectType getType() {
+ return type;
+ }
+
+ public String key() {
+ return ObjectUtils.genKey(0, "todocluster", objectId);
+ }
+}
diff --git a/core/src/main/scala/kafka/log/s3/operator/MemoryS3Operator.java b/core/src/main/scala/kafka/log/s3/operator/MemoryS3Operator.java
new file mode 100644
index 0000000000..358546b563
--- /dev/null
+++ b/core/src/main/scala/kafka/log/s3/operator/MemoryS3Operator.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.s3.operator;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import kafka.log.es.FutureUtil;
+
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+public class MemoryS3Operator implements S3Operator {
+ private final Map storage = new HashMap<>();
+
+ @Override
+ public CompletableFuture read(String path) {
+ ByteBuf value = storage.get(path);
+ if (value == null) {
+ return FutureUtil.failedFuture(new IllegalArgumentException("object not exist"));
+ }
+ return CompletableFuture.completedFuture(value.duplicate());
+ }
+
+ @Override
+ public CompletableFuture rangeRead(String path, long start, long end) {
+ ByteBuf value = storage.get(path);
+ if (value == null) {
+ return FutureUtil.failedFuture(new IllegalArgumentException("object not exist"));
+ }
+ return CompletableFuture.completedFuture(value.slice(value.readerIndex() + (int) start, value.readerIndex() + (int) end));
+ }
+
+ @Override
+ public CompletableFuture write(String path, ByteBuf data) {
+ storage.put(path, data.duplicate());
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public Writer writer(String path) {
+ ByteBuf buf = Unpooled.buffer();
+ storage.put(path, buf);
+ return new Writer() {
+ @Override
+ public void write(ByteBuf part) {
+ buf.writeBytes(part);
+ }
+
+ @Override
+ public void copyWrite(String sourcePath, long start, long end) {
+ ByteBuf source = storage.get(sourcePath);
+ if (source == null) {
+ throw new IllegalArgumentException("object not exist");
+ }
+ buf.writeBytes(source.slice(source.readerIndex() + (int) start, source.readerIndex() + (int) end));
+ }
+
+ @Override
+ public CompletableFuture close() {
+ return CompletableFuture.completedFuture(null);
+ }
+ };
+ }
+
+ @Override
+ public CompletableFuture delete(String path) {
+ storage.remove(path);
+ return CompletableFuture.completedFuture(null);
+ }
+}
diff --git a/core/src/test/java/kafka/log/s3/DefaultRecordBatch.java b/core/src/test/java/kafka/log/s3/DefaultRecordBatch.java
index 681a1c25c2..a480cc0120 100644
--- a/core/src/test/java/kafka/log/s3/DefaultRecordBatch.java
+++ b/core/src/test/java/kafka/log/s3/DefaultRecordBatch.java
@@ -22,6 +22,7 @@
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
+import java.util.Random;
public class DefaultRecordBatch implements RecordBatch {
int count;
@@ -30,9 +31,9 @@ public class DefaultRecordBatch implements RecordBatch {
public static RecordBatch of(int count, int size) {
DefaultRecordBatch record = new DefaultRecordBatch();
record.count = count;
- record.payload = ByteBuffer.allocate(size);
- record.payload.position(size);
- record.payload.flip();
+ byte[] bytes = new byte[size];
+ new Random().nextBytes(bytes);
+ record.payload = ByteBuffer.wrap(bytes);
return record;
}
diff --git a/core/src/test/java/kafka/log/s3/ObjectWriterTest.java b/core/src/test/java/kafka/log/s3/ObjectWriterTest.java
new file mode 100644
index 0000000000..e9887343fd
--- /dev/null
+++ b/core/src/test/java/kafka/log/s3/ObjectWriterTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log.s3;
+
+import com.automq.elasticstream.client.api.RecordBatch;
+import kafka.log.s3.model.StreamRecordBatch;
+import kafka.log.s3.objects.ObjectStreamRange;
+import kafka.log.s3.objects.S3ObjectMetadata;
+import kafka.log.s3.operator.MemoryS3Operator;
+import kafka.log.s3.operator.S3Operator;
+import org.apache.kafka.metadata.stream.S3ObjectType;
+import org.junit.jupiter.api.Test;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+public class ObjectWriterTest {
+
+ @Test
+ public void testWrite() throws ExecutionException, InterruptedException {
+ S3ObjectMetadata metadata = new S3ObjectMetadata(1, 0, S3ObjectType.WAL_LOOSE);
+
+ S3Operator s3Operator = new MemoryS3Operator();
+ ObjectWriter objectWriter = new ObjectWriter(metadata.key(), s3Operator, 1024, 1024);
+ StreamRecordBatch r1 = newRecord(233, 10, 5, 512);
+ objectWriter.write(r1);
+ StreamRecordBatch r2 = newRecord(233, 15, 10, 512);
+ objectWriter.write(r2);
+ StreamRecordBatch r3 = newRecord(233, 25, 5, 512);
+ objectWriter.write(r3);
+ StreamRecordBatch r4 = newRecord(234, 0, 5, 512);
+ objectWriter.write(r4);
+ objectWriter.close().get();
+
+ List streamRanges = objectWriter.getStreamRanges();
+ assertEquals(2, streamRanges.size());
+ assertEquals(233, streamRanges.get(0).getStreamId());
+ assertEquals(10, streamRanges.get(0).getStartOffset());
+ assertEquals(30, streamRanges.get(0).getEndOffset());
+ assertEquals(234, streamRanges.get(1).getStreamId());
+ assertEquals(0, streamRanges.get(1).getStartOffset());
+ assertEquals(5, streamRanges.get(1).getEndOffset());
+
+ int objectSize = s3Operator.read(metadata.key()).get().readableBytes();
+ assertEquals(objectSize, objectWriter.size());
+
+ metadata = new S3ObjectMetadata(1, objectSize, S3ObjectType.WAL_LOOSE);
+ ObjectReader objectReader = new ObjectReader(metadata, s3Operator);
+ List blockIndexes = objectReader.find(233, 10, 30).get();
+ assertEquals(2, blockIndexes.size());
+ {
+ Iterator it = objectReader.read(blockIndexes.get(0)).get().iterator();
+ StreamRecordBatch r = it.next();
+ assertEquals(233L, r.getStreamId());
+ assertEquals(10L, r.getBaseOffset());
+ assertEquals(5L, r.getRecordBatch().count());
+ assertEquals(r1.getRecordBatch().rawPayload(), r.getRecordBatch().rawPayload());
+ r = it.next();
+ assertEquals(233L, r.getStreamId());
+ assertEquals(15L, r.getBaseOffset());
+ assertEquals(10L, r.getRecordBatch().count());
+ assertEquals(r2.getRecordBatch().rawPayload(), r.getRecordBatch().rawPayload());
+ assertFalse(it.hasNext());
+ }
+
+ {
+ Iterator it = objectReader.read(blockIndexes.get(1)).get().iterator();
+ StreamRecordBatch r = it.next();
+ assertEquals(233L, r.getStreamId());
+ assertEquals(25L, r.getBaseOffset());
+ assertEquals(5L, r.getRecordBatch().count());
+ assertEquals(r3.getRecordBatch().rawPayload(), r.getRecordBatch().rawPayload());
+ r = it.next();
+ assertEquals(234L, r.getStreamId());
+ assertEquals(0L, r.getBaseOffset());
+ assertEquals(5L, r.getRecordBatch().count());
+ assertEquals(r4.getRecordBatch().rawPayload(), r.getRecordBatch().rawPayload());
+ assertFalse(it.hasNext());
+ }
+
+ blockIndexes = objectReader.find(234, 1, 2).get();
+ assertEquals(1, blockIndexes.size());
+ assertEquals(1, blockIndexes.get(0).blockId());
+ }
+
+ StreamRecordBatch newRecord(long streamId, long offset, int count, int payloadSize) {
+ RecordBatch recordBatch = DefaultRecordBatch.of(count, payloadSize);
+ return new StreamRecordBatch(streamId, 0, offset, recordBatch);
+ }
+}
diff --git a/core/src/test/java/kafka/log/s3/S3StreamTest.java b/core/src/test/java/kafka/log/s3/S3StreamTest.java
index 58bdd3bb36..807b6457f7 100644
--- a/core/src/test/java/kafka/log/s3/S3StreamTest.java
+++ b/core/src/test/java/kafka/log/s3/S3StreamTest.java
@@ -24,7 +24,6 @@
import kafka.log.s3.cache.S3BlockCache;
import kafka.log.s3.model.RangeMetadata;
import kafka.log.s3.model.StreamMetadata;
-import kafka.log.s3.objects.ObjectManager;
import kafka.log.s3.streams.StreamManager;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -42,7 +41,6 @@ public class S3StreamTest {
Wal wal;
S3BlockCache blockCache;
StreamManager streamManager;
- ObjectManager objectManager;
S3Stream stream;
@BeforeEach
@@ -50,26 +48,22 @@ public void setup() {
wal = mock(Wal.class);
blockCache = mock(S3BlockCache.class);
streamManager = mock(StreamManager.class);
- objectManager = mock(ObjectManager.class);
StreamMetadata metadata = new StreamMetadata();
metadata.setStreamId(233);
metadata.setEpoch(1);
metadata.setStartOffset(100);
metadata.setRanges(List.of(new RangeMetadata(1, 50, -1, 10)));
- stream = new S3Stream(metadata, wal, blockCache, streamManager, objectManager);
+ stream = new S3Stream(metadata, wal, blockCache, streamManager);
}
@Test
public void testFetch() throws Throwable {
- when(objectManager.getObjects(eq(233L), eq(110L), eq(120L), eq(100))).thenReturn(List.of(123L, 124L));
- when(blockCache.read(eq(123L), eq(233L), eq(110L), eq(120L), eq(100)))
- .thenReturn(CompletableFuture.completedFuture(newReadDataBlock(110, 115, 10)));
- when(blockCache.read(eq(124L), eq(233L), eq(115L), eq(120L), eq(90)))
- .thenReturn(CompletableFuture.completedFuture(newReadDataBlock(115, 120, 10)));
+ when(blockCache.read(eq(233L), eq(110L), eq(120L), eq(100)))
+ .thenReturn(CompletableFuture.completedFuture(newReadDataBlock(110, 115, 110)));
FetchResult rst = stream.fetch(110, 120, 100).get(1, TimeUnit.SECONDS);
- assertEquals(2, rst.recordBatchList().size());
+ assertEquals(1, rst.recordBatchList().size());
assertEquals(110, rst.recordBatchList().get(0).baseOffset());
- assertEquals(120, rst.recordBatchList().get(1).lastOffset());
+ assertEquals(115, rst.recordBatchList().get(0).lastOffset());
}
ReadDataBlock newReadDataBlock(long start, long end, int size) {
diff --git a/core/src/test/java/kafka/log/s3/S3WalTest.java b/core/src/test/java/kafka/log/s3/S3WalTest.java
index 220a25d25a..6795c59a80 100644
--- a/core/src/test/java/kafka/log/s3/S3WalTest.java
+++ b/core/src/test/java/kafka/log/s3/S3WalTest.java
@@ -22,7 +22,7 @@
import kafka.log.s3.objects.CommitWalObjectResponse;
import kafka.log.s3.objects.ObjectManager;
import kafka.log.s3.objects.ObjectStreamRange;
-import kafka.log.s3.operator.S3Operator;
+import kafka.log.s3.operator.MemoryS3Operator;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
@@ -41,14 +41,12 @@
public class S3WalTest {
ObjectManager objectManager;
- S3Operator s3Operator;
S3Wal s3Wal;
@BeforeEach
public void setup() {
objectManager = mock(ObjectManager.class);
- s3Operator = mock(S3Operator.class);
- s3Wal = new S3Wal(objectManager, s3Operator);
+ s3Wal = new S3Wal(objectManager, new MemoryS3Operator());
}
@Test
@@ -56,7 +54,6 @@ public void testAppend() throws Exception {
when(objectManager.prepareObject(eq(1), anyLong())).thenReturn(CompletableFuture.completedFuture(16L));
CommitWalObjectResponse resp = new CommitWalObjectResponse();
when(objectManager.commitWalObject(any())).thenReturn(CompletableFuture.completedFuture(resp));
- when(s3Operator.write(any(), any())).thenReturn(CompletableFuture.completedFuture(null));
CompletableFuture cf1 = s3Wal.append(new StreamRecordBatch(233, 1, 10, DefaultRecordBatch.of(1, 100)));
CompletableFuture cf2 = s3Wal.append(new StreamRecordBatch(233, 1, 11, DefaultRecordBatch.of(2, 100)));