From 71b20b6a5ce0e3ff7b9f0b9c3190fd702ddd0dac Mon Sep 17 00:00:00 2001 From: Robin Han Date: Mon, 4 Sep 2023 15:34:26 +0800 Subject: [PATCH] feat(stream-client): pipeline upload WAL object Signed-off-by: Robin Han --- .../main/scala/kafka/log/es/FutureUtil.java | 15 ++++ .../main/scala/kafka/log/s3/S3Storage.java | 75 ++++++++++++++---- .../kafka/log/s3/WALObjectUploadTask.java | 77 ++++++++++--------- .../scala/kafka/log/s3/cache/LogCache.java | 4 + .../test/java/kafka/log/s3/S3StorageTest.java | 53 +++++++++++++ 5 files changed, 173 insertions(+), 51 deletions(-) diff --git a/core/src/main/scala/kafka/log/es/FutureUtil.java b/core/src/main/scala/kafka/log/es/FutureUtil.java index f4415b9088..bf26490523 100644 --- a/core/src/main/scala/kafka/log/es/FutureUtil.java +++ b/core/src/main/scala/kafka/log/es/FutureUtil.java @@ -50,6 +50,9 @@ public static void propagate(CompletableFuture source, CompletableFuture< }); } + /** + * Catch exceptions as a last resort to avoid unresponsiveness. + */ public static CompletableFuture exec(Supplier> run, Logger logger, String name) { try { return run.get(); @@ -58,4 +61,16 @@ public static CompletableFuture exec(Supplier> run, return failedFuture(ex); } } + + /** + * Catch exceptions as a last resort to avoid unresponsiveness. + */ + public static void exec(Runnable run, CompletableFuture cf, Logger logger, String name) { + try { + run.run(); + } catch (Throwable ex) { + logger.error("{} run with unexpected exception", name, ex); + cf.completeExceptionally(ex); + } + } } diff --git a/core/src/main/scala/kafka/log/s3/S3Storage.java b/core/src/main/scala/kafka/log/s3/S3Storage.java index de53caaf0d..44f105bda9 100644 --- a/core/src/main/scala/kafka/log/s3/S3Storage.java +++ b/core/src/main/scala/kafka/log/s3/S3Storage.java @@ -35,6 +35,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; @@ -51,6 +52,8 @@ public class S3Storage implements Storage { private final WriteAheadLog log; private final LogCache logCache; private final WALCallbackSequencer callbackSequencer = new WALCallbackSequencer(); + private final Queue walObjectPrepareQueue = new LinkedList<>(); + private final Queue walObjectCommitQueue = new LinkedList<>(); private final ScheduledExecutorService mainExecutor = Executors.newSingleThreadScheduledExecutor( ThreadUtils.createThreadFactory("s3-storage-main", false)); private final ScheduledExecutorService backgroundExecutor = Executors.newSingleThreadScheduledExecutor( @@ -139,27 +142,65 @@ private void handleAppendCallback(WalWriteRequest request) { }); } - private CompletableFuture uploadWALObject(LogCache.LogCacheBlock logCacheBlock) { + /** + * Upload cache block to S3. The earlier cache block will have smaller objectId and commit first. + */ + CompletableFuture uploadWALObject(LogCache.LogCacheBlock logCacheBlock) { CompletableFuture cf = new CompletableFuture<>(); backgroundExecutor.execute(() -> uploadWALObject0(logCacheBlock, cf)); return cf; } private void uploadWALObject0(LogCache.LogCacheBlock logCacheBlock, CompletableFuture cf) { - // TODO: pipeline the WAL object upload to accelerate the upload. - try { - WALObjectUploadTask walObjectUploadTask = new WALObjectUploadTask(logCacheBlock.records(), objectManager, s3Operator, - config.s3ObjectBlockSizeProp(), config.s3ObjectPartSizeProp(), config.s3StreamSplitSizeProp()); - walObjectUploadTask.prepare().get(); - walObjectUploadTask.upload().get(); - walObjectUploadTask.commit().get(); - log.trim(logCacheBlock.confirmOffset()); - freeCache(logCacheBlock.blockId()); - cf.complete(null); - } catch (Throwable e) { - LOGGER.error("unexpect upload wal object fail", e); - cf.completeExceptionally(e); + WALObjectUploadTask walObjectUploadTask = new WALObjectUploadTask(logCacheBlock.records(), objectManager, s3Operator, + config.s3ObjectBlockSizeProp(), config.s3ObjectPartSizeProp(), config.s3StreamSplitSizeProp()); + WALObjectUploadTaskContext context = new WALObjectUploadTaskContext(); + context.task = walObjectUploadTask; + context.cache = logCacheBlock; + context.cf = cf; + + boolean walObjectPrepareQueueEmpty = walObjectPrepareQueue.isEmpty(); + walObjectPrepareQueue.add(context); + if (!walObjectPrepareQueueEmpty) { + // there is another WAL object upload task is preparing, just return. + return; } + prepareWALObject(context); + } + + private void prepareWALObject(WALObjectUploadTaskContext context) { + context.task.prepare().thenAcceptAsync(nil -> { + // 1. poll out current task and trigger upload. + WALObjectUploadTaskContext peek = walObjectPrepareQueue.poll(); + Objects.requireNonNull(peek).task.upload(); + // 2. add task to commit queue. + boolean walObjectCommitQueueEmpty = walObjectCommitQueue.isEmpty(); + walObjectCommitQueue.add(peek); + if (walObjectCommitQueueEmpty) { + commitWALObject(peek); + } + // 3. trigger next task to prepare. + WALObjectUploadTaskContext next = walObjectPrepareQueue.peek(); + if (next != null) { + prepareWALObject(next); + } + }, backgroundExecutor); + } + + private void commitWALObject(WALObjectUploadTaskContext context) { + context.task.commit().thenAcceptAsync(nil -> { + // 1. poll out current task + walObjectCommitQueue.poll(); + log.trim(context.cache.confirmOffset()); + freeCache(context.cache.blockId()); + context.cf.complete(null); + + // 2. trigger next task to commit. + WALObjectUploadTaskContext next = walObjectCommitQueue.peek(); + if (next != null) { + commitWALObject(next); + } + }, backgroundExecutor); } private void freeCache(long blockId) { @@ -234,4 +275,10 @@ public long getWALConfirmOffset() { } } + + static class WALObjectUploadTaskContext { + WALObjectUploadTask task; + LogCache.LogCacheBlock cache; + CompletableFuture cf; + } } diff --git a/core/src/main/scala/kafka/log/s3/WALObjectUploadTask.java b/core/src/main/scala/kafka/log/s3/WALObjectUploadTask.java index e2ef6e8ebb..d8003bd096 100644 --- a/core/src/main/scala/kafka/log/s3/WALObjectUploadTask.java +++ b/core/src/main/scala/kafka/log/s3/WALObjectUploadTask.java @@ -33,6 +33,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import static kafka.log.es.FutureUtil.exec; + public class WALObjectUploadTask { private static final Logger LOGGER = LoggerFactory.getLogger(WALObjectUploadTask.class); private final Map> streamRecordsMap; @@ -65,49 +67,50 @@ public CompletableFuture prepare() { } public CompletableFuture upload() { - prepareCf.thenAccept(objectId -> { - List streamIds = new ArrayList<>(streamRecordsMap.keySet()); - Collections.sort(streamIds); - CommitWALObjectRequest request = new CommitWALObjectRequest(); + prepareCf.thenAccept(objectId -> exec(() -> upload0(objectId), uploadCf, LOGGER, "upload")); + return uploadCf; + } - ObjectWriter walObject = new ObjectWriter(objectId, s3Operator, objectBlockSize, objectPartSize); + private void upload0(long objectId) { + List streamIds = new ArrayList<>(streamRecordsMap.keySet()); + Collections.sort(streamIds); + CommitWALObjectRequest request = new CommitWALObjectRequest(); - List> streamObjectCfList = new LinkedList<>(); + ObjectWriter walObject = new ObjectWriter(objectId, s3Operator, objectBlockSize, objectPartSize); - for (Long streamId : streamIds) { - List streamRecords = streamRecordsMap.get(streamId); - long streamSize = streamRecords.stream().mapToLong(r -> r.encodedBuf.readableBytes()).sum(); - if (streamSize >= streamSplitSizeThreshold) { - streamObjectCfList.add(writeStreamObject(streamRecords)); - } else { - for (FlatStreamRecordBatch record : streamRecords) { - walObject.write(record); - } - long startOffset = streamRecords.get(0).baseOffset; - long endOffset = streamRecords.get(streamRecords.size() - 1).lastOffset(); - request.addStreamRange(new ObjectStreamRange(streamId, -1L, startOffset, endOffset)); - // log object block only contain single stream's data. - walObject.closeCurrentBlock(); + List> streamObjectCfList = new LinkedList<>(); + + for (Long streamId : streamIds) { + List streamRecords = streamRecordsMap.get(streamId); + long streamSize = streamRecords.stream().mapToLong(r -> r.encodedBuf.readableBytes()).sum(); + if (streamSize >= streamSplitSizeThreshold) { + streamObjectCfList.add(writeStreamObject(streamRecords)); + } else { + for (FlatStreamRecordBatch record : streamRecords) { + walObject.write(record); } + long startOffset = streamRecords.get(0).baseOffset; + long endOffset = streamRecords.get(streamRecords.size() - 1).lastOffset(); + request.addStreamRange(new ObjectStreamRange(streamId, -1L, startOffset, endOffset)); + // log object block only contain single stream's data. + walObject.closeCurrentBlock(); } - request.setObjectId(objectId); - request.setOrderId(objectId); - CompletableFuture walObjectCf = walObject.close().thenAccept(nil -> request.setObjectSize(walObject.size())); - for (CompletableFuture streamObjectCf : streamObjectCfList) { - streamObjectCf.thenAccept(request::addStreamObject); - } - List> allCf = new LinkedList<>(streamObjectCfList); - allCf.add(walObjectCf); - - CompletableFuture.allOf(allCf.toArray(new CompletableFuture[0])).thenAccept(nil -> { - commitWALObjectRequest = request; - uploadCf.complete(request); - }).exceptionally(ex -> { - uploadCf.completeExceptionally(ex); - return null; - }); + } + request.setObjectId(objectId); + request.setOrderId(objectId); + CompletableFuture walObjectCf = walObject.close().thenAccept(nil -> request.setObjectSize(walObject.size())); + for (CompletableFuture streamObjectCf : streamObjectCfList) { + streamObjectCf.thenAccept(request::addStreamObject); + } + List> allCf = new LinkedList<>(streamObjectCfList); + allCf.add(walObjectCf); + CompletableFuture.allOf(allCf.toArray(new CompletableFuture[0])).thenAccept(nil -> { + commitWALObjectRequest = request; + uploadCf.complete(request); + }).exceptionally(ex -> { + uploadCf.completeExceptionally(ex); + return null; }); - return uploadCf; } public CompletableFuture commit() { diff --git a/core/src/main/scala/kafka/log/s3/cache/LogCache.java b/core/src/main/scala/kafka/log/s3/cache/LogCache.java index c840e014e2..e644da77ab 100644 --- a/core/src/main/scala/kafka/log/s3/cache/LogCache.java +++ b/core/src/main/scala/kafka/log/s3/cache/LogCache.java @@ -154,5 +154,9 @@ public long confirmOffset() { return confirmOffset; } + public void confirmOffset(long confirmOffset) { + this.confirmOffset = confirmOffset; + } + } } diff --git a/core/src/test/java/kafka/log/s3/S3StorageTest.java b/core/src/test/java/kafka/log/s3/S3StorageTest.java index 497ab07fd8..44630c484d 100644 --- a/core/src/test/java/kafka/log/s3/S3StorageTest.java +++ b/core/src/test/java/kafka/log/s3/S3StorageTest.java @@ -18,6 +18,7 @@ package kafka.log.s3; import kafka.log.s3.cache.DefaultS3BlockCache; +import kafka.log.s3.cache.LogCache; import kafka.log.s3.cache.ReadDataBlock; import kafka.log.s3.model.StreamRecordBatch; import kafka.log.s3.objects.CommitWALObjectRequest; @@ -35,15 +36,22 @@ import org.mockito.ArgumentCaptor; import java.util.Collections; +import java.util.LinkedList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -115,6 +123,51 @@ public void testWALCallbackSequencer() { assertEquals(103L, seq.getWALConfirmOffset()); } + @Test + public void testUploadWALObject_sequence() throws ExecutionException, InterruptedException, TimeoutException { + List> objectIdCfList = new LinkedList<>(); + doAnswer(invocation -> { + CompletableFuture objectIdCf = new CompletableFuture<>(); + objectIdCfList.add(objectIdCf); + return objectIdCf; + }).when(objectManager).prepareObject(anyInt(), anyLong()); + + LogCache.LogCacheBlock logCacheBlock1 = new LogCache.LogCacheBlock(1024); + logCacheBlock1.put(newRecord(233L, 10L)); + logCacheBlock1.confirmOffset(10L); + CompletableFuture cf1 = storage.uploadWALObject(logCacheBlock1); + + LogCache.LogCacheBlock logCacheBlock2 = new LogCache.LogCacheBlock(1024); + logCacheBlock2.put(newRecord(233L, 20L)); + logCacheBlock2.confirmOffset(20L); + CompletableFuture cf2 = storage.uploadWALObject(logCacheBlock2); + + // sequence get objectId + verify(objectManager, timeout(1000).times(1)).prepareObject(anyInt(), anyLong()); + + List> commitCfList = new LinkedList<>(); + doAnswer(invocation -> { + CompletableFuture cf = new CompletableFuture<>(); + commitCfList.add(cf); + return cf; + }).when(objectManager).commitWALObject(any()); + + objectIdCfList.get(0).complete(1L); + // trigger next upload prepare objectId + verify(objectManager, timeout(1000).times(2)).prepareObject(anyInt(), anyLong()); + verify(objectManager, times(1)).commitWALObject(any()); + + objectIdCfList.get(1).complete(2L); + Thread.sleep(10); + verify(objectManager, times(1)).commitWALObject(any()); + + commitCfList.get(0).complete(new CommitWALObjectResponse()); + verify(objectManager, timeout(1000).times(2)).commitWALObject(any()); + commitCfList.get(1).complete(new CommitWALObjectResponse()); + cf1.get(1, TimeUnit.SECONDS); + cf2.get(1, TimeUnit.SECONDS); + } + private static FlatStreamRecordBatch newRecord(long streamId, long offset) { StreamRecordBatch recordBatch = new StreamRecordBatch(streamId, 0, offset, DefaultRecordBatch.of(1, 1024)); return FlatStreamRecordBatch.from(recordBatch);