From 837d1107ac63e23c3e2da1a256e283a721ed7df3 Mon Sep 17 00:00:00 2001 From: Robin Han Date: Thu, 24 Aug 2023 09:40:06 +0800 Subject: [PATCH 1/2] feat(stream-client): stream read Signed-off-by: Robin Han --- .../src/main/scala/kafka/log/s3/S3Stream.java | 7 +- .../log/s3/cache/DefaultS3BlockCache.java | 118 ++++++++++++++++++ .../kafka/log/s3/cache/ReadDataBlock.java | 16 +-- .../kafka/log/s3/model/StreamRecordBatch.java | 4 + .../kafka/log/s3/objects/ObjectManager.java | 7 +- .../kafka/log/s3/DefaultS3BlockCacheTest.java | 96 ++++++++++++++ .../java/kafka/log/s3/ObjectWriterTest.java | 2 + .../test/java/kafka/log/s3/S3StreamTest.java | 8 +- .../src/test/java/kafka/log/s3/S3WalTest.java | 2 + 9 files changed, 247 insertions(+), 13 deletions(-) create mode 100644 core/src/main/scala/kafka/log/s3/cache/DefaultS3BlockCache.java create mode 100644 core/src/test/java/kafka/log/s3/DefaultS3BlockCacheTest.java diff --git a/core/src/main/scala/kafka/log/s3/S3Stream.java b/core/src/main/scala/kafka/log/s3/S3Stream.java index bd60952a0c..1505ebec11 100644 --- a/core/src/main/scala/kafka/log/s3/S3Stream.java +++ b/core/src/main/scala/kafka/log/s3/S3Stream.java @@ -23,6 +23,7 @@ import com.automq.elasticstream.client.api.RecordBatch; import com.automq.elasticstream.client.api.RecordBatchWithContext; import com.automq.elasticstream.client.api.Stream; +import kafka.log.es.RecordBatchWithContextWrapper; import kafka.log.s3.cache.S3BlockCache; import kafka.log.s3.model.StreamMetadata; import kafka.log.s3.model.StreamRecordBatch; @@ -31,6 +32,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; public class S3Stream implements Stream { private final StreamMetadata metadata; @@ -76,7 +78,10 @@ public CompletableFuture append(RecordBatch recordBatch) { @Override public CompletableFuture fetch(long startOffset, long endOffset, int maxBytes) { //TODO: bound check - return blockCache.read(streamId, startOffset, endOffset, maxBytes).thenApply(dataBlock -> new DefaultFetchResult(dataBlock.getRecords())); + return blockCache.read(streamId, startOffset, endOffset, maxBytes).thenApply(dataBlock -> { + List records = dataBlock.getRecords().stream().map(r -> new RecordBatchWithContextWrapper(r.getRecordBatch(), r.getBaseOffset())).collect(Collectors.toList()); + return new DefaultFetchResult(records); + }); } @Override diff --git a/core/src/main/scala/kafka/log/s3/cache/DefaultS3BlockCache.java b/core/src/main/scala/kafka/log/s3/cache/DefaultS3BlockCache.java new file mode 100644 index 0000000000..8886c0159a --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/cache/DefaultS3BlockCache.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.cache; + +import kafka.log.s3.ObjectReader; +import kafka.log.s3.model.StreamRecordBatch; +import kafka.log.s3.objects.ObjectManager; +import kafka.log.s3.objects.S3ObjectMetadata; +import kafka.log.s3.operator.S3Operator; +import org.apache.kafka.common.utils.CloseableIterator; + +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +public class DefaultS3BlockCache implements S3BlockCache { + private final ObjectManager objectManager; + private final S3Operator s3Operator; + + public DefaultS3BlockCache(ObjectManager objectManager, S3Operator s3Operator) { + this.objectManager = objectManager; + this.s3Operator = s3Operator; + } + + @Override + public CompletableFuture read(long streamId, long startOffset, long endOffset, int maxBytes) { + List objects = objectManager.getObjects(streamId, startOffset, endOffset, 2); + ReadContext context = new ReadContext(objects, startOffset, maxBytes); + return read0(streamId, endOffset, context); + } + + private CompletableFuture read0(long streamId, long endOffset, ReadContext context) { + if (context.blocks == null || context.blocks.size() <= context.blockIndex) { + if (context.objectIndex >= context.objects.size()) { + context.objects = objectManager.getObjects(streamId, context.nextStartOffset, endOffset, 2); + context.objectIndex = 0; + } + // previous object is completed read o or is the first object. + context.reader = new ObjectReader(context.objects.get(context.objectIndex), s3Operator); + context.objectIndex++; + return context.reader.find(streamId, context.nextStartOffset, endOffset).thenCompose(blocks -> { + context.blocks = blocks; + context.blockIndex = 0; + return read0(streamId, endOffset, context); + }); + } else { + return context.reader.read(context.blocks.get(context.blockIndex)).thenCompose(dataBlock -> { + context.blockIndex++; + long nextStartOffset = context.nextStartOffset; + int nextMaxBytes = context.nextMaxBytes; + boolean matched = false; + try (CloseableIterator it = dataBlock.iterator()) { + while (it.hasNext()) { + StreamRecordBatch recordBatch = it.next(); + if (recordBatch.getStreamId() != streamId) { + if (matched) { + break; + } + continue; + } + matched = true; + if (recordBatch.getLastOffset() <= nextStartOffset) { + continue; + } + context.records.add(recordBatch); + nextStartOffset = recordBatch.getLastOffset(); + nextMaxBytes -= Math.min(nextMaxBytes, recordBatch.getRecordBatch().rawPayload().remaining()); + if (nextStartOffset >= endOffset || nextMaxBytes == 0) { + break; + } + // TODO: cache the remaining records + } + } + context.nextStartOffset = nextStartOffset; + context.nextMaxBytes = nextMaxBytes; + if (nextStartOffset >= endOffset || nextMaxBytes == 0) { + return CompletableFuture.completedFuture(new ReadDataBlock(context.records)); + } else { + return read0(streamId, endOffset, context); + } + }); + } + } + + static class ReadContext { + List objects; + int objectIndex; + ObjectReader reader; + List blocks; + int blockIndex; + List records; + long nextStartOffset; + int nextMaxBytes; + + public ReadContext(List objects, long startOffset, int maxBytes) { + this.objects = objects; + this.records = new LinkedList<>(); + this.nextStartOffset = startOffset; + this.nextMaxBytes = maxBytes; + } + + } +} diff --git a/core/src/main/scala/kafka/log/s3/cache/ReadDataBlock.java b/core/src/main/scala/kafka/log/s3/cache/ReadDataBlock.java index 35110b35b7..459c9316e2 100644 --- a/core/src/main/scala/kafka/log/s3/cache/ReadDataBlock.java +++ b/core/src/main/scala/kafka/log/s3/cache/ReadDataBlock.java @@ -17,23 +17,23 @@ package kafka.log.s3.cache; -import com.automq.elasticstream.client.api.RecordBatchWithContext; +import kafka.log.s3.model.StreamRecordBatch; import java.util.List; import java.util.OptionalLong; public class ReadDataBlock { - private List records; + private List records; - public ReadDataBlock(List records) { + public ReadDataBlock(List records) { this.records = records; } - public List getRecords() { + public List getRecords() { return records; } - public void setRecords(List records) { + public void setRecords(List records) { this.records = records; } @@ -41,7 +41,7 @@ public OptionalLong startOffset() { if (records.isEmpty()) { return OptionalLong.empty(); } else { - return OptionalLong.of(records.get(0).baseOffset()); + return OptionalLong.of(records.get(0).getBaseOffset()); } } @@ -49,11 +49,11 @@ public OptionalLong endOffset() { if (records.isEmpty()) { return OptionalLong.empty(); } else { - return OptionalLong.of(records.get(records.size() - 1).lastOffset()); + return OptionalLong.of(records.get(records.size() - 1).getLastOffset()); } } public int sizeInBytes() { - return records.stream().mapToInt(r -> r.rawPayload().remaining()).sum(); + return records.stream().mapToInt(r -> r.getRecordBatch().rawPayload().remaining()).sum(); } } diff --git a/core/src/main/scala/kafka/log/s3/model/StreamRecordBatch.java b/core/src/main/scala/kafka/log/s3/model/StreamRecordBatch.java index 28cedb13da..809090fd81 100644 --- a/core/src/main/scala/kafka/log/s3/model/StreamRecordBatch.java +++ b/core/src/main/scala/kafka/log/s3/model/StreamRecordBatch.java @@ -44,6 +44,10 @@ public long getBaseOffset() { return baseOffset; } + public long getLastOffset() { + return baseOffset + recordBatch.count(); + } + public RecordBatch getRecordBatch() { return recordBatch; } diff --git a/core/src/main/scala/kafka/log/s3/objects/ObjectManager.java b/core/src/main/scala/kafka/log/s3/objects/ObjectManager.java index db47608e3e..b2604f4cd2 100644 --- a/core/src/main/scala/kafka/log/s3/objects/ObjectManager.java +++ b/core/src/main/scala/kafka/log/s3/objects/ObjectManager.java @@ -65,8 +65,13 @@ public interface ObjectManager { /** * Get objects by stream range. + * @param streamId stream id. + * @param startOffset get range start offset. + * @param endOffset get range end offset. + * @param limit max object count. Why use limit instead of maxBytes? Because we cannot get stream size from object metadata. + * @return {@link S3ObjectMetadata} */ - List getObjects(long streamId, long startOffset, long endOffset, int maxBytes); + List getObjects(long streamId, long startOffset, long endOffset, int limit); /** * Get current server wal objects. diff --git a/core/src/test/java/kafka/log/s3/DefaultS3BlockCacheTest.java b/core/src/test/java/kafka/log/s3/DefaultS3BlockCacheTest.java new file mode 100644 index 0000000000..3f3a8df4fc --- /dev/null +++ b/core/src/test/java/kafka/log/s3/DefaultS3BlockCacheTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3; + +import com.automq.elasticstream.client.api.RecordBatch; +import kafka.log.s3.cache.DefaultS3BlockCache; +import kafka.log.s3.cache.ReadDataBlock; +import kafka.log.s3.model.StreamRecordBatch; +import kafka.log.s3.objects.ObjectManager; +import kafka.log.s3.objects.S3ObjectMetadata; +import kafka.log.s3.operator.MemoryS3Operator; +import kafka.log.s3.operator.S3Operator; +import org.apache.kafka.metadata.stream.S3ObjectType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@Tag("s3Unit") +public class DefaultS3BlockCacheTest { + ObjectManager objectManager; + S3Operator s3Operator; + DefaultS3BlockCache s3BlockCache; + + @BeforeEach + public void setup() { + objectManager = mock(ObjectManager.class); + s3Operator = new MemoryS3Operator(); + s3BlockCache = new DefaultS3BlockCache(objectManager, s3Operator); + } + + @Test + public void testRead() throws Exception { + S3ObjectMetadata metadata1 = new S3ObjectMetadata(0, 0, S3ObjectType.WAL_LOOSE); + ObjectWriter objectWriter = new ObjectWriter(metadata1.key(), s3Operator, 1024, 1024); + objectWriter.write(newRecord(233, 10, 5, 512)); + objectWriter.write(newRecord(233, 15, 10, 512)); + objectWriter.write(newRecord(233, 25, 5, 512)); + objectWriter.write(newRecord(234, 0, 5, 512)); + objectWriter.close(); + metadata1 = new S3ObjectMetadata(0, objectWriter.size(), S3ObjectType.WAL_LOOSE); + + S3ObjectMetadata metadata2 = new S3ObjectMetadata(1, 0, S3ObjectType.WAL_LOOSE); + objectWriter = new ObjectWriter(metadata2.key(), s3Operator, 1024, 1024); + objectWriter.write(newRecord(233, 30, 10, 512)); + objectWriter.close(); + metadata2 = new S3ObjectMetadata(1, objectWriter.size(), S3ObjectType.WAL_LOOSE); + + S3ObjectMetadata metadata3 = new S3ObjectMetadata(2, 0, S3ObjectType.WAL_LOOSE); + objectWriter = new ObjectWriter(metadata3.key(), s3Operator, 1024, 1024); + objectWriter.write(newRecord(233, 40, 20, 512)); + objectWriter.close(); + metadata3 = new S3ObjectMetadata(2, objectWriter.size(), S3ObjectType.WAL_LOOSE); + + when(objectManager.getObjects(eq(233L), eq(11L), eq(60L), eq(2))).thenReturn(List.of( + metadata1, metadata2 + )); + when(objectManager.getObjects(eq(233L), eq(40L), eq(60L), eq(2))).thenReturn(List.of( + metadata3 + )); + + ReadDataBlock rst = s3BlockCache.read(233L, 11L, 60L, 10000).get(3, TimeUnit.SECONDS); + assertEquals(5, rst.getRecords().size()); + assertEquals(10, rst.getRecords().get(0).getBaseOffset()); + assertEquals(60, rst.getRecords().get(4).getLastOffset()); + } + + StreamRecordBatch newRecord(long streamId, long offset, int count, int payloadSize) { + RecordBatch recordBatch = DefaultRecordBatch.of(count, payloadSize); + return new StreamRecordBatch(streamId, 0, offset, recordBatch); + } + + +} diff --git a/core/src/test/java/kafka/log/s3/ObjectWriterTest.java b/core/src/test/java/kafka/log/s3/ObjectWriterTest.java index e9887343fd..0f5b5cf348 100644 --- a/core/src/test/java/kafka/log/s3/ObjectWriterTest.java +++ b/core/src/test/java/kafka/log/s3/ObjectWriterTest.java @@ -24,6 +24,7 @@ import kafka.log.s3.operator.MemoryS3Operator; import kafka.log.s3.operator.S3Operator; import org.apache.kafka.metadata.stream.S3ObjectType; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import java.util.Iterator; @@ -33,6 +34,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +@Tag("esUnit") public class ObjectWriterTest { @Test diff --git a/core/src/test/java/kafka/log/s3/S3StreamTest.java b/core/src/test/java/kafka/log/s3/S3StreamTest.java index 807b6457f7..2ea64413e4 100644 --- a/core/src/test/java/kafka/log/s3/S3StreamTest.java +++ b/core/src/test/java/kafka/log/s3/S3StreamTest.java @@ -19,13 +19,14 @@ import com.automq.elasticstream.client.api.FetchResult; import com.automq.elasticstream.client.api.RecordBatch; -import com.automq.elasticstream.client.api.RecordBatchWithContext; import kafka.log.s3.cache.ReadDataBlock; import kafka.log.s3.cache.S3BlockCache; import kafka.log.s3.model.RangeMetadata; import kafka.log.s3.model.StreamMetadata; +import kafka.log.s3.model.StreamRecordBatch; import kafka.log.s3.streams.StreamManager; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import java.util.List; @@ -37,6 +38,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +@Tag("esUnit") public class S3StreamTest { Wal wal; S3BlockCache blockCache; @@ -68,7 +70,7 @@ public void testFetch() throws Throwable { ReadDataBlock newReadDataBlock(long start, long end, int size) { RecordBatch recordBatch = DefaultRecordBatch.of((int) (end - start), size); - RecordBatchWithContext recordBatchWithContext = new DefaultRecordBatchWithContext(recordBatch, start); - return new ReadDataBlock(List.of(recordBatchWithContext)); + StreamRecordBatch record = new StreamRecordBatch(0, 0, start, recordBatch); + return new ReadDataBlock(List.of(record)); } } diff --git a/core/src/test/java/kafka/log/s3/S3WalTest.java b/core/src/test/java/kafka/log/s3/S3WalTest.java index 6795c59a80..9a16b3fa3c 100644 --- a/core/src/test/java/kafka/log/s3/S3WalTest.java +++ b/core/src/test/java/kafka/log/s3/S3WalTest.java @@ -24,6 +24,7 @@ import kafka.log.s3.objects.ObjectStreamRange; import kafka.log.s3.operator.MemoryS3Operator; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; @@ -39,6 +40,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +@Tag("esUnit") public class S3WalTest { ObjectManager objectManager; S3Wal s3Wal; From 194b2bedc9c4b6b651a10590a17e41f5aa065caa Mon Sep 17 00:00:00 2001 From: Robin Han Date: Thu, 24 Aug 2023 10:08:10 +0800 Subject: [PATCH 2/2] fix(test): add test tag Signed-off-by: Robin Han --- core/src/test/java/kafka/log/s3/DefaultS3BlockCacheTest.java | 2 +- core/src/test/java/kafka/log/s3/ObjectWriterTest.java | 2 +- core/src/test/java/kafka/log/s3/S3StreamTest.java | 2 +- core/src/test/java/kafka/log/s3/S3WalTest.java | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/test/java/kafka/log/s3/DefaultS3BlockCacheTest.java b/core/src/test/java/kafka/log/s3/DefaultS3BlockCacheTest.java index 3f3a8df4fc..ea6617d867 100644 --- a/core/src/test/java/kafka/log/s3/DefaultS3BlockCacheTest.java +++ b/core/src/test/java/kafka/log/s3/DefaultS3BlockCacheTest.java @@ -38,7 +38,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -@Tag("s3Unit") +@Tag("S3Unit") public class DefaultS3BlockCacheTest { ObjectManager objectManager; S3Operator s3Operator; diff --git a/core/src/test/java/kafka/log/s3/ObjectWriterTest.java b/core/src/test/java/kafka/log/s3/ObjectWriterTest.java index 0f5b5cf348..892191f18e 100644 --- a/core/src/test/java/kafka/log/s3/ObjectWriterTest.java +++ b/core/src/test/java/kafka/log/s3/ObjectWriterTest.java @@ -34,7 +34,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -@Tag("esUnit") +@Tag("S3Unit") public class ObjectWriterTest { @Test diff --git a/core/src/test/java/kafka/log/s3/S3StreamTest.java b/core/src/test/java/kafka/log/s3/S3StreamTest.java index 2ea64413e4..84c50c3c0b 100644 --- a/core/src/test/java/kafka/log/s3/S3StreamTest.java +++ b/core/src/test/java/kafka/log/s3/S3StreamTest.java @@ -38,7 +38,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -@Tag("esUnit") +@Tag("S3Unit") public class S3StreamTest { Wal wal; S3BlockCache blockCache; diff --git a/core/src/test/java/kafka/log/s3/S3WalTest.java b/core/src/test/java/kafka/log/s3/S3WalTest.java index 9a16b3fa3c..648566237e 100644 --- a/core/src/test/java/kafka/log/s3/S3WalTest.java +++ b/core/src/test/java/kafka/log/s3/S3WalTest.java @@ -40,7 +40,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -@Tag("esUnit") +@Tag("S3Unit") public class S3WalTest { ObjectManager objectManager; S3Wal s3Wal;