diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticLogSegment.scala b/core/src/main/scala/kafka/log/streamaspect/ElasticLogSegment.scala index 466a8b8072..1d79f43f9e 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticLogSegment.scala +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticLogSegment.scala @@ -350,10 +350,12 @@ class ElasticLogSegment(val _meta: ElasticStreamSegmentMeta, } override def findOffsetByTimestamp(timestamp: Long, startingOffset: Long = baseOffset): Option[TimestampAndOffset] = { + // TODO: async the find to avoid blocking the thread // Get the index entry with a timestamp less than or equal to the target timestamp val timestampOffset = timeIndex.lookup(timestamp) // Search the timestamp - Option(_log.searchForTimestamp(timestamp, timestampOffset.offset)) + val rst = Option(_log.searchForTimestamp(timestamp, timestampOffset.offset)) + rst } diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticTimeIndex.scala b/core/src/main/scala/kafka/log/streamaspect/ElasticTimeIndex.scala index f2c2615c29..864976fd55 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticTimeIndex.scala +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticTimeIndex.scala @@ -96,7 +96,8 @@ class ElasticTimeIndex(__file: File, streamSegmentSupplier: StreamSliceSupplier, var timestampOffset = tryGetEntryFromCache(n) if (timestampOffset == TimestampOffset.Unknown) { // cache missing, try read from remote and put it to cache. - val rst = stream.fetch(startOffset, Math.min(_entries * entrySize, startOffset + 16 * 1024)).get() + // the index interval is 1MiB and the segment size is 1GB, so binary search only need 512 entries + val rst = stream.fetch(startOffset, Math.min(_entries * entrySize, startOffset + entrySize * 512)).get() val records = rst.recordBatchList() if (records.size() == 0) { throw new IllegalStateException(s"fetch empty from stream $stream at offset $startOffset") diff --git a/core/src/main/scala/kafka/log/streamaspect/cache/FileCache.java b/core/src/main/scala/kafka/log/streamaspect/cache/FileCache.java index 1a3e3044d6..7d334a6502 100644 --- a/core/src/main/scala/kafka/log/streamaspect/cache/FileCache.java +++ b/core/src/main/scala/kafka/log/streamaspect/cache/FileCache.java @@ -119,25 +119,25 @@ public Optional get(String filePath, long position, int length) { if (entry == null) { return Optional.empty(); } - long filePosition = entry.getKey(); + long cacheStartPosition = entry.getKey(); Value value = entry.getValue(); if (entry.getKey() + entry.getValue().dataLength < position + length) { return Optional.empty(); } - lru.touch(new Key(filePath, filePosition)); + lru.touch(new Key(filePath, cacheStartPosition)); MappedByteBuffer cacheByteBuffer = this.cacheByteBuffer.duplicate(); + long nextPosition = position; int remaining = length; - long blockFilePosition = filePosition - blockSize; - for (int blockIndex : value.blocks) { - blockFilePosition += blockSize; - int blockCachePosition = blockIndex * blockSize; - if (blockCachePosition + blockSize < position) { + for (int i = 0; i < value.blocks.length; i++) { + long cacheBlockEndPosition = cacheStartPosition + (long) (i + 1) * blockSize; + if (cacheBlockEndPosition < nextPosition) { continue; } - int cachePosition = blockCachePosition + (int) (position - blockFilePosition); - buf.writeBytes(cacheByteBuffer.slice(cachePosition, Math.min(remaining, blockSize))); - remaining -= blockSize; - position = blockFilePosition + blockSize; + long cacheBlockStartPosition = cacheBlockEndPosition - blockSize; + int readSize = (int) Math.min(remaining, cacheBlockEndPosition - nextPosition); + buf.writeBytes(cacheByteBuffer.slice(value.blocks[i] * blockSize + (int) (nextPosition - cacheBlockStartPosition), readSize)); + remaining -= readSize; + nextPosition += readSize; if (remaining <= 0) { break; } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 9bc4c84405..2dd27a7f87 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -139,7 +139,7 @@ object Defaults { val LogCleanerMinCompactionLagMs = 0L val LogCleanerMaxCompactionLagMs = Long.MaxValue val LogIndexSizeMaxBytes = 10 * 1024 * 1024 - val LogIndexIntervalBytes = 4096 + val LogIndexIntervalBytes = 1024 * 1024 val LogFlushIntervalMessages = Long.MaxValue val LogDeleteDelayMs = 60000 val LogFlushSchedulerIntervalMs = Long.MaxValue diff --git a/core/src/test/java/kafka/log/streamaspect/cache/FileCacheTest.java b/core/src/test/java/kafka/log/streamaspect/cache/FileCacheTest.java index 7802b921e8..dc27141862 100644 --- a/core/src/test/java/kafka/log/streamaspect/cache/FileCacheTest.java +++ b/core/src/test/java/kafka/log/streamaspect/cache/FileCacheTest.java @@ -66,7 +66,7 @@ public void test() throws IOException { FileCache.Value value = fileCache.path2cache.get("test3").get(123L); assertEquals(2049, value.dataLength); - assertArrayEquals(new int[] {2, 3, 9}, value.blocks); + assertArrayEquals(new int[]{2, 3, 9}, value.blocks); rst = fileCache.get("test3", 123, 2049).get(); @@ -76,7 +76,7 @@ public void test() throws IOException { // expect evict test1-10 and test2-2048 fileCache.put("test4", 123, genBuf((byte) 7, 2049)); value = fileCache.path2cache.get("test4").get(123L); - assertArrayEquals(new int[] {0, 1, 4}, value.blocks); + assertArrayEquals(new int[]{0, 1, 4}, value.blocks); rst = fileCache.get("test4", 123, 2049).get(); assertTrue(verify(rst, (byte) 7)); @@ -85,10 +85,15 @@ public void test() throws IOException { // expect occupy free blocks 5,6,7 fileCache.put("test5", 123, genBuf((byte) 8, 2049)); value = fileCache.path2cache.get("test5").get(123L); - assertArrayEquals(new int[] {5, 6, 7}, value.blocks); + assertArrayEquals(new int[]{5, 6, 7}, value.blocks); rst = fileCache.get("test5", 123, 2049).get(); assertTrue(verify(rst, (byte) 8)); assertEquals(1, fileCache.freeBlockCount); + + fileCache.put("test6", 6666, genBuf((byte) 9, 3333)); + rst = fileCache.get("test6", 6666L, 3333).get(); + assertTrue(verify(rst, (byte) 9)); + } ByteBuf genBuf(byte data, int length) {