diff --git a/core/src/main/scala/kafka/log/s3/ObjectReader.java b/core/src/main/scala/kafka/log/s3/ObjectReader.java index 556e7c0598..c7dafdb802 100644 --- a/core/src/main/scala/kafka/log/s3/ObjectReader.java +++ b/core/src/main/scala/kafka/log/s3/ObjectReader.java @@ -278,6 +278,7 @@ public StreamRecordBatch next() { public void close() { try { in.close(); + buf.release(); } catch (IOException e) { throw new KafkaException("Failed to close object block stream", e); } diff --git a/core/src/main/scala/kafka/log/s3/operator/DefaultS3Operator.java b/core/src/main/scala/kafka/log/s3/operator/DefaultS3Operator.java index a62d9e8706..dab6ceb00d 100644 --- a/core/src/main/scala/kafka/log/s3/operator/DefaultS3Operator.java +++ b/core/src/main/scala/kafka/log/s3/operator/DefaultS3Operator.java @@ -109,7 +109,7 @@ public void close() { public CompletableFuture rangeRead(String path, long start, long end, ByteBufAllocator alloc) { end = end - 1; CompletableFuture cf = new CompletableFuture<>(); - ByteBuf buf = alloc.heapBuffer((int) (end - start)); + ByteBuf buf = alloc.directBuffer((int) (end - start)); rangeRead0(path, start, end, buf, cf); return cf; } @@ -192,6 +192,7 @@ private static boolean isUnrecoverable(Throwable ex) { class DefaultWriter implements Writer { + private static final long MAX_MERGE_WRITE_SIZE = 16L * 1024 * 1024; private final String path; private final CompletableFuture uploadIdCf = new CompletableFuture<>(); private volatile String uploadId; @@ -202,18 +203,7 @@ class DefaultWriter implements Writer { * The minPartSize represents the minimum size of a part for a multipart object. */ private final long minPartSize; - /** - * The cachedBuf is used to combine small parts into a bigger one. - */ - private CompositeByteBuf cachedBuf; - /** - * The size of the cachedBuf. - */ - private long cachedBufSize; - /** - * The last adding component operation to the cachedBuf. - */ - private CompletableFuture cachedBufLastAddCf; + private ObjectPart objectPart = null; private final long start = System.nanoTime(); public DefaultWriter(String path) { @@ -243,50 +233,22 @@ private void init() { }); } - // To unite the write and copyWrite, we still use cachedBufLastAddCf to add the last part of the source. @Override - public CompletableFuture write(ByteBuf part) { - OBJECT_UPLOAD_SIZE.inc(part.readableBytes()); + public CompletableFuture write(ByteBuf data) { + OBJECT_UPLOAD_SIZE.inc(data.readableBytes()); - long targetSize = part.readableBytes(); - if (cachedBuf == null) { - // Case 1: create cache and add as the first item. - if (targetSize < minPartSize) { - return initCacheWithFirstItem(targetSize, CompletableFuture.completedFuture(part)); - } - // Case 2: just upload. - return handleWriteFuturePart(CompletableFuture.completedFuture(null), part, System.nanoTime()) - .thenApply(nil -> null); + if (objectPart == null) { + objectPart = new ObjectPart(); } + ObjectPart objectPart = this.objectPart; - // cachedBuf not null, add to the cache. - cachedBufSize += targetSize; - cachedBufLastAddCf = cachedBufLastAddCf - .thenRun(() -> cachedBuf.addComponent(true, part)); - - // Case 3: cache size is smaller than minPartSize, just add to the cache. - if (cachedBufSize < minPartSize) { - return cachedBufLastAddCf; + objectPart.write(data); + if (objectPart.size() > minPartSize) { + objectPart.upload(); + // finish current part. + this.objectPart = null; } - - // Case 4: upload the combined result. - CompletableFuture future = handleWriteFuturePart(cachedBufLastAddCf, cachedBuf, System.nanoTime()); - clearCache(); - return future.thenApply(nil -> null); - } - - private CompletableFuture initCacheWithFirstItem(long targetSize, CompletableFuture firstItemFuture) { - cachedBuf = ByteBufAlloc.ALLOC.compositeBuffer(); - cachedBufSize = targetSize; - cachedBufLastAddCf = firstItemFuture - .thenAccept(buf -> cachedBuf.addComponent(true, buf)); - return cachedBufLastAddCf; - } - - private void clearCache() { - cachedBuf = null; - cachedBufLastAddCf = null; - cachedBufSize = 0; + return objectPart.getFuture(); } private void write0(String uploadId, int partNumber, ByteBuf part, CompletableFuture partCf) { @@ -312,70 +274,27 @@ private void write0(String uploadId, int partNumber, ByteBuf part, CompletableFu @Override public void copyWrite(String sourcePath, long start, long end) { long targetSize = end - start; - if (cachedBuf == null) { - // Case 1: create cache and add as the first item. + if (objectPart == null) { if (targetSize < minPartSize) { - initCacheWithFirstItem(targetSize, rangeRead(sourcePath, start, end, ByteBufAlloc.ALLOC)); - } else { // Case 2: just copy. - handleCopyPart(sourcePath, start, end); + this.objectPart = new ObjectPart(); + objectPart.readAndWrite(sourcePath, start, end); + } else { + new CopyObjectPart(sourcePath, start, end); } - return; - } - - // cachedBuf not null - long combinedSize = cachedBufSize + targetSize; - - // The size of the new part is at most minPartSize, it is ok to read the full range [start, end). - if (combinedSize <= 2 * minPartSize) { - cachedBufSize = combinedSize; - cachedBufLastAddCf = cachedBufLastAddCf - .thenCompose(v -> rangeRead(sourcePath, start, end, ByteBufAlloc.ALLOC)) - .thenAccept(buf -> cachedBuf.addComponent(true, buf)); - // Case 3: just add to cache. - if (combinedSize < minPartSize) { - return; + } else { + if (objectPart.size() + targetSize > MAX_MERGE_WRITE_SIZE) { + long readAndWriteCopyEnd = start + minPartSize - objectPart.size(); + objectPart.readAndWrite(sourcePath, start, readAndWriteCopyEnd); + objectPart.upload(); + new CopyObjectPart(sourcePath, readAndWriteCopyEnd, end); + } else { + objectPart.readAndWrite(sourcePath, start, end); + if (objectPart.size() > minPartSize) { + objectPart.upload(); + this.objectPart = null; + } } - // Case 4: add to the cache and upload the combined result. - // combine successfully, then upload. - handleWriteFuturePart(cachedBufLastAddCf, cachedBuf, System.nanoTime()); - clearCache(); - return; } - - // Case 5: It is better to only read a piece of range [start, end). - // Just fill cachedBuf with source until it reaches minPartSize. Upload the cache and the rest of the source. - cachedBufLastAddCf = cachedBufLastAddCf - .thenCompose(v -> rangeRead(sourcePath, start, start + minPartSize - cachedBufSize, ByteBufAlloc.ALLOC)) - .thenAccept(buf -> cachedBuf.addComponent(true, buf)); - handleWriteFuturePart(cachedBufLastAddCf, cachedBuf, System.nanoTime()); - handleCopyPart(sourcePath, start + minPartSize - cachedBufSize, end); - clearCache(); - } - - private CompletableFuture handleWriteFuturePart(CompletableFuture waitingFuture, ByteBuf part, long startNanoTime) { - int partNumber = nextPartNumber.getAndIncrement(); - CompletableFuture partCf = new CompletableFuture<>(); - parts.add(partCf); - waitingFuture.thenCompose(nil -> uploadIdCf) - .thenAccept(uploadId -> write0(uploadId, partNumber, part, partCf)); - partCf.whenComplete((nil, ex) -> { - PART_UPLOAD_COST.update(System.nanoTime() - startNanoTime); - part.release(); - }); - return partCf; - } - - private CompletableFuture handleCopyPart(String sourcePath, long start, long end) { - long inclusiveEnd = end - 1; - int partNumber = nextPartNumber.getAndIncrement(); - CompletableFuture partCf = new CompletableFuture<>(); - parts.add(partCf); - uploadIdCf.thenAccept(uploadId -> { - UploadPartCopyRequest request = UploadPartCopyRequest.builder().sourceBucket(bucket).sourceKey(sourcePath) - .destinationBucket(bucket).destinationKey(path).copySourceRange(range(start, inclusiveEnd)).uploadId(uploadId).partNumber(partNumber).build(); - copyWrite0(partNumber, request, partCf); - }); - return partCf; } private void copyWrite0(int partNumber, UploadPartCopyRequest request, CompletableFuture partCf) { @@ -396,10 +315,10 @@ public CompletableFuture close() { return closeCf; } - // upload the cached buf anyway. Note that the last part can be smaller than minPartSize. - if (cachedBuf != null) { - handleWriteFuturePart(cachedBufLastAddCf, cachedBuf, System.nanoTime()); - clearCache(); + if (objectPart != null) { + // force upload the last part which can be smaller than minPartSize. + objectPart.upload(); + objectPart = null; } OBJECT_INTO_CLOSE_COST.update(System.nanoTime() - start); @@ -448,5 +367,76 @@ private List genCompleteParts() { } }).collect(Collectors.toList()); } + + class ObjectPart { + private final int partNumber = nextPartNumber.getAndIncrement(); + private final CompositeByteBuf partBuf = ByteBufAlloc.ALLOC.compositeBuffer(); + private CompletableFuture lastRangeReadCf = CompletableFuture.completedFuture(null); + private final CompletableFuture partCf = new CompletableFuture<>(); + private long size; + + public ObjectPart() { + parts.add(partCf); + } + + public void write(ByteBuf data) { + size += data.readableBytes(); + // ensure addComponent happen before following write or copyWrite. + this.lastRangeReadCf = lastRangeReadCf.thenAccept(nil -> partBuf.addComponent(true, data)); + } + + public void readAndWrite(String sourcePath, long start, long end) { + size += end - start; + // TODO: parallel read and sequence add. + this.lastRangeReadCf = lastRangeReadCf + .thenCompose(nil -> rangeRead(sourcePath, start, end, ByteBufAlloc.ALLOC)) + .thenAccept(buf -> partBuf.addComponent(true, buf)); + } + + public void upload() { + this.lastRangeReadCf.whenComplete((nil, ex) -> { + if (ex != null) { + partCf.completeExceptionally(ex); + } else { + upload0(); + } + }); + } + + private void upload0() { + long start = System.nanoTime(); + uploadIdCf.thenAccept(uploadId -> write0(uploadId, partNumber, partBuf, partCf)).whenComplete((nil, ex) -> { + PART_UPLOAD_COST.update(System.nanoTime() - start); + partBuf.release(); + }); + } + + public long size() { + return size; + } + + public CompletableFuture getFuture() { + return partCf.thenApply(nil -> null); + } + } + + class CopyObjectPart { + private final CompletableFuture partCf = new CompletableFuture<>(); + + public CopyObjectPart(String sourcePath, long start, long end) { + long inclusiveEnd = end - 1; + int partNumber = nextPartNumber.getAndIncrement(); + parts.add(partCf); + uploadIdCf.thenAccept(uploadId -> { + UploadPartCopyRequest request = UploadPartCopyRequest.builder().sourceBucket(bucket).sourceKey(sourcePath) + .destinationBucket(bucket).destinationKey(path).copySourceRange(range(start, inclusiveEnd)).uploadId(uploadId).partNumber(partNumber).build(); + copyWrite0(partNumber, request, partCf); + }); + } + + public CompletableFuture getFuture() { + return partCf.thenApply(nil -> null); + } + } } } diff --git a/core/src/main/scala/kafka/log/s3/operator/MemoryS3Operator.java b/core/src/main/scala/kafka/log/s3/operator/MemoryS3Operator.java index 7f2dce27f7..2d236c78c0 100644 --- a/core/src/main/scala/kafka/log/s3/operator/MemoryS3Operator.java +++ b/core/src/main/scala/kafka/log/s3/operator/MemoryS3Operator.java @@ -39,7 +39,7 @@ public CompletableFuture rangeRead(String path, long start, long end, B return FutureUtil.failedFuture(new IllegalArgumentException("object not exist")); } int length = (int) (end - start); - return CompletableFuture.completedFuture(value.slice(value.readerIndex() + (int) start, length)); + return CompletableFuture.completedFuture(value.retainedSlice(value.readerIndex() + (int) start, length)); } @Override diff --git a/core/src/test/java/kafka/log/s3/operator/DefaultWriterTest.java b/core/src/test/java/kafka/log/s3/operator/DefaultWriterTest.java index 57ce789507..13f5b8e567 100644 --- a/core/src/test/java/kafka/log/s3/operator/DefaultWriterTest.java +++ b/core/src/test/java/kafka/log/s3/operator/DefaultWriterTest.java @@ -120,7 +120,7 @@ void testCopyWrite() throws NoSuchMethodException, InvocationTargetException, Il writer = operator.writer("test-path-2", 100); List uploadPartRequests = new ArrayList<>(); List uploadPartCopyRequests = new ArrayList<>(); - List contentLengths = new ArrayList<>(); + List writeContentLengths = new ArrayList<>(); UploadPartResponse.Builder builder = UploadPartResponse.builder(); Method method = builder.getClass().getDeclaredMethod("setETag", String.class); @@ -142,7 +142,7 @@ void testCopyWrite() throws NoSuchMethodException, InvocationTargetException, Il UploadPartRequest request = invocation.getArgument(0); uploadPartRequests.add(request); AsyncRequestBody body = invocation.getArgument(1); - contentLengths.add(body.contentLength().orElse(0L)); + writeContentLengths.add(body.contentLength().orElse(0L)); return CompletableFuture.completedFuture(builder.build()); }); @@ -183,22 +183,22 @@ void testCopyWrite() throws NoSuchMethodException, InvocationTargetException, Il assertEquals(3, uploadPartRequests.size()); assertEquals("unit-test-bucket", uploadPartRequests.get(0).bucket()); assertEquals("test-path-2", uploadPartRequests.get(0).key()); - assertEquals(List.of(2, 3, 5), uploadPartRequests.stream() + assertEquals(List.of(2, 3, 4), uploadPartRequests.stream() .map(UploadPartRequest::partNumber) .collect(Collectors.toList())); - assertEquals(List.of(120L, 100L, 10L), contentLengths); + assertEquals(List.of(120L, 280L, 10L), writeContentLengths); - assertEquals(2, uploadPartCopyRequests.size()); + assertEquals(1, uploadPartCopyRequests.size()); assertEquals("unit-test-bucket", uploadPartCopyRequests.get(0).sourceBucket()); assertEquals("unit-test-bucket", uploadPartCopyRequests.get(0).destinationBucket()); - assertEquals(List.of("path-1", "path-6"), uploadPartCopyRequests.stream() + assertEquals(List.of("path-1"), uploadPartCopyRequests.stream() .map(UploadPartCopyRequest::sourceKey) .collect(Collectors.toList())); assertEquals("test-path-2", uploadPartCopyRequests.get(0).destinationKey()); - assertEquals(List.of(1, 4), uploadPartCopyRequests.stream() + assertEquals(List.of(1), uploadPartCopyRequests.stream() .map(UploadPartCopyRequest::partNumber) .collect(Collectors.toList())); - assertEquals(List.of("bytes=0-119", "bytes=420-599"), uploadPartCopyRequests.stream() + assertEquals(List.of("bytes=0-119"), uploadPartCopyRequests.stream() .map(UploadPartCopyRequest::copySourceRange) .collect(Collectors.toList())); }