diff --git a/core/src/main/scala/kafka/log/es/ElasticTimeIndex.scala b/core/src/main/scala/kafka/log/es/ElasticTimeIndex.scala index af755e2e2e..2ba14de424 100644 --- a/core/src/main/scala/kafka/log/es/ElasticTimeIndex.scala +++ b/core/src/main/scala/kafka/log/es/ElasticTimeIndex.scala @@ -74,6 +74,7 @@ class ElasticTimeIndex(_file: File, streamSegmentSupplier: StreamSliceSupplier, cache.put(record.rawPayload()) } }) + rst.free() } else { return timestampOffset } diff --git a/core/src/main/scala/kafka/log/es/ElasticTransactionIndex.scala b/core/src/main/scala/kafka/log/es/ElasticTransactionIndex.scala index 2a900baa59..2511ae9636 100644 --- a/core/src/main/scala/kafka/log/es/ElasticTransactionIndex.scala +++ b/core/src/main/scala/kafka/log/es/ElasticTransactionIndex.scala @@ -93,7 +93,8 @@ class ElasticTransactionIndex(@volatile private var _file: File, streamSliceSupp return queue.dequeue() } try { - val records = stream.fetch(position, stream.nextOffset(), AbortedTxn.TotalSize * 128).recordBatchList() + val rst = stream.fetch(position, stream.nextOffset(), AbortedTxn.TotalSize * 128) + val records = rst.recordBatchList() records.forEach(recordBatch => { val readBuf = Unpooled.wrappedBuffer(recordBatch.rawPayload()) val size = readBuf.readableBytes() @@ -110,6 +111,7 @@ class ElasticTransactionIndex(@volatile private var _file: File, streamSliceSupp } position += size }) + rst.free() queue.dequeue() } catch { case e: IOException => diff --git a/core/src/main/scala/kafka/log/es/MetaStream.java b/core/src/main/scala/kafka/log/es/MetaStream.java index 4065fb4fde..feae520d00 100644 --- a/core/src/main/scala/kafka/log/es/MetaStream.java +++ b/core/src/main/scala/kafka/log/es/MetaStream.java @@ -35,6 +35,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; + +import io.netty.buffer.Unpooled; import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -177,7 +179,7 @@ public Map replay() throws IOException { FetchResult fetchRst = fetch(pos, endOffset, 64 * 1024).get(); for (RecordBatchWithContext context : fetchRst.recordBatchList()) { try { - MetaKeyValue kv = MetaKeyValue.decode(context.rawPayload()); + MetaKeyValue kv = MetaKeyValue.decode(Unpooled.copiedBuffer(context.rawPayload()).nioBuffer()); metaCache.put(kv.getKey(), Pair.of(context.baseOffset(), kv.getValue())); } catch (Exception e) { LOGGER.error("{} streamId {}: decode meta failed, offset: {}, error: {}", logIdent, streamId(), context.baseOffset(), e.getMessage()); @@ -187,6 +189,7 @@ public Map replay() throws IOException { if (pos >= endOffset) { done = true; } + fetchRst.free(); } } catch (ExecutionException e) { if (e.getCause() instanceof IOException) { diff --git a/core/src/main/scala/kafka/log/s3/ByteBufAlloc.java b/core/src/main/scala/kafka/log/s3/ByteBufAlloc.java new file mode 100644 index 0000000000..2ae6b108bc --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/ByteBufAlloc.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3; + +import io.netty.buffer.PooledByteBufAllocator; + +public class ByteBufAlloc { + + public static final PooledByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT; + +} diff --git a/core/src/main/scala/kafka/log/s3/FlatStreamRecordBatch.java b/core/src/main/scala/kafka/log/s3/FlatStreamRecordBatch.java deleted file mode 100644 index 691fd6e4bc..0000000000 --- a/core/src/main/scala/kafka/log/s3/FlatStreamRecordBatch.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.log.s3; - -import io.netty.buffer.ByteBuf; -import kafka.log.s3.model.StreamRecordBatch; - -public class FlatStreamRecordBatch implements Comparable { - public long streamId; - public long epoch; - public long baseOffset; - public int count; - public ByteBuf encodedBuf; - - public static FlatStreamRecordBatch from(StreamRecordBatch streamRecord) { - FlatStreamRecordBatch self = new FlatStreamRecordBatch(); - self.streamId = streamRecord.getStreamId(); - self.epoch = streamRecord.getEpoch(); - self.baseOffset = streamRecord.getBaseOffset(); - self.count = streamRecord.getRecordBatch().count(); - self.encodedBuf = StreamRecordBatchCodec.encode(streamRecord); - return self; - } - - public long lastOffset() { - return baseOffset + count; - } - - public ByteBuf encodedBuf() { - return encodedBuf.duplicate(); - } - - public int size() { - return encodedBuf.readableBytes(); - } - - public void free() { - encodedBuf.release(); - } - - @Override - public int compareTo(FlatStreamRecordBatch o) { - @SuppressWarnings("DuplicatedCode") - int rst = Long.compare(streamId, o.streamId); - if (rst != 0) { - return rst; - } - rst = Long.compare(epoch, o.epoch); - if (rst != 0) { - return rst; - } - return Long.compare(baseOffset, o.baseOffset); - } -} diff --git a/core/src/main/scala/kafka/log/s3/ObjectWriter.java b/core/src/main/scala/kafka/log/s3/ObjectWriter.java index 66d3605921..78dde0287a 100644 --- a/core/src/main/scala/kafka/log/s3/ObjectWriter.java +++ b/core/src/main/scala/kafka/log/s3/ObjectWriter.java @@ -20,12 +20,13 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; +import kafka.log.s3.model.StreamRecordBatch; import kafka.log.s3.objects.ObjectStreamRange; import kafka.log.s3.operator.S3Operator; import kafka.log.s3.operator.Writer; -import org.apache.kafka.metadata.stream.ObjectUtils; import org.apache.kafka.common.compress.ZstdFactory; import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.metadata.stream.ObjectUtils; import java.io.IOException; import java.io.OutputStream; @@ -58,7 +59,7 @@ public ObjectWriter(long objectId, S3Operator s3Operator, int blockSizeThreshold writer = s3Operator.writer(objectKey); } - public void write(FlatStreamRecordBatch record) { + public void write(StreamRecordBatch record) { if (dataBlock == null) { dataBlock = new DataBlock(nextDataBlockPosition); } @@ -159,7 +160,7 @@ public DataBlock(long position) { streamRanges = new LinkedList<>(); } - public boolean write(FlatStreamRecordBatch record) { + public boolean write(StreamRecordBatch record) { try { recordCount++; return write0(record); @@ -169,19 +170,18 @@ public boolean write(FlatStreamRecordBatch record) { } } - public boolean write0(FlatStreamRecordBatch record) throws IOException { - if (streamRange == null || streamRange.getStreamId() != record.streamId) { + public boolean write0(StreamRecordBatch record) throws IOException { + if (streamRange == null || streamRange.getStreamId() != record.getStreamId()) { streamRange = new ObjectStreamRange(); - streamRange.setStreamId(record.streamId); - streamRange.setEpoch(record.epoch); - streamRange.setStartOffset(record.baseOffset); + streamRange.setStreamId(record.getStreamId()); + streamRange.setEpoch(record.getEpoch()); + streamRange.setStartOffset(record.getBaseOffset()); streamRanges.add(streamRange); } - streamRange.setEndOffset(record.lastOffset()); + streamRange.setEndOffset(record.getLastOffset()); - ByteBuf recordBuf = record.encodedBuf(); + ByteBuf recordBuf = record.encoded(); out.write(recordBuf.array(), recordBuf.arrayOffset(), recordBuf.readableBytes()); - recordBuf.release(); blockSize += recordBuf.readableBytes(); if (blockSize >= blockSizeThreshold) { close(); diff --git a/core/src/main/scala/kafka/log/s3/S3Storage.java b/core/src/main/scala/kafka/log/s3/S3Storage.java index 21e143af73..1e52fc19b1 100644 --- a/core/src/main/scala/kafka/log/s3/S3Storage.java +++ b/core/src/main/scala/kafka/log/s3/S3Storage.java @@ -79,11 +79,9 @@ public void close() { @Override public CompletableFuture append(StreamRecordBatch streamRecord) { - //TODO: copy to pooled bytebuffer to reduce gc, convert to flat record - FlatStreamRecordBatch flatStreamRecordBatch = FlatStreamRecordBatch.from(streamRecord); - WriteAheadLog.AppendResult appendResult = log.append(flatStreamRecordBatch.encodedBuf()); + WriteAheadLog.AppendResult appendResult = log.append(streamRecord.encoded()); CompletableFuture cf = new CompletableFuture<>(); - WalWriteRequest writeRequest = new WalWriteRequest(flatStreamRecordBatch, appendResult.offset, cf); + WalWriteRequest writeRequest = new WalWriteRequest(streamRecord, appendResult.offset, cf); callbackSequencer.before(writeRequest); appendResult.future.thenAccept(nil -> handleAppendCallback(writeRequest)); return cf; @@ -97,9 +95,9 @@ public CompletableFuture read(long streamId, long startOffset, lo } private CompletableFuture read0(long streamId, long startOffset, long endOffset, int maxBytes) { - List records = logCache.get(streamId, startOffset, endOffset, maxBytes); + List records = logCache.get(streamId, startOffset, endOffset, maxBytes); if (!records.isEmpty()) { - return CompletableFuture.completedFuture(new ReadDataBlock(StreamRecordBatchCodec.decode(records))); + return CompletableFuture.completedFuture(new ReadDataBlock(records)); } return blockCache.read(streamId, startOffset, endOffset, maxBytes).thenApply(readDataBlock -> { long nextStartOffset = readDataBlock.endOffset().orElse(startOffset); @@ -108,7 +106,7 @@ private CompletableFuture read0(long streamId, long startOffset, return readDataBlock; } List finalRecords = new LinkedList<>(readDataBlock.getRecords()); - finalRecords.addAll(StreamRecordBatchCodec.decode(logCache.get(streamId, nextStartOffset, endOffset, maxBytes))); + finalRecords.addAll(logCache.get(streamId, nextStartOffset, endOffset, maxBytes)); return new ReadDataBlock(finalRecords); }); } @@ -192,6 +190,8 @@ private void commitWALObject(WALObjectUploadTaskContext context) { // 1. poll out current task walObjectCommitQueue.poll(); log.trim(context.cache.confirmOffset()); + // transfer records ownership to block cache. + blockCache.put(context.cache.records()); freeCache(context.cache.blockId()); context.cf.complete(null); @@ -219,7 +219,7 @@ static class WALCallbackSequencer { public void before(WalWriteRequest request) { try { walRequests.put(request); - Queue streamRequests = stream2requests.computeIfAbsent(request.record.streamId, s -> new LinkedBlockingQueue<>()); + Queue streamRequests = stream2requests.computeIfAbsent(request.record.getStreamId(), s -> new LinkedBlockingQueue<>()); streamRequests.add(request); } catch (InterruptedException ex) { request.cf.completeExceptionally(ex); @@ -244,7 +244,7 @@ public List after(WalWriteRequest request) { } // pop sequence success stream request. - long streamId = request.record.streamId; + long streamId = request.record.getStreamId(); Queue streamRequests = stream2requests.get(streamId); WalWriteRequest peek = streamRequests.peek(); if (peek == null || peek.offset != request.offset) { diff --git a/core/src/main/scala/kafka/log/s3/S3Stream.java b/core/src/main/scala/kafka/log/s3/S3Stream.java index 9f62f442ab..5f0ddd623f 100644 --- a/core/src/main/scala/kafka/log/s3/S3Stream.java +++ b/core/src/main/scala/kafka/log/s3/S3Stream.java @@ -25,6 +25,7 @@ import com.automq.elasticstream.client.api.RecordBatchWithContext; import com.automq.elasticstream.client.api.Stream; import com.automq.elasticstream.client.flatc.header.ErrorCode; +import io.netty.buffer.Unpooled; import kafka.log.es.FutureUtil; import kafka.log.es.RecordBatchWithContextWrapper; import kafka.log.s3.model.StreamRecordBatch; @@ -90,7 +91,7 @@ private CompletableFuture append0(RecordBatch recordBatch) { return FutureUtil.failedFuture(new ElasticStreamClientException(ErrorCode.STREAM_ALREADY_CLOSED, logIdent + " stream is not writable")); } long offset = nextOffset.getAndAdd(recordBatch.count()); - StreamRecordBatch streamRecordBatch = new StreamRecordBatch(streamId, epoch, offset, recordBatch); + StreamRecordBatch streamRecordBatch = new StreamRecordBatch(streamId, epoch, offset, recordBatch.count(), Unpooled.wrappedBuffer(recordBatch.rawPayload())); CompletableFuture cf = storage.append(streamRecordBatch).thenApply(nil -> { updateConfirmOffset(offset + recordBatch.count()); return new DefaultAppendResult(offset); @@ -128,7 +129,7 @@ private CompletableFuture fetch0(long startOffset, long endOffset, )); } return storage.read(streamId, startOffset, endOffset, maxBytes).thenApply(dataBlock -> { - List records = dataBlock.getRecords().stream().map(r -> new RecordBatchWithContextWrapper(r.getRecordBatch(), r.getBaseOffset())).collect(Collectors.toList()); + List records = dataBlock.getRecords(); LOGGER.trace("{} stream fetch, startOffset: {}, endOffset: {}, maxBytes: {}, records: {}", logIdent, startOffset, endOffset, maxBytes, records.size()); return new DefaultFetchResult(records); }); @@ -184,16 +185,24 @@ private void updateConfirmOffset(long newOffset) { } } + static class DefaultFetchResult implements FetchResult { - private final List records; + private final List records; - public DefaultFetchResult(List records) { + public DefaultFetchResult(List records) { this.records = records; } @Override public List recordBatchList() { - return records; + return records.stream().map(r -> new RecordBatchWithContextWrapper(r.getRecordBatch(), r.getBaseOffset())).collect(Collectors.toList()); + } + + @Override + public void free() { + for (StreamRecordBatch record : records) { + record.release(); + } } } diff --git a/core/src/main/scala/kafka/log/s3/StreamRecordBatchCodec.java b/core/src/main/scala/kafka/log/s3/StreamRecordBatchCodec.java index 06c78098fc..5d9d575b2e 100644 --- a/core/src/main/scala/kafka/log/s3/StreamRecordBatchCodec.java +++ b/core/src/main/scala/kafka/log/s3/StreamRecordBatchCodec.java @@ -17,21 +17,15 @@ package kafka.log.s3; -import com.automq.elasticstream.client.DefaultRecordBatch; import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; import kafka.log.s3.model.StreamRecordBatch; import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.List; -import java.util.stream.Collectors; public class StreamRecordBatchCodec { - private static final byte MAGIC_V0 = 0x22; - private static final PooledByteBufAllocator ALLOCATOR = PooledByteBufAllocator.DEFAULT; + public static final byte MAGIC_V0 = 0x22; public static ByteBuf encode(StreamRecordBatch streamRecord) { int totalLength = 1 // magic @@ -42,7 +36,7 @@ public static ByteBuf encode(StreamRecordBatch streamRecord) { + 4 // payload length + streamRecord.getRecordBatch().rawPayload().remaining(); // payload - ByteBuf buf = ALLOCATOR.heapBuffer(totalLength); + ByteBuf buf = ByteBufAlloc.ALLOC.heapBuffer(totalLength); buf.writeByte(MAGIC_V0); buf.writeLong(streamRecord.getStreamId()); buf.writeLong(streamRecord.getEpoch()); @@ -65,10 +59,10 @@ public static StreamRecordBatch decode(DataInputStream in) { long baseOffset = in.readLong(); int lastOffsetDelta = in.readInt(); int payloadLength = in.readInt(); - ByteBuffer payload = ByteBuffer.allocate(payloadLength); - in.readFully(payload.array()); - DefaultRecordBatch defaultRecordBatch = new DefaultRecordBatch(lastOffsetDelta, 0, Collections.emptyMap(), payload); - return new StreamRecordBatch(streamId, epoch, baseOffset, defaultRecordBatch); + ByteBuf payload = ByteBufAlloc.ALLOC.heapBuffer(payloadLength); + in.readFully(payload.array(), payload.arrayOffset(), payloadLength); + payload.writerIndex(payload.readerIndex() + payloadLength); + return new StreamRecordBatch(streamId, epoch, baseOffset, lastOffsetDelta, payload); } catch (IOException e) { throw new RuntimeException(e); } @@ -81,13 +75,8 @@ public static StreamRecordBatch decode(ByteBuf buf) { long baseOffset = buf.readLong(); int lastOffsetDelta = buf.readInt(); int payloadLength = buf.readInt(); - ByteBuffer payload = buf.slice(buf.readerIndex(), payloadLength).nioBuffer(); + ByteBuf payload = buf.slice(buf.readerIndex(), payloadLength); buf.skipBytes(payloadLength); - DefaultRecordBatch defaultRecordBatch = new DefaultRecordBatch(lastOffsetDelta, 0, Collections.emptyMap(), payload); - return new StreamRecordBatch(streamId, epoch, baseOffset, defaultRecordBatch); - } - - public static List decode(List records) { - return records.stream().map(r -> decode(r.encodedBuf())).collect(Collectors.toList()); + return new StreamRecordBatch(streamId, epoch, baseOffset, lastOffsetDelta, payload); } } diff --git a/core/src/main/scala/kafka/log/s3/WALObjectUploadTask.java b/core/src/main/scala/kafka/log/s3/WALObjectUploadTask.java index 4130fa6e58..2e323f3853 100644 --- a/core/src/main/scala/kafka/log/s3/WALObjectUploadTask.java +++ b/core/src/main/scala/kafka/log/s3/WALObjectUploadTask.java @@ -17,6 +17,7 @@ package kafka.log.s3; +import kafka.log.s3.model.StreamRecordBatch; import kafka.log.s3.objects.CommitWALObjectRequest; import kafka.log.s3.objects.ObjectManager; import kafka.log.s3.objects.ObjectStreamRange; @@ -37,7 +38,7 @@ public class WALObjectUploadTask { private static final Logger LOGGER = LoggerFactory.getLogger(WALObjectUploadTask.class); - private final Map> streamRecordsMap; + private final Map> streamRecordsMap; private final int objectBlockSize; private final int objectPartSize; private final int streamSplitSizeThreshold; @@ -47,7 +48,7 @@ public class WALObjectUploadTask { private volatile CommitWALObjectRequest commitWALObjectRequest; private final CompletableFuture uploadCf = new CompletableFuture<>(); - public WALObjectUploadTask(Map> streamRecordsMap, ObjectManager objectManager, S3Operator s3Operator, + public WALObjectUploadTask(Map> streamRecordsMap, ObjectManager objectManager, S3Operator s3Operator, int objectBlockSize, int objectPartSize, int streamSplitSizeThreshold) { this.streamRecordsMap = streamRecordsMap; this.objectBlockSize = objectBlockSize; @@ -81,16 +82,16 @@ private void upload0(long objectId) { List> streamObjectCfList = new LinkedList<>(); for (Long streamId : streamIds) { - List streamRecords = streamRecordsMap.get(streamId); - long streamSize = streamRecords.stream().mapToLong(FlatStreamRecordBatch::size).sum(); + List streamRecords = streamRecordsMap.get(streamId); + long streamSize = streamRecords.stream().mapToLong(StreamRecordBatch::size).sum(); if (streamSize >= streamSplitSizeThreshold) { streamObjectCfList.add(writeStreamObject(streamRecords)); } else { - for (FlatStreamRecordBatch record : streamRecords) { + for (StreamRecordBatch record : streamRecords) { walObject.write(record); } - long startOffset = streamRecords.get(0).baseOffset; - long endOffset = streamRecords.get(streamRecords.size() - 1).lastOffset(); + long startOffset = streamRecords.get(0).getBaseOffset(); + long endOffset = streamRecords.get(streamRecords.size() - 1).getLastOffset(); request.addStreamRange(new ObjectStreamRange(streamId, -1L, startOffset, endOffset)); // log object block only contain single stream's data. walObject.closeCurrentBlock(); @@ -120,17 +121,17 @@ public CompletableFuture commit() { })); } - private CompletableFuture writeStreamObject(List streamRecords) { + private CompletableFuture writeStreamObject(List streamRecords) { CompletableFuture objectIdCf = objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(30)); // TODO: retry until success return objectIdCf.thenCompose(objectId -> { ObjectWriter streamObjectWriter = new ObjectWriter(objectId, s3Operator, objectBlockSize, objectPartSize); - for (FlatStreamRecordBatch record : streamRecords) { + for (StreamRecordBatch record : streamRecords) { streamObjectWriter.write(record); } - long streamId = streamRecords.get(0).streamId; - long startOffset = streamRecords.get(0).baseOffset; - long endOffset = streamRecords.get(streamRecords.size() - 1).lastOffset(); + long streamId = streamRecords.get(0).getStreamId(); + long startOffset = streamRecords.get(0).getBaseOffset(); + long endOffset = streamRecords.get(streamRecords.size() - 1).getLastOffset(); StreamObject streamObject = new StreamObject(); streamObject.setObjectId(objectId); streamObject.setStreamId(streamId); diff --git a/core/src/main/scala/kafka/log/s3/WalWriteRequest.java b/core/src/main/scala/kafka/log/s3/WalWriteRequest.java index 422a287136..e9c1351b08 100644 --- a/core/src/main/scala/kafka/log/s3/WalWriteRequest.java +++ b/core/src/main/scala/kafka/log/s3/WalWriteRequest.java @@ -18,15 +18,17 @@ package kafka.log.s3; +import kafka.log.s3.model.StreamRecordBatch; + import java.util.concurrent.CompletableFuture; public class WalWriteRequest implements Comparable { - final FlatStreamRecordBatch record; + final StreamRecordBatch record; final long offset; final CompletableFuture cf; boolean persisted; - public WalWriteRequest(FlatStreamRecordBatch record, long offset, CompletableFuture cf) { + public WalWriteRequest(StreamRecordBatch record, long offset, CompletableFuture cf) { this.record = record; this.offset = offset; this.cf = cf; diff --git a/core/src/main/scala/kafka/log/s3/cache/BlockCache.java b/core/src/main/scala/kafka/log/s3/cache/BlockCache.java index 676844d6b4..f5c57d7903 100644 --- a/core/src/main/scala/kafka/log/s3/cache/BlockCache.java +++ b/core/src/main/scala/kafka/log/s3/cache/BlockCache.java @@ -17,7 +17,8 @@ package kafka.log.s3.cache; -import kafka.log.s3.FlatStreamRecordBatch; + +import kafka.log.s3.model.StreamRecordBatch; import java.util.ArrayList; import java.util.Collections; @@ -46,15 +47,15 @@ public BlockCache(long maxSize) { this.maxSize = maxSize; } - public void put(long streamId, List records) { + public void put(long streamId, List records) { if (maxSize == 0 || records.isEmpty()) { return; } boolean overlapped = false; records = new ArrayList<>(records); NavigableMap streamCache = stream2cache.computeIfAbsent(streamId, id -> new TreeMap<>()); - long startOffset = records.get(0).baseOffset; - long endOffset = records.get(records.size() - 1).lastOffset(); + long startOffset = records.get(0).getBaseOffset(); + long endOffset = records.get(records.size() - 1).getLastOffset(); // TODO: generate readahead. Map.Entry floorEntry = streamCache.floorEntry(startOffset); SortedMap tailMap = streamCache.tailMap(floorEntry != null ? floorEntry.getKey() : startOffset); @@ -66,9 +67,9 @@ public void put(long streamId, List records) { } // overlap is a rare case, so removeIf is fine for the performance. if (records.removeIf(record -> { - boolean remove = record.lastOffset() > cacheBlock.firstOffset && record.baseOffset < cacheBlock.lastOffset; + boolean remove = record.getLastOffset() > cacheBlock.firstOffset && record.getBaseOffset() < cacheBlock.lastOffset; if (remove) { - record.free(); + record.release(); } return remove; })) { @@ -77,23 +78,23 @@ public void put(long streamId, List records) { } // ensure the cache size. - int size = records.stream().mapToInt(FlatStreamRecordBatch::size).sum(); + int size = records.stream().mapToInt(StreamRecordBatch::size).sum(); ensureCapacity(size); // TODO: split records to 1MB blocks. if (overlapped) { // split to multiple cache blocks. long expectStartOffset = -1L; - List part = new ArrayList<>(records.size() / 2); - for (FlatStreamRecordBatch record : records) { - if (expectStartOffset == -1L || record.baseOffset == expectStartOffset) { + List part = new ArrayList<>(records.size() / 2); + for (StreamRecordBatch record : records) { + if (expectStartOffset == -1L || record.getBaseOffset() == expectStartOffset) { part.add(record); } else { put(streamId, streamCache, new CacheBlock(part)); part = new ArrayList<>(records.size() / 2); part.add(record); } - expectStartOffset = record.lastOffset(); + expectStartOffset = record.getLastOffset(); } if (!part.isEmpty()) { put(streamId, streamCache, new CacheBlock(part)); @@ -104,6 +105,10 @@ public void put(long streamId, List records) { } + /** + * Get records from cache. + * Note: the records is retained, the caller should release it. + */ public GetCacheResult get(long streamId, long startOffset, long endOffset, int maxBytes) { NavigableMap streamCache = stream2cache.get(streamId); if (streamCache == null) { @@ -114,7 +119,7 @@ public GetCacheResult get(long streamId, long startOffset, long endOffset, int m long nextStartOffset = startOffset; int nextMaxBytes = maxBytes; Readahead readahead = null; - LinkedList records = new LinkedList<>(); + LinkedList records = new LinkedList<>(); for (Map.Entry entry : streamCache.entrySet()) { CacheBlock cacheBlock = entry.getValue(); if (cacheBlock.lastOffset < nextStartOffset || nextStartOffset < cacheBlock.firstOffset) { @@ -124,7 +129,7 @@ public GetCacheResult get(long streamId, long startOffset, long endOffset, int m readahead = cacheBlock.readahead; } nextMaxBytes = readFromCacheBlock(records, cacheBlock, nextStartOffset, endOffset, nextMaxBytes); - nextStartOffset = records.getLast().lastOffset(); + nextStartOffset = records.getLast().getLastOffset(); boolean blockCompletedRead = nextStartOffset >= cacheBlock.lastOffset; CacheKey cacheKey = new CacheKey(streamId, cacheBlock.firstOffset); if (blockCompletedRead) { @@ -141,16 +146,18 @@ public GetCacheResult get(long streamId, long startOffset, long endOffset, int m } } + + records.forEach(StreamRecordBatch::retain); return GetCacheResult.of(records, readahead); } - private int readFromCacheBlock(LinkedList records, CacheBlock cacheBlock, + private int readFromCacheBlock(LinkedList records, CacheBlock cacheBlock, long nextStartOffset, long endOffset, int nextMaxBytes) { boolean matched = false; - for (FlatStreamRecordBatch record : cacheBlock.records) { - if (record.baseOffset <= nextStartOffset && record.lastOffset() > nextStartOffset) { + for (StreamRecordBatch record : cacheBlock.records) { + if (record.getBaseOffset() <= nextStartOffset && record.getLastOffset() > nextStartOffset) { records.add(record); - nextStartOffset = record.lastOffset(); + nextStartOffset = record.getLastOffset(); nextMaxBytes -= record.size(); matched = true; if (nextStartOffset >= endOffset || nextMaxBytes <= 0) { @@ -214,35 +221,35 @@ public boolean equals(Object obj) { } static class CacheBlock { - List records; + List records; long firstOffset; long lastOffset; int size; Readahead readahead; - public CacheBlock(List records, Readahead readahead) { + public CacheBlock(List records, Readahead readahead) { this.records = records; - this.firstOffset = records.get(0).baseOffset; - this.lastOffset = records.get(records.size() - 1).lastOffset(); - this.size = records.stream().mapToInt(FlatStreamRecordBatch::size).sum(); + this.firstOffset = records.get(0).getBaseOffset(); + this.lastOffset = records.get(records.size() - 1).getLastOffset(); + this.size = records.stream().mapToInt(StreamRecordBatch::size).sum(); this.readahead = readahead; } - public CacheBlock(List records) { + public CacheBlock(List records) { this(records, null); } public void free() { - records.forEach(r -> r.encodedBuf.release()); + records.forEach(StreamRecordBatch::release); records = null; } } public static class GetCacheResult { - private final List records; + private final List records; private final Readahead readahead; - private GetCacheResult(List records, Readahead readahead) { + private GetCacheResult(List records, Readahead readahead) { this.records = records; this.readahead = readahead; } @@ -251,11 +258,11 @@ public static GetCacheResult empty() { return new GetCacheResult(Collections.emptyList(), null); } - public static GetCacheResult of(List records, Readahead readahead) { + public static GetCacheResult of(List records, Readahead readahead) { return new GetCacheResult(records, readahead); } - public List getRecords() { + public List getRecords() { return records; } diff --git a/core/src/main/scala/kafka/log/s3/cache/DefaultS3BlockCache.java b/core/src/main/scala/kafka/log/s3/cache/DefaultS3BlockCache.java index 130d48e6bc..a7ec4f8582 100644 --- a/core/src/main/scala/kafka/log/s3/cache/DefaultS3BlockCache.java +++ b/core/src/main/scala/kafka/log/s3/cache/DefaultS3BlockCache.java @@ -27,6 +27,7 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; public class DefaultS3BlockCache implements S3BlockCache { @@ -50,6 +51,11 @@ public CompletableFuture read(long streamId, long startOffset, lo return read0(streamId, endOffset, context); } + @Override + public void put(Map> stream2records) { + stream2records.forEach(cache::put); + } + private CompletableFuture read0(long streamId, long endOffset, ReadContext context) { if (context.blocks == null || context.blocks.size() <= context.blockIndex) { if (context.objectIndex >= context.objects.size()) { @@ -74,6 +80,7 @@ private CompletableFuture read0(long streamId, long endOffset, Re while (it.hasNext()) { StreamRecordBatch recordBatch = it.next(); if (recordBatch.getStreamId() != streamId) { + recordBatch.release(); if (matched) { break; } @@ -81,6 +88,7 @@ private CompletableFuture read0(long streamId, long endOffset, Re } matched = true; if (recordBatch.getLastOffset() <= nextStartOffset) { + recordBatch.release(); continue; } context.records.add(recordBatch); diff --git a/core/src/main/scala/kafka/log/s3/cache/LogCache.java b/core/src/main/scala/kafka/log/s3/cache/LogCache.java index c96c6d5b6d..643bac4e2b 100644 --- a/core/src/main/scala/kafka/log/s3/cache/LogCache.java +++ b/core/src/main/scala/kafka/log/s3/cache/LogCache.java @@ -17,7 +17,7 @@ package kafka.log.s3.cache; -import kafka.log.s3.FlatStreamRecordBatch; +import kafka.log.s3.model.StreamRecordBatch; import java.util.ArrayList; import java.util.Collections; @@ -39,32 +39,39 @@ public LogCache(long cacheBlockMaxSize) { this.activeBlock = new LogCacheBlock(cacheBlockMaxSize); } - public boolean put(FlatStreamRecordBatch recordBatch) { + public boolean put(StreamRecordBatch recordBatch) { return activeBlock.put(recordBatch); } /** * Get streamId [startOffset, endOffset) range records with maxBytes limit. * If the cache only contain records after startOffset, the return list is empty. + * Note: the records is retained, the caller should release it. + * */ - public List get(long streamId, long startOffset, long endOffset, int maxBytes) { - List rst = new LinkedList<>(); + public List get(long streamId, long startOffset, long endOffset, int maxBytes) { + List records = get0(streamId, startOffset, endOffset, maxBytes); + records.forEach(StreamRecordBatch::retain); + return records; + } + public List get0(long streamId, long startOffset, long endOffset, int maxBytes) { + List rst = new LinkedList<>(); long nextStartOffset = startOffset; int nextMaxBytes = maxBytes; for (LogCacheBlock archiveBlock : archiveBlocks) { // TODO: fast break when cache doesn't contains the startOffset. - List records = archiveBlock.get(streamId, nextStartOffset, endOffset, nextMaxBytes); + List records = archiveBlock.get(streamId, nextStartOffset, endOffset, nextMaxBytes); if (records.isEmpty()) { continue; } - nextStartOffset = records.get(records.size() - 1).lastOffset(); - nextMaxBytes -= Math.min(nextMaxBytes, records.stream().mapToInt(r -> r.encodedBuf().readableBytes()).sum()); + nextStartOffset = records.get(records.size() - 1).getLastOffset(); + nextMaxBytes -= Math.min(nextMaxBytes, records.stream().mapToInt(StreamRecordBatch::size).sum()); rst.addAll(records); if (nextStartOffset >= endOffset || nextMaxBytes == 0) { return rst; } } - List records = activeBlock.get(streamId, nextStartOffset, endOffset, nextMaxBytes); + List records = activeBlock.get(streamId, nextStartOffset, endOffset, nextMaxBytes); rst.addAll(records); return rst; } @@ -97,7 +104,7 @@ public static class LogCacheBlock { private static final AtomicLong BLOCK_ID_ALLOC = new AtomicLong(); private final long blockId; private final long maxSize; - private final Map> map = new HashMap<>(); + private final Map> map = new HashMap<>(); private long size = 0; private long confirmOffset; @@ -110,20 +117,20 @@ public long blockId() { return blockId; } - public boolean put(FlatStreamRecordBatch recordBatch) { - List streamCache = map.computeIfAbsent(recordBatch.streamId, id -> new ArrayList<>()); + public boolean put(StreamRecordBatch recordBatch) { + List streamCache = map.computeIfAbsent(recordBatch.getStreamId(), id -> new ArrayList<>()); streamCache.add(recordBatch); int recordSize = recordBatch.size(); size += recordSize; return size >= maxSize; } - public List get(long streamId, long startOffset, long endOffset, int maxBytes) { - List streamRecords = map.get(streamId); + public List get(long streamId, long startOffset, long endOffset, int maxBytes) { + List streamRecords = map.get(streamId); if (streamRecords == null) { return Collections.emptyList(); } - if (streamRecords.get(0).baseOffset > startOffset || streamRecords.get(streamRecords.size() - 1).lastOffset() <= startOffset) { + if (streamRecords.get(0).getBaseOffset() > startOffset || streamRecords.get(streamRecords.size() - 1).getLastOffset() <= startOffset) { return Collections.emptyList(); } int startIndex = -1; @@ -131,14 +138,14 @@ public List get(long streamId, long startOffset, long end int remainingBytesSize = maxBytes; // TODO: binary search the startOffset. for (int i = 0; i < streamRecords.size(); i++) { - FlatStreamRecordBatch record = streamRecords.get(i); - if (startIndex == -1 && record.baseOffset <= startOffset && record.lastOffset() > startOffset) { + StreamRecordBatch record = streamRecords.get(i); + if (startIndex == -1 && record.getBaseOffset() <= startOffset && record.getLastOffset() > startOffset) { startIndex = i; } if (startIndex != -1) { endIndex = i + 1; - remainingBytesSize -= Math.min(remainingBytesSize, record.encodedBuf().readableBytes()); - if (record.lastOffset() >= endOffset || remainingBytesSize == 0) { + remainingBytesSize -= Math.min(remainingBytesSize, record.size()); + if (record.getLastOffset() >= endOffset || remainingBytesSize == 0) { break; } } @@ -146,7 +153,7 @@ public List get(long streamId, long startOffset, long end return streamRecords.subList(startIndex, endIndex); } - public Map> records() { + public Map> records() { return map; } diff --git a/core/src/main/scala/kafka/log/s3/cache/S3BlockCache.java b/core/src/main/scala/kafka/log/s3/cache/S3BlockCache.java index d4e7cea55b..918a507367 100644 --- a/core/src/main/scala/kafka/log/s3/cache/S3BlockCache.java +++ b/core/src/main/scala/kafka/log/s3/cache/S3BlockCache.java @@ -17,6 +17,10 @@ package kafka.log.s3.cache; +import kafka.log.s3.model.StreamRecordBatch; + +import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; /** @@ -28,4 +32,5 @@ public interface S3BlockCache { CompletableFuture read(long streamId, long startOffset, long endOffset, int maxBytes); + void put(Map> stream2records); } diff --git a/core/src/main/scala/kafka/log/s3/model/StreamRecordBatch.java b/core/src/main/scala/kafka/log/s3/model/StreamRecordBatch.java index b88135fe69..c599f75f3c 100644 --- a/core/src/main/scala/kafka/log/s3/model/StreamRecordBatch.java +++ b/core/src/main/scala/kafka/log/s3/model/StreamRecordBatch.java @@ -18,18 +18,37 @@ package kafka.log.s3.model; import com.automq.elasticstream.client.api.RecordBatch; +import io.netty.buffer.ByteBuf; +import kafka.log.s3.StreamRecordBatchCodec; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Map; public class StreamRecordBatch implements Comparable { private final long streamId; private final long epoch; private final long baseOffset; - private final RecordBatch recordBatch; + private final int count; + private ByteBuf payload; + private ByteBuf encoded; - public StreamRecordBatch(long streamId, long epoch, long baseOffset, RecordBatch recordBatch) { + public StreamRecordBatch(long streamId, long epoch, long baseOffset, int count, ByteBuf payload) { this.streamId = streamId; this.epoch = epoch; this.baseOffset = baseOffset; - this.recordBatch = recordBatch; + this.count = count; + this.payload = payload; + } + + public ByteBuf encoded() { + if (encoded == null) { + encoded = StreamRecordBatchCodec.encode(this); + ByteBuf oldPayload = payload; + payload = encoded.slice(encoded.readerIndex() + encoded.readableBytes() - payload.readableBytes(), payload.readableBytes()); + oldPayload.release(); + } + return encoded.duplicate(); } public long getStreamId() { @@ -45,11 +64,55 @@ public long getBaseOffset() { } public long getLastOffset() { - return baseOffset + recordBatch.count(); + return baseOffset + count; + } + + public ByteBuf getPayload() { + return payload; } public RecordBatch getRecordBatch() { - return recordBatch; + return new RecordBatch() { + @Override + public int count() { + return count; + } + + @Override + public long baseTimestamp() { + return 0; + } + + @Override + public Map properties() { + return Collections.emptyMap(); + } + + @Override + public ByteBuffer rawPayload() { + return payload.nioBuffer(); + } + }; + } + + public int size() { + return payload.readableBytes(); + } + + public void retain() { + if (encoded != null) { + encoded.retain(); + } else { + payload.retain(); + } + } + + public void release() { + if (encoded != null) { + encoded.release(); + } else { + payload.release(); + } } @Override diff --git a/core/src/test/java/kafka/log/s3/DefaultS3BlockCacheTest.java b/core/src/test/java/kafka/log/s3/DefaultS3BlockCacheTest.java index 94e8aeaf98..0c6d300f1a 100644 --- a/core/src/test/java/kafka/log/s3/DefaultS3BlockCacheTest.java +++ b/core/src/test/java/kafka/log/s3/DefaultS3BlockCacheTest.java @@ -17,14 +17,13 @@ package kafka.log.s3; -import com.automq.elasticstream.client.api.RecordBatch; import kafka.log.s3.cache.DefaultS3BlockCache; import kafka.log.s3.cache.ReadDataBlock; import kafka.log.s3.model.StreamRecordBatch; import kafka.log.s3.objects.ObjectManager; -import org.apache.kafka.metadata.stream.S3ObjectMetadata; import kafka.log.s3.operator.MemoryS3Operator; import kafka.log.s3.operator.S3Operator; +import org.apache.kafka.metadata.stream.S3ObjectMetadata; import org.apache.kafka.metadata.stream.S3ObjectType; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; @@ -54,20 +53,20 @@ public void setup() { @Test public void testRead() throws Exception { ObjectWriter objectWriter = new ObjectWriter(0, s3Operator, 1024, 1024); - objectWriter.write(FlatStreamRecordBatch.from(newRecord(233, 10, 5, 512))); - objectWriter.write(FlatStreamRecordBatch.from(newRecord(233, 15, 10, 512))); - objectWriter.write(FlatStreamRecordBatch.from(newRecord(233, 25, 5, 512))); - objectWriter.write(FlatStreamRecordBatch.from(newRecord(234, 0, 5, 512))); + objectWriter.write(newRecord(233, 10, 5, 512)); + objectWriter.write(newRecord(233, 15, 10, 512)); + objectWriter.write(newRecord(233, 25, 5, 512)); + objectWriter.write(newRecord(234, 0, 5, 512)); objectWriter.close(); S3ObjectMetadata metadata1 = new S3ObjectMetadata(0, objectWriter.size(), S3ObjectType.WAL_LOOSE); objectWriter = new ObjectWriter(1, s3Operator, 1024, 1024); - objectWriter.write(FlatStreamRecordBatch.from(newRecord(233, 30, 10, 512))); + objectWriter.write(newRecord(233, 30, 10, 512)); objectWriter.close(); S3ObjectMetadata metadata2 = new S3ObjectMetadata(1, objectWriter.size(), S3ObjectType.WAL_LOOSE); objectWriter = new ObjectWriter(2, s3Operator, 1024, 1024); - objectWriter.write(FlatStreamRecordBatch.from(newRecord(233, 40, 20, 512))); + objectWriter.write(newRecord(233, 40, 20, 512)); objectWriter.close(); S3ObjectMetadata metadata3 = new S3ObjectMetadata(2, objectWriter.size(), S3ObjectType.WAL_LOOSE); @@ -85,8 +84,7 @@ public void testRead() throws Exception { } StreamRecordBatch newRecord(long streamId, long offset, int count, int payloadSize) { - RecordBatch recordBatch = DefaultRecordBatch.of(count, payloadSize); - return new StreamRecordBatch(streamId, 0, offset, recordBatch); + return new StreamRecordBatch(streamId, 0, offset, count, TestUtils.random(payloadSize)); } diff --git a/core/src/test/java/kafka/log/s3/ObjectWriterTest.java b/core/src/test/java/kafka/log/s3/ObjectWriterTest.java index 5a10372b44..421c5c4609 100644 --- a/core/src/test/java/kafka/log/s3/ObjectWriterTest.java +++ b/core/src/test/java/kafka/log/s3/ObjectWriterTest.java @@ -17,12 +17,11 @@ package kafka.log.s3; -import com.automq.elasticstream.client.api.RecordBatch; import kafka.log.s3.model.StreamRecordBatch; import kafka.log.s3.objects.ObjectStreamRange; -import org.apache.kafka.metadata.stream.S3ObjectMetadata; import kafka.log.s3.operator.MemoryS3Operator; import kafka.log.s3.operator.S3Operator; +import org.apache.kafka.metadata.stream.S3ObjectMetadata; import org.apache.kafka.metadata.stream.S3ObjectType; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -44,13 +43,13 @@ public void testWrite() throws ExecutionException, InterruptedException { S3Operator s3Operator = new MemoryS3Operator(); ObjectWriter objectWriter = new ObjectWriter(1, s3Operator, 1024, 1024); StreamRecordBatch r1 = newRecord(233, 10, 5, 512); - objectWriter.write(FlatStreamRecordBatch.from(r1)); + objectWriter.write(r1); StreamRecordBatch r2 = newRecord(233, 15, 10, 512); - objectWriter.write(FlatStreamRecordBatch.from(r2)); + objectWriter.write(r2); StreamRecordBatch r3 = newRecord(233, 25, 5, 512); - objectWriter.write(FlatStreamRecordBatch.from(r3)); + objectWriter.write(r3); StreamRecordBatch r4 = newRecord(234, 0, 5, 512); - objectWriter.write(FlatStreamRecordBatch.from(r4)); + objectWriter.write(r4); objectWriter.close().get(); List streamRanges = objectWriter.getStreamRanges(); @@ -111,7 +110,6 @@ public void testWrite_closeBlock() { } StreamRecordBatch newRecord(long streamId, long offset, int count, int payloadSize) { - RecordBatch recordBatch = DefaultRecordBatch.of(count, payloadSize); - return new StreamRecordBatch(streamId, 0, offset, recordBatch); + return new StreamRecordBatch(streamId, 0, offset, count, TestUtils.random(payloadSize)); } } diff --git a/core/src/test/java/kafka/log/s3/S3StorageTest.java b/core/src/test/java/kafka/log/s3/S3StorageTest.java index 0ef201ed48..45c3610f66 100644 --- a/core/src/test/java/kafka/log/s3/S3StorageTest.java +++ b/core/src/test/java/kafka/log/s3/S3StorageTest.java @@ -43,6 +43,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import static kafka.log.s3.TestUtils.random; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; @@ -73,9 +74,15 @@ public void testAppend() throws Exception { CommitWALObjectResponse resp = new CommitWALObjectResponse(); when(objectManager.commitWALObject(any())).thenReturn(CompletableFuture.completedFuture(resp)); - CompletableFuture cf1 = storage.append(new StreamRecordBatch(233, 1, 10, DefaultRecordBatch.of(1, 100))); - CompletableFuture cf2 = storage.append(new StreamRecordBatch(233, 1, 11, DefaultRecordBatch.of(2, 100))); - CompletableFuture cf3 = storage.append(new StreamRecordBatch(234, 3, 100, DefaultRecordBatch.of(1, 100))); + CompletableFuture cf1 = storage.append( + new StreamRecordBatch(233, 1, 10, 1, random(100)) + ); + CompletableFuture cf2 = storage.append( + new StreamRecordBatch(233, 1, 11, 2, random(100)) + ); + CompletableFuture cf3 = storage.append( + new StreamRecordBatch(234, 3, 100, 1, random(100)) + ); cf1.get(3, TimeUnit.SECONDS); cf2.get(3, TimeUnit.SECONDS); @@ -168,8 +175,7 @@ public void testUploadWALObject_sequence() throws ExecutionException, Interrupte cf2.get(1, TimeUnit.SECONDS); } - private static FlatStreamRecordBatch newRecord(long streamId, long offset) { - StreamRecordBatch recordBatch = new StreamRecordBatch(streamId, 0, offset, DefaultRecordBatch.of(1, 1024)); - return FlatStreamRecordBatch.from(recordBatch); + private static StreamRecordBatch newRecord(long streamId, long offset) { + return new StreamRecordBatch(streamId, 0, offset, 1, random(1)); } } diff --git a/core/src/test/java/kafka/log/s3/S3StreamTest.java b/core/src/test/java/kafka/log/s3/S3StreamTest.java index 42229e6787..01dfdd6017 100644 --- a/core/src/test/java/kafka/log/s3/S3StreamTest.java +++ b/core/src/test/java/kafka/log/s3/S3StreamTest.java @@ -19,7 +19,6 @@ import com.automq.elasticstream.client.api.ElasticStreamClientException; import com.automq.elasticstream.client.api.FetchResult; -import com.automq.elasticstream.client.api.RecordBatch; import kafka.log.s3.cache.ReadDataBlock; import kafka.log.s3.model.StreamRecordBatch; import kafka.log.s3.streams.StreamManager; @@ -75,8 +74,7 @@ public void testFetch() throws Throwable { } ReadDataBlock newReadDataBlock(long start, long end, int size) { - RecordBatch recordBatch = DefaultRecordBatch.of((int) (end - start), size); - StreamRecordBatch record = new StreamRecordBatch(0, 0, start, recordBatch); + StreamRecordBatch record = new StreamRecordBatch(0, 0, start, (int) (end - start), TestUtils.random(size)); return new ReadDataBlock(List.of(record)); } } diff --git a/core/src/test/java/kafka/log/s3/TestUtils.java b/core/src/test/java/kafka/log/s3/TestUtils.java new file mode 100644 index 0000000000..20de16704b --- /dev/null +++ b/core/src/test/java/kafka/log/s3/TestUtils.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + +import java.util.Random; + +public class TestUtils { + + public static ByteBuf random(int size) { + byte[] bytes = new byte[size]; + new Random().nextBytes(bytes); + return Unpooled.wrappedBuffer(bytes); + } + +} diff --git a/core/src/test/java/kafka/log/s3/WALObjectUploadTaskTest.java b/core/src/test/java/kafka/log/s3/WALObjectUploadTaskTest.java index 5f6538049a..f2345de1f5 100644 --- a/core/src/test/java/kafka/log/s3/WALObjectUploadTaskTest.java +++ b/core/src/test/java/kafka/log/s3/WALObjectUploadTaskTest.java @@ -37,6 +37,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; +import static kafka.log.s3.TestUtils.random; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; @@ -64,15 +65,15 @@ public void testTryCompact() throws Exception { doAnswer(invocation -> CompletableFuture.completedFuture(objectIdAlloc.getAndIncrement())).when(objectManager).prepareObject(anyInt(), anyLong()); when(objectManager.commitWALObject(any())).thenReturn(CompletableFuture.completedFuture(new CommitWALObjectResponse())); - Map> map = new HashMap<>(); + Map> map = new HashMap<>(); map.put(233L, List.of( - FlatStreamRecordBatch.from(new StreamRecordBatch(233, 0, 10, DefaultRecordBatch.of(2, 512))), - FlatStreamRecordBatch.from(new StreamRecordBatch(233, 0, 12, DefaultRecordBatch.of(2, 128))), - FlatStreamRecordBatch.from(new StreamRecordBatch(233, 0, 14, DefaultRecordBatch.of(2, 512))) + new StreamRecordBatch(233, 0, 10, 2, random(512)), + new StreamRecordBatch(233, 0, 12, 2, random(128)), + new StreamRecordBatch(233, 0, 14, 2, random(512)) )); map.put(234L, List.of( - FlatStreamRecordBatch.from(new StreamRecordBatch(234, 0, 20, DefaultRecordBatch.of(2, 128))), - FlatStreamRecordBatch.from(new StreamRecordBatch(234, 0, 22, DefaultRecordBatch.of(2, 128))) + new StreamRecordBatch(234, 0, 20, 2, random(128)), + new StreamRecordBatch(234, 0, 22, 2, random(128)) )); walObjectUploadTask = new WALObjectUploadTask(map, objectManager, s3Operator, 16 * 1024 * 1024, 16 * 1024 * 1024, 1000); diff --git a/core/src/test/java/kafka/log/s3/cache/BlockCacheTest.java b/core/src/test/java/kafka/log/s3/cache/BlockCacheTest.java index dab3adbf62..b805ec4b3c 100644 --- a/core/src/test/java/kafka/log/s3/cache/BlockCacheTest.java +++ b/core/src/test/java/kafka/log/s3/cache/BlockCacheTest.java @@ -17,8 +17,7 @@ package kafka.log.s3.cache; -import kafka.log.s3.DefaultRecordBatch; -import kafka.log.s3.FlatStreamRecordBatch; +import kafka.log.s3.TestUtils; import kafka.log.s3.model.StreamRecordBatch; import org.junit.jupiter.api.Test; @@ -51,12 +50,12 @@ public void testPutGet() { )); BlockCache.GetCacheResult rst = blockCache.get(233L, 10L, 20L, 1024); - List records = rst.getRecords(); + List records = rst.getRecords(); assertEquals(4, records.size()); - assertEquals(10L, records.get(0).baseOffset); - assertEquals(12L, records.get(1).baseOffset); - assertEquals(14L, records.get(2).baseOffset); - assertEquals(16L, records.get(3).baseOffset); + assertEquals(10L, records.get(0).getBaseOffset()); + assertEquals(12L, records.get(1).getBaseOffset()); + assertEquals(14L, records.get(2).getBaseOffset()); + assertEquals(16L, records.get(3).getBaseOffset()); } @Test @@ -90,9 +89,8 @@ public void testLRU() { assertNull(lru.pop()); } - private static FlatStreamRecordBatch newRecord(long streamId, long offset, int count, int size) { - StreamRecordBatch recordBatch = new StreamRecordBatch(streamId, 0, offset, DefaultRecordBatch.of(count, size)); - return FlatStreamRecordBatch.from(recordBatch); + private static StreamRecordBatch newRecord(long streamId, long offset, int count, int size) { + return new StreamRecordBatch(streamId, 0, offset, count, TestUtils.random(size)); } }