Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
public class ElasticTimeIndex extends TimeIndex {
private final File file;
private final FileCache cache;
private final long cacheId;
final ElasticStreamSlice stream;

private volatile CompletableFuture<?> lastAppend = CompletableFuture.completedFuture(null);
Expand All @@ -46,6 +47,7 @@ public ElasticTimeIndex(
super(file, baseOffset, maxIndexSize, true, true);
this.file = file;
this.cache = cache;
this.cacheId = cache.newCacheId();
this.stream = sliceSupplier.get();
setEntries((int) (stream.nextOffset() / ENTRY_SIZE));
if (entries() == 0) {
Expand Down Expand Up @@ -132,7 +134,7 @@ public void maybeAppend(long timestamp, long offset, boolean skipFullCheck) {
buffer.flip();
long position = stream.nextOffset();
lastAppend = stream.append(RawPayloadRecordBatch.of(buffer));
cache.put(stream.stream().streamId(), position, Unpooled.wrappedBuffer(buffer));
cache.put(cacheId, position, Unpooled.wrappedBuffer(buffer));
incrementEntries();
lastEntry(new TimestampOffset(timestamp, offset));
}
Expand Down Expand Up @@ -249,8 +251,8 @@ protected TimestampOffset parseEntry(ByteBuffer buffer, int n) {
return parseEntry(n);
}

private TimestampOffset tryGetEntryFromCache(int n) {
Optional<ByteBuf> rst = cache.get(stream.stream().streamId(), (long) n * ENTRY_SIZE, ENTRY_SIZE);
TimestampOffset tryGetEntryFromCache(int n) {
Optional<ByteBuf> rst = cache.get(cacheId, (long) n * ENTRY_SIZE, ENTRY_SIZE);
if (rst.isPresent()) {
ByteBuf buffer = rst.get();
return new TimestampOffset(buffer.readLong(), baseOffset() + buffer.readInt());
Expand Down Expand Up @@ -292,7 +294,7 @@ private TimestampOffset parseEntry0(int n) throws ExecutionException, Interrupte
}
ByteBuf buf = Unpooled.buffer(records.size() * ENTRY_SIZE);
records.forEach(record -> buf.writeBytes(record.rawPayload()));
cache.put(stream.stream().streamId(), startOffset, buf);
cache.put(cacheId, startOffset, buf);
ByteBuf indexEntry = Unpooled.wrappedBuffer(records.get(0).rawPayload());
timestampOffset = new TimestampOffset(indexEntry.readLong(), baseOffset() + indexEntry.readInt());
rst.free();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class ElasticTransactionIndex extends TransactionIndex {
ElasticStreamSlice stream;
private final FileCache cache;
private final String path;
private final long cacheId;
private volatile LastAppend lastAppend;

private boolean closed = false;
Expand All @@ -47,6 +48,7 @@ public ElasticTransactionIndex(long startOffset, File file, StreamSliceSupplier
this.streamSupplier = streamSupplier;
this.stream = streamSupplier.get();
this.cache = cache;
this.cacheId = cache.newCacheId();
this.path = file.getPath();
lastAppend = new LastAppend(stream.nextOffset(), CompletableFuture.completedFuture(null));
}
Expand All @@ -70,7 +72,7 @@ public void append(AbortedTxn abortedTxn) {
long position = stream.nextOffset();
CompletableFuture<?> cf = stream.append(RawPayloadRecordBatch.of(abortedTxn.buffer().duplicate()));
lastAppend = new LastAppend(stream.nextOffset(), cf);
cache.put(stream.stream().streamId(), position, Unpooled.wrappedBuffer(abortedTxn.buffer()));
cache.put(cacheId, position, Unpooled.wrappedBuffer(abortedTxn.buffer()));
}

@Override
Expand Down Expand Up @@ -180,15 +182,15 @@ public AbortedTxnWithPosition next() {
return item;
}
int endOffset = Math.min(position.value + AbortedTxn.TOTAL_SIZE * 128, endPosition);
Optional<ByteBuf> cacheDataOpt = cache.get(stream.stream().streamId(), position.value, endOffset - position.value);
Optional<ByteBuf> cacheDataOpt = cache.get(cacheId, position.value, endOffset - position.value);
ByteBuf buf;
if (cacheDataOpt.isPresent()) {
buf = cacheDataOpt.get();
} else {
FetchResult records = fetchStream(position.value, endOffset, endOffset - position.value);
ByteBuf txnListBuf = Unpooled.buffer(records.recordBatchList().size() * AbortedTxn.TOTAL_SIZE);
records.recordBatchList().forEach(r -> txnListBuf.writeBytes(r.rawPayload()));
cache.put(stream.stream().streamId(), position.value, txnListBuf);
cache.put(cacheId, position.value, txnListBuf);
records.free();
buf = txnListBuf;
}
Expand Down
42 changes: 23 additions & 19 deletions core/src/main/scala/kafka/log/streamaspect/cache/FileCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import kafka.log.streamaspect.ElasticTimeIndex;
import kafka.log.streamaspect.ElasticTransactionIndex;
Expand Down Expand Up @@ -61,13 +62,13 @@ public class FileCache {
*/
private final LRUCache<Key, Blocks> lru = new LRUCache<>();
/**
* The cache of streamId to cache blocks.
* The map of cacheId to cache blocks.
* Its value is a {@link NavigableMap} which is used to store the cache blocks in the order of the position.
*
* @see Key#streamId
* @see Key#cacheId
* @see Key#position
*/
final Map<Long /* streamId */, NavigableMap<Long /* position /*/, Blocks>> stream2cache = new HashMap<>();
final Map<Long /* segment-unique id */, NavigableMap<Long /* position /*/, Blocks>> cacheMap = new HashMap<>();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
Expand All @@ -82,6 +83,7 @@ public class FileCache {
*/
private int freeCheckPoint = 0;
private final MappedByteBuffer cacheByteBuffer;
private final AtomicLong cacheIdAlloc = new AtomicLong(0);

public FileCache(String path, int size, int blockSize) throws IOException {
this.blockSize = blockSize;
Expand Down Expand Up @@ -110,11 +112,15 @@ public FileCache(String path, int size) throws IOException {
this(path, size, BLOCK_SIZE);
}

public void put(long streamId, long position, ByteBuf data) {
public long newCacheId() {
return cacheIdAlloc.incrementAndGet();
}

public void put(long cacheId, long position, ByteBuf data) {
writeLock.lock();
try {
int dataLength = data.readableBytes();
NavigableMap<Long, Blocks> cache = stream2cache.computeIfAbsent(streamId, k -> new TreeMap<>());
NavigableMap<Long, Blocks> cache = cacheMap.computeIfAbsent(cacheId, k -> new TreeMap<>());
Map.Entry<Long, Blocks> pos2block = cache.floorEntry(position);
long cacheStartPosition;
long cacheEndPosition;
Expand Down Expand Up @@ -149,7 +155,7 @@ public void put(long streamId, long position, ByteBuf data) {
blocks = new Blocks(blocks.indexes, newDataLength);
}
cache.put(cacheStartPosition, blocks);
lru.put(new Key(streamId, cacheStartPosition), blocks);
lru.put(new Key(cacheId, cacheStartPosition), blocks);

// write data to cache
ByteBuffer cacheByteBuffer = this.cacheByteBuffer.duplicate();
Expand All @@ -174,11 +180,11 @@ public void put(long streamId, long position, ByteBuf data) {
}
}

public Optional<ByteBuf> get(long streamId, long position, int length) {
public Optional<ByteBuf> get(long cacheId, long position, int length) {
ByteBuf buf = Unpooled.buffer(length);
readLock.lock();
try {
NavigableMap<Long, Blocks> cache = stream2cache.get(streamId);
NavigableMap<Long, Blocks> cache = cacheMap.get(cacheId);
if (cache == null) {
return Optional.empty();
}
Expand All @@ -191,7 +197,7 @@ public Optional<ByteBuf> get(long streamId, long position, int length) {
if (entry.getKey() + entry.getValue().dataLength < position + length) {
return Optional.empty();
}
lru.touchIfExist(new Key(streamId, cacheStartPosition));
lru.touchIfExist(new Key(cacheId, cacheStartPosition));
MappedByteBuffer cacheByteBuffer = this.cacheByteBuffer.duplicate();
long nextPosition = position;
int remaining = length;
Expand Down Expand Up @@ -237,7 +243,7 @@ private int[] ensureCapacity(long cacheStartPosition, int size) {
}
Key key = entry.getKey();
Blocks blocks = entry.getValue();
stream2cache.get(key.streamId).remove(key.position);
cacheMap.get(key.cacheId).remove(key.position);
if (key.position == cacheStartPosition) {
// eviction is conflict to current cache
for (int i = 0; i < acquiringBlockIndex; i++) {
Expand Down Expand Up @@ -279,11 +285,11 @@ private int align(int size) {
}

static class Key implements Comparable<Key> {
Long streamId;
long cacheId;
long position;

public Key(Long streamId, long position) {
this.streamId = streamId;
public Key(Long cacheId, long position) {
this.cacheId = cacheId;
this.position = position;
}

Expand All @@ -294,20 +300,18 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass())
return false;
Key key = (Key) o;
return position == key.position && Objects.equals(streamId, key.streamId);
return position == key.position && Objects.equals(cacheId, key.cacheId);
}

@Override
public int hashCode() {
return Objects.hash(streamId, position);
return Objects.hash(cacheId, position);
}

@Override
public int compareTo(Key o) {
if (this.streamId.compareTo(o.streamId) != 0) {
return this.streamId.compareTo(o.streamId);
}
return Long.compare(this.position, o.position);
int compareCacheId = Long.compare(cacheId, o.cacheId);
return compareCacheId == 0 ? Long.compare(position, o.position) : compareCacheId;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@
*/
package kafka.log.streamaspect;

import com.automq.stream.api.Stream;
import java.io.IOException;
import java.util.List;
import kafka.log.streamaspect.cache.FileCache;
import kafka.utils.TestUtils;
import org.apache.kafka.storage.internals.log.TimestampOffset;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -47,6 +49,29 @@ public void testLookUp() throws IOException {

}

@Test
public void testUniqueFileCache() throws IOException {
FileCache cache = new FileCache(TestUtils.tempFile().getPath(), 10 * 1024);
Stream stream = new MemoryClient.StreamImpl(1);
ElasticStreamSlice slice1 = new DefaultElasticStreamSlice(stream, SliceRange.of(0, Offsets.NOOP_OFFSET));
ElasticTimeIndex idx1 = new ElasticTimeIndex(TestUtils.tempFile(), baseOffset, maxEntries * 12,
new IStreamSliceSupplier(slice1), TimestampOffset.UNKNOWN, cache);
long now = System.currentTimeMillis();
idx1.maybeAppend(System.currentTimeMillis(), 100L, false);
TimestampOffset to = idx1.tryGetEntryFromCache(0);
Assertions.assertEquals(now, to.timestamp);
Assertions.assertEquals(100L, to.offset);
Assertions.assertEquals(12, slice1.nextOffset());

ElasticStreamSlice slice2 = new DefaultElasticStreamSlice(stream, SliceRange.of(stream.nextOffset(), Offsets.NOOP_OFFSET));
ElasticTimeIndex idx2 = new ElasticTimeIndex(TestUtils.tempFile(), baseOffset, maxEntries * 12,
new IStreamSliceSupplier(slice2), TimestampOffset.UNKNOWN, cache);
Assertions.assertEquals(0, slice2.nextOffset());

to = idx2.tryGetEntryFromCache(0);
Assertions.assertEquals(TimestampOffset.UNKNOWN, to);
}

void appendEntries(ElasticTimeIndex idx, int numEntries) {
for (int i = 1; i < numEntries; i++) {
idx.maybeAppend(i * 10L, i * 10L + baseOffset, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@
*/
package kafka.log.streamaspect;

import com.automq.stream.api.Stream;
import java.io.File;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import kafka.log.streamaspect.cache.FileCache;
import kafka.utils.TestUtils;
import org.apache.kafka.storage.internals.log.AbortedTxn;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -78,14 +80,16 @@ public void test_withFileCache() throws IOException {
}

@Test
public void test_withReusedFileCache() throws IOException {
public void testUniqueFileCache() throws IOException {
String indexFile = TestUtils.tempFile().getPath();
String cacheFile = TestUtils.tempFile().getPath();

FileCache fileCache = new FileCache(cacheFile, 10 * 1024);
ElasticStreamSlice slice = new DefaultElasticStreamSlice(new MemoryClient.StreamImpl(1), SliceRange.of(0, Offsets.NOOP_OFFSET));
Stream stream = new MemoryClient.StreamImpl(1);
ElasticStreamSlice slice = new DefaultElasticStreamSlice(stream, SliceRange.of(0, Offsets.NOOP_OFFSET));
ElasticTransactionIndex index = new ElasticTransactionIndex(0, new File(indexFile), new IStreamSliceSupplier(slice),
fileCache);
Assertions.assertEquals(0, slice.nextOffset());

List<AbortedTxn> abortedTxns = new LinkedList<>();
abortedTxns.add(new AbortedTxn(0L, 0, 10, 11));
Expand All @@ -95,13 +99,16 @@ public void test_withReusedFileCache() throws IOException {
for (AbortedTxn abortedTxn : abortedTxns) {
index.append(abortedTxn);
}
Assertions.assertEquals((long) abortedTxns.size() * AbortedTxn.TOTAL_SIZE, slice.nextOffset());
Assertions.assertEquals(stream.nextOffset(), slice.nextOffset());

// get from write cache
assertEquals(abortedTxns, index.allAbortedTxns());

slice = new DefaultElasticStreamSlice(new MemoryClient.StreamImpl(2), SliceRange.of(0, Offsets.NOOP_OFFSET));
slice = new DefaultElasticStreamSlice(stream, SliceRange.of(stream.nextOffset(), Offsets.NOOP_OFFSET));
index = new ElasticTransactionIndex(0, new File(indexFile), new IStreamSliceSupplier(slice),
fileCache);
Assertions.assertEquals(0, slice.nextOffset());

abortedTxns = new LinkedList<>();
abortedTxns.add(new AbortedTxn(5L, 0, 10, 11));
Expand Down
Loading