Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 57 additions & 12 deletions core/src/main/scala/kafka/log/streamaspect/cache/FileCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
*/
public class FileCache {
private static final int BLOCK_SIZE = 4 * 1024;
private final int maxSize;
private final int blockSize;
private final BitSet freeBlocks;
private final LRUCache<Key, Value> lru = new LRUCache<>();
Expand All @@ -56,6 +57,7 @@ public class FileCache {
public FileCache(String path, int size, int blockSize) throws IOException {
this.blockSize = blockSize;
size = align(size);
this.maxSize = size;
int blockCount = size / blockSize;
this.freeBlocks = new BitSet(blockCount);
this.freeBlocks.set(0, blockCount, true);
Expand All @@ -78,25 +80,48 @@ public void put(String path, long position, ByteBuf data) {
writeLock.lock();
try {
int dataLength = data.readableBytes();
int[] blocks = ensureCapacity(dataLength);
if (blocks == null) {
return;
}
Key key = new Key(path, position);
Value value = new Value(blocks, dataLength);
lru.put(key, value);
NavigableMap<Long, Value> cache = path2cache.computeIfAbsent(path, k -> new TreeMap<>());
cache.put(position, value);
Map.Entry<Long, Value> pos2value = cache.floorEntry(position);
long cacheStartPosition;
Value value;
if (pos2value == null || pos2value.getKey() + pos2value.getValue().dataLength < position) {
cacheStartPosition = position;
value = Value.EMPTY;
} else {
cacheStartPosition = pos2value.getKey();
value = pos2value.getValue();
}
// ensure the capacity, if the capacity change then update the cache index
int moreCapacity = (int) ((position + dataLength) - (cacheStartPosition + value.blocks.length * (long) blockSize));
int newDataLength = (int) (position + dataLength - cacheStartPosition);
if (moreCapacity > 0) {
int[] blocks = ensureCapacity(cacheStartPosition, moreCapacity);
if (blocks == null) {
return;
}
int[] newBlocks = new int[value.blocks.length + blocks.length];
System.arraycopy(value.blocks, 0, newBlocks, 0, value.blocks.length);
System.arraycopy(blocks, 0, newBlocks, value.blocks.length, blocks.length);
value = new Value(newBlocks, newDataLength);
} else {
value = new Value(value.blocks, newDataLength);
}
cache.put(cacheStartPosition, value);
lru.put(new Key(path, cacheStartPosition), value);

// write data to cache
ByteBuffer cacheByteBuffer = this.cacheByteBuffer.duplicate();
int positionDelta = (int) (position - cacheStartPosition);
int written = 0;
ByteBuffer[] nioBuffers = data.nioBuffers();
int[] blocks = value.blocks;
for (ByteBuffer nioBuffer : nioBuffers) {
ByteBuf buf = Unpooled.wrappedBuffer(nioBuffer);
while (buf.readableBytes() > 0) {
int block = blocks[written / blockSize];
cacheByteBuffer.position(block * blockSize + written % blockSize);
int length = Math.min(buf.readableBytes(), blockSize - written % blockSize);
int writePosition = positionDelta + written;
int block = blocks[writePosition / blockSize];
cacheByteBuffer.position(block * blockSize + writePosition % blockSize);
int length = Math.min(buf.readableBytes(), blockSize - writePosition % blockSize);
cacheByteBuffer.put(buf.slice(buf.readerIndex(), length).nioBuffer());
buf.skipBytes(length);
written += length;
Expand Down Expand Up @@ -148,7 +173,17 @@ public Optional<ByteBuf> get(String filePath, long position, int length) {
}
}

private int[] ensureCapacity(int size) {
/**
* Ensure the capacity of cache
*
* @param cacheStartPosition if the eviction entries contain the current cache, then ensure capacity will return null.
* @param size size of data
* @return the cache blocks
*/
private int[] ensureCapacity(long cacheStartPosition, int size) {
if (size > this.maxSize) {
return null;
}
int requiredBlockCount = align(size) / blockSize;
int[] blocks = new int[requiredBlockCount];
int acquiringBlockIndex = 0;
Expand All @@ -160,6 +195,14 @@ private int[] ensureCapacity(int size) {
Key key = entry.getKey();
Value value = entry.getValue();
path2cache.get(key.path).remove(key.position);
if (key.position == cacheStartPosition) {
// eviction is conflict to current cache
for (int i = 0; i < acquiringBlockIndex; i++) {
freeBlockCount++;
freeBlocks.set(blocks[i], true);
}
return null;
}
for (int blockIndex : value.blocks) {
if (acquiringBlockIndex < blocks.length) {
blocks[acquiringBlockIndex++] = blockIndex;
Expand Down Expand Up @@ -224,6 +267,8 @@ public int compareTo(Key o) {
}

static class Value {
static final Value EMPTY = new Value(new int[0], 0);

int[] blocks;
int dataLength;

Expand Down
38 changes: 38 additions & 0 deletions core/src/test/java/kafka/log/streamaspect/cache/FileCacheTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package kafka.log.streamaspect.cache;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
Expand Down Expand Up @@ -96,6 +97,43 @@ public void test() throws IOException {

}

@Test
public void testMergePut() throws IOException {
FileCache fileCache = new FileCache("/tmp/file_cache_test", 10 * 1024, 1024);
{
CompositeByteBuf buf = Unpooled.compositeBuffer();
buf.addComponent(true, genBuf((byte) 1, 500));
buf.addComponent(true, genBuf((byte) 2, 500));
buf.addComponent(true, genBuf((byte) 3, 500));
fileCache.put("test", 3333L, buf);
}
assertEquals(1, fileCache.path2cache.get("test").size());
assertEquals(1500, fileCache.path2cache.get("test").get(3333L).dataLength);
{
CompositeByteBuf buf = Unpooled.compositeBuffer();
buf.addComponent(true, genBuf((byte) 4, 500));
buf.addComponent(true, genBuf((byte) 5, 500));
buf.addComponent(true, genBuf((byte) 6, 500));
fileCache.put("test", 3333L + 1000, buf);
}
assertEquals(1, fileCache.path2cache.get("test").size());
assertEquals(2500, fileCache.path2cache.get("test").get(3333L).dataLength);
{
CompositeByteBuf buf = Unpooled.compositeBuffer();
buf.addComponent(true, genBuf((byte) 7, 500));
fileCache.put("test", 3333L + 1000 + 1500, buf);
}
assertEquals(1, fileCache.path2cache.get("test").size());
assertEquals(3000, fileCache.path2cache.get("test").get(3333L).dataLength);

assertTrue(verify(fileCache.get("test", 3333L, 500).get(), (byte) 1));
assertTrue(verify(fileCache.get("test", 3333L + 500, 500).get(), (byte) 2));
assertTrue(verify(fileCache.get("test", 3333L + 1000, 500).get(), (byte) 4));
assertTrue(verify(fileCache.get("test", 3333L + 1500, 500).get(), (byte) 5));
assertTrue(verify(fileCache.get("test", 3333L + 2000, 500).get(), (byte) 6));
assertTrue(verify(fileCache.get("test", 3333L + 2500, 500).get(), (byte) 7));
}

ByteBuf genBuf(byte data, int length) {
byte[] bytes = new byte[length];
Arrays.fill(bytes, data);
Expand Down