Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions core/src/main/scala/kafka/log/es/FutureUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ public static <T> void propagate(CompletableFuture<T> source, CompletableFuture<
});
}

/**
* Catch exceptions as a last resort to avoid unresponsiveness.
*/
public static <T> CompletableFuture<T> exec(Supplier<CompletableFuture<T>> run, Logger logger, String name) {
try {
return run.get();
Expand All @@ -58,4 +61,16 @@ public static <T> CompletableFuture<T> exec(Supplier<CompletableFuture<T>> run,
return failedFuture(ex);
}
}

/**
* Catch exceptions as a last resort to avoid unresponsiveness.
*/
public static <T> void exec(Runnable run, CompletableFuture<T> cf, Logger logger, String name) {
try {
run.run();
} catch (Throwable ex) {
logger.error("{} run with unexpected exception", name, ex);
cf.completeExceptionally(ex);
}
}
}
75 changes: 61 additions & 14 deletions core/src/main/scala/kafka/log/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
Expand All @@ -51,6 +52,8 @@ public class S3Storage implements Storage {
private final WriteAheadLog log;
private final LogCache logCache;
private final WALCallbackSequencer callbackSequencer = new WALCallbackSequencer();
private final Queue<WALObjectUploadTaskContext> walObjectPrepareQueue = new LinkedList<>();
private final Queue<WALObjectUploadTaskContext> walObjectCommitQueue = new LinkedList<>();
private final ScheduledExecutorService mainExecutor = Executors.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("s3-storage-main", false));
private final ScheduledExecutorService backgroundExecutor = Executors.newSingleThreadScheduledExecutor(
Expand Down Expand Up @@ -139,27 +142,65 @@ private void handleAppendCallback(WalWriteRequest request) {
});
}

private CompletableFuture<Void> uploadWALObject(LogCache.LogCacheBlock logCacheBlock) {
/**
* Upload cache block to S3. The earlier cache block will have smaller objectId and commit first.
*/
CompletableFuture<Void> uploadWALObject(LogCache.LogCacheBlock logCacheBlock) {
CompletableFuture<Void> cf = new CompletableFuture<>();
backgroundExecutor.execute(() -> uploadWALObject0(logCacheBlock, cf));
return cf;
}

private void uploadWALObject0(LogCache.LogCacheBlock logCacheBlock, CompletableFuture<Void> cf) {
// TODO: pipeline the WAL object upload to accelerate the upload.
try {
WALObjectUploadTask walObjectUploadTask = new WALObjectUploadTask(logCacheBlock.records(), objectManager, s3Operator,
config.s3ObjectBlockSizeProp(), config.s3ObjectPartSizeProp(), config.s3StreamSplitSizeProp());
walObjectUploadTask.prepare().get();
walObjectUploadTask.upload().get();
walObjectUploadTask.commit().get();
log.trim(logCacheBlock.confirmOffset());
freeCache(logCacheBlock.blockId());
cf.complete(null);
} catch (Throwable e) {
LOGGER.error("unexpect upload wal object fail", e);
cf.completeExceptionally(e);
WALObjectUploadTask walObjectUploadTask = new WALObjectUploadTask(logCacheBlock.records(), objectManager, s3Operator,
config.s3ObjectBlockSizeProp(), config.s3ObjectPartSizeProp(), config.s3StreamSplitSizeProp());
WALObjectUploadTaskContext context = new WALObjectUploadTaskContext();
context.task = walObjectUploadTask;
context.cache = logCacheBlock;
context.cf = cf;

boolean walObjectPrepareQueueEmpty = walObjectPrepareQueue.isEmpty();
walObjectPrepareQueue.add(context);
if (!walObjectPrepareQueueEmpty) {
// there is another WAL object upload task is preparing, just return.
return;
}
prepareWALObject(context);
}

private void prepareWALObject(WALObjectUploadTaskContext context) {
context.task.prepare().thenAcceptAsync(nil -> {
// 1. poll out current task and trigger upload.
WALObjectUploadTaskContext peek = walObjectPrepareQueue.poll();
Objects.requireNonNull(peek).task.upload();
// 2. add task to commit queue.
boolean walObjectCommitQueueEmpty = walObjectCommitQueue.isEmpty();
walObjectCommitQueue.add(peek);
if (walObjectCommitQueueEmpty) {
commitWALObject(peek);
}
// 3. trigger next task to prepare.
WALObjectUploadTaskContext next = walObjectPrepareQueue.peek();
if (next != null) {
prepareWALObject(next);
}
}, backgroundExecutor);
}

private void commitWALObject(WALObjectUploadTaskContext context) {
context.task.commit().thenAcceptAsync(nil -> {
// 1. poll out current task
walObjectCommitQueue.poll();
log.trim(context.cache.confirmOffset());
freeCache(context.cache.blockId());
context.cf.complete(null);

// 2. trigger next task to commit.
WALObjectUploadTaskContext next = walObjectCommitQueue.peek();
if (next != null) {
commitWALObject(next);
}
}, backgroundExecutor);
}

private void freeCache(long blockId) {
Expand Down Expand Up @@ -234,4 +275,10 @@ public long getWALConfirmOffset() {
}

}

static class WALObjectUploadTaskContext {
WALObjectUploadTask task;
LogCache.LogCacheBlock cache;
CompletableFuture<Void> cf;
}
}
77 changes: 40 additions & 37 deletions core/src/main/scala/kafka/log/s3/WALObjectUploadTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import static kafka.log.es.FutureUtil.exec;

public class WALObjectUploadTask {
private static final Logger LOGGER = LoggerFactory.getLogger(WALObjectUploadTask.class);
private final Map<Long, List<FlatStreamRecordBatch>> streamRecordsMap;
Expand Down Expand Up @@ -65,49 +67,50 @@ public CompletableFuture<Long> prepare() {
}

public CompletableFuture<CommitWALObjectRequest> upload() {
prepareCf.thenAccept(objectId -> {
List<Long> streamIds = new ArrayList<>(streamRecordsMap.keySet());
Collections.sort(streamIds);
CommitWALObjectRequest request = new CommitWALObjectRequest();
prepareCf.thenAccept(objectId -> exec(() -> upload0(objectId), uploadCf, LOGGER, "upload"));
return uploadCf;
}

ObjectWriter walObject = new ObjectWriter(objectId, s3Operator, objectBlockSize, objectPartSize);
private void upload0(long objectId) {
List<Long> streamIds = new ArrayList<>(streamRecordsMap.keySet());
Collections.sort(streamIds);
CommitWALObjectRequest request = new CommitWALObjectRequest();

List<CompletableFuture<StreamObject>> streamObjectCfList = new LinkedList<>();
ObjectWriter walObject = new ObjectWriter(objectId, s3Operator, objectBlockSize, objectPartSize);

for (Long streamId : streamIds) {
List<FlatStreamRecordBatch> streamRecords = streamRecordsMap.get(streamId);
long streamSize = streamRecords.stream().mapToLong(r -> r.encodedBuf.readableBytes()).sum();
if (streamSize >= streamSplitSizeThreshold) {
streamObjectCfList.add(writeStreamObject(streamRecords));
} else {
for (FlatStreamRecordBatch record : streamRecords) {
walObject.write(record);
}
long startOffset = streamRecords.get(0).baseOffset;
long endOffset = streamRecords.get(streamRecords.size() - 1).lastOffset();
request.addStreamRange(new ObjectStreamRange(streamId, -1L, startOffset, endOffset));
// log object block only contain single stream's data.
walObject.closeCurrentBlock();
List<CompletableFuture<StreamObject>> streamObjectCfList = new LinkedList<>();

for (Long streamId : streamIds) {
List<FlatStreamRecordBatch> streamRecords = streamRecordsMap.get(streamId);
long streamSize = streamRecords.stream().mapToLong(r -> r.encodedBuf.readableBytes()).sum();
if (streamSize >= streamSplitSizeThreshold) {
streamObjectCfList.add(writeStreamObject(streamRecords));
} else {
for (FlatStreamRecordBatch record : streamRecords) {
walObject.write(record);
}
long startOffset = streamRecords.get(0).baseOffset;
long endOffset = streamRecords.get(streamRecords.size() - 1).lastOffset();
request.addStreamRange(new ObjectStreamRange(streamId, -1L, startOffset, endOffset));
// log object block only contain single stream's data.
walObject.closeCurrentBlock();
}
request.setObjectId(objectId);
request.setOrderId(objectId);
CompletableFuture<Void> walObjectCf = walObject.close().thenAccept(nil -> request.setObjectSize(walObject.size()));
for (CompletableFuture<StreamObject> streamObjectCf : streamObjectCfList) {
streamObjectCf.thenAccept(request::addStreamObject);
}
List<CompletableFuture<?>> allCf = new LinkedList<>(streamObjectCfList);
allCf.add(walObjectCf);

CompletableFuture.allOf(allCf.toArray(new CompletableFuture[0])).thenAccept(nil -> {
commitWALObjectRequest = request;
uploadCf.complete(request);
}).exceptionally(ex -> {
uploadCf.completeExceptionally(ex);
return null;
});
}
request.setObjectId(objectId);
request.setOrderId(objectId);
CompletableFuture<Void> walObjectCf = walObject.close().thenAccept(nil -> request.setObjectSize(walObject.size()));
for (CompletableFuture<StreamObject> streamObjectCf : streamObjectCfList) {
streamObjectCf.thenAccept(request::addStreamObject);
}
List<CompletableFuture<?>> allCf = new LinkedList<>(streamObjectCfList);
allCf.add(walObjectCf);
CompletableFuture.allOf(allCf.toArray(new CompletableFuture[0])).thenAccept(nil -> {
commitWALObjectRequest = request;
uploadCf.complete(request);
}).exceptionally(ex -> {
uploadCf.completeExceptionally(ex);
return null;
});
return uploadCf;
}

public CompletableFuture<Void> commit() {
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/kafka/log/s3/cache/LogCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,5 +154,9 @@ public long confirmOffset() {
return confirmOffset;
}

public void confirmOffset(long confirmOffset) {
this.confirmOffset = confirmOffset;
}

}
}
53 changes: 53 additions & 0 deletions core/src/test/java/kafka/log/s3/S3StorageTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package kafka.log.s3;

import kafka.log.s3.cache.DefaultS3BlockCache;
import kafka.log.s3.cache.LogCache;
import kafka.log.s3.cache.ReadDataBlock;
import kafka.log.s3.model.StreamRecordBatch;
import kafka.log.s3.objects.CommitWALObjectRequest;
Expand All @@ -35,15 +36,22 @@
import org.mockito.ArgumentCaptor;

import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -115,6 +123,51 @@ public void testWALCallbackSequencer() {
assertEquals(103L, seq.getWALConfirmOffset());
}

@Test
public void testUploadWALObject_sequence() throws ExecutionException, InterruptedException, TimeoutException {
List<CompletableFuture<Long>> objectIdCfList = new LinkedList<>();
doAnswer(invocation -> {
CompletableFuture<Long> objectIdCf = new CompletableFuture<>();
objectIdCfList.add(objectIdCf);
return objectIdCf;
}).when(objectManager).prepareObject(anyInt(), anyLong());

LogCache.LogCacheBlock logCacheBlock1 = new LogCache.LogCacheBlock(1024);
logCacheBlock1.put(newRecord(233L, 10L));
logCacheBlock1.confirmOffset(10L);
CompletableFuture<Void> cf1 = storage.uploadWALObject(logCacheBlock1);

LogCache.LogCacheBlock logCacheBlock2 = new LogCache.LogCacheBlock(1024);
logCacheBlock2.put(newRecord(233L, 20L));
logCacheBlock2.confirmOffset(20L);
CompletableFuture<Void> cf2 = storage.uploadWALObject(logCacheBlock2);

// sequence get objectId
verify(objectManager, timeout(1000).times(1)).prepareObject(anyInt(), anyLong());

List<CompletableFuture<CommitWALObjectResponse>> commitCfList = new LinkedList<>();
doAnswer(invocation -> {
CompletableFuture<CommitWALObjectResponse> cf = new CompletableFuture<>();
commitCfList.add(cf);
return cf;
}).when(objectManager).commitWALObject(any());

objectIdCfList.get(0).complete(1L);
// trigger next upload prepare objectId
verify(objectManager, timeout(1000).times(2)).prepareObject(anyInt(), anyLong());
verify(objectManager, times(1)).commitWALObject(any());

objectIdCfList.get(1).complete(2L);
Thread.sleep(10);
verify(objectManager, times(1)).commitWALObject(any());

commitCfList.get(0).complete(new CommitWALObjectResponse());
verify(objectManager, timeout(1000).times(2)).commitWALObject(any());
commitCfList.get(1).complete(new CommitWALObjectResponse());
cf1.get(1, TimeUnit.SECONDS);
cf2.get(1, TimeUnit.SECONDS);
}

private static FlatStreamRecordBatch newRecord(long streamId, long offset) {
StreamRecordBatch recordBatch = new StreamRecordBatch(streamId, 0, offset, DefaultRecordBatch.of(1, 1024));
return FlatStreamRecordBatch.from(recordBatch);
Expand Down