diff --git a/core/src/main/scala/kafka/log/s3/S3Storage.java b/core/src/main/scala/kafka/log/s3/S3Storage.java index 123d63249d..b17f98b02f 100644 --- a/core/src/main/scala/kafka/log/s3/S3Storage.java +++ b/core/src/main/scala/kafka/log/s3/S3Storage.java @@ -29,23 +29,26 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.atomic.AtomicLong; public class S3Storage implements Storage { private static final Logger LOGGER = LoggerFactory.getLogger(S3Storage.class); - private final BlockingQueue waitingLogConfirmedRequests; private final WriteAheadLog log; private final LogCache logCache; - private final AtomicLong logConfirmOffset = new AtomicLong(); - private final AtomicLong processedLogConfirmOffset = new AtomicLong(-1L); + private final WALCallbackSequencer callbackSequencer = new WALCallbackSequencer(); private final ScheduledExecutorService mainExecutor = Executors.newSingleThreadScheduledExecutor( ThreadUtils.createThreadFactory("s3-storage-main", false)); private final ScheduledExecutorService backgroundExecutor = Executors.newSingleThreadScheduledExecutor( @@ -55,7 +58,6 @@ public class S3Storage implements Storage { private final S3BlockCache blockCache; public S3Storage(WriteAheadLog log, ObjectManager objectManager, S3BlockCache blockCache, S3Operator s3Operator) { - this.waitingLogConfirmedRequests = new ArrayBlockingQueue<>(16384); this.log = log; this.logCache = new LogCache(512 * 1024 * 1024); this.objectManager = objectManager; @@ -76,17 +78,8 @@ public CompletableFuture append(StreamRecordBatch streamRecord) { WriteAheadLog.AppendResult appendResult = log.append(flatStreamRecordBatch.encodedBuf.duplicate()); CompletableFuture cf = new CompletableFuture<>(); WalWriteRequest writeRequest = new WalWriteRequest(flatStreamRecordBatch, appendResult.offset, cf); - try { - waitingLogConfirmedRequests.put(writeRequest); - } catch (InterruptedException e) { - cf.completeExceptionally(e); - } - appendResult.future.thenAccept(nil -> { - // TODO: callback is out of order, we need reorder ack in stream dimension. - // TODO: cache end offset update should consider log hollow. - logConfirmOffset.getAndUpdate(operand -> Math.max(operand, appendResult.offset)); - handleAppendCallback(writeRequest); - }); + callbackSequencer.before(writeRequest); + appendResult.future.thenAccept(nil -> handleAppendCallback(writeRequest)); return cf; } @@ -130,37 +123,19 @@ public CompletableFuture forceUpload(long streamId) { private void handleAppendCallback(WalWriteRequest request) { mainExecutor.execute(() -> { - putToCache(request); - if (processedLogConfirmOffset.get() == logConfirmOffset.get()) { - return; + List waitingAckRequests = callbackSequencer.after(request); + for (WalWriteRequest waitingAckRequest : waitingAckRequests) { + if (logCache.put(waitingAckRequest.record)) { + // cache block is full, trigger WAL object upload. + logCache.setConfirmOffset(callbackSequencer.getWALConfirmOffset()); + LogCache.LogCacheBlock logCacheBlock = logCache.archiveCurrentBlock(); + uploadWALObject(logCacheBlock); + } + waitingAckRequest.cf.complete(null); } - tryCallback(); }); } - private void tryCallback() { - long walConfirmOffset = this.logConfirmOffset.get(); - for (; ; ) { - WalWriteRequest request = waitingLogConfirmedRequests.peek(); - if (request == null) break; - if (request.offset <= walConfirmOffset) { - waitingLogConfirmedRequests.poll(); - request.cf.complete(null); - } else { - break; - } - } - processedLogConfirmOffset.set(walConfirmOffset); - } - - private void putToCache(WalWriteRequest request) { - if (logCache.put(request.record, request.offset)) { - // cache block is full, trigger WAL object upload. - LogCache.LogCacheBlock logCacheBlock = logCache.archiveCurrentBlock(); - uploadWALObject(logCacheBlock); - } - } - private CompletableFuture uploadWALObject(LogCache.LogCacheBlock logCacheBlock) { CompletableFuture cf = new CompletableFuture<>(); backgroundExecutor.execute(() -> uploadWALObject0(logCacheBlock, cf)); @@ -174,7 +149,7 @@ private void uploadWALObject0(LogCache.LogCacheBlock logCacheBlock, CompletableF walObjectUploadTask.prepare().get(); walObjectUploadTask.upload().get(); walObjectUploadTask.commit().get(); - log.trim(logCacheBlock.maxOffset()); + log.trim(logCacheBlock.confirmOffset()); freeCache(logCacheBlock.blockId()); cf.complete(null); } catch (Throwable e) { @@ -186,4 +161,73 @@ private void uploadWALObject0(LogCache.LogCacheBlock logCacheBlock, CompletableF private void freeCache(long blockId) { mainExecutor.execute(() -> logCache.free(blockId)); } + + static class WALCallbackSequencer { + public static final long NOOP_OFFSET = -1L; + private final Map> stream2requests = new ConcurrentHashMap<>(); + private final BlockingQueue walRequests = new ArrayBlockingQueue<>(4096); + private long walConfirmOffset = NOOP_OFFSET; + + /** + * Add request to stream sequence queue. + */ + public void before(WalWriteRequest request) { + try { + walRequests.put(request); + Queue streamRequests = stream2requests.computeIfAbsent(request.record.streamId, s -> new LinkedBlockingQueue<>()); + streamRequests.add(request); + } catch (InterruptedException ex) { + request.cf.completeExceptionally(ex); + } + } + + /** + * Try pop sequence persisted request from stream queue and move forward wal inclusive confirm offset. + * + * @return popped sequence persisted request. + */ + public List after(WalWriteRequest request) { + request.persisted = true; + // move the WAL inclusive confirm offset. + for (; ; ) { + WalWriteRequest peek = walRequests.peek(); + if (peek == null || !peek.persisted) { + break; + } + walRequests.poll(); + walConfirmOffset = peek.offset; + } + + // pop sequence success stream request. + long streamId = request.record.streamId; + Queue streamRequests = stream2requests.get(streamId); + WalWriteRequest peek = streamRequests.peek(); + if (peek == null || peek.offset != request.offset) { + return Collections.emptyList(); + } + List rst = new ArrayList<>(); + rst.add(streamRequests.poll()); + for (; ; ) { + peek = streamRequests.peek(); + if (peek == null || !peek.persisted) { + break; + } + rst.add(streamRequests.poll()); + } + if (streamRequests.isEmpty()) { + stream2requests.computeIfPresent(streamId, (id, requests) -> requests.isEmpty() ? null : requests); + } + return rst; + } + + /** + * Get WAL inclusive confirm offset. + * + * @return inclusive confirm offset. + */ + public long getWALConfirmOffset() { + return walConfirmOffset; + } + + } } diff --git a/core/src/main/scala/kafka/log/s3/WalWriteRequest.java b/core/src/main/scala/kafka/log/s3/WalWriteRequest.java index fb29497641..422a287136 100644 --- a/core/src/main/scala/kafka/log/s3/WalWriteRequest.java +++ b/core/src/main/scala/kafka/log/s3/WalWriteRequest.java @@ -24,6 +24,7 @@ public class WalWriteRequest implements Comparable { final FlatStreamRecordBatch record; final long offset; final CompletableFuture cf; + boolean persisted; public WalWriteRequest(FlatStreamRecordBatch record, long offset, CompletableFuture cf) { this.record = record; diff --git a/core/src/main/scala/kafka/log/s3/cache/LogCache.java b/core/src/main/scala/kafka/log/s3/cache/LogCache.java index ffcd2d8c70..ca730e50c3 100644 --- a/core/src/main/scala/kafka/log/s3/cache/LogCache.java +++ b/core/src/main/scala/kafka/log/s3/cache/LogCache.java @@ -32,14 +32,15 @@ public class LogCache { private final int cacheBlockMaxSize; private final List archiveBlocks = new ArrayList<>(); private LogCacheBlock activeBlock; + private long confirmOffset; public LogCache(int cacheBlockMaxSize) { this.cacheBlockMaxSize = cacheBlockMaxSize; this.activeBlock = new LogCacheBlock(cacheBlockMaxSize); } - public boolean put(FlatStreamRecordBatch recordBatch, long offset) { - return activeBlock.put(recordBatch, offset); + public boolean put(FlatStreamRecordBatch recordBatch) { + return activeBlock.put(recordBatch); } /** @@ -70,6 +71,7 @@ public List get(long streamId, long startOffset, long end public LogCacheBlock archiveCurrentBlock() { LogCacheBlock block = activeBlock; + block.confirmOffset = confirmOffset; archiveBlocks.add(block); activeBlock = new LogCacheBlock(cacheBlockMaxSize); return block; @@ -87,13 +89,17 @@ public void free(long blockId) { archiveBlocks.removeIf(b -> b.blockId == blockId); } + public void setConfirmOffset(long confirmOffset) { + this.confirmOffset = confirmOffset; + } + public static class LogCacheBlock { private static final AtomicLong BLOCK_ID_ALLOC = new AtomicLong(); private final long blockId; private final int maxSize; private final Map> map = new HashMap<>(); private int size = 0; - private long maxOffset; + private long confirmOffset; public LogCacheBlock(int maxSize) { this.blockId = BLOCK_ID_ALLOC.getAndIncrement(); @@ -104,12 +110,11 @@ public long blockId() { return blockId; } - public boolean put(FlatStreamRecordBatch recordBatch, long offset) { + public boolean put(FlatStreamRecordBatch recordBatch) { List streamCache = map.computeIfAbsent(recordBatch.streamId, id -> new ArrayList<>()); streamCache.add(recordBatch); int recordSize = recordBatch.encodedBuf.readableBytes(); size += recordSize; - maxOffset = offset; return size >= maxSize; } @@ -145,8 +150,8 @@ public Map> records() { return map; } - public long maxOffset() { - return maxOffset; + public long confirmOffset() { + return confirmOffset; } } diff --git a/core/src/test/java/kafka/log/s3/S3StorageTest.java b/core/src/test/java/kafka/log/s3/S3StorageTest.java index 9ba321be43..423cc0c4fd 100644 --- a/core/src/test/java/kafka/log/s3/S3StorageTest.java +++ b/core/src/test/java/kafka/log/s3/S3StorageTest.java @@ -32,6 +32,7 @@ import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; +import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -91,7 +92,29 @@ public void testAppend() throws Exception { } @Test - public void testAppend_outOfOrder() { - // TODO: test out of order write task complete. + public void testWALCallbackSequencer() { + S3Storage.WALCallbackSequencer seq = new S3Storage.WALCallbackSequencer(); + WalWriteRequest r0 = new WalWriteRequest(newRecord(233L, 10L), 100L, new CompletableFuture<>()); + seq.before(r0); + WalWriteRequest r1 = new WalWriteRequest(newRecord(233L, 11L), 101L, new CompletableFuture<>()); + seq.before(r1); + WalWriteRequest r2 = new WalWriteRequest(newRecord(234L, 20L), 102L, new CompletableFuture<>()); + seq.before(r2); + WalWriteRequest r3 = new WalWriteRequest(newRecord(234L, 21L), 103L, new CompletableFuture<>()); + seq.before(r3); + + assertEquals(Collections.emptyList(), seq.after(r3)); + assertEquals(-1L, seq.getWALConfirmOffset()); + assertEquals(List.of(r2, r3), seq.after(r2)); + assertEquals(-1L, seq.getWALConfirmOffset()); + assertEquals(List.of(r0), seq.after(r0)); + assertEquals(100L, seq.getWALConfirmOffset()); + assertEquals(List.of(r1), seq.after(r1)); + assertEquals(103L, seq.getWALConfirmOffset()); + } + + private static FlatStreamRecordBatch newRecord(long streamId, long offset) { + StreamRecordBatch recordBatch = new StreamRecordBatch(streamId, 0, offset, DefaultRecordBatch.of(1, 1024)); + return FlatStreamRecordBatch.from(recordBatch); } }