diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticTimeIndex.java b/core/src/main/scala/kafka/log/streamaspect/ElasticTimeIndex.java index 4866aa304d..1318dd74c6 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticTimeIndex.java +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticTimeIndex.java @@ -31,6 +31,7 @@ public class ElasticTimeIndex extends TimeIndex { private final File file; private final FileCache cache; + private final long cacheId; final ElasticStreamSlice stream; private volatile CompletableFuture lastAppend = CompletableFuture.completedFuture(null); @@ -46,6 +47,7 @@ public ElasticTimeIndex( super(file, baseOffset, maxIndexSize, true, true); this.file = file; this.cache = cache; + this.cacheId = cache.newCacheId(); this.stream = sliceSupplier.get(); setEntries((int) (stream.nextOffset() / ENTRY_SIZE)); if (entries() == 0) { @@ -132,7 +134,7 @@ public void maybeAppend(long timestamp, long offset, boolean skipFullCheck) { buffer.flip(); long position = stream.nextOffset(); lastAppend = stream.append(RawPayloadRecordBatch.of(buffer)); - cache.put(stream.stream().streamId(), position, Unpooled.wrappedBuffer(buffer)); + cache.put(cacheId, position, Unpooled.wrappedBuffer(buffer)); incrementEntries(); lastEntry(new TimestampOffset(timestamp, offset)); } @@ -249,8 +251,8 @@ protected TimestampOffset parseEntry(ByteBuffer buffer, int n) { return parseEntry(n); } - private TimestampOffset tryGetEntryFromCache(int n) { - Optional rst = cache.get(stream.stream().streamId(), (long) n * ENTRY_SIZE, ENTRY_SIZE); + TimestampOffset tryGetEntryFromCache(int n) { + Optional rst = cache.get(cacheId, (long) n * ENTRY_SIZE, ENTRY_SIZE); if (rst.isPresent()) { ByteBuf buffer = rst.get(); return new TimestampOffset(buffer.readLong(), baseOffset() + buffer.readInt()); @@ -292,7 +294,7 @@ private TimestampOffset parseEntry0(int n) throws ExecutionException, Interrupte } ByteBuf buf = Unpooled.buffer(records.size() * ENTRY_SIZE); records.forEach(record -> buf.writeBytes(record.rawPayload())); - cache.put(stream.stream().streamId(), startOffset, buf); + cache.put(cacheId, startOffset, buf); ByteBuf indexEntry = Unpooled.wrappedBuffer(records.get(0).rawPayload()); timestampOffset = new TimestampOffset(indexEntry.readLong(), baseOffset() + indexEntry.readInt()); rst.free(); diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticTransactionIndex.java b/core/src/main/scala/kafka/log/streamaspect/ElasticTransactionIndex.java index 0f14ff5d67..48cc1f6057 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticTransactionIndex.java +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticTransactionIndex.java @@ -37,6 +37,7 @@ public class ElasticTransactionIndex extends TransactionIndex { ElasticStreamSlice stream; private final FileCache cache; private final String path; + private final long cacheId; private volatile LastAppend lastAppend; private boolean closed = false; @@ -47,6 +48,7 @@ public ElasticTransactionIndex(long startOffset, File file, StreamSliceSupplier this.streamSupplier = streamSupplier; this.stream = streamSupplier.get(); this.cache = cache; + this.cacheId = cache.newCacheId(); this.path = file.getPath(); lastAppend = new LastAppend(stream.nextOffset(), CompletableFuture.completedFuture(null)); } @@ -70,7 +72,7 @@ public void append(AbortedTxn abortedTxn) { long position = stream.nextOffset(); CompletableFuture cf = stream.append(RawPayloadRecordBatch.of(abortedTxn.buffer().duplicate())); lastAppend = new LastAppend(stream.nextOffset(), cf); - cache.put(stream.stream().streamId(), position, Unpooled.wrappedBuffer(abortedTxn.buffer())); + cache.put(cacheId, position, Unpooled.wrappedBuffer(abortedTxn.buffer())); } @Override @@ -180,7 +182,7 @@ public AbortedTxnWithPosition next() { return item; } int endOffset = Math.min(position.value + AbortedTxn.TOTAL_SIZE * 128, endPosition); - Optional cacheDataOpt = cache.get(stream.stream().streamId(), position.value, endOffset - position.value); + Optional cacheDataOpt = cache.get(cacheId, position.value, endOffset - position.value); ByteBuf buf; if (cacheDataOpt.isPresent()) { buf = cacheDataOpt.get(); @@ -188,7 +190,7 @@ public AbortedTxnWithPosition next() { FetchResult records = fetchStream(position.value, endOffset, endOffset - position.value); ByteBuf txnListBuf = Unpooled.buffer(records.recordBatchList().size() * AbortedTxn.TOTAL_SIZE); records.recordBatchList().forEach(r -> txnListBuf.writeBytes(r.rawPayload())); - cache.put(stream.stream().streamId(), position.value, txnListBuf); + cache.put(cacheId, position.value, txnListBuf); records.free(); buf = txnListBuf; } diff --git a/core/src/main/scala/kafka/log/streamaspect/cache/FileCache.java b/core/src/main/scala/kafka/log/streamaspect/cache/FileCache.java index 8c73872afd..9834ae45ee 100644 --- a/core/src/main/scala/kafka/log/streamaspect/cache/FileCache.java +++ b/core/src/main/scala/kafka/log/streamaspect/cache/FileCache.java @@ -28,6 +28,7 @@ import java.util.Objects; import java.util.Optional; import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; import kafka.log.streamaspect.ElasticTimeIndex; import kafka.log.streamaspect.ElasticTransactionIndex; @@ -61,13 +62,13 @@ public class FileCache { */ private final LRUCache lru = new LRUCache<>(); /** - * The cache of streamId to cache blocks. + * The map of cacheId to cache blocks. * Its value is a {@link NavigableMap} which is used to store the cache blocks in the order of the position. * - * @see Key#streamId + * @see Key#cacheId * @see Key#position */ - final Map> stream2cache = new HashMap<>(); + final Map> cacheMap = new HashMap<>(); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock(); private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); @@ -82,6 +83,7 @@ public class FileCache { */ private int freeCheckPoint = 0; private final MappedByteBuffer cacheByteBuffer; + private final AtomicLong cacheIdAlloc = new AtomicLong(0); public FileCache(String path, int size, int blockSize) throws IOException { this.blockSize = blockSize; @@ -110,11 +112,15 @@ public FileCache(String path, int size) throws IOException { this(path, size, BLOCK_SIZE); } - public void put(long streamId, long position, ByteBuf data) { + public long newCacheId() { + return cacheIdAlloc.incrementAndGet(); + } + + public void put(long cacheId, long position, ByteBuf data) { writeLock.lock(); try { int dataLength = data.readableBytes(); - NavigableMap cache = stream2cache.computeIfAbsent(streamId, k -> new TreeMap<>()); + NavigableMap cache = cacheMap.computeIfAbsent(cacheId, k -> new TreeMap<>()); Map.Entry pos2block = cache.floorEntry(position); long cacheStartPosition; long cacheEndPosition; @@ -149,7 +155,7 @@ public void put(long streamId, long position, ByteBuf data) { blocks = new Blocks(blocks.indexes, newDataLength); } cache.put(cacheStartPosition, blocks); - lru.put(new Key(streamId, cacheStartPosition), blocks); + lru.put(new Key(cacheId, cacheStartPosition), blocks); // write data to cache ByteBuffer cacheByteBuffer = this.cacheByteBuffer.duplicate(); @@ -174,11 +180,11 @@ public void put(long streamId, long position, ByteBuf data) { } } - public Optional get(long streamId, long position, int length) { + public Optional get(long cacheId, long position, int length) { ByteBuf buf = Unpooled.buffer(length); readLock.lock(); try { - NavigableMap cache = stream2cache.get(streamId); + NavigableMap cache = cacheMap.get(cacheId); if (cache == null) { return Optional.empty(); } @@ -191,7 +197,7 @@ public Optional get(long streamId, long position, int length) { if (entry.getKey() + entry.getValue().dataLength < position + length) { return Optional.empty(); } - lru.touchIfExist(new Key(streamId, cacheStartPosition)); + lru.touchIfExist(new Key(cacheId, cacheStartPosition)); MappedByteBuffer cacheByteBuffer = this.cacheByteBuffer.duplicate(); long nextPosition = position; int remaining = length; @@ -237,7 +243,7 @@ private int[] ensureCapacity(long cacheStartPosition, int size) { } Key key = entry.getKey(); Blocks blocks = entry.getValue(); - stream2cache.get(key.streamId).remove(key.position); + cacheMap.get(key.cacheId).remove(key.position); if (key.position == cacheStartPosition) { // eviction is conflict to current cache for (int i = 0; i < acquiringBlockIndex; i++) { @@ -279,11 +285,11 @@ private int align(int size) { } static class Key implements Comparable { - Long streamId; + long cacheId; long position; - public Key(Long streamId, long position) { - this.streamId = streamId; + public Key(Long cacheId, long position) { + this.cacheId = cacheId; this.position = position; } @@ -294,20 +300,18 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; Key key = (Key) o; - return position == key.position && Objects.equals(streamId, key.streamId); + return position == key.position && Objects.equals(cacheId, key.cacheId); } @Override public int hashCode() { - return Objects.hash(streamId, position); + return Objects.hash(cacheId, position); } @Override public int compareTo(Key o) { - if (this.streamId.compareTo(o.streamId) != 0) { - return this.streamId.compareTo(o.streamId); - } - return Long.compare(this.position, o.position); + int compareCacheId = Long.compare(cacheId, o.cacheId); + return compareCacheId == 0 ? Long.compare(position, o.position) : compareCacheId; } } diff --git a/core/src/test/java/kafka/log/streamaspect/ElasticTimeIndexTest.java b/core/src/test/java/kafka/log/streamaspect/ElasticTimeIndexTest.java index ddf37997d7..6793eddaaf 100644 --- a/core/src/test/java/kafka/log/streamaspect/ElasticTimeIndexTest.java +++ b/core/src/test/java/kafka/log/streamaspect/ElasticTimeIndexTest.java @@ -10,11 +10,13 @@ */ package kafka.log.streamaspect; +import com.automq.stream.api.Stream; import java.io.IOException; import java.util.List; import kafka.log.streamaspect.cache.FileCache; import kafka.utils.TestUtils; import org.apache.kafka.storage.internals.log.TimestampOffset; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -47,6 +49,29 @@ public void testLookUp() throws IOException { } + @Test + public void testUniqueFileCache() throws IOException { + FileCache cache = new FileCache(TestUtils.tempFile().getPath(), 10 * 1024); + Stream stream = new MemoryClient.StreamImpl(1); + ElasticStreamSlice slice1 = new DefaultElasticStreamSlice(stream, SliceRange.of(0, Offsets.NOOP_OFFSET)); + ElasticTimeIndex idx1 = new ElasticTimeIndex(TestUtils.tempFile(), baseOffset, maxEntries * 12, + new IStreamSliceSupplier(slice1), TimestampOffset.UNKNOWN, cache); + long now = System.currentTimeMillis(); + idx1.maybeAppend(System.currentTimeMillis(), 100L, false); + TimestampOffset to = idx1.tryGetEntryFromCache(0); + Assertions.assertEquals(now, to.timestamp); + Assertions.assertEquals(100L, to.offset); + Assertions.assertEquals(12, slice1.nextOffset()); + + ElasticStreamSlice slice2 = new DefaultElasticStreamSlice(stream, SliceRange.of(stream.nextOffset(), Offsets.NOOP_OFFSET)); + ElasticTimeIndex idx2 = new ElasticTimeIndex(TestUtils.tempFile(), baseOffset, maxEntries * 12, + new IStreamSliceSupplier(slice2), TimestampOffset.UNKNOWN, cache); + Assertions.assertEquals(0, slice2.nextOffset()); + + to = idx2.tryGetEntryFromCache(0); + Assertions.assertEquals(TimestampOffset.UNKNOWN, to); + } + void appendEntries(ElasticTimeIndex idx, int numEntries) { for (int i = 1; i < numEntries; i++) { idx.maybeAppend(i * 10L, i * 10L + baseOffset, false); diff --git a/core/src/test/java/kafka/log/streamaspect/ElasticTransactionIndexTest.java b/core/src/test/java/kafka/log/streamaspect/ElasticTransactionIndexTest.java index 53643600de..774867d09e 100644 --- a/core/src/test/java/kafka/log/streamaspect/ElasticTransactionIndexTest.java +++ b/core/src/test/java/kafka/log/streamaspect/ElasticTransactionIndexTest.java @@ -10,6 +10,7 @@ */ package kafka.log.streamaspect; +import com.automq.stream.api.Stream; import java.io.File; import java.io.IOException; import java.util.LinkedList; @@ -17,6 +18,7 @@ import kafka.log.streamaspect.cache.FileCache; import kafka.utils.TestUtils; import org.apache.kafka.storage.internals.log.AbortedTxn; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -78,14 +80,16 @@ public void test_withFileCache() throws IOException { } @Test - public void test_withReusedFileCache() throws IOException { + public void testUniqueFileCache() throws IOException { String indexFile = TestUtils.tempFile().getPath(); String cacheFile = TestUtils.tempFile().getPath(); FileCache fileCache = new FileCache(cacheFile, 10 * 1024); - ElasticStreamSlice slice = new DefaultElasticStreamSlice(new MemoryClient.StreamImpl(1), SliceRange.of(0, Offsets.NOOP_OFFSET)); + Stream stream = new MemoryClient.StreamImpl(1); + ElasticStreamSlice slice = new DefaultElasticStreamSlice(stream, SliceRange.of(0, Offsets.NOOP_OFFSET)); ElasticTransactionIndex index = new ElasticTransactionIndex(0, new File(indexFile), new IStreamSliceSupplier(slice), fileCache); + Assertions.assertEquals(0, slice.nextOffset()); List abortedTxns = new LinkedList<>(); abortedTxns.add(new AbortedTxn(0L, 0, 10, 11)); @@ -95,13 +99,16 @@ public void test_withReusedFileCache() throws IOException { for (AbortedTxn abortedTxn : abortedTxns) { index.append(abortedTxn); } + Assertions.assertEquals((long) abortedTxns.size() * AbortedTxn.TOTAL_SIZE, slice.nextOffset()); + Assertions.assertEquals(stream.nextOffset(), slice.nextOffset()); // get from write cache assertEquals(abortedTxns, index.allAbortedTxns()); - slice = new DefaultElasticStreamSlice(new MemoryClient.StreamImpl(2), SliceRange.of(0, Offsets.NOOP_OFFSET)); + slice = new DefaultElasticStreamSlice(stream, SliceRange.of(stream.nextOffset(), Offsets.NOOP_OFFSET)); index = new ElasticTransactionIndex(0, new File(indexFile), new IStreamSliceSupplier(slice), fileCache); + Assertions.assertEquals(0, slice.nextOffset()); abortedTxns = new LinkedList<>(); abortedTxns.add(new AbortedTxn(5L, 0, 10, 11)); diff --git a/core/src/test/java/kafka/log/streamaspect/cache/FileCacheTest.java b/core/src/test/java/kafka/log/streamaspect/cache/FileCacheTest.java index 08c0f05f0d..0d51f835d8 100644 --- a/core/src/test/java/kafka/log/streamaspect/cache/FileCacheTest.java +++ b/core/src/test/java/kafka/log/streamaspect/cache/FileCacheTest.java @@ -38,65 +38,65 @@ public void test() throws IOException { FileCache fileCache = new FileCache("/tmp/file_cache_test", 10 * 1024, 1024); // occupy block 0,1 - long streamId1 = 1; - fileCache.put(streamId1, 10, genBuf((byte) 1, 2 * 1024)); + long cacheId = 1; + fileCache.put(cacheId, 10, genBuf((byte) 1, 2 * 1024)); - ByteBuf rst = fileCache.get(streamId1, 10 + 1000, 1024).get(); + ByteBuf rst = fileCache.get(cacheId, 10 + 1000, 1024).get(); assertEquals(1024, rst.readableBytes()); assertTrue(verify(rst, (byte) 1)); - Assertions.assertFalse(fileCache.get(streamId1, 10 + 1000, 2048).isPresent()); + Assertions.assertFalse(fileCache.get(cacheId, 10 + 1000, 2048).isPresent()); // occupy block 2,3 - long streamId2 = 2; - fileCache.put(streamId2, 233, genBuf((byte) 2, 1025)); + long cacheId2 = 2; + fileCache.put(cacheId2, 233, genBuf((byte) 2, 1025)); // occupy block 4~8 - fileCache.put(streamId2, 2048, genBuf((byte) 4, 1024 * 5)); + fileCache.put(cacheId2, 2048, genBuf((byte) 4, 1024 * 5)); // occupy block 9 - fileCache.put(streamId2, 10000, genBuf((byte) 5, 1024)); + fileCache.put(cacheId2, 10000, genBuf((byte) 5, 1024)); // touch lru - assertEquals(1025, fileCache.get(streamId2, 233, 1025).get().readableBytes()); - assertEquals(1024, fileCache.get(streamId2, 10000, 1024).get().readableBytes()); - assertEquals(2048, fileCache.get(streamId1, 10, 2048).get().readableBytes()); - assertEquals(1024 * 5, fileCache.get(streamId2, 2048, 1024 * 5).get().readableBytes()); + assertEquals(1025, fileCache.get(cacheId2, 233, 1025).get().readableBytes()); + assertEquals(1024, fileCache.get(cacheId2, 10000, 1024).get().readableBytes()); + assertEquals(2048, fileCache.get(cacheId, 10, 2048).get().readableBytes()); + assertEquals(1024 * 5, fileCache.get(cacheId2, 2048, 1024 * 5).get().readableBytes()); // expect evict test2-233 and test2-10000 - long streamId3 = 3; - fileCache.put(streamId3, 123, genBuf((byte) 6, 2049)); + long cacheId3 = 3; + fileCache.put(cacheId3, 123, genBuf((byte) 6, 2049)); - FileCache.Blocks blocks = fileCache.stream2cache.get(streamId3).get(123L); + FileCache.Blocks blocks = fileCache.cacheMap.get(cacheId3).get(123L); assertEquals(2049, blocks.dataLength); assertArrayEquals(new int[] {2, 3, 9}, blocks.indexes); - rst = fileCache.get(streamId3, 123, 2049).get(); + rst = fileCache.get(cacheId3, 123, 2049).get(); assertEquals(2049, rst.readableBytes()); assertTrue(verify(rst, (byte) 6)); // expect evict test1-10 and test2-2048 - long streamId4 = 4; - fileCache.put(streamId4, 123, genBuf((byte) 7, 2049)); - blocks = fileCache.stream2cache.get(streamId4).get(123L); + long cacheId4 = 4; + fileCache.put(cacheId4, 123, genBuf((byte) 7, 2049)); + blocks = fileCache.cacheMap.get(cacheId4).get(123L); assertArrayEquals(new int[] {0, 1, 4}, blocks.indexes); - rst = fileCache.get(streamId4, 123, 2049).get(); + rst = fileCache.get(cacheId4, 123, 2049).get(); assertTrue(verify(rst, (byte) 7)); assertEquals(4, fileCache.freeBlockCount); // expect occupy free blocks 5,6,7 - long streamId5 = 5; - fileCache.put(streamId5, 123, genBuf((byte) 8, 2049)); - blocks = fileCache.stream2cache.get(streamId5).get(123L); + long cacheId5 = 5; + fileCache.put(cacheId5, 123, genBuf((byte) 8, 2049)); + blocks = fileCache.cacheMap.get(cacheId5).get(123L); assertArrayEquals(new int[] {5, 6, 7}, blocks.indexes); - rst = fileCache.get(streamId5, 123, 2049).get(); + rst = fileCache.get(cacheId5, 123, 2049).get(); assertTrue(verify(rst, (byte) 8)); assertEquals(1, fileCache.freeBlockCount); - long streamId6 = 6; - fileCache.put(streamId6, 6666, genBuf((byte) 9, 3333)); - rst = fileCache.get(streamId6, 6666L, 3333).get(); + long cacheId6 = 6; + fileCache.put(cacheId6, 6666, genBuf((byte) 9, 3333)); + rst = fileCache.get(cacheId6, 6666L, 3333).get(); assertTrue(verify(rst, (byte) 9)); } @@ -104,61 +104,61 @@ public void test() throws IOException { @Test public void testMergePut() throws IOException { FileCache fileCache = new FileCache("/tmp/file_cache_test", 10 * 1024, 1024); - long streamId = 1; + long cacheId = 1; CompositeByteBuf buf; { buf = Unpooled.compositeBuffer(); buf.addComponent(true, genBuf((byte) 1, 500)); buf.addComponent(true, genBuf((byte) 2, 500)); buf.addComponent(true, genBuf((byte) 3, 500)); - fileCache.put(streamId, 3333L, buf); + fileCache.put(cacheId, 3333L, buf); } - assertEquals(1, fileCache.stream2cache.get(streamId).size()); - assertEquals(1500, fileCache.stream2cache.get(streamId).get(3333L).dataLength); - assertTrue(verify(fileCache.get(streamId, 3333L, 500).get(), (byte) 1)); - assertTrue(verify(fileCache.get(streamId, 3333L + 500, 500).get(), (byte) 2)); - assertTrue(verify(fileCache.get(streamId, 3333L + 1000, 500).get(), (byte) 3)); + assertEquals(1, fileCache.cacheMap.get(cacheId).size()); + assertEquals(1500, fileCache.cacheMap.get(cacheId).get(3333L).dataLength); + assertTrue(verify(fileCache.get(cacheId, 3333L, 500).get(), (byte) 1)); + assertTrue(verify(fileCache.get(cacheId, 3333L + 500, 500).get(), (byte) 2)); + assertTrue(verify(fileCache.get(cacheId, 3333L + 1000, 500).get(), (byte) 3)); { buf = Unpooled.compositeBuffer(); buf.addComponent(true, genBuf((byte) 2, 500)); - fileCache.put(streamId, 3333L + 500, buf); + fileCache.put(cacheId, 3333L + 500, buf); } - assertEquals(1, fileCache.stream2cache.get(streamId).size()); - assertEquals(1500, fileCache.stream2cache.get(streamId).get(3333L).dataLength); - assertTrue(verify(fileCache.get(streamId, 3333L, 500).get(), (byte) 1)); - assertTrue(verify(fileCache.get(streamId, 3333L + 500, 500).get(), (byte) 2)); - assertTrue(verify(fileCache.get(streamId, 3333L + 1000, 500).get(), (byte) 3)); + assertEquals(1, fileCache.cacheMap.get(cacheId).size()); + assertEquals(1500, fileCache.cacheMap.get(cacheId).get(3333L).dataLength); + assertTrue(verify(fileCache.get(cacheId, 3333L, 500).get(), (byte) 1)); + assertTrue(verify(fileCache.get(cacheId, 3333L + 500, 500).get(), (byte) 2)); + assertTrue(verify(fileCache.get(cacheId, 3333L + 1000, 500).get(), (byte) 3)); { buf = Unpooled.compositeBuffer(); buf.addComponent(true, genBuf((byte) 4, 500)); buf.addComponent(true, genBuf((byte) 5, 500)); buf.addComponent(true, genBuf((byte) 6, 500)); - fileCache.put(streamId, 3333L + 1000, buf); + fileCache.put(cacheId, 3333L + 1000, buf); } - assertEquals(1, fileCache.stream2cache.get(streamId).size()); - assertEquals(2500, fileCache.stream2cache.get(streamId).get(3333L).dataLength); - assertTrue(verify(fileCache.get(streamId, 3333L, 500).get(), (byte) 1)); - assertTrue(verify(fileCache.get(streamId, 3333L + 500, 500).get(), (byte) 2)); - assertTrue(verify(fileCache.get(streamId, 3333L + 1000, 500).get(), (byte) 4)); - assertTrue(verify(fileCache.get(streamId, 3333L + 1500, 500).get(), (byte) 5)); - assertTrue(verify(fileCache.get(streamId, 3333L + 2000, 500).get(), (byte) 6)); + assertEquals(1, fileCache.cacheMap.get(cacheId).size()); + assertEquals(2500, fileCache.cacheMap.get(cacheId).get(3333L).dataLength); + assertTrue(verify(fileCache.get(cacheId, 3333L, 500).get(), (byte) 1)); + assertTrue(verify(fileCache.get(cacheId, 3333L + 500, 500).get(), (byte) 2)); + assertTrue(verify(fileCache.get(cacheId, 3333L + 1000, 500).get(), (byte) 4)); + assertTrue(verify(fileCache.get(cacheId, 3333L + 1500, 500).get(), (byte) 5)); + assertTrue(verify(fileCache.get(cacheId, 3333L + 2000, 500).get(), (byte) 6)); { buf = Unpooled.compositeBuffer(); buf.addComponent(true, genBuf((byte) 7, 500)); - fileCache.put(streamId, 3333L + 1000 + 1500, buf); + fileCache.put(cacheId, 3333L + 1000 + 1500, buf); } - assertEquals(1, fileCache.stream2cache.get(streamId).size()); - assertEquals(3000, fileCache.stream2cache.get(streamId).get(3333L).dataLength); - - assertTrue(verify(fileCache.get(streamId, 3333L, 500).get(), (byte) 1)); - assertTrue(verify(fileCache.get(streamId, 3333L + 500, 500).get(), (byte) 2)); - assertTrue(verify(fileCache.get(streamId, 3333L + 1000, 500).get(), (byte) 4)); - assertTrue(verify(fileCache.get(streamId, 3333L + 1500, 500).get(), (byte) 5)); - assertTrue(verify(fileCache.get(streamId, 3333L + 2000, 500).get(), (byte) 6)); - assertTrue(verify(fileCache.get(streamId, 3333L + 2500, 500).get(), (byte) 7)); + assertEquals(1, fileCache.cacheMap.get(cacheId).size()); + assertEquals(3000, fileCache.cacheMap.get(cacheId).get(3333L).dataLength); + + assertTrue(verify(fileCache.get(cacheId, 3333L, 500).get(), (byte) 1)); + assertTrue(verify(fileCache.get(cacheId, 3333L + 500, 500).get(), (byte) 2)); + assertTrue(verify(fileCache.get(cacheId, 3333L + 1000, 500).get(), (byte) 4)); + assertTrue(verify(fileCache.get(cacheId, 3333L + 1500, 500).get(), (byte) 5)); + assertTrue(verify(fileCache.get(cacheId, 3333L + 2000, 500).get(), (byte) 6)); + assertTrue(verify(fileCache.get(cacheId, 3333L + 2500, 500).get(), (byte) 7)); } ByteBuf genBuf(byte data, int length) {