From 346321049076e44a6d2da77ab422dc96c983b1fe Mon Sep 17 00:00:00 2001 From: Robin Han Date: Tue, 5 Sep 2023 15:16:45 +0800 Subject: [PATCH 1/2] feat(stream-client): add block cache Signed-off-by: Robin Han --- config/kraft/server.properties | 2 +- .../scala/kafka/log/s3/DefaultS3Client.java | 2 +- .../kafka/log/s3/FlatStreamRecordBatch.java | 4 + .../main/scala/kafka/log/s3/S3Storage.java | 4 +- .../kafka/log/s3/WALObjectUploadTask.java | 2 +- .../scala/kafka/log/s3/cache/BlockCache.java | 308 ++++++++++++++++++ .../log/s3/cache/DefaultS3BlockCache.java | 8 +- .../scala/kafka/log/s3/cache/LogCache.java | 2 +- .../main/scala/kafka/server/KafkaConfig.scala | 10 +- .../kafka/log/s3/DefaultS3BlockCacheTest.java | 2 +- .../test/java/kafka/log/s3/S3StorageTest.java | 2 +- .../java/kafka/log/s3/S3StreamMemoryTest.java | 2 +- .../kafka/log/s3/cache/BlockCacheTest.java | 98 ++++++ 13 files changed, 432 insertions(+), 14 deletions(-) create mode 100644 core/src/main/scala/kafka/log/s3/cache/BlockCache.java create mode 100644 core/src/test/java/kafka/log/s3/cache/BlockCacheTest.java diff --git a/config/kraft/server.properties b/config/kraft/server.properties index 5dc99bcd55..4546e9e895 100644 --- a/config/kraft/server.properties +++ b/config/kraft/server.properties @@ -157,4 +157,4 @@ elasticstream.endpoint=s3:// # s3.endpoint=http://127.0.0.1:4566 s3.endpoint=https://s3.amazonaws.com s3.region=us-east-1 -s3.bucket=ko3 +s3.bucket=ko3 \ No newline at end of file diff --git a/core/src/main/scala/kafka/log/s3/DefaultS3Client.java b/core/src/main/scala/kafka/log/s3/DefaultS3Client.java index 4a427065a1..a716b97429 100644 --- a/core/src/main/scala/kafka/log/s3/DefaultS3Client.java +++ b/core/src/main/scala/kafka/log/s3/DefaultS3Client.java @@ -60,7 +60,7 @@ public DefaultS3Client(BrokerServer brokerServer, KafkaConfig config, S3Operator this.requestSender = new ControllerRequestSender(brokerServer); this.streamManager = new ControllerStreamManager(this.requestSender, config); this.objectManager = new ControllerObjectManager(this.requestSender, this.metadataManager, this.config); - this.blockCache = new DefaultS3BlockCache(objectManager, operator); + this.blockCache = new DefaultS3BlockCache(config.s3CacheSize() * 1024 * 1024, objectManager, operator); this.storage = new S3Storage(config, new MemoryWriteAheadLog(), objectManager, blockCache, operator); this.streamClient = new S3StreamClient(this.streamManager, this.storage); this.kvClient = new ControllerKVClient(this.requestSender); diff --git a/core/src/main/scala/kafka/log/s3/FlatStreamRecordBatch.java b/core/src/main/scala/kafka/log/s3/FlatStreamRecordBatch.java index 3e21a52a6b..b3a00fc604 100644 --- a/core/src/main/scala/kafka/log/s3/FlatStreamRecordBatch.java +++ b/core/src/main/scala/kafka/log/s3/FlatStreamRecordBatch.java @@ -45,6 +45,10 @@ public ByteBuf encodedBuf() { return encodedBuf.duplicate(); } + public int size() { + return encodedBuf.readableBytes(); + } + @Override public int compareTo(FlatStreamRecordBatch o) { @SuppressWarnings("DuplicatedCode") diff --git a/core/src/main/scala/kafka/log/s3/S3Storage.java b/core/src/main/scala/kafka/log/s3/S3Storage.java index 44f105bda9..21e143af73 100644 --- a/core/src/main/scala/kafka/log/s3/S3Storage.java +++ b/core/src/main/scala/kafka/log/s3/S3Storage.java @@ -81,7 +81,7 @@ public void close() { public CompletableFuture append(StreamRecordBatch streamRecord) { //TODO: copy to pooled bytebuffer to reduce gc, convert to flat record FlatStreamRecordBatch flatStreamRecordBatch = FlatStreamRecordBatch.from(streamRecord); - WriteAheadLog.AppendResult appendResult = log.append(flatStreamRecordBatch.encodedBuf.duplicate()); + WriteAheadLog.AppendResult appendResult = log.append(flatStreamRecordBatch.encodedBuf()); CompletableFuture cf = new CompletableFuture<>(); WalWriteRequest writeRequest = new WalWriteRequest(flatStreamRecordBatch, appendResult.offset, cf); callbackSequencer.before(writeRequest); @@ -153,7 +153,7 @@ CompletableFuture uploadWALObject(LogCache.LogCacheBlock logCacheBlock) { private void uploadWALObject0(LogCache.LogCacheBlock logCacheBlock, CompletableFuture cf) { WALObjectUploadTask walObjectUploadTask = new WALObjectUploadTask(logCacheBlock.records(), objectManager, s3Operator, - config.s3ObjectBlockSizeProp(), config.s3ObjectPartSizeProp(), config.s3StreamSplitSizeProp()); + config.s3ObjectBlockSize(), config.s3ObjectPartSize(), config.s3StreamSplitSize()); WALObjectUploadTaskContext context = new WALObjectUploadTaskContext(); context.task = walObjectUploadTask; context.cache = logCacheBlock; diff --git a/core/src/main/scala/kafka/log/s3/WALObjectUploadTask.java b/core/src/main/scala/kafka/log/s3/WALObjectUploadTask.java index d8003bd096..4130fa6e58 100644 --- a/core/src/main/scala/kafka/log/s3/WALObjectUploadTask.java +++ b/core/src/main/scala/kafka/log/s3/WALObjectUploadTask.java @@ -82,7 +82,7 @@ private void upload0(long objectId) { for (Long streamId : streamIds) { List streamRecords = streamRecordsMap.get(streamId); - long streamSize = streamRecords.stream().mapToLong(r -> r.encodedBuf.readableBytes()).sum(); + long streamSize = streamRecords.stream().mapToLong(FlatStreamRecordBatch::size).sum(); if (streamSize >= streamSplitSizeThreshold) { streamObjectCfList.add(writeStreamObject(streamRecords)); } else { diff --git a/core/src/main/scala/kafka/log/s3/cache/BlockCache.java b/core/src/main/scala/kafka/log/s3/cache/BlockCache.java new file mode 100644 index 0000000000..a76bd145d7 --- /dev/null +++ b/core/src/main/scala/kafka/log/s3/cache/BlockCache.java @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.cache; + +import kafka.log.s3.FlatStreamRecordBatch; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicLong; + +public class BlockCache { + private final long maxSize; + private final Map> stream2cache = new HashMap<>(); + private final LRUCache inactive = new LRUCache<>(); + private final LRUCache active = new LRUCache<>(); + private final AtomicLong size = new AtomicLong(); + + public BlockCache(long maxSize) { + this.maxSize = maxSize; + } + + public void put(long streamId, List records) { + if (maxSize == 0 || records.isEmpty()) { + return; + } + boolean overlapped = false; + records = new ArrayList<>(records); + NavigableMap streamCache = stream2cache.computeIfAbsent(streamId, id -> new TreeMap<>()); + long startOffset = records.get(0).baseOffset; + long endOffset = records.get(records.size() - 1).lastOffset(); + // TODO: generate readahead. + Map.Entry floorEntry = streamCache.floorEntry(startOffset); + SortedMap tailMap = streamCache.tailMap(floorEntry != null ? floorEntry.getKey() : startOffset); + // remove overlapped part. + for (Map.Entry entry : tailMap.entrySet()) { + CacheBlock cacheBlock = entry.getValue(); + if (cacheBlock.firstOffset >= endOffset) { + break; + } + // overlap is a rare case, so removeIf is fine for the performance. + if (records.removeIf(record -> record.lastOffset() > cacheBlock.firstOffset && record.baseOffset < cacheBlock.lastOffset)) { + overlapped = true; + } + } + + // ensure the cache size. + int size = records.stream().mapToInt(FlatStreamRecordBatch::size).sum(); + ensureCapacity(size); + + // TODO: split records to 1MB blocks. + if (overlapped) { + // split to multiple cache blocks. + long expectStartOffset = -1L; + List part = new ArrayList<>(records.size() / 2); + for (FlatStreamRecordBatch record : records) { + if (expectStartOffset == -1L || record.baseOffset == expectStartOffset) { + part.add(record); + } else { + put(streamId, streamCache, new CacheBlock(part)); + part = new ArrayList<>(records.size() / 2); + part.add(record); + } + expectStartOffset = record.lastOffset(); + } + if (!part.isEmpty()) { + put(streamId, streamCache, new CacheBlock(part)); + } + } else { + put(streamId, streamCache, new CacheBlock(records)); + } + + } + + public GetCacheResult get(long streamId, long startOffset, long endOffset, int maxBytes) { + NavigableMap streamCache = stream2cache.get(streamId); + if (streamCache == null) { + return GetCacheResult.empty(); + } + Map.Entry floorEntry = streamCache.floorEntry(startOffset); + streamCache = streamCache.tailMap(floorEntry != null ? floorEntry.getKey() : startOffset, true); + long nextStartOffset = startOffset; + int nextMaxBytes = maxBytes; + Readahead readahead = null; + LinkedList records = new LinkedList<>(); + for (Map.Entry entry : streamCache.entrySet()) { + CacheBlock cacheBlock = entry.getValue(); + if (cacheBlock.lastOffset < nextStartOffset || nextStartOffset < cacheBlock.firstOffset) { + break; + } + if (readahead == null && cacheBlock.readahead != null) { + readahead = cacheBlock.readahead; + } + boolean matched = false; + for (FlatStreamRecordBatch record : cacheBlock.records) { + if (record.baseOffset <= nextStartOffset && record.lastOffset() > nextStartOffset) { + records.add(record); + nextStartOffset = record.lastOffset(); + nextMaxBytes -= record.size(); + matched = true; + if (nextStartOffset >= endOffset || nextMaxBytes <= 0) { + break; + } + } else if (matched) { + break; + } + } + boolean blockCompletedRead = records.getLast().lastOffset() >= cacheBlock.lastOffset; + CacheKey cacheKey = new CacheKey(streamId, cacheBlock.firstOffset); + if (blockCompletedRead) { + active.remove(cacheKey); + inactive.put(cacheKey, cacheBlock.size); + } else { + if (!active.touch(cacheKey)) { + inactive.touch(cacheKey); + } + } + + if (nextStartOffset >= endOffset || nextMaxBytes <= 0) { + break; + } + + } + return GetCacheResult.of(records, readahead); + } + + private void ensureCapacity(int size) { + if (maxSize - this.size.get() >= size) { + return; + } + for (LRUCache lru : List.of(inactive, active)) { + for (; ; ) { + Map.Entry entry = lru.pop(); + if (entry == null) { + break; + } + CacheBlock cacheBlock = stream2cache.get(entry.getKey().streamId).remove(entry.getKey().startOffset); + cacheBlock.free(); + if (maxSize - this.size.addAndGet(-entry.getValue()) >= size) { + return; + } + } + } + } + + private void put(long streamId, NavigableMap streamCache, CacheBlock cacheBlock) { + streamCache.put(cacheBlock.firstOffset, cacheBlock); + active.put(new CacheKey(streamId, cacheBlock.firstOffset), cacheBlock.size); + size.getAndAdd(cacheBlock.size); + } + + static class CacheKey { + final long streamId; + final long startOffset; + + public CacheKey(long streamId, long startOffset) { + this.streamId = streamId; + this.startOffset = startOffset; + } + + @Override + public int hashCode() { + return Objects.hash(streamId, startOffset); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof CacheKey) { + CacheKey other = (CacheKey) obj; + return streamId == other.streamId && startOffset == other.startOffset; + } else { + return false; + } + } + } + + static class CacheBlock { + List records; + long firstOffset; + long lastOffset; + int size; + Readahead readahead; + + public CacheBlock(List records) { + this.records = records; + this.firstOffset = records.get(0).baseOffset; + this.lastOffset = records.get(records.size() - 1).lastOffset(); + this.size = records.stream().mapToInt(FlatStreamRecordBatch::size).sum(); + } + + public void free() { + records.forEach(r -> r.encodedBuf.release()); + records = null; + } + } + + static class GetCacheResult { + private final List records; + private final Readahead readahead; + + private GetCacheResult(List records, Readahead readahead) { + this.records = records; + this.readahead = readahead; + } + + public static GetCacheResult empty() { + return new GetCacheResult(Collections.emptyList(), null); + } + + public static GetCacheResult of(List records, Readahead readahead) { + return new GetCacheResult(records, readahead); + } + + public List getRecords() { + return records; + } + + public Optional getReadahead() { + if (readahead == null) { + return Optional.empty(); + } else { + return Optional.of(readahead); + } + } + } + + static class Readahead { + private final long startOffset; + private final int size; + + public Readahead(long startOffset, int size) { + this.startOffset = startOffset; + this.size = size; + } + + public long getStartOffset() { + return startOffset; + } + + public int getSize() { + return size; + } + } + + static class LRUCache { + private final LinkedHashMap cache; + private final Set> cacheEntrySet; + + public LRUCache() { + cache = new LinkedHashMap<>(16, .75f, true); + cacheEntrySet = cache.entrySet(); + } + + public boolean touch(K key) { + return cache.get(key) != null; + } + + public void put(K key, V value) { + if (cache.put(key, value) != null) { + touch(key); + } + } + + public Map.Entry pop() { + Iterator> it = cacheEntrySet.iterator(); + if (!it.hasNext()) { + return null; + } + Map.Entry entry = it.next(); + if (entry == null) { + return null; + } + it.remove(); + return entry; + } + + public boolean remove(K key) { + return cache.remove(key) != null; + } + } + +} diff --git a/core/src/main/scala/kafka/log/s3/cache/DefaultS3BlockCache.java b/core/src/main/scala/kafka/log/s3/cache/DefaultS3BlockCache.java index 61a6e43109..130d48e6bc 100644 --- a/core/src/main/scala/kafka/log/s3/cache/DefaultS3BlockCache.java +++ b/core/src/main/scala/kafka/log/s3/cache/DefaultS3BlockCache.java @@ -20,9 +20,9 @@ import kafka.log.s3.ObjectReader; import kafka.log.s3.model.StreamRecordBatch; import kafka.log.s3.objects.ObjectManager; -import org.apache.kafka.metadata.stream.S3ObjectMetadata; import kafka.log.s3.operator.S3Operator; import org.apache.kafka.common.utils.CloseableIterator; +import org.apache.kafka.metadata.stream.S3ObjectMetadata; import java.util.Collections; import java.util.LinkedList; @@ -30,10 +30,12 @@ import java.util.concurrent.CompletableFuture; public class DefaultS3BlockCache implements S3BlockCache { + private final BlockCache cache; private final ObjectManager objectManager; private final S3Operator s3Operator; - public DefaultS3BlockCache(ObjectManager objectManager, S3Operator s3Operator) { + public DefaultS3BlockCache(long cacheBytesSize, ObjectManager objectManager, S3Operator s3Operator) { + this.cache = new BlockCache(cacheBytesSize); this.objectManager = objectManager; this.s3Operator = s3Operator; } @@ -119,4 +121,6 @@ public ReadContext(List objects, long startOffset, int maxByte } } + + } diff --git a/core/src/main/scala/kafka/log/s3/cache/LogCache.java b/core/src/main/scala/kafka/log/s3/cache/LogCache.java index e644da77ab..c96c6d5b6d 100644 --- a/core/src/main/scala/kafka/log/s3/cache/LogCache.java +++ b/core/src/main/scala/kafka/log/s3/cache/LogCache.java @@ -113,7 +113,7 @@ public long blockId() { public boolean put(FlatStreamRecordBatch recordBatch) { List streamCache = map.computeIfAbsent(recordBatch.streamId, id -> new ArrayList<>()); streamCache.add(recordBatch); - int recordSize = recordBatch.encodedBuf.readableBytes(); + int recordSize = recordBatch.size(); size += recordSize; return size >= maxSize; } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 91299dfa8d..fe9730190e 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -683,6 +683,7 @@ object KafkaConfig { val S3StreamSplitSizeProp = "s3.stream.object.split.size" val S3ObjectBlockSizeProp = "s3.object.block.size" val S3ObjectPartSizeProp = "s3.object.part.size" + val S3CacheSizeProp = "s3.cache.size" val S3EndpointDoc = "The S3 endpoint, ex. https://s3.{region}.amazonaws.com." val S3RegionDoc = "The S3 region, ex. us-east-1." @@ -691,6 +692,7 @@ object KafkaConfig { val S3StreamSplitSizeDoc = "The S3 stream object split size threshold when upload WAL object or compact object." val S3ObjectBlockSizeDoc = "The S3 object compressed block size threshold." val S3ObjectPartSizeDoc = "The S3 object multi-part upload part size threshold." + val S3CacheSizeDoc = "The S3 block cache size in MiB." // Kafka on S3 inject end @@ -1498,6 +1500,7 @@ object KafkaConfig { .define(S3StreamSplitSizeProp, INT, 16777216, MEDIUM, S3StreamSplitSizeDoc) .define(S3ObjectBlockSizeProp, INT, 8388608, MEDIUM, S3ObjectBlockSizeDoc) .define(S3ObjectPartSizeProp, INT, 16777216, MEDIUM, S3ObjectPartSizeDoc) + .define(S3CacheSizeProp, INT, 1024, MEDIUM, S3CacheSizeDoc) // Kafka on S3 inject end } @@ -2037,9 +2040,10 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami val s3Region = getString(KafkaConfig.S3RegionProp) val s3Bucket = getString(KafkaConfig.S3BucketProp) val s3WALObjectSize = getLong(KafkaConfig.S3WALObjectSizeProp) - val s3StreamSplitSizeProp = getInt(KafkaConfig.S3StreamSplitSizeProp) - val s3ObjectBlockSizeProp = getInt(KafkaConfig.S3ObjectBlockSizeProp) - val s3ObjectPartSizeProp = getInt(KafkaConfig.S3ObjectPartSizeProp) + val s3StreamSplitSize = getInt(KafkaConfig.S3StreamSplitSizeProp) + val s3ObjectBlockSize = getInt(KafkaConfig.S3ObjectBlockSizeProp) + val s3ObjectPartSize = getInt(KafkaConfig.S3ObjectPartSizeProp) + val s3CacheSize = getInt(KafkaConfig.S3CacheSizeProp) // Kafka on S3 inject end def addReconfigurable(reconfigurable: Reconfigurable): Unit = { diff --git a/core/src/test/java/kafka/log/s3/DefaultS3BlockCacheTest.java b/core/src/test/java/kafka/log/s3/DefaultS3BlockCacheTest.java index 38069dead6..94e8aeaf98 100644 --- a/core/src/test/java/kafka/log/s3/DefaultS3BlockCacheTest.java +++ b/core/src/test/java/kafka/log/s3/DefaultS3BlockCacheTest.java @@ -48,7 +48,7 @@ public class DefaultS3BlockCacheTest { public void setup() { objectManager = mock(ObjectManager.class); s3Operator = new MemoryS3Operator(); - s3BlockCache = new DefaultS3BlockCache(objectManager, s3Operator); + s3BlockCache = new DefaultS3BlockCache(0, objectManager, s3Operator); } @Test diff --git a/core/src/test/java/kafka/log/s3/S3StorageTest.java b/core/src/test/java/kafka/log/s3/S3StorageTest.java index 44630c484d..0ef201ed48 100644 --- a/core/src/test/java/kafka/log/s3/S3StorageTest.java +++ b/core/src/test/java/kafka/log/s3/S3StorageTest.java @@ -64,7 +64,7 @@ public class S3StorageTest { public void setup() { objectManager = mock(ObjectManager.class); S3Operator s3Operator = new MemoryS3Operator(); - storage = new S3Storage(KafkaConfig.fromProps(TestUtils.defaultBrokerConfig()), new MemoryWriteAheadLog(), objectManager, new DefaultS3BlockCache(objectManager, s3Operator), s3Operator); + storage = new S3Storage(KafkaConfig.fromProps(TestUtils.defaultBrokerConfig()), new MemoryWriteAheadLog(), objectManager, new DefaultS3BlockCache(0, objectManager, s3Operator), s3Operator); } @Test diff --git a/core/src/test/java/kafka/log/s3/S3StreamMemoryTest.java b/core/src/test/java/kafka/log/s3/S3StreamMemoryTest.java index 0ed3508788..6441099416 100644 --- a/core/src/test/java/kafka/log/s3/S3StreamMemoryTest.java +++ b/core/src/test/java/kafka/log/s3/S3StreamMemoryTest.java @@ -113,7 +113,7 @@ public void setUp() { streamManager = manager; objectManager = manager; operator = new MemoryS3Operator(); - blockCache = new DefaultS3BlockCache(objectManager, operator); + blockCache = new DefaultS3BlockCache(0, objectManager, operator); storage = new S3Storage(KafkaConfig.fromProps(TestUtils.defaultBrokerConfig()), new MemoryWriteAheadLog(), objectManager, blockCache, operator); streamClient = new S3StreamClient(streamManager, storage); } diff --git a/core/src/test/java/kafka/log/s3/cache/BlockCacheTest.java b/core/src/test/java/kafka/log/s3/cache/BlockCacheTest.java new file mode 100644 index 0000000000..dab3adbf62 --- /dev/null +++ b/core/src/test/java/kafka/log/s3/cache/BlockCacheTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.s3.cache; + +import kafka.log.s3.DefaultRecordBatch; +import kafka.log.s3.FlatStreamRecordBatch; +import kafka.log.s3.model.StreamRecordBatch; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class BlockCacheTest { + + @Test + public void testPutGet() { + BlockCache blockCache = new BlockCache(1024); + + blockCache.put(233L, List.of( + newRecord(233L, 10L, 2, 1), + newRecord(233L, 12L, 2, 1) + )); + + blockCache.put(233L, List.of( + newRecord(233L, 16L, 4, 1), + newRecord(233L, 20L, 2, 1) + )); + + // overlap + blockCache.put(233L, List.of( + newRecord(233L, 12L, 2, 1), + newRecord(233L, 14L, 2, 1), + newRecord(233L, 16L, 4, 1) + )); + + BlockCache.GetCacheResult rst = blockCache.get(233L, 10L, 20L, 1024); + List records = rst.getRecords(); + assertEquals(4, records.size()); + assertEquals(10L, records.get(0).baseOffset); + assertEquals(12L, records.get(1).baseOffset); + assertEquals(14L, records.get(2).baseOffset); + assertEquals(16L, records.get(3).baseOffset); + } + + @Test + public void testEvict() { + BlockCache blockCache = new BlockCache(4); + blockCache.put(233L, List.of( + newRecord(233L, 10L, 2, 2), + newRecord(233L, 12L, 2, 1) + )); + + assertEquals(2, blockCache.get(233L, 10L, 20L, 1000).getRecords().size()); + + blockCache.put(233L, List.of( + newRecord(233L, 16L, 4, 1), + newRecord(233L, 20L, 2, 1) + )); + assertEquals(0, blockCache.get(233L, 10L, 20L, 1000).getRecords().size()); + assertEquals(2, blockCache.get(233L, 16, 21L, 1000).getRecords().size()); + } + + @Test + public void testLRU() { + BlockCache.LRUCache lru = new BlockCache.LRUCache<>(); + lru.put(1L, true); + lru.put(2L, true); + lru.put(3L, true); + lru.touch(2L); + assertEquals(1, lru.pop().getKey()); + assertEquals(3, lru.pop().getKey()); + assertEquals(2, lru.pop().getKey()); + assertNull(lru.pop()); + } + + private static FlatStreamRecordBatch newRecord(long streamId, long offset, int count, int size) { + StreamRecordBatch recordBatch = new StreamRecordBatch(streamId, 0, offset, DefaultRecordBatch.of(count, size)); + return FlatStreamRecordBatch.from(recordBatch); + } + +} From d9a2672653d2e4f4778e4874f518665898910ae5 Mon Sep 17 00:00:00 2001 From: Robin Han Date: Tue, 5 Sep 2023 15:56:31 +0800 Subject: [PATCH 2/2] fix: fix checkstyle Signed-off-by: Robin Han --- .../scala/kafka/log/s3/DefaultS3Client.java | 2 +- .../kafka/log/s3/FlatStreamRecordBatch.java | 4 ++ .../scala/kafka/log/s3/cache/BlockCache.java | 56 ++++++++++++------- 3 files changed, 42 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/kafka/log/s3/DefaultS3Client.java b/core/src/main/scala/kafka/log/s3/DefaultS3Client.java index a716b97429..d795bfd696 100644 --- a/core/src/main/scala/kafka/log/s3/DefaultS3Client.java +++ b/core/src/main/scala/kafka/log/s3/DefaultS3Client.java @@ -60,7 +60,7 @@ public DefaultS3Client(BrokerServer brokerServer, KafkaConfig config, S3Operator this.requestSender = new ControllerRequestSender(brokerServer); this.streamManager = new ControllerStreamManager(this.requestSender, config); this.objectManager = new ControllerObjectManager(this.requestSender, this.metadataManager, this.config); - this.blockCache = new DefaultS3BlockCache(config.s3CacheSize() * 1024 * 1024, objectManager, operator); + this.blockCache = new DefaultS3BlockCache(config.s3CacheSize() * 1024L * 1024, objectManager, operator); this.storage = new S3Storage(config, new MemoryWriteAheadLog(), objectManager, blockCache, operator); this.streamClient = new S3StreamClient(this.streamManager, this.storage); this.kvClient = new ControllerKVClient(this.requestSender); diff --git a/core/src/main/scala/kafka/log/s3/FlatStreamRecordBatch.java b/core/src/main/scala/kafka/log/s3/FlatStreamRecordBatch.java index b3a00fc604..691fd6e4bc 100644 --- a/core/src/main/scala/kafka/log/s3/FlatStreamRecordBatch.java +++ b/core/src/main/scala/kafka/log/s3/FlatStreamRecordBatch.java @@ -49,6 +49,10 @@ public int size() { return encodedBuf.readableBytes(); } + public void free() { + encodedBuf.release(); + } + @Override public int compareTo(FlatStreamRecordBatch o) { @SuppressWarnings("DuplicatedCode") diff --git a/core/src/main/scala/kafka/log/s3/cache/BlockCache.java b/core/src/main/scala/kafka/log/s3/cache/BlockCache.java index a76bd145d7..676844d6b4 100644 --- a/core/src/main/scala/kafka/log/s3/cache/BlockCache.java +++ b/core/src/main/scala/kafka/log/s3/cache/BlockCache.java @@ -65,7 +65,13 @@ public void put(long streamId, List records) { break; } // overlap is a rare case, so removeIf is fine for the performance. - if (records.removeIf(record -> record.lastOffset() > cacheBlock.firstOffset && record.baseOffset < cacheBlock.lastOffset)) { + if (records.removeIf(record -> { + boolean remove = record.lastOffset() > cacheBlock.firstOffset && record.baseOffset < cacheBlock.lastOffset; + if (remove) { + record.free(); + } + return remove; + })) { overlapped = true; } } @@ -117,21 +123,9 @@ public GetCacheResult get(long streamId, long startOffset, long endOffset, int m if (readahead == null && cacheBlock.readahead != null) { readahead = cacheBlock.readahead; } - boolean matched = false; - for (FlatStreamRecordBatch record : cacheBlock.records) { - if (record.baseOffset <= nextStartOffset && record.lastOffset() > nextStartOffset) { - records.add(record); - nextStartOffset = record.lastOffset(); - nextMaxBytes -= record.size(); - matched = true; - if (nextStartOffset >= endOffset || nextMaxBytes <= 0) { - break; - } - } else if (matched) { - break; - } - } - boolean blockCompletedRead = records.getLast().lastOffset() >= cacheBlock.lastOffset; + nextMaxBytes = readFromCacheBlock(records, cacheBlock, nextStartOffset, endOffset, nextMaxBytes); + nextStartOffset = records.getLast().lastOffset(); + boolean blockCompletedRead = nextStartOffset >= cacheBlock.lastOffset; CacheKey cacheKey = new CacheKey(streamId, cacheBlock.firstOffset); if (blockCompletedRead) { active.remove(cacheKey); @@ -150,6 +144,25 @@ public GetCacheResult get(long streamId, long startOffset, long endOffset, int m return GetCacheResult.of(records, readahead); } + private int readFromCacheBlock(LinkedList records, CacheBlock cacheBlock, + long nextStartOffset, long endOffset, int nextMaxBytes) { + boolean matched = false; + for (FlatStreamRecordBatch record : cacheBlock.records) { + if (record.baseOffset <= nextStartOffset && record.lastOffset() > nextStartOffset) { + records.add(record); + nextStartOffset = record.lastOffset(); + nextMaxBytes -= record.size(); + matched = true; + if (nextStartOffset >= endOffset || nextMaxBytes <= 0) { + break; + } + } else if (matched) { + break; + } + } + return nextMaxBytes; + } + private void ensureCapacity(int size) { if (maxSize - this.size.get() >= size) { return; @@ -207,11 +220,16 @@ static class CacheBlock { int size; Readahead readahead; - public CacheBlock(List records) { + public CacheBlock(List records, Readahead readahead) { this.records = records; this.firstOffset = records.get(0).baseOffset; this.lastOffset = records.get(records.size() - 1).lastOffset(); this.size = records.stream().mapToInt(FlatStreamRecordBatch::size).sum(); + this.readahead = readahead; + } + + public CacheBlock(List records) { + this(records, null); } public void free() { @@ -220,7 +238,7 @@ public void free() { } } - static class GetCacheResult { + public static class GetCacheResult { private final List records; private final Readahead readahead; @@ -250,7 +268,7 @@ public Optional getReadahead() { } } - static class Readahead { + public static class Readahead { private final long startOffset; private final int size;