Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
public class ElasticTimeIndex extends TimeIndex {
private final File file;
private final FileCache cache;
private final Long cachePathId;
final ElasticStreamSlice stream;

private volatile CompletableFuture<?> lastAppend = CompletableFuture.completedFuture(null);
Expand All @@ -46,6 +47,7 @@ public ElasticTimeIndex(
super(file, baseOffset, maxIndexSize, true, true);
this.file = file;
this.cache = cache;
this.cachePathId = cache.newPathId();
this.stream = sliceSupplier.get();
setEntries((int) (stream.nextOffset() / ENTRY_SIZE));
if (entries() == 0) {
Expand Down Expand Up @@ -132,7 +134,7 @@ public void maybeAppend(long timestamp, long offset, boolean skipFullCheck) {
buffer.flip();
long position = stream.nextOffset();
lastAppend = stream.append(RawPayloadRecordBatch.of(buffer));
cache.put(file.getPath(), position, Unpooled.wrappedBuffer(buffer));
cache.put(cachePathId, position, Unpooled.wrappedBuffer(buffer));
incrementEntries();
lastEntry(new TimestampOffset(timestamp, offset));
}
Expand Down Expand Up @@ -250,7 +252,7 @@ protected TimestampOffset parseEntry(ByteBuffer buffer, int n) {
}

private TimestampOffset tryGetEntryFromCache(int n) {
Optional<ByteBuf> rst = cache.get(file.getPath(), (long) n * ENTRY_SIZE, ENTRY_SIZE);
Optional<ByteBuf> rst = cache.get(cachePathId, (long) n * ENTRY_SIZE, ENTRY_SIZE);
if (rst.isPresent()) {
ByteBuf buffer = rst.get();
return new TimestampOffset(buffer.readLong(), baseOffset() + buffer.readInt());
Expand Down Expand Up @@ -292,7 +294,7 @@ private TimestampOffset parseEntry0(int n) throws ExecutionException, Interrupte
}
ByteBuf buf = Unpooled.buffer(records.size() * ENTRY_SIZE);
records.forEach(record -> buf.writeBytes(record.rawPayload()));
cache.put(file.getPath(), startOffset, buf);
cache.put(cachePathId, startOffset, buf);
ByteBuf indexEntry = Unpooled.wrappedBuffer(records.get(0).rawPayload());
timestampOffset = new TimestampOffset(indexEntry.readLong(), baseOffset() + indexEntry.readInt());
rst.free();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class ElasticTransactionIndex extends TransactionIndex {
private final StreamSliceSupplier streamSupplier;
ElasticStreamSlice stream;
private final FileCache cache;
private final Long cachePathId;
private final String path;
private volatile LastAppend lastAppend;

Expand All @@ -47,6 +48,7 @@ public ElasticTransactionIndex(long startOffset, File file, StreamSliceSupplier
this.streamSupplier = streamSupplier;
this.stream = streamSupplier.get();
this.cache = cache;
this.cachePathId = cache.newPathId();
this.path = file.getPath();
lastAppend = new LastAppend(stream.nextOffset(), CompletableFuture.completedFuture(null));
}
Expand All @@ -70,7 +72,7 @@ public void append(AbortedTxn abortedTxn) {
long position = stream.nextOffset();
CompletableFuture<?> cf = stream.append(RawPayloadRecordBatch.of(abortedTxn.buffer().duplicate()));
lastAppend = new LastAppend(stream.nextOffset(), cf);
cache.put(path, position, Unpooled.wrappedBuffer(abortedTxn.buffer()));
cache.put(cachePathId, position, Unpooled.wrappedBuffer(abortedTxn.buffer()));
}

@Override
Expand Down Expand Up @@ -179,18 +181,18 @@ public AbortedTxnWithPosition next() {
+ AbortedTxn.CURRENT_VERSION);
return item;
}
int getLength = Math.min(position.value + AbortedTxn.TOTAL_SIZE * 128, endPosition);
Optional<ByteBuf> cacheDataOpt = cache.get(path, position.value, getLength);
int endOffset = Math.min(position.value + AbortedTxn.TOTAL_SIZE * 128, endPosition);
Optional<ByteBuf> cacheDataOpt = cache.get(cachePathId, position.value, endOffset - position.value);
ByteBuf buf;
if (cacheDataOpt.isPresent()) {
buf = cacheDataOpt.get();
} else {
FetchResult records = fetchStream(position.value, getLength, getLength);
FetchResult records = fetchStream(position.value, endOffset, endOffset - position.value);
ByteBuf txnListBuf = Unpooled.buffer(records.recordBatchList().size() * AbortedTxn.TOTAL_SIZE);
records.recordBatchList().forEach(r -> {
txnListBuf.writeBytes(r.rawPayload());
});
cache.put(path, position.value, txnListBuf);
cache.put(cachePathId, position.value, txnListBuf);
records.free();
buf = txnListBuf;
}
Expand Down
16 changes: 11 additions & 5 deletions core/src/main/scala/kafka/log/streamaspect/cache/FileCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
Expand All @@ -49,13 +50,14 @@ public class FileCache {
private final int blockSize;
private final BitSet freeBlocks;
private final LRUCache<Key, Value> lru = new LRUCache<>();
final Map<String, NavigableMap<Long, Value>> path2cache = new HashMap<>();
final Map<Long, NavigableMap<Long, Value>> path2cache = new HashMap<>();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
int freeBlockCount;
private int freeCheckPoint = 0;
private final MappedByteBuffer cacheByteBuffer;
private final AtomicLong pathIdAlloc = new AtomicLong();

public FileCache(String path, int size, int blockSize) throws IOException {
this.blockSize = blockSize;
Expand Down Expand Up @@ -84,7 +86,11 @@ public FileCache(String path, int size) throws IOException {
this(path, size, BLOCK_SIZE);
}

public void put(String path, long position, ByteBuf data) {
public Long newPathId() {
return pathIdAlloc.incrementAndGet();
}

public void put(Long path, long position, ByteBuf data) {
writeLock.lock();
try {
int dataLength = data.readableBytes();
Expand Down Expand Up @@ -140,7 +146,7 @@ public void put(String path, long position, ByteBuf data) {
}
}

public Optional<ByteBuf> get(String filePath, long position, int length) {
public Optional<ByteBuf> get(Long filePath, long position, int length) {
ByteBuf buf = Unpooled.buffer(length);
readLock.lock();
try {
Expand Down Expand Up @@ -243,10 +249,10 @@ private int align(int size) {
}

static class Key implements Comparable<Key> {
String path;
Long path;
long position;

public Key(String path, long position) {
public Key(Long path, long position) {
this.path = path;
this.position = position;
}
Expand Down
81 changes: 44 additions & 37 deletions core/src/test/java/kafka/log/streamaspect/cache/FileCacheTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,99 +39,106 @@ public void test() throws IOException {
FileCache fileCache = new FileCache("/tmp/file_cache_test", 10 * 1024, 1024);

// occupy block 0,1
fileCache.put("test1", 10, genBuf((byte) 1, 2 * 1024));
Long pathId1 = fileCache.newPathId();
fileCache.put(pathId1, 10, genBuf((byte) 1, 2 * 1024));

ByteBuf rst = fileCache.get("test1", 10 + 1000, 1024).get();
ByteBuf rst = fileCache.get(pathId1, 10 + 1000, 1024).get();
assertEquals(1024, rst.readableBytes());
assertTrue(verify(rst, (byte) 1));

Assertions.assertFalse(fileCache.get("test1", 10 + 1000, 2048).isPresent());
Assertions.assertFalse(fileCache.get(pathId1, 10 + 1000, 2048).isPresent());

// occupy block 2,3
fileCache.put("test2", 233, genBuf((byte) 2, 1025));
Long pathId2 = fileCache.newPathId();
fileCache.put(pathId2, 233, genBuf((byte) 2, 1025));

// occupy block 4~8
fileCache.put("test2", 2048, genBuf((byte) 4, 1024 * 5));
fileCache.put(pathId2, 2048, genBuf((byte) 4, 1024 * 5));

// occupy block 9
fileCache.put("test2", 10000, genBuf((byte) 5, 1024));
fileCache.put(pathId2, 10000, genBuf((byte) 5, 1024));

// touch lru
assertEquals(1025, fileCache.get("test2", 233, 1025).get().readableBytes());
assertEquals(1024, fileCache.get("test2", 10000, 1024).get().readableBytes());
assertEquals(2048, fileCache.get("test1", 10, 2048).get().readableBytes());
assertEquals(1024 * 5, fileCache.get("test2", 2048, 1024 * 5).get().readableBytes());
assertEquals(1025, fileCache.get(pathId2, 233, 1025).get().readableBytes());
assertEquals(1024, fileCache.get(pathId2, 10000, 1024).get().readableBytes());
assertEquals(2048, fileCache.get(pathId1, 10, 2048).get().readableBytes());
assertEquals(1024 * 5, fileCache.get(pathId2, 2048, 1024 * 5).get().readableBytes());

// expect evict test2-233 and test2-10000
fileCache.put("test3", 123, genBuf((byte) 6, 2049));
Long pathId3 = fileCache.newPathId();
fileCache.put(pathId3, 123, genBuf((byte) 6, 2049));

FileCache.Value value = fileCache.path2cache.get("test3").get(123L);
FileCache.Value value = fileCache.path2cache.get(pathId3).get(123L);
assertEquals(2049, value.dataLength);
assertArrayEquals(new int[]{2, 3, 9}, value.blocks);


rst = fileCache.get("test3", 123, 2049).get();
rst = fileCache.get(pathId3, 123, 2049).get();
assertEquals(2049, rst.readableBytes());
assertTrue(verify(rst, (byte) 6));

// expect evict test1-10 and test2-2048
fileCache.put("test4", 123, genBuf((byte) 7, 2049));
value = fileCache.path2cache.get("test4").get(123L);
Long pathId4 = fileCache.newPathId();
fileCache.put(pathId4, 123, genBuf((byte) 7, 2049));
value = fileCache.path2cache.get(pathId4).get(123L);
assertArrayEquals(new int[]{0, 1, 4}, value.blocks);
rst = fileCache.get("test4", 123, 2049).get();
rst = fileCache.get(pathId4, 123, 2049).get();
assertTrue(verify(rst, (byte) 7));

assertEquals(4, fileCache.freeBlockCount);

// expect occupy free blocks 5,6,7
fileCache.put("test5", 123, genBuf((byte) 8, 2049));
value = fileCache.path2cache.get("test5").get(123L);
Long pathId5 = fileCache.newPathId();
fileCache.put(pathId5, 123, genBuf((byte) 8, 2049));
value = fileCache.path2cache.get(pathId5).get(123L);
assertArrayEquals(new int[]{5, 6, 7}, value.blocks);
rst = fileCache.get("test5", 123, 2049).get();
rst = fileCache.get(pathId5, 123, 2049).get();
assertTrue(verify(rst, (byte) 8));
assertEquals(1, fileCache.freeBlockCount);

fileCache.put("test6", 6666, genBuf((byte) 9, 3333));
rst = fileCache.get("test6", 6666L, 3333).get();
Long pathId6 = fileCache.newPathId();
fileCache.put(pathId6, 6666, genBuf((byte) 9, 3333));
rst = fileCache.get(pathId6, 6666L, 3333).get();
assertTrue(verify(rst, (byte) 9));

}

@Test
public void testMergePut() throws IOException {
FileCache fileCache = new FileCache("/tmp/file_cache_test", 10 * 1024, 1024);
Long pathId = fileCache.newPathId();
{
CompositeByteBuf buf = Unpooled.compositeBuffer();
buf.addComponent(true, genBuf((byte) 1, 500));
buf.addComponent(true, genBuf((byte) 2, 500));
buf.addComponent(true, genBuf((byte) 3, 500));
fileCache.put("test", 3333L, buf);
fileCache.put(pathId, 3333L, buf);
}
assertEquals(1, fileCache.path2cache.get("test").size());
assertEquals(1500, fileCache.path2cache.get("test").get(3333L).dataLength);
assertEquals(1, fileCache.path2cache.get(pathId).size());
assertEquals(1500, fileCache.path2cache.get(pathId).get(3333L).dataLength);
{
CompositeByteBuf buf = Unpooled.compositeBuffer();
buf.addComponent(true, genBuf((byte) 4, 500));
buf.addComponent(true, genBuf((byte) 5, 500));
buf.addComponent(true, genBuf((byte) 6, 500));
fileCache.put("test", 3333L + 1000, buf);
fileCache.put(pathId, 3333L + 1000, buf);
}
assertEquals(1, fileCache.path2cache.get("test").size());
assertEquals(2500, fileCache.path2cache.get("test").get(3333L).dataLength);
assertEquals(1, fileCache.path2cache.get(pathId).size());
assertEquals(2500, fileCache.path2cache.get(pathId).get(3333L).dataLength);
{
CompositeByteBuf buf = Unpooled.compositeBuffer();
buf.addComponent(true, genBuf((byte) 7, 500));
fileCache.put("test", 3333L + 1000 + 1500, buf);
fileCache.put(pathId, 3333L + 1000 + 1500, buf);
}
assertEquals(1, fileCache.path2cache.get("test").size());
assertEquals(3000, fileCache.path2cache.get("test").get(3333L).dataLength);

assertTrue(verify(fileCache.get("test", 3333L, 500).get(), (byte) 1));
assertTrue(verify(fileCache.get("test", 3333L + 500, 500).get(), (byte) 2));
assertTrue(verify(fileCache.get("test", 3333L + 1000, 500).get(), (byte) 4));
assertTrue(verify(fileCache.get("test", 3333L + 1500, 500).get(), (byte) 5));
assertTrue(verify(fileCache.get("test", 3333L + 2000, 500).get(), (byte) 6));
assertTrue(verify(fileCache.get("test", 3333L + 2500, 500).get(), (byte) 7));
assertEquals(1, fileCache.path2cache.get(pathId).size());
assertEquals(3000, fileCache.path2cache.get(pathId).get(3333L).dataLength);

assertTrue(verify(fileCache.get(pathId, 3333L, 500).get(), (byte) 1));
assertTrue(verify(fileCache.get(pathId, 3333L + 500, 500).get(), (byte) 2));
assertTrue(verify(fileCache.get(pathId, 3333L + 1000, 500).get(), (byte) 4));
assertTrue(verify(fileCache.get(pathId, 3333L + 1500, 500).get(), (byte) 5));
assertTrue(verify(fileCache.get(pathId, 3333L + 2000, 500).get(), (byte) 6));
assertTrue(verify(fileCache.get(pathId, 3333L + 2500, 500).get(), (byte) 7));
}

ByteBuf genBuf(byte data, int length) {
Expand Down