Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 88 additions & 44 deletions core/src/main/scala/kafka/log/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,26 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;

public class S3Storage implements Storage {
private static final Logger LOGGER = LoggerFactory.getLogger(S3Storage.class);
private final BlockingQueue<WalWriteRequest> waitingLogConfirmedRequests;
private final WriteAheadLog log;
private final LogCache logCache;
private final AtomicLong logConfirmOffset = new AtomicLong();
private final AtomicLong processedLogConfirmOffset = new AtomicLong(-1L);
private final WALCallbackSequencer callbackSequencer = new WALCallbackSequencer();
private final ScheduledExecutorService mainExecutor = Executors.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("s3-storage-main", false));
private final ScheduledExecutorService backgroundExecutor = Executors.newSingleThreadScheduledExecutor(
Expand All @@ -55,7 +58,6 @@ public class S3Storage implements Storage {
private final S3BlockCache blockCache;

public S3Storage(WriteAheadLog log, ObjectManager objectManager, S3BlockCache blockCache, S3Operator s3Operator) {
this.waitingLogConfirmedRequests = new ArrayBlockingQueue<>(16384);
this.log = log;
this.logCache = new LogCache(512 * 1024 * 1024);
this.objectManager = objectManager;
Expand All @@ -76,17 +78,8 @@ public CompletableFuture<Void> append(StreamRecordBatch streamRecord) {
WriteAheadLog.AppendResult appendResult = log.append(flatStreamRecordBatch.encodedBuf.duplicate());
CompletableFuture<Void> cf = new CompletableFuture<>();
WalWriteRequest writeRequest = new WalWriteRequest(flatStreamRecordBatch, appendResult.offset, cf);
try {
waitingLogConfirmedRequests.put(writeRequest);
} catch (InterruptedException e) {
cf.completeExceptionally(e);
}
appendResult.future.thenAccept(nil -> {
// TODO: callback is out of order, we need reorder ack in stream dimension.
// TODO: cache end offset update should consider log hollow.
logConfirmOffset.getAndUpdate(operand -> Math.max(operand, appendResult.offset));
handleAppendCallback(writeRequest);
});
callbackSequencer.before(writeRequest);
appendResult.future.thenAccept(nil -> handleAppendCallback(writeRequest));
return cf;
}

Expand Down Expand Up @@ -130,37 +123,19 @@ public CompletableFuture<Void> forceUpload(long streamId) {

private void handleAppendCallback(WalWriteRequest request) {
mainExecutor.execute(() -> {
putToCache(request);
if (processedLogConfirmOffset.get() == logConfirmOffset.get()) {
return;
List<WalWriteRequest> waitingAckRequests = callbackSequencer.after(request);
for (WalWriteRequest waitingAckRequest : waitingAckRequests) {
if (logCache.put(waitingAckRequest.record)) {
// cache block is full, trigger WAL object upload.
logCache.setConfirmOffset(callbackSequencer.getWALConfirmOffset());
LogCache.LogCacheBlock logCacheBlock = logCache.archiveCurrentBlock();
uploadWALObject(logCacheBlock);
}
waitingAckRequest.cf.complete(null);
}
tryCallback();
});
}

private void tryCallback() {
long walConfirmOffset = this.logConfirmOffset.get();
for (; ; ) {
WalWriteRequest request = waitingLogConfirmedRequests.peek();
if (request == null) break;
if (request.offset <= walConfirmOffset) {
waitingLogConfirmedRequests.poll();
request.cf.complete(null);
} else {
break;
}
}
processedLogConfirmOffset.set(walConfirmOffset);
}

private void putToCache(WalWriteRequest request) {
if (logCache.put(request.record, request.offset)) {
// cache block is full, trigger WAL object upload.
LogCache.LogCacheBlock logCacheBlock = logCache.archiveCurrentBlock();
uploadWALObject(logCacheBlock);
}
}

private CompletableFuture<Void> uploadWALObject(LogCache.LogCacheBlock logCacheBlock) {
CompletableFuture<Void> cf = new CompletableFuture<>();
backgroundExecutor.execute(() -> uploadWALObject0(logCacheBlock, cf));
Expand All @@ -174,7 +149,7 @@ private void uploadWALObject0(LogCache.LogCacheBlock logCacheBlock, CompletableF
walObjectUploadTask.prepare().get();
walObjectUploadTask.upload().get();
walObjectUploadTask.commit().get();
log.trim(logCacheBlock.maxOffset());
log.trim(logCacheBlock.confirmOffset());
freeCache(logCacheBlock.blockId());
cf.complete(null);
} catch (Throwable e) {
Expand All @@ -186,4 +161,73 @@ private void uploadWALObject0(LogCache.LogCacheBlock logCacheBlock, CompletableF
private void freeCache(long blockId) {
mainExecutor.execute(() -> logCache.free(blockId));
}

static class WALCallbackSequencer {
public static final long NOOP_OFFSET = -1L;
private final Map<Long, Queue<WalWriteRequest>> stream2requests = new ConcurrentHashMap<>();
private final BlockingQueue<WalWriteRequest> walRequests = new ArrayBlockingQueue<>(4096);
private long walConfirmOffset = NOOP_OFFSET;

/**
* Add request to stream sequence queue.
*/
public void before(WalWriteRequest request) {
try {
walRequests.put(request);
Queue<WalWriteRequest> streamRequests = stream2requests.computeIfAbsent(request.record.streamId, s -> new LinkedBlockingQueue<>());
streamRequests.add(request);
} catch (InterruptedException ex) {
request.cf.completeExceptionally(ex);
}
}

/**
* Try pop sequence persisted request from stream queue and move forward wal inclusive confirm offset.
*
* @return popped sequence persisted request.
*/
public List<WalWriteRequest> after(WalWriteRequest request) {
request.persisted = true;
// move the WAL inclusive confirm offset.
for (; ; ) {
WalWriteRequest peek = walRequests.peek();
if (peek == null || !peek.persisted) {
break;
}
walRequests.poll();
walConfirmOffset = peek.offset;
}

// pop sequence success stream request.
long streamId = request.record.streamId;
Queue<WalWriteRequest> streamRequests = stream2requests.get(streamId);
WalWriteRequest peek = streamRequests.peek();
if (peek == null || peek.offset != request.offset) {
return Collections.emptyList();
}
List<WalWriteRequest> rst = new ArrayList<>();
rst.add(streamRequests.poll());
for (; ; ) {
peek = streamRequests.peek();
if (peek == null || !peek.persisted) {
break;
}
rst.add(streamRequests.poll());
}
if (streamRequests.isEmpty()) {
stream2requests.computeIfPresent(streamId, (id, requests) -> requests.isEmpty() ? null : requests);
}
return rst;
}

/**
* Get WAL inclusive confirm offset.
*
* @return inclusive confirm offset.
*/
public long getWALConfirmOffset() {
return walConfirmOffset;
}

}
}
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/log/s3/WalWriteRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public class WalWriteRequest implements Comparable<WalWriteRequest> {
final FlatStreamRecordBatch record;
final long offset;
final CompletableFuture<Void> cf;
boolean persisted;

public WalWriteRequest(FlatStreamRecordBatch record, long offset, CompletableFuture<Void> cf) {
this.record = record;
Expand Down
19 changes: 12 additions & 7 deletions core/src/main/scala/kafka/log/s3/cache/LogCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,15 @@ public class LogCache {
private final int cacheBlockMaxSize;
private final List<LogCacheBlock> archiveBlocks = new ArrayList<>();
private LogCacheBlock activeBlock;
private long confirmOffset;

public LogCache(int cacheBlockMaxSize) {
this.cacheBlockMaxSize = cacheBlockMaxSize;
this.activeBlock = new LogCacheBlock(cacheBlockMaxSize);
}

public boolean put(FlatStreamRecordBatch recordBatch, long offset) {
return activeBlock.put(recordBatch, offset);
public boolean put(FlatStreamRecordBatch recordBatch) {
return activeBlock.put(recordBatch);
}

/**
Expand Down Expand Up @@ -70,6 +71,7 @@ public List<FlatStreamRecordBatch> get(long streamId, long startOffset, long end

public LogCacheBlock archiveCurrentBlock() {
LogCacheBlock block = activeBlock;
block.confirmOffset = confirmOffset;
archiveBlocks.add(block);
activeBlock = new LogCacheBlock(cacheBlockMaxSize);
return block;
Expand All @@ -87,13 +89,17 @@ public void free(long blockId) {
archiveBlocks.removeIf(b -> b.blockId == blockId);
}

public void setConfirmOffset(long confirmOffset) {
this.confirmOffset = confirmOffset;
}

public static class LogCacheBlock {
private static final AtomicLong BLOCK_ID_ALLOC = new AtomicLong();
private final long blockId;
private final int maxSize;
private final Map<Long, List<FlatStreamRecordBatch>> map = new HashMap<>();
private int size = 0;
private long maxOffset;
private long confirmOffset;

public LogCacheBlock(int maxSize) {
this.blockId = BLOCK_ID_ALLOC.getAndIncrement();
Expand All @@ -104,12 +110,11 @@ public long blockId() {
return blockId;
}

public boolean put(FlatStreamRecordBatch recordBatch, long offset) {
public boolean put(FlatStreamRecordBatch recordBatch) {
List<FlatStreamRecordBatch> streamCache = map.computeIfAbsent(recordBatch.streamId, id -> new ArrayList<>());
streamCache.add(recordBatch);
int recordSize = recordBatch.encodedBuf.readableBytes();
size += recordSize;
maxOffset = offset;
return size >= maxSize;
}

Expand Down Expand Up @@ -145,8 +150,8 @@ public Map<Long, List<FlatStreamRecordBatch>> records() {
return map;
}

public long maxOffset() {
return maxOffset;
public long confirmOffset() {
return confirmOffset;
}

}
Expand Down
27 changes: 25 additions & 2 deletions core/src/test/java/kafka/log/s3/S3StorageTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -91,7 +92,29 @@ public void testAppend() throws Exception {
}

@Test
public void testAppend_outOfOrder() {
// TODO: test out of order write task complete.
public void testWALCallbackSequencer() {
S3Storage.WALCallbackSequencer seq = new S3Storage.WALCallbackSequencer();
WalWriteRequest r0 = new WalWriteRequest(newRecord(233L, 10L), 100L, new CompletableFuture<>());
seq.before(r0);
WalWriteRequest r1 = new WalWriteRequest(newRecord(233L, 11L), 101L, new CompletableFuture<>());
seq.before(r1);
WalWriteRequest r2 = new WalWriteRequest(newRecord(234L, 20L), 102L, new CompletableFuture<>());
seq.before(r2);
WalWriteRequest r3 = new WalWriteRequest(newRecord(234L, 21L), 103L, new CompletableFuture<>());
seq.before(r3);

assertEquals(Collections.emptyList(), seq.after(r3));
assertEquals(-1L, seq.getWALConfirmOffset());
assertEquals(List.of(r2, r3), seq.after(r2));
assertEquals(-1L, seq.getWALConfirmOffset());
assertEquals(List.of(r0), seq.after(r0));
assertEquals(100L, seq.getWALConfirmOffset());
assertEquals(List.of(r1), seq.after(r1));
assertEquals(103L, seq.getWALConfirmOffset());
}

private static FlatStreamRecordBatch newRecord(long streamId, long offset) {
StreamRecordBatch recordBatch = new StreamRecordBatch(streamId, 0, offset, DefaultRecordBatch.of(1, 1024));
return FlatStreamRecordBatch.from(recordBatch);
}
}