Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/log/s3/ObjectReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ public StreamRecordBatch next() {
public void close() {
try {
in.close();
buf.release();
} catch (IOException e) {
throw new KafkaException("Failed to close object block stream", e);
}
Expand Down
222 changes: 106 additions & 116 deletions core/src/main/scala/kafka/log/s3/operator/DefaultS3Operator.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void close() {
public CompletableFuture<ByteBuf> rangeRead(String path, long start, long end, ByteBufAllocator alloc) {
end = end - 1;
CompletableFuture<ByteBuf> cf = new CompletableFuture<>();
ByteBuf buf = alloc.heapBuffer((int) (end - start));
ByteBuf buf = alloc.directBuffer((int) (end - start));
rangeRead0(path, start, end, buf, cf);
return cf;
}
Expand Down Expand Up @@ -192,6 +192,7 @@ private static boolean isUnrecoverable(Throwable ex) {


class DefaultWriter implements Writer {
private static final long MAX_MERGE_WRITE_SIZE = 16L * 1024 * 1024;
private final String path;
private final CompletableFuture<String> uploadIdCf = new CompletableFuture<>();
private volatile String uploadId;
Expand All @@ -202,18 +203,7 @@ class DefaultWriter implements Writer {
* The minPartSize represents the minimum size of a part for a multipart object.
*/
private final long minPartSize;
/**
* The cachedBuf is used to combine small parts into a bigger one.
*/
private CompositeByteBuf cachedBuf;
/**
* The size of the cachedBuf.
*/
private long cachedBufSize;
/**
* The last adding component operation to the cachedBuf.
*/
private CompletableFuture<Void> cachedBufLastAddCf;
private ObjectPart objectPart = null;
private final long start = System.nanoTime();

public DefaultWriter(String path) {
Expand Down Expand Up @@ -243,50 +233,22 @@ private void init() {
});
}

// To unite the write and copyWrite, we still use cachedBufLastAddCf to add the last part of the source.
@Override
public CompletableFuture<Void> write(ByteBuf part) {
OBJECT_UPLOAD_SIZE.inc(part.readableBytes());
public CompletableFuture<Void> write(ByteBuf data) {
OBJECT_UPLOAD_SIZE.inc(data.readableBytes());

long targetSize = part.readableBytes();
if (cachedBuf == null) {
// Case 1: create cache and add as the first item.
if (targetSize < minPartSize) {
return initCacheWithFirstItem(targetSize, CompletableFuture.completedFuture(part));
}
// Case 2: just upload.
return handleWriteFuturePart(CompletableFuture.completedFuture(null), part, System.nanoTime())
.thenApply(nil -> null);
if (objectPart == null) {
objectPart = new ObjectPart();
}
ObjectPart objectPart = this.objectPart;

// cachedBuf not null, add to the cache.
cachedBufSize += targetSize;
cachedBufLastAddCf = cachedBufLastAddCf
.thenRun(() -> cachedBuf.addComponent(true, part));

// Case 3: cache size is smaller than minPartSize, just add to the cache.
if (cachedBufSize < minPartSize) {
return cachedBufLastAddCf;
objectPart.write(data);
if (objectPart.size() > minPartSize) {
objectPart.upload();
// finish current part.
this.objectPart = null;
}

// Case 4: upload the combined result.
CompletableFuture<CompletedPart> future = handleWriteFuturePart(cachedBufLastAddCf, cachedBuf, System.nanoTime());
clearCache();
return future.thenApply(nil -> null);
}

private CompletableFuture<Void> initCacheWithFirstItem(long targetSize, CompletableFuture<ByteBuf> firstItemFuture) {
cachedBuf = ByteBufAlloc.ALLOC.compositeBuffer();
cachedBufSize = targetSize;
cachedBufLastAddCf = firstItemFuture
.thenAccept(buf -> cachedBuf.addComponent(true, buf));
return cachedBufLastAddCf;
}

private void clearCache() {
cachedBuf = null;
cachedBufLastAddCf = null;
cachedBufSize = 0;
return objectPart.getFuture();
}

private void write0(String uploadId, int partNumber, ByteBuf part, CompletableFuture<CompletedPart> partCf) {
Expand All @@ -312,70 +274,27 @@ private void write0(String uploadId, int partNumber, ByteBuf part, CompletableFu
@Override
public void copyWrite(String sourcePath, long start, long end) {
long targetSize = end - start;
if (cachedBuf == null) {
// Case 1: create cache and add as the first item.
if (objectPart == null) {
if (targetSize < minPartSize) {
initCacheWithFirstItem(targetSize, rangeRead(sourcePath, start, end, ByteBufAlloc.ALLOC));
} else { // Case 2: just copy.
handleCopyPart(sourcePath, start, end);
this.objectPart = new ObjectPart();
objectPart.readAndWrite(sourcePath, start, end);
} else {
new CopyObjectPart(sourcePath, start, end);
}
return;
}

// cachedBuf not null
long combinedSize = cachedBufSize + targetSize;

// The size of the new part is at most minPartSize, it is ok to read the full range [start, end).
if (combinedSize <= 2 * minPartSize) {
cachedBufSize = combinedSize;
cachedBufLastAddCf = cachedBufLastAddCf
.thenCompose(v -> rangeRead(sourcePath, start, end, ByteBufAlloc.ALLOC))
.thenAccept(buf -> cachedBuf.addComponent(true, buf));
// Case 3: just add to cache.
if (combinedSize < minPartSize) {
return;
} else {
if (objectPart.size() + targetSize > MAX_MERGE_WRITE_SIZE) {
long readAndWriteCopyEnd = start + minPartSize - objectPart.size();
objectPart.readAndWrite(sourcePath, start, readAndWriteCopyEnd);
objectPart.upload();
new CopyObjectPart(sourcePath, readAndWriteCopyEnd, end);
} else {
objectPart.readAndWrite(sourcePath, start, end);
if (objectPart.size() > minPartSize) {
objectPart.upload();
this.objectPart = null;
}
}
// Case 4: add to the cache and upload the combined result.
// combine successfully, then upload.
handleWriteFuturePart(cachedBufLastAddCf, cachedBuf, System.nanoTime());
clearCache();
return;
}

// Case 5: It is better to only read a piece of range [start, end).
// Just fill cachedBuf with source until it reaches minPartSize. Upload the cache and the rest of the source.
cachedBufLastAddCf = cachedBufLastAddCf
.thenCompose(v -> rangeRead(sourcePath, start, start + minPartSize - cachedBufSize, ByteBufAlloc.ALLOC))
.thenAccept(buf -> cachedBuf.addComponent(true, buf));
handleWriteFuturePart(cachedBufLastAddCf, cachedBuf, System.nanoTime());
handleCopyPart(sourcePath, start + minPartSize - cachedBufSize, end);
clearCache();
}

private CompletableFuture<CompletedPart> handleWriteFuturePart(CompletableFuture<Void> waitingFuture, ByteBuf part, long startNanoTime) {
int partNumber = nextPartNumber.getAndIncrement();
CompletableFuture<CompletedPart> partCf = new CompletableFuture<>();
parts.add(partCf);
waitingFuture.thenCompose(nil -> uploadIdCf)
.thenAccept(uploadId -> write0(uploadId, partNumber, part, partCf));
partCf.whenComplete((nil, ex) -> {
PART_UPLOAD_COST.update(System.nanoTime() - startNanoTime);
part.release();
});
return partCf;
}

private CompletableFuture<CompletedPart> handleCopyPart(String sourcePath, long start, long end) {
long inclusiveEnd = end - 1;
int partNumber = nextPartNumber.getAndIncrement();
CompletableFuture<CompletedPart> partCf = new CompletableFuture<>();
parts.add(partCf);
uploadIdCf.thenAccept(uploadId -> {
UploadPartCopyRequest request = UploadPartCopyRequest.builder().sourceBucket(bucket).sourceKey(sourcePath)
.destinationBucket(bucket).destinationKey(path).copySourceRange(range(start, inclusiveEnd)).uploadId(uploadId).partNumber(partNumber).build();
copyWrite0(partNumber, request, partCf);
});
return partCf;
}

private void copyWrite0(int partNumber, UploadPartCopyRequest request, CompletableFuture<CompletedPart> partCf) {
Expand All @@ -396,10 +315,10 @@ public CompletableFuture<Void> close() {
return closeCf;
}

// upload the cached buf anyway. Note that the last part can be smaller than minPartSize.
if (cachedBuf != null) {
handleWriteFuturePart(cachedBufLastAddCf, cachedBuf, System.nanoTime());
clearCache();
if (objectPart != null) {
// force upload the last part which can be smaller than minPartSize.
objectPart.upload();
objectPart = null;
}

OBJECT_INTO_CLOSE_COST.update(System.nanoTime() - start);
Expand Down Expand Up @@ -448,5 +367,76 @@ private List<CompletedPart> genCompleteParts() {
}
}).collect(Collectors.toList());
}

class ObjectPart {
private final int partNumber = nextPartNumber.getAndIncrement();
private final CompositeByteBuf partBuf = ByteBufAlloc.ALLOC.compositeBuffer();
private CompletableFuture<Void> lastRangeReadCf = CompletableFuture.completedFuture(null);
private final CompletableFuture<CompletedPart> partCf = new CompletableFuture<>();
private long size;

public ObjectPart() {
parts.add(partCf);
}

public void write(ByteBuf data) {
size += data.readableBytes();
// ensure addComponent happen before following write or copyWrite.
this.lastRangeReadCf = lastRangeReadCf.thenAccept(nil -> partBuf.addComponent(true, data));
}

public void readAndWrite(String sourcePath, long start, long end) {
size += end - start;
// TODO: parallel read and sequence add.
this.lastRangeReadCf = lastRangeReadCf
.thenCompose(nil -> rangeRead(sourcePath, start, end, ByteBufAlloc.ALLOC))
.thenAccept(buf -> partBuf.addComponent(true, buf));
}

public void upload() {
this.lastRangeReadCf.whenComplete((nil, ex) -> {
if (ex != null) {
partCf.completeExceptionally(ex);
} else {
upload0();
}
});
}

private void upload0() {
long start = System.nanoTime();
uploadIdCf.thenAccept(uploadId -> write0(uploadId, partNumber, partBuf, partCf)).whenComplete((nil, ex) -> {
PART_UPLOAD_COST.update(System.nanoTime() - start);
partBuf.release();
});
}

public long size() {
return size;
}

public CompletableFuture<Void> getFuture() {
return partCf.thenApply(nil -> null);
}
}

class CopyObjectPart {
private final CompletableFuture<CompletedPart> partCf = new CompletableFuture<>();

public CopyObjectPart(String sourcePath, long start, long end) {
long inclusiveEnd = end - 1;
int partNumber = nextPartNumber.getAndIncrement();
parts.add(partCf);
uploadIdCf.thenAccept(uploadId -> {
UploadPartCopyRequest request = UploadPartCopyRequest.builder().sourceBucket(bucket).sourceKey(sourcePath)
.destinationBucket(bucket).destinationKey(path).copySourceRange(range(start, inclusiveEnd)).uploadId(uploadId).partNumber(partNumber).build();
copyWrite0(partNumber, request, partCf);
});
}

public CompletableFuture<Void> getFuture() {
return partCf.thenApply(nil -> null);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public CompletableFuture<ByteBuf> rangeRead(String path, long start, long end, B
return FutureUtil.failedFuture(new IllegalArgumentException("object not exist"));
}
int length = (int) (end - start);
return CompletableFuture.completedFuture(value.slice(value.readerIndex() + (int) start, length));
return CompletableFuture.completedFuture(value.retainedSlice(value.readerIndex() + (int) start, length));
}

@Override
Expand Down
16 changes: 8 additions & 8 deletions core/src/test/java/kafka/log/s3/operator/DefaultWriterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ void testCopyWrite() throws NoSuchMethodException, InvocationTargetException, Il
writer = operator.writer("test-path-2", 100);
List<UploadPartRequest> uploadPartRequests = new ArrayList<>();
List<UploadPartCopyRequest> uploadPartCopyRequests = new ArrayList<>();
List<Long> contentLengths = new ArrayList<>();
List<Long> writeContentLengths = new ArrayList<>();

UploadPartResponse.Builder builder = UploadPartResponse.builder();
Method method = builder.getClass().getDeclaredMethod("setETag", String.class);
Expand All @@ -142,7 +142,7 @@ void testCopyWrite() throws NoSuchMethodException, InvocationTargetException, Il
UploadPartRequest request = invocation.getArgument(0);
uploadPartRequests.add(request);
AsyncRequestBody body = invocation.getArgument(1);
contentLengths.add(body.contentLength().orElse(0L));
writeContentLengths.add(body.contentLength().orElse(0L));
return CompletableFuture.completedFuture(builder.build());
});

Expand Down Expand Up @@ -183,22 +183,22 @@ void testCopyWrite() throws NoSuchMethodException, InvocationTargetException, Il
assertEquals(3, uploadPartRequests.size());
assertEquals("unit-test-bucket", uploadPartRequests.get(0).bucket());
assertEquals("test-path-2", uploadPartRequests.get(0).key());
assertEquals(List.of(2, 3, 5), uploadPartRequests.stream()
assertEquals(List.of(2, 3, 4), uploadPartRequests.stream()
.map(UploadPartRequest::partNumber)
.collect(Collectors.toList()));
assertEquals(List.of(120L, 100L, 10L), contentLengths);
assertEquals(List.of(120L, 280L, 10L), writeContentLengths);

assertEquals(2, uploadPartCopyRequests.size());
assertEquals(1, uploadPartCopyRequests.size());
assertEquals("unit-test-bucket", uploadPartCopyRequests.get(0).sourceBucket());
assertEquals("unit-test-bucket", uploadPartCopyRequests.get(0).destinationBucket());
assertEquals(List.of("path-1", "path-6"), uploadPartCopyRequests.stream()
assertEquals(List.of("path-1"), uploadPartCopyRequests.stream()
.map(UploadPartCopyRequest::sourceKey)
.collect(Collectors.toList()));
assertEquals("test-path-2", uploadPartCopyRequests.get(0).destinationKey());
assertEquals(List.of(1, 4), uploadPartCopyRequests.stream()
assertEquals(List.of(1), uploadPartCopyRequests.stream()
.map(UploadPartCopyRequest::partNumber)
.collect(Collectors.toList()));
assertEquals(List.of("bytes=0-119", "bytes=420-599"), uploadPartCopyRequests.stream()
assertEquals(List.of("bytes=0-119"), uploadPartCopyRequests.stream()
.map(UploadPartCopyRequest::copySourceRange)
.collect(Collectors.toList()));
}
Expand Down