Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion core/src/main/scala/kafka/log/s3/S3Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.automq.elasticstream.client.api.RecordBatch;
import com.automq.elasticstream.client.api.RecordBatchWithContext;
import com.automq.elasticstream.client.api.Stream;
import kafka.log.es.RecordBatchWithContextWrapper;
import kafka.log.s3.cache.S3BlockCache;
import kafka.log.s3.model.StreamMetadata;
import kafka.log.s3.model.StreamRecordBatch;
Expand All @@ -31,6 +32,7 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

public class S3Stream implements Stream {
private final StreamMetadata metadata;
Expand Down Expand Up @@ -76,7 +78,10 @@ public CompletableFuture<AppendResult> append(RecordBatch recordBatch) {
@Override
public CompletableFuture<FetchResult> fetch(long startOffset, long endOffset, int maxBytes) {
//TODO: bound check
return blockCache.read(streamId, startOffset, endOffset, maxBytes).thenApply(dataBlock -> new DefaultFetchResult(dataBlock.getRecords()));
return blockCache.read(streamId, startOffset, endOffset, maxBytes).thenApply(dataBlock -> {
List<RecordBatchWithContext> records = dataBlock.getRecords().stream().map(r -> new RecordBatchWithContextWrapper(r.getRecordBatch(), r.getBaseOffset())).collect(Collectors.toList());
return new DefaultFetchResult(records);
});
}

@Override
Expand Down
118 changes: 118 additions & 0 deletions core/src/main/scala/kafka/log/s3/cache/DefaultS3BlockCache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.log.s3.cache;

import kafka.log.s3.ObjectReader;
import kafka.log.s3.model.StreamRecordBatch;
import kafka.log.s3.objects.ObjectManager;
import kafka.log.s3.objects.S3ObjectMetadata;
import kafka.log.s3.operator.S3Operator;
import org.apache.kafka.common.utils.CloseableIterator;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;

public class DefaultS3BlockCache implements S3BlockCache {
private final ObjectManager objectManager;
private final S3Operator s3Operator;

public DefaultS3BlockCache(ObjectManager objectManager, S3Operator s3Operator) {
this.objectManager = objectManager;
this.s3Operator = s3Operator;
}

@Override
public CompletableFuture<ReadDataBlock> read(long streamId, long startOffset, long endOffset, int maxBytes) {
List<S3ObjectMetadata> objects = objectManager.getObjects(streamId, startOffset, endOffset, 2);
ReadContext context = new ReadContext(objects, startOffset, maxBytes);
return read0(streamId, endOffset, context);
}

private CompletableFuture<ReadDataBlock> read0(long streamId, long endOffset, ReadContext context) {
if (context.blocks == null || context.blocks.size() <= context.blockIndex) {
if (context.objectIndex >= context.objects.size()) {
context.objects = objectManager.getObjects(streamId, context.nextStartOffset, endOffset, 2);
context.objectIndex = 0;
}
// previous object is completed read o or is the first object.
context.reader = new ObjectReader(context.objects.get(context.objectIndex), s3Operator);
context.objectIndex++;
return context.reader.find(streamId, context.nextStartOffset, endOffset).thenCompose(blocks -> {
context.blocks = blocks;
context.blockIndex = 0;
return read0(streamId, endOffset, context);
});
} else {
return context.reader.read(context.blocks.get(context.blockIndex)).thenCompose(dataBlock -> {
context.blockIndex++;
long nextStartOffset = context.nextStartOffset;
int nextMaxBytes = context.nextMaxBytes;
boolean matched = false;
try (CloseableIterator<StreamRecordBatch> it = dataBlock.iterator()) {
while (it.hasNext()) {
StreamRecordBatch recordBatch = it.next();
if (recordBatch.getStreamId() != streamId) {
if (matched) {
break;
}
continue;
}
matched = true;
if (recordBatch.getLastOffset() <= nextStartOffset) {
continue;
}
context.records.add(recordBatch);
nextStartOffset = recordBatch.getLastOffset();
nextMaxBytes -= Math.min(nextMaxBytes, recordBatch.getRecordBatch().rawPayload().remaining());
if (nextStartOffset >= endOffset || nextMaxBytes == 0) {
break;
}
// TODO: cache the remaining records
}
}
context.nextStartOffset = nextStartOffset;
context.nextMaxBytes = nextMaxBytes;
if (nextStartOffset >= endOffset || nextMaxBytes == 0) {
return CompletableFuture.completedFuture(new ReadDataBlock(context.records));
} else {
return read0(streamId, endOffset, context);
}
});
}
}

static class ReadContext {
List<S3ObjectMetadata> objects;
int objectIndex;
ObjectReader reader;
List<ObjectReader.DataBlockIndex> blocks;
int blockIndex;
List<StreamRecordBatch> records;
long nextStartOffset;
int nextMaxBytes;

public ReadContext(List<S3ObjectMetadata> objects, long startOffset, int maxBytes) {
this.objects = objects;
this.records = new LinkedList<>();
this.nextStartOffset = startOffset;
this.nextMaxBytes = maxBytes;
}

}
}
16 changes: 8 additions & 8 deletions core/src/main/scala/kafka/log/s3/cache/ReadDataBlock.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,43 +17,43 @@

package kafka.log.s3.cache;

import com.automq.elasticstream.client.api.RecordBatchWithContext;
import kafka.log.s3.model.StreamRecordBatch;

import java.util.List;
import java.util.OptionalLong;

public class ReadDataBlock {
private List<RecordBatchWithContext> records;
private List<StreamRecordBatch> records;

public ReadDataBlock(List<RecordBatchWithContext> records) {
public ReadDataBlock(List<StreamRecordBatch> records) {
this.records = records;
}

public List<RecordBatchWithContext> getRecords() {
public List<StreamRecordBatch> getRecords() {
return records;
}

public void setRecords(List<RecordBatchWithContext> records) {
public void setRecords(List<StreamRecordBatch> records) {
this.records = records;
}

public OptionalLong startOffset() {
if (records.isEmpty()) {
return OptionalLong.empty();
} else {
return OptionalLong.of(records.get(0).baseOffset());
return OptionalLong.of(records.get(0).getBaseOffset());
}
}

public OptionalLong endOffset() {
if (records.isEmpty()) {
return OptionalLong.empty();
} else {
return OptionalLong.of(records.get(records.size() - 1).lastOffset());
return OptionalLong.of(records.get(records.size() - 1).getLastOffset());
}
}

public int sizeInBytes() {
return records.stream().mapToInt(r -> r.rawPayload().remaining()).sum();
return records.stream().mapToInt(r -> r.getRecordBatch().rawPayload().remaining()).sum();
}
}
4 changes: 4 additions & 0 deletions core/src/main/scala/kafka/log/s3/model/StreamRecordBatch.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ public long getBaseOffset() {
return baseOffset;
}

public long getLastOffset() {
return baseOffset + recordBatch.count();
}

public RecordBatch getRecordBatch() {
return recordBatch;
}
Expand Down
7 changes: 6 additions & 1 deletion core/src/main/scala/kafka/log/s3/objects/ObjectManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,13 @@ public interface ObjectManager {

/**
* Get objects by stream range.
* @param streamId stream id.
* @param startOffset get range start offset.
* @param endOffset get range end offset.
* @param limit max object count. Why use limit instead of maxBytes? Because we cannot get stream size from object metadata.
* @return {@link S3ObjectMetadata}
*/
List<S3ObjectMetadata> getObjects(long streamId, long startOffset, long endOffset, int maxBytes);
List<S3ObjectMetadata> getObjects(long streamId, long startOffset, long endOffset, int limit);

/**
* Get current server wal objects.
Expand Down
96 changes: 96 additions & 0 deletions core/src/test/java/kafka/log/s3/DefaultS3BlockCacheTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.log.s3;

import com.automq.elasticstream.client.api.RecordBatch;
import kafka.log.s3.cache.DefaultS3BlockCache;
import kafka.log.s3.cache.ReadDataBlock;
import kafka.log.s3.model.StreamRecordBatch;
import kafka.log.s3.objects.ObjectManager;
import kafka.log.s3.objects.S3ObjectMetadata;
import kafka.log.s3.operator.MemoryS3Operator;
import kafka.log.s3.operator.S3Operator;
import org.apache.kafka.metadata.stream.S3ObjectType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@Tag("S3Unit")
public class DefaultS3BlockCacheTest {
ObjectManager objectManager;
S3Operator s3Operator;
DefaultS3BlockCache s3BlockCache;

@BeforeEach
public void setup() {
objectManager = mock(ObjectManager.class);
s3Operator = new MemoryS3Operator();
s3BlockCache = new DefaultS3BlockCache(objectManager, s3Operator);
}

@Test
public void testRead() throws Exception {
S3ObjectMetadata metadata1 = new S3ObjectMetadata(0, 0, S3ObjectType.WAL_LOOSE);
ObjectWriter objectWriter = new ObjectWriter(metadata1.key(), s3Operator, 1024, 1024);
objectWriter.write(newRecord(233, 10, 5, 512));
objectWriter.write(newRecord(233, 15, 10, 512));
objectWriter.write(newRecord(233, 25, 5, 512));
objectWriter.write(newRecord(234, 0, 5, 512));
objectWriter.close();
metadata1 = new S3ObjectMetadata(0, objectWriter.size(), S3ObjectType.WAL_LOOSE);

S3ObjectMetadata metadata2 = new S3ObjectMetadata(1, 0, S3ObjectType.WAL_LOOSE);
objectWriter = new ObjectWriter(metadata2.key(), s3Operator, 1024, 1024);
objectWriter.write(newRecord(233, 30, 10, 512));
objectWriter.close();
metadata2 = new S3ObjectMetadata(1, objectWriter.size(), S3ObjectType.WAL_LOOSE);

S3ObjectMetadata metadata3 = new S3ObjectMetadata(2, 0, S3ObjectType.WAL_LOOSE);
objectWriter = new ObjectWriter(metadata3.key(), s3Operator, 1024, 1024);
objectWriter.write(newRecord(233, 40, 20, 512));
objectWriter.close();
metadata3 = new S3ObjectMetadata(2, objectWriter.size(), S3ObjectType.WAL_LOOSE);

when(objectManager.getObjects(eq(233L), eq(11L), eq(60L), eq(2))).thenReturn(List.of(
metadata1, metadata2
));
when(objectManager.getObjects(eq(233L), eq(40L), eq(60L), eq(2))).thenReturn(List.of(
metadata3
));

ReadDataBlock rst = s3BlockCache.read(233L, 11L, 60L, 10000).get(3, TimeUnit.SECONDS);
assertEquals(5, rst.getRecords().size());
assertEquals(10, rst.getRecords().get(0).getBaseOffset());
assertEquals(60, rst.getRecords().get(4).getLastOffset());
}

StreamRecordBatch newRecord(long streamId, long offset, int count, int payloadSize) {
RecordBatch recordBatch = DefaultRecordBatch.of(count, payloadSize);
return new StreamRecordBatch(streamId, 0, offset, recordBatch);
}


}
2 changes: 2 additions & 0 deletions core/src/test/java/kafka/log/s3/ObjectWriterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import kafka.log.s3.operator.MemoryS3Operator;
import kafka.log.s3.operator.S3Operator;
import org.apache.kafka.metadata.stream.S3ObjectType;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import java.util.Iterator;
Expand All @@ -33,6 +34,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;

@Tag("S3Unit")
public class ObjectWriterTest {

@Test
Expand Down
8 changes: 5 additions & 3 deletions core/src/test/java/kafka/log/s3/S3StreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@

import com.automq.elasticstream.client.api.FetchResult;
import com.automq.elasticstream.client.api.RecordBatch;
import com.automq.elasticstream.client.api.RecordBatchWithContext;
import kafka.log.s3.cache.ReadDataBlock;
import kafka.log.s3.cache.S3BlockCache;
import kafka.log.s3.model.RangeMetadata;
import kafka.log.s3.model.StreamMetadata;
import kafka.log.s3.model.StreamRecordBatch;
import kafka.log.s3.streams.StreamManager;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import java.util.List;
Expand All @@ -37,6 +38,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@Tag("S3Unit")
public class S3StreamTest {
Wal wal;
S3BlockCache blockCache;
Expand Down Expand Up @@ -68,7 +70,7 @@ public void testFetch() throws Throwable {

ReadDataBlock newReadDataBlock(long start, long end, int size) {
RecordBatch recordBatch = DefaultRecordBatch.of((int) (end - start), size);
RecordBatchWithContext recordBatchWithContext = new DefaultRecordBatchWithContext(recordBatch, start);
return new ReadDataBlock(List.of(recordBatchWithContext));
StreamRecordBatch record = new StreamRecordBatch(0, 0, start, recordBatch);
return new ReadDataBlock(List.of(record));
}
}
2 changes: 2 additions & 0 deletions core/src/test/java/kafka/log/s3/S3WalTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import kafka.log.s3.objects.ObjectStreamRange;
import kafka.log.s3.operator.MemoryS3Operator;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;

Expand All @@ -39,6 +40,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@Tag("S3Unit")
public class S3WalTest {
ObjectManager objectManager;
S3Wal s3Wal;
Expand Down