From 55b06479bf4eeef3ae9c3e934fc6dcc788d0ffd2 Mon Sep 17 00:00:00 2001 From: Robin Han Date: Mon, 29 Jan 2024 16:11:40 +0800 Subject: [PATCH] feat(log): file cache support merge put Signed-off-by: Robin Han --- .../log/streamaspect/cache/FileCache.java | 69 +++++++++++++++---- .../log/streamaspect/cache/FileCacheTest.java | 38 ++++++++++ 2 files changed, 95 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/kafka/log/streamaspect/cache/FileCache.java b/core/src/main/scala/kafka/log/streamaspect/cache/FileCache.java index 7d334a6502..42d426f2e7 100644 --- a/core/src/main/scala/kafka/log/streamaspect/cache/FileCache.java +++ b/core/src/main/scala/kafka/log/streamaspect/cache/FileCache.java @@ -42,6 +42,7 @@ */ public class FileCache { private static final int BLOCK_SIZE = 4 * 1024; + private final int maxSize; private final int blockSize; private final BitSet freeBlocks; private final LRUCache lru = new LRUCache<>(); @@ -56,6 +57,7 @@ public class FileCache { public FileCache(String path, int size, int blockSize) throws IOException { this.blockSize = blockSize; size = align(size); + this.maxSize = size; int blockCount = size / blockSize; this.freeBlocks = new BitSet(blockCount); this.freeBlocks.set(0, blockCount, true); @@ -78,25 +80,48 @@ public void put(String path, long position, ByteBuf data) { writeLock.lock(); try { int dataLength = data.readableBytes(); - int[] blocks = ensureCapacity(dataLength); - if (blocks == null) { - return; - } - Key key = new Key(path, position); - Value value = new Value(blocks, dataLength); - lru.put(key, value); NavigableMap cache = path2cache.computeIfAbsent(path, k -> new TreeMap<>()); - cache.put(position, value); + Map.Entry pos2value = cache.floorEntry(position); + long cacheStartPosition; + Value value; + if (pos2value == null || pos2value.getKey() + pos2value.getValue().dataLength < position) { + cacheStartPosition = position; + value = Value.EMPTY; + } else { + cacheStartPosition = pos2value.getKey(); + value = pos2value.getValue(); + } + // ensure the capacity, if the capacity change then update the cache index + int moreCapacity = (int) ((position + dataLength) - (cacheStartPosition + value.blocks.length * (long) blockSize)); + int newDataLength = (int) (position + dataLength - cacheStartPosition); + if (moreCapacity > 0) { + int[] blocks = ensureCapacity(cacheStartPosition, moreCapacity); + if (blocks == null) { + return; + } + int[] newBlocks = new int[value.blocks.length + blocks.length]; + System.arraycopy(value.blocks, 0, newBlocks, 0, value.blocks.length); + System.arraycopy(blocks, 0, newBlocks, value.blocks.length, blocks.length); + value = new Value(newBlocks, newDataLength); + } else { + value = new Value(value.blocks, newDataLength); + } + cache.put(cacheStartPosition, value); + lru.put(new Key(path, cacheStartPosition), value); + // write data to cache ByteBuffer cacheByteBuffer = this.cacheByteBuffer.duplicate(); + int positionDelta = (int) (position - cacheStartPosition); int written = 0; ByteBuffer[] nioBuffers = data.nioBuffers(); + int[] blocks = value.blocks; for (ByteBuffer nioBuffer : nioBuffers) { ByteBuf buf = Unpooled.wrappedBuffer(nioBuffer); while (buf.readableBytes() > 0) { - int block = blocks[written / blockSize]; - cacheByteBuffer.position(block * blockSize + written % blockSize); - int length = Math.min(buf.readableBytes(), blockSize - written % blockSize); + int writePosition = positionDelta + written; + int block = blocks[writePosition / blockSize]; + cacheByteBuffer.position(block * blockSize + writePosition % blockSize); + int length = Math.min(buf.readableBytes(), blockSize - writePosition % blockSize); cacheByteBuffer.put(buf.slice(buf.readerIndex(), length).nioBuffer()); buf.skipBytes(length); written += length; @@ -148,7 +173,17 @@ public Optional get(String filePath, long position, int length) { } } - private int[] ensureCapacity(int size) { + /** + * Ensure the capacity of cache + * + * @param cacheStartPosition if the eviction entries contain the current cache, then ensure capacity will return null. + * @param size size of data + * @return the cache blocks + */ + private int[] ensureCapacity(long cacheStartPosition, int size) { + if (size > this.maxSize) { + return null; + } int requiredBlockCount = align(size) / blockSize; int[] blocks = new int[requiredBlockCount]; int acquiringBlockIndex = 0; @@ -160,6 +195,14 @@ private int[] ensureCapacity(int size) { Key key = entry.getKey(); Value value = entry.getValue(); path2cache.get(key.path).remove(key.position); + if (key.position == cacheStartPosition) { + // eviction is conflict to current cache + for (int i = 0; i < acquiringBlockIndex; i++) { + freeBlockCount++; + freeBlocks.set(blocks[i], true); + } + return null; + } for (int blockIndex : value.blocks) { if (acquiringBlockIndex < blocks.length) { blocks[acquiringBlockIndex++] = blockIndex; @@ -224,6 +267,8 @@ public int compareTo(Key o) { } static class Value { + static final Value EMPTY = new Value(new int[0], 0); + int[] blocks; int dataLength; diff --git a/core/src/test/java/kafka/log/streamaspect/cache/FileCacheTest.java b/core/src/test/java/kafka/log/streamaspect/cache/FileCacheTest.java index dc27141862..4ca7db0b1c 100644 --- a/core/src/test/java/kafka/log/streamaspect/cache/FileCacheTest.java +++ b/core/src/test/java/kafka/log/streamaspect/cache/FileCacheTest.java @@ -18,6 +18,7 @@ package kafka.log.streamaspect.cache; import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; @@ -96,6 +97,43 @@ public void test() throws IOException { } + @Test + public void testMergePut() throws IOException { + FileCache fileCache = new FileCache("/tmp/file_cache_test", 10 * 1024, 1024); + { + CompositeByteBuf buf = Unpooled.compositeBuffer(); + buf.addComponent(true, genBuf((byte) 1, 500)); + buf.addComponent(true, genBuf((byte) 2, 500)); + buf.addComponent(true, genBuf((byte) 3, 500)); + fileCache.put("test", 3333L, buf); + } + assertEquals(1, fileCache.path2cache.get("test").size()); + assertEquals(1500, fileCache.path2cache.get("test").get(3333L).dataLength); + { + CompositeByteBuf buf = Unpooled.compositeBuffer(); + buf.addComponent(true, genBuf((byte) 4, 500)); + buf.addComponent(true, genBuf((byte) 5, 500)); + buf.addComponent(true, genBuf((byte) 6, 500)); + fileCache.put("test", 3333L + 1000, buf); + } + assertEquals(1, fileCache.path2cache.get("test").size()); + assertEquals(2500, fileCache.path2cache.get("test").get(3333L).dataLength); + { + CompositeByteBuf buf = Unpooled.compositeBuffer(); + buf.addComponent(true, genBuf((byte) 7, 500)); + fileCache.put("test", 3333L + 1000 + 1500, buf); + } + assertEquals(1, fileCache.path2cache.get("test").size()); + assertEquals(3000, fileCache.path2cache.get("test").get(3333L).dataLength); + + assertTrue(verify(fileCache.get("test", 3333L, 500).get(), (byte) 1)); + assertTrue(verify(fileCache.get("test", 3333L + 500, 500).get(), (byte) 2)); + assertTrue(verify(fileCache.get("test", 3333L + 1000, 500).get(), (byte) 4)); + assertTrue(verify(fileCache.get("test", 3333L + 1500, 500).get(), (byte) 5)); + assertTrue(verify(fileCache.get("test", 3333L + 2000, 500).get(), (byte) 6)); + assertTrue(verify(fileCache.get("test", 3333L + 2500, 500).get(), (byte) 7)); + } + ByteBuf genBuf(byte data, int length) { byte[] bytes = new byte[length]; Arrays.fill(bytes, data);