diff --git a/s3stream/src/main/java/com/automq/stream/s3/network/AsyncNetworkBandwidthLimiter.java b/s3stream/src/main/java/com/automq/stream/s3/network/AsyncNetworkBandwidthLimiter.java index b3a5f0c899..83f6aa86a7 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/network/AsyncNetworkBandwidthLimiter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/network/AsyncNetworkBandwidthLimiter.java @@ -31,7 +31,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -public class AsyncNetworkBandwidthLimiter { +public class AsyncNetworkBandwidthLimiter implements NetworkBandwidthLimiter { private static final Logger LOGGER = new LogContext().logger(AsyncNetworkBandwidthLimiter.class); private static final float DEFAULT_EXTRA_TOKEN_RATIO = 0.1f; private static final long MAX_TOKEN_PART_SIZE = 1024 * 1024; diff --git a/s3stream/src/main/java/com/automq/stream/s3/network/NetworkBandwidthLimiter.java b/s3stream/src/main/java/com/automq/stream/s3/network/NetworkBandwidthLimiter.java new file mode 100644 index 0000000000..258566b55b --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/s3/network/NetworkBandwidthLimiter.java @@ -0,0 +1,42 @@ +/* + * Copyright 2024, AutoMQ CO.,LTD. + * + * Use of this software is governed by the Business Source License + * included in the file BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package com.automq.stream.s3.network; + +import java.util.concurrent.CompletableFuture; + +public interface NetworkBandwidthLimiter { + NetworkBandwidthLimiter NOOP = new Noop(); + + CompletableFuture consume(ThrottleStrategy throttleStrategy, long size); + + long getMaxTokens(); + + long getAvailableTokens(); + + + class Noop implements NetworkBandwidthLimiter { + @Override + public CompletableFuture consume(ThrottleStrategy throttleStrategy, long size) { + return CompletableFuture.completedFuture(null); + } + + @Override + public long getMaxTokens() { + return Long.MAX_VALUE; + } + + @Override + public long getAvailableTokens() { + return Long.MAX_VALUE; + } + } +} diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/AbstractObjectStorage.java b/s3stream/src/main/java/com/automq/stream/s3/operator/AbstractObjectStorage.java index be686deddb..48fcfc69c0 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/AbstractObjectStorage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/AbstractObjectStorage.java @@ -18,13 +18,12 @@ import com.automq.stream.s3.metrics.stats.S3OperationStats; import com.automq.stream.s3.metrics.stats.StorageOperationStats; import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter; -import com.automq.stream.s3.network.ThrottleStrategy; +import com.automq.stream.s3.network.NetworkBandwidthLimiter; import com.automq.stream.utils.FutureUtil; import com.automq.stream.utils.ThreadUtils; import com.automq.stream.utils.Threads; import com.automq.stream.utils.Utils; import io.netty.buffer.ByteBuf; -import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; @@ -42,7 +41,6 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,10 +58,8 @@ public abstract class AbstractObjectStorage implements ObjectStorage { private final Semaphore inflightReadLimiter; private final Semaphore inflightWriteLimiter; private final List waitingReadTasks = new LinkedList<>(); - private final AsyncNetworkBandwidthLimiter networkInboundBandwidthLimiter; - private final AsyncNetworkBandwidthLimiter networkOutboundBandwidthLimiter; - private final ExecutorService readLimiterCallbackExecutor = Threads.newFixedThreadPoolWithMonitor(1, - "s3-read-limiter-cb-executor", true, LOGGER); + private final NetworkBandwidthLimiter networkInboundBandwidthLimiter; + private final NetworkBandwidthLimiter networkOutboundBandwidthLimiter; private final ExecutorService writeLimiterCallbackExecutor = Threads.newFixedThreadPoolWithMonitor(1, "s3-write-limiter-cb-executor", true, LOGGER); private final ExecutorService readCallbackExecutor = Threads.newFixedThreadPoolWithMonitor(1, @@ -74,11 +70,11 @@ public abstract class AbstractObjectStorage implements ObjectStorage { ThreadUtils.createThreadFactory("s3-timeout-detect", true), 1, TimeUnit.SECONDS, 100); final ScheduledExecutorService scheduler = Threads.newSingleThreadScheduledExecutor( ThreadUtils.createThreadFactory("objectStorage", true), LOGGER); - boolean checkS3ApiMode = false; + final boolean checkS3ApiMode; private AbstractObjectStorage( - AsyncNetworkBandwidthLimiter networkInboundBandwidthLimiter, - AsyncNetworkBandwidthLimiter networkOutboundBandwidthLimiter, + NetworkBandwidthLimiter networkInboundBandwidthLimiter, + NetworkBandwidthLimiter networkOutboundBandwidthLimiter, int maxObjectStorageConcurrency, int currentIndex, boolean readWriteIsolate, @@ -88,8 +84,8 @@ private AbstractObjectStorage( this.maxMergeReadSparsityRate = Utils.getMaxMergeReadSparsityRate(); this.inflightWriteLimiter = new Semaphore(maxObjectStorageConcurrency); this.inflightReadLimiter = readWriteIsolate ? new Semaphore(maxObjectStorageConcurrency) : inflightWriteLimiter; - this.networkInboundBandwidthLimiter = networkInboundBandwidthLimiter; - this.networkOutboundBandwidthLimiter = networkOutboundBandwidthLimiter; + this.networkInboundBandwidthLimiter = networkInboundBandwidthLimiter != null ? networkInboundBandwidthLimiter : NetworkBandwidthLimiter.NOOP; + this.networkOutboundBandwidthLimiter = networkOutboundBandwidthLimiter != null ? networkOutboundBandwidthLimiter : NetworkBandwidthLimiter.NOOP; this.checkS3ApiMode = checkS3ApiMode; if (!manualMergeRead) { scheduler.scheduleWithFixedDelay(this::tryMergeRead, 1, 1, TimeUnit.MILLISECONDS); @@ -100,8 +96,8 @@ private AbstractObjectStorage( } public AbstractObjectStorage( - AsyncNetworkBandwidthLimiter networkInboundBandwidthLimiter, - AsyncNetworkBandwidthLimiter networkOutboundBandwidthLimiter, + NetworkBandwidthLimiter networkInboundBandwidthLimiter, + NetworkBandwidthLimiter networkOutboundBandwidthLimiter, boolean readWriteIsolate, boolean checkS3ApiMode) { this(networkInboundBandwidthLimiter, networkOutboundBandwidthLimiter, getMaxObjectStorageConcurrency(), @@ -110,7 +106,7 @@ public AbstractObjectStorage( // used for test only public AbstractObjectStorage(boolean manualMergeRead) { - this(null, null, 50, 0, true, false, manualMergeRead); + this(NetworkBandwidthLimiter.NOOP, NetworkBandwidthLimiter.NOOP, 50, 0, true, false, manualMergeRead); } @Override @@ -131,42 +127,56 @@ public CompletableFuture rangeRead(ReadOptions options, String objectPa return cf; } - if (networkInboundBandwidthLimiter != null) { - TimerUtil timerUtil = new TimerUtil(); - networkInboundBandwidthLimiter.consume(options.throttleStrategy(), end - start).whenCompleteAsync((v, ex) -> { - NetworkStats.getInstance().networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type.INBOUND, options.throttleStrategy()) - .record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); - if (ex != null) { - cf.completeExceptionally(ex); - } else { - rangeRead0(objectPath, start, end, cf); + TimerUtil timerUtil = new TimerUtil(); + networkInboundBandwidthLimiter.consume(options.throttleStrategy(), end - start).whenComplete((v, ex) -> { + NetworkStats.getInstance().networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type.INBOUND, options.throttleStrategy()) + .record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + if (ex != null) { + cf.completeExceptionally(ex); + } else { + synchronized (waitingReadTasks) { + waitingReadTasks.add(new AbstractObjectStorage.ReadTask(options, objectPath, start, end, cf)); } - }, readLimiterCallbackExecutor); - } else { - rangeRead0(objectPath, start, end, cf); - - } - Timeout timeout = timeoutDetect.newTimeout(t -> { - LOGGER.warn("rangeRead {} {}-{} timeout", objectPath, start, end); - System.out.println("timeout"); - }, 1, TimeUnit.MINUTES); + } + }); + Timeout timeout = timeoutDetect.newTimeout(t -> LOGGER.warn("rangeRead {} {}-{} timeout", objectPath, start, end), 1, TimeUnit.MINUTES); return cf.whenComplete((rst, ex) -> timeout.cancel()); } - public CompletableFuture write(String path, ByteBuf data, ThrottleStrategy throttleStrategy) { + @Override + public CompletableFuture write(WriteOptions options, String objectPath, ByteBuf data) { CompletableFuture cf = new CompletableFuture<>(); CompletableFuture retCf = acquireWritePermit(cf); if (retCf.isDone()) { return retCf; } - writeWithCf(path, data, throttleStrategy, cf); + TimerUtil timerUtil = new TimerUtil(); + networkOutboundBandwidthLimiter + .consume(options.throttleStrategy(), data.readableBytes()) + .whenCompleteAsync((v, ex) -> { + NetworkStats.getInstance().networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type.OUTBOUND, options.throttleStrategy()) + .record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + if (ex != null) { + cf.completeExceptionally(ex); + } else { + write0(options, objectPath, data, cf); + } + }, writeLimiterCallbackExecutor); return retCf; } - private void writeWithCf(String path, ByteBuf data, ThrottleStrategy throttleStrategy, CompletableFuture cf) { + private void write0(WriteOptions options, String path, ByteBuf data, CompletableFuture cf) { TimerUtil timerUtil = new TimerUtil(); long objectSize = data.readableBytes(); - Consumer failHandler = ex -> { + doWrite(options, path, data).thenAccept(nil -> { + S3OperationStats.getInstance().uploadSizeTotalStats.add(MetricsLevel.INFO, objectSize); + S3OperationStats.getInstance().putObjectStats(objectSize, true).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("put object {} with size {}, cost {}ms", path, objectSize, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + } + data.release(); + cf.complete(null); + }).exceptionally(ex -> { S3OperationStats.getInstance().putObjectStats(objectSize, false).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); if (isUnrecoverable(ex) || checkS3ApiMode) { LOGGER.error("PutObject for object {} fail", path, ex); @@ -174,88 +184,69 @@ private void writeWithCf(String path, ByteBuf data, ThrottleStrategy throttleStr data.release(); } else { LOGGER.warn("PutObject for object {} fail, retry later", path, ex); - scheduler.schedule(() -> writeWithCf(path, data, throttleStrategy, cf), 100, TimeUnit.MILLISECONDS); + scheduler.schedule(() -> write0(options, path, data, cf), 100, TimeUnit.MILLISECONDS); } - }; - Runnable successHandler = () -> { - S3OperationStats.getInstance().uploadSizeTotalStats.add(MetricsLevel.INFO, objectSize); - S3OperationStats.getInstance().putObjectStats(objectSize, true).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); - LOGGER.debug("put object {} with size {}, cost {}ms", path, objectSize, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); - data.release(); - cf.complete(null); - }; - if (networkOutboundBandwidthLimiter != null) { - networkOutboundBandwidthLimiter.consume(throttleStrategy, data.readableBytes()).whenCompleteAsync((v, ex) -> { - NetworkStats.getInstance().networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type.OUTBOUND, throttleStrategy) - .record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); - if (ex != null) { - cf.completeExceptionally(ex); - } else { - doWrite(path, data, failHandler, successHandler); - } - }, writeLimiterCallbackExecutor); - } else { - doWrite(path, data, failHandler, successHandler); - } + return null; + }); } - public CompletableFuture createMultipartUpload(String path) { + public CompletableFuture createMultipartUpload(WriteOptions options, String path) { CompletableFuture cf = new CompletableFuture<>(); CompletableFuture retCf = acquireWritePermit(cf); if (retCf.isDone()) { return retCf; } - - createMultipartUploadWithCf(path, cf); + createMultipartUpload0(options, path, cf); return retCf; } - private void createMultipartUploadWithCf(String path, CompletableFuture cf) { + private void createMultipartUpload0(WriteOptions options, String path, CompletableFuture cf) { TimerUtil timerUtil = new TimerUtil(); - Consumer successHandler = uploadId -> { + doCreateMultipartUpload(options, path).thenAccept(uploadId -> { S3OperationStats.getInstance().createMultiPartUploadStats(true).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); cf.complete(uploadId); - }; - Consumer failHandler = ex -> { + }).exceptionally(ex -> { S3OperationStats.getInstance().createMultiPartUploadStats(false).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); if (isUnrecoverable(ex) || checkS3ApiMode) { LOGGER.error("CreateMultipartUpload for object {} fail", path, ex); cf.completeExceptionally(ex); } else { LOGGER.warn("CreateMultipartUpload for object {} fail, retry later", path, ex); - scheduler.schedule(() -> createMultipartUploadWithCf(path, cf), 100, TimeUnit.MILLISECONDS); + scheduler.schedule(() -> createMultipartUpload0(options, path, cf), 100, TimeUnit.MILLISECONDS); } - }; - doCreateMultipartUpload(path, failHandler, successHandler); + return null; + }); } - public CompletableFuture uploadPart(String path, String uploadId, int partNumber, - ByteBuf data, ThrottleStrategy throttleStrategy) { + public CompletableFuture uploadPart(WriteOptions options, String path, String uploadId, + int partNumber, ByteBuf data) { CompletableFuture cf = new CompletableFuture<>(); CompletableFuture refCf = acquireWritePermit(cf); if (refCf.isDone()) { return refCf; } - - uploadPartWithCf(path, uploadId, partNumber, data, throttleStrategy, cf); + networkOutboundBandwidthLimiter + .consume(options.throttleStrategy(), data.readableBytes()) + .whenCompleteAsync((v, ex) -> { + if (ex != null) { + cf.completeExceptionally(ex); + } else { + uploadPart0(options, path, uploadId, partNumber, data, cf); + } + }, writeLimiterCallbackExecutor); return refCf; } - private void uploadPartWithCf(String path, - String uploadId, - int partNumber, - ByteBuf data, - ThrottleStrategy throttleStrategy, - CompletableFuture cf) { + private void uploadPart0(WriteOptions options, String path, String uploadId, int partNumber, ByteBuf data, + CompletableFuture cf) { TimerUtil timerUtil = new TimerUtil(); int size = data.readableBytes(); - Consumer successHandler = objectStorageCompletedPart -> { + doUploadPart(options, path, uploadId, partNumber, data).thenAccept(part -> { S3OperationStats.getInstance().uploadSizeTotalStats.add(MetricsLevel.INFO, size); S3OperationStats.getInstance().uploadPartStats(size, true).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); data.release(); - cf.complete(objectStorageCompletedPart); - }; - Consumer failHandler = ex -> { + cf.complete(part); + }).exceptionally(ex -> { S3OperationStats.getInstance().uploadPartStats(size, false).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); if (isUnrecoverable(ex) || checkS3ApiMode) { LOGGER.error("UploadPart for object {}-{} fail", path, partNumber, ex); @@ -263,85 +254,63 @@ private void uploadPartWithCf(String path, cf.completeExceptionally(ex); } else { LOGGER.warn("UploadPart for object {}-{} fail, retry later", path, partNumber, ex); - scheduler.schedule(() -> uploadPartWithCf(path, uploadId, partNumber, data, throttleStrategy, cf), 100, TimeUnit.MILLISECONDS); + scheduler.schedule(() -> uploadPart0(options, path, uploadId, partNumber, data, cf), 100, TimeUnit.MILLISECONDS); } - }; - if (networkOutboundBandwidthLimiter != null) { - networkOutboundBandwidthLimiter.consume(throttleStrategy, data.readableBytes()).whenCompleteAsync((v, ex) -> { - if (ex != null) { - cf.completeExceptionally(ex); - } else { - doUploadPart(path, uploadId, partNumber, data, failHandler, successHandler); - } - }, writeLimiterCallbackExecutor); - } else { - doUploadPart(path, uploadId, partNumber, data, failHandler, successHandler); - } + return null; + }); } - public CompletableFuture uploadPartCopy(String sourcePath, String path, long start, - long end, - String uploadId, int partNumber) { + public CompletableFuture uploadPartCopy(WriteOptions options, String sourcePath, + String path, long start, long end, String uploadId, int partNumber) { CompletableFuture cf = new CompletableFuture<>(); CompletableFuture retCf = acquireWritePermit(cf); if (retCf.isDone()) { return retCf; } - uploadPartCopyWithCf(sourcePath, path, start, end, uploadId, partNumber, cf); + options.apiCallAttemptTimeout(DEFAULT_UPLOAD_PART_COPY_TIMEOUT); + uploadPartCopy0(options, sourcePath, path, start, end, uploadId, partNumber, cf); return retCf; } - private void uploadPartCopyWithCf(String sourcePath, - String path, - long start, - long end, - String uploadId, - int partNumber, - CompletableFuture cf) { + private void uploadPartCopy0(WriteOptions options, String sourcePath, String path, long start, long end, + String uploadId, int partNumber, CompletableFuture cf) { TimerUtil timerUtil = new TimerUtil(); - Consumer successHandler = objectStorageCompletedPart -> { + doUploadPartCopy(options, sourcePath, path, start, end, uploadId, partNumber).thenAccept(part -> { S3OperationStats.getInstance().uploadPartCopyStats(true).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); - cf.complete(objectStorageCompletedPart); - }; - Consumer failHandler = ex -> { + cf.complete(part); + }).exceptionally(ex -> { S3OperationStats.getInstance().uploadPartCopyStats(false).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); if (isUnrecoverable(ex) || checkS3ApiMode) { LOGGER.warn("UploadPartCopy for object {}-{} [{}, {}] fail", path, partNumber, start, end, ex); cf.completeExceptionally(ex); } else { - long nextApiCallAttemptTimeout = Math.min(DEFAULT_UPLOAD_PART_COPY_TIMEOUT * 2, TimeUnit.MINUTES.toMillis(10)); + long nextApiCallAttemptTimeout = Math.min(options.apiCallAttemptTimeout() * 2, TimeUnit.MINUTES.toMillis(10)); LOGGER.warn("UploadPartCopy for object {}-{} [{}, {}] fail, retry later with apiCallAttemptTimeout={}", path, partNumber, start, end, nextApiCallAttemptTimeout, ex); - scheduler.schedule(() -> uploadPartCopyWithCf(sourcePath, path, start, end, uploadId, partNumber, cf), 1000, TimeUnit.MILLISECONDS); + options.apiCallAttemptTimeout(nextApiCallAttemptTimeout); + scheduler.schedule(() -> uploadPartCopy0(options, sourcePath, path, start, end, uploadId, partNumber, cf), 1000, TimeUnit.MILLISECONDS); } - }; - - // TODO: get default timeout by latency baseline - doUploadPartCopy(sourcePath, path, start, end, uploadId, partNumber, DEFAULT_UPLOAD_PART_COPY_TIMEOUT, failHandler, successHandler); + return null; + }); } - public CompletableFuture completeMultipartUpload(String path, String uploadId, + public CompletableFuture completeMultipartUpload(WriteOptions options, String path, String uploadId, List parts) { - CompletableFuture cf = new CompletableFuture<>(); CompletableFuture retCf = acquireWritePermit(cf); if (retCf.isDone()) { return retCf; } - - completeMultipartUploadWithCf(path, uploadId, parts, cf); + completeMultipartUpload0(options, path, uploadId, parts, cf); return retCf; } - private void completeMultipartUploadWithCf(String path, - String uploadId, - List parts, - CompletableFuture cf) { + private void completeMultipartUpload0(WriteOptions options, String path, String uploadId, + List parts, CompletableFuture cf) { TimerUtil timerUtil = new TimerUtil(); - Runnable successHandler = () -> { + doCompleteMultipartUpload(options, path, uploadId, parts).thenAccept(nil -> { S3OperationStats.getInstance().completeMultiPartUploadStats(true).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); cf.complete(null); - }; - Consumer failHandler = ex -> { + }).exceptionally(ex -> { S3OperationStats.getInstance().completeMultiPartUploadStats(false).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); if (isUnrecoverable(ex) || checkS3ApiMode) { LOGGER.error("CompleteMultipartUpload for object {} fail", path, ex); @@ -351,29 +320,25 @@ private void completeMultipartUploadWithCf(String path, cf.completeExceptionally(new IllegalArgumentException("Part numbers are not continuous")); } else { LOGGER.warn("CompleteMultipartUpload for object {} fail, retry later", path, ex); - scheduler.schedule(() -> completeMultipartUploadWithCf(path, uploadId, parts, cf), 100, TimeUnit.MILLISECONDS); + scheduler.schedule(() -> completeMultipartUpload0(options, path, uploadId, parts, cf), 100, TimeUnit.MILLISECONDS); } - }; - - doCompleteMultipartUpload(path, uploadId, parts, failHandler, successHandler); + return null; + }); } @Override public CompletableFuture delete(List objectPaths) { TimerUtil timerUtil = new TimerUtil(); CompletableFuture cf = new CompletableFuture<>(); - List objectKeys = objectPaths.stream() - .map(ObjectPath::key) - .collect(Collectors.toList()); - Runnable successHandler = () -> { - LOGGER.info("Delete objects finished, count: {}, cost: {}", objectKeys.size(), timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + List objectKeys = objectPaths.stream().map(ObjectPath::key).collect(Collectors.toList()); + doDeleteObjects(objectKeys).thenAccept(nil -> { + LOGGER.info("Delete objects finished, count: {}, cost: {}ms", objectKeys.size(), timerUtil.elapsedAs(TimeUnit.MILLISECONDS)); S3OperationStats.getInstance().deleteObjectsStats(true).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); cf.complete(null); - }; - Consumer failHandler = ex -> { + }).exceptionally(ex -> { if (ex instanceof DeleteObjectsException) { DeleteObjectsException deleteObjectsException = (DeleteObjectsException) ex; - LOGGER.info("Delete objects failed, count: {}, cost: {}, failedKeys: {}", + LOGGER.warn("Delete objects failed, count: {}, cost: {}, failedKeys: {}", deleteObjectsException.getFailedKeys().size(), timerUtil.elapsedAs(TimeUnit.NANOSECONDS), deleteObjectsException.getFailedKeys()); } else { @@ -383,23 +348,18 @@ public CompletableFuture delete(List objectPaths) { objectKeys.size(), timerUtil.elapsedAs(TimeUnit.NANOSECONDS), ex.getMessage()); } cf.completeExceptionally(ex); - }; - doDeleteObjects(objectKeys, failHandler, successHandler); + return null; + }); return cf; } - @Override - public CompletableFuture write(WriteOptions options, String objectPath, ByteBuf buf) { - return write(objectPath, buf, options.throttleStrategy()); - } - @Override public CompletableFuture> list(String prefix) { TimerUtil timerUtil = new TimerUtil(); CompletableFuture> cf = doList(prefix); cf.thenAccept(keyList -> { S3OperationStats.getInstance().listObjectsStats(true).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); - LOGGER.info("List objects finished, count: {}, cost: {}", keyList.size(), timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + LOGGER.info("List objects finished, count: {}, cost: {}ms", keyList.size(), timerUtil.elapsedAs(TimeUnit.MILLISECONDS)); }).exceptionally(ex -> { S3OperationStats.getInstance().listObjectsStats(false).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); LOGGER.info("List objects failed, cost: {}, ex: {}", timerUtil.elapsedAs(TimeUnit.NANOSECONDS), ex.getMessage()); @@ -410,31 +370,27 @@ public CompletableFuture> list(String prefix) { @Override public void close() { - readLimiterCallbackExecutor.shutdown(); readCallbackExecutor.shutdown(); scheduler.shutdown(); doClose(); } - abstract void doRangeRead(String path, long start, long end, Consumer failHandler, - Consumer successHandler); + abstract CompletableFuture doRangeRead(ReadOptions options, String path, long start, long end); - abstract void doWrite(String path, ByteBuf data, Consumer failHandler, Runnable successHandler); + abstract CompletableFuture doWrite(WriteOptions options, String path, ByteBuf data); - abstract void doCreateMultipartUpload(String path, Consumer failHandler, - Consumer successHandler); + abstract CompletableFuture doCreateMultipartUpload(WriteOptions options, String path); - abstract void doUploadPart(String path, String uploadId, int partNumber, ByteBuf part, - Consumer failHandler, Consumer successHandler); + abstract CompletableFuture doUploadPart(WriteOptions options, String path, + String uploadId, int partNumber, ByteBuf part); - abstract void doUploadPartCopy(String sourcePath, String path, long start, long end, String uploadId, - int partNumber, long apiCallAttemptTimeout, - Consumer failHandler, Consumer successHandler); + abstract CompletableFuture doUploadPartCopy(WriteOptions options, String sourcePath, + String path, long start, long end, String uploadId, int partNumber); - abstract void doCompleteMultipartUpload(String path, String uploadId, List parts, - Consumer failHandler, Runnable successHandler); + abstract CompletableFuture doCompleteMultipartUpload(WriteOptions options, String path, String uploadId, + List parts); - abstract void doDeleteObjects(List objectKeys, Consumer failHandler, Runnable successHandler); + abstract CompletableFuture doDeleteObjects(List objectKeys); abstract boolean isUnrecoverable(Throwable ex); @@ -447,12 +403,6 @@ private static boolean checkPartNumbers(List parts) return maxOpt.isPresent() && maxOpt.get() == parts.size(); } - private void rangeRead0(String objectPath, long start, long end, CompletableFuture cf) { - synchronized (waitingReadTasks) { - waitingReadTasks.add(new AbstractObjectStorage.ReadTask(objectPath, start, end, cf)); - } - } - void tryMergeRead() { try { tryMergeRead0(); @@ -502,7 +452,7 @@ private void tryMergeRead0() { path, mergedReadTask.start, mergedReadTask.end, mergedReadTask.end - mergedReadTask.start, mergedReadTask.dataSparsityRate); } - mergedRangeRead(path, mergedReadTask.start, mergedReadTask.end) + mergedRangeRead(mergedReadTask.readTasks.get(0).options, path, mergedReadTask.start, mergedReadTask.end) .whenComplete((rst, ex) -> FutureUtil.suppress(() -> mergedReadTask.handleReadCompleted(rst, ex), LOGGER)); } ); @@ -512,31 +462,20 @@ private int availableReadPermit() { return inflightReadLimiter.availablePermits(); } - CompletableFuture mergedRangeRead(String path, long start, long end) { + CompletableFuture mergedRangeRead(ReadOptions options, String path, long start, long end) { CompletableFuture cf = new CompletableFuture<>(); CompletableFuture retCf = acquireReadPermit(cf); if (retCf.isDone()) { return retCf; } - mergedRangeRead0(path, start, end, cf); + mergedRangeRead0(options, path, start, end, cf); return retCf; } - private void mergedRangeRead0(String path, long start, long end, CompletableFuture cf) { + private void mergedRangeRead0(ReadOptions options, String path, long start, long end, CompletableFuture cf) { TimerUtil timerUtil = new TimerUtil(); long size = end - start; - Consumer failHandler = ex -> { - if (isUnrecoverable(ex) || checkS3ApiMode) { - LOGGER.error("GetObject for object {} [{}, {}) fail", path, start, end, ex); - cf.completeExceptionally(ex); - } else { - LOGGER.warn("GetObject for object {} [{}, {}) fail, retry later", path, start, end, ex); - scheduler.schedule(() -> mergedRangeRead0(path, start, end, cf), 100, TimeUnit.MILLISECONDS); - } - S3OperationStats.getInstance().getObjectStats(size, false).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); - }; - - Consumer successHandler = buf -> { + doRangeRead(options, path, start, end).thenAccept(buf -> { if (LOGGER.isDebugEnabled()) { LOGGER.debug("[S3BlockCache] getObject from path: {}, {}-{}, size: {}, cost: {} ms", path, start, end, size, timerUtil.elapsedAs(TimeUnit.MILLISECONDS)); @@ -544,9 +483,17 @@ private void mergedRangeRead0(String path, long start, long end, CompletableFutu S3OperationStats.getInstance().downloadSizeTotalStats.add(MetricsLevel.INFO, size); S3OperationStats.getInstance().getObjectStats(size, true).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); cf.complete(buf); - }; - - doRangeRead(path, start, end, failHandler, successHandler); + }).exceptionally(ex -> { + if (isUnrecoverable(ex) || checkS3ApiMode) { + LOGGER.error("GetObject for object {} [{}, {}) fail", path, start, end, ex); + cf.completeExceptionally(ex); + } else { + LOGGER.warn("GetObject for object {} [{}, {}) fail, retry later", path, start, end, ex); + scheduler.schedule(() -> mergedRangeRead0(options, path, start, end, cf), 100, TimeUnit.MILLISECONDS); + } + S3OperationStats.getInstance().getObjectStats(size, false).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + return null; + }); } static int getMaxObjectStorageConcurrency() { @@ -719,18 +666,24 @@ void handleReadCompleted(ByteBuf rst, Throwable ex) { } static final class ReadTask { + private final ReadOptions options; private final String objectPath; private final long start; private final long end; private final CompletableFuture cf; - ReadTask(String objectPath, long start, long end, CompletableFuture cf) { + ReadTask(ReadOptions options, String objectPath, long start, long end, CompletableFuture cf) { + this.options = options; this.objectPath = objectPath; this.start = start; this.end = end; this.cf = cf; } + public ReadOptions options() { + return options; + } + public String objectPath() { return objectPath; } diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/AwsObjectStorage.java b/s3stream/src/main/java/com/automq/stream/s3/operator/AwsObjectStorage.java index 14423dac9d..71aefb9384 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/AwsObjectStorage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/AwsObjectStorage.java @@ -12,7 +12,7 @@ package com.automq.stream.s3.operator; import com.automq.stream.s3.ByteBufAlloc; -import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter; +import com.automq.stream.s3.network.NetworkBandwidthLimiter; import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; @@ -25,8 +25,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; @@ -46,10 +44,10 @@ import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload; import software.amazon.awssdk.services.s3.model.CompletedPart; import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; import software.amazon.awssdk.services.s3.model.Delete; import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse; -import software.amazon.awssdk.services.s3.model.DeletedObject; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.ObjectIdentifier; import software.amazon.awssdk.services.s3.model.PutObjectRequest; @@ -58,7 +56,6 @@ import software.amazon.awssdk.services.s3.model.Tagging; import software.amazon.awssdk.services.s3.model.UploadPartCopyRequest; import software.amazon.awssdk.services.s3.model.UploadPartRequest; -import software.amazon.awssdk.services.s3.model.UploadPartResponse; import static com.automq.stream.s3.metadata.ObjectUtils.tagging; import static com.automq.stream.utils.FutureUtil.cause; @@ -71,13 +68,12 @@ public class AwsObjectStorage extends AbstractObjectStorage { private final Tagging tagging; private final S3AsyncClient readS3Client; private final S3AsyncClient writeS3Client; - private boolean deleteObjectsReturnSuccessKeys; public AwsObjectStorage(String endpoint, Map tagging, String region, String bucket, boolean forcePathStyle, List credentialsProviders, - AsyncNetworkBandwidthLimiter networkInboundBandwidthLimiter, - AsyncNetworkBandwidthLimiter networkOutboundBandwidthLimiter, + NetworkBandwidthLimiter networkInboundBandwidthLimiter, + NetworkBandwidthLimiter networkOutboundBandwidthLimiter, boolean readWriteIsolate, boolean checkMode) { super(networkInboundBandwidthLimiter, networkOutboundBandwidthLimiter, readWriteIsolate, checkMode); @@ -85,7 +81,7 @@ public AwsObjectStorage(String endpoint, Map tagging, String reg this.tagging = tagging(tagging); this.writeS3Client = newS3Client(endpoint, region, forcePathStyle, credentialsProviders, getMaxObjectStorageConcurrency()); this.readS3Client = readWriteIsolate ? newS3Client(endpoint, region, forcePathStyle, credentialsProviders, getMaxObjectStorageConcurrency()) : writeS3Client; - this.deleteObjectsReturnSuccessKeys = getDeleteObjectsMode(); + readinessCheck(); } // used for test only @@ -106,189 +102,111 @@ public static Builder builder() { return new Builder(); } - static void handleDeleteObjectsResponse(DeleteObjectsResponse response, - boolean deleteObjectsReturnSuccessKeys) throws Exception { + static void checkDeleteObjectsResponse(DeleteObjectsResponse response) throws Exception { int errDeleteCount = 0; ArrayList failedKeys = new ArrayList<>(); ArrayList errorsMessages = new ArrayList<>(); - if (deleteObjectsReturnSuccessKeys) { - // expect NoSuchKey is not response because s3 api won't return this in errors. - for (S3Error error : response.errors()) { - if (errDeleteCount < 30) { - LOGGER.error("Delete objects for key [{}] error code [{}] message [{}]", - error.key(), error.code(), error.message()); - } - failedKeys.add(error.key()); - errorsMessages.add(error.message()); - errDeleteCount++; - + for (S3Error error : response.errors()) { + if (S3_API_NO_SUCH_KEY.equals(error.code())) { + // ignore for delete objects. + continue; } - } else { - for (S3Error error : response.errors()) { - if (S3_API_NO_SUCH_KEY.equals(error.code())) { - // ignore for delete objects. - continue; - } - if (errDeleteCount < 30) { - LOGGER.error("Delete objects for key [{}] error code [{}] message [{}]", - error.key(), error.code(), error.message()); - } - failedKeys.add(error.key()); - errorsMessages.add(error.message()); - errDeleteCount++; + if (errDeleteCount < 5) { + LOGGER.error("Delete objects for key [{}] error code [{}] message [{}]", + error.key(), error.code(), error.message()); } + failedKeys.add(error.key()); + errorsMessages.add(error.message()); + errDeleteCount++; } if (errDeleteCount > 0) { throw new DeleteObjectsException("Failed to delete objects", failedKeys, errorsMessages); } } - static boolean checkIfDeleteObjectsWillReturnSuccessDeleteKeys(List path, DeleteObjectsResponse resp) { - // BOS S3 API works as quiet mode - // in this mode success delete objects won't be returned. - // which could cause object not deleted in metadata. - // - // BOS doc: https://cloud.baidu.com/doc/BOS/s/tkc5twspg - // S3 doc: https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html#API_DeleteObjects_RequestBody - - boolean hasDeleted = resp.hasDeleted() && !resp.deleted().isEmpty(); - boolean hasErrors = resp.hasErrors() && !resp.errors().isEmpty(); - boolean hasErrorsWithoutNoSuchKey = resp.errors().stream().filter(s3Error -> !S3_API_NO_SUCH_KEY.equals(s3Error.code())).count() != 0; - boolean allDeleteKeyMatch = resp.deleted().stream().map(DeletedObject::key).sorted().collect(Collectors.toList()).equals(path); - - if (hasDeleted && !hasErrors && allDeleteKeyMatch) { - LOGGER.info("call deleteObjects deleteObjectKeys returned."); - - return true; - - } else if (!hasDeleted && !hasErrorsWithoutNoSuchKey) { - LOGGER.info("call deleteObjects but deleteObjectKeys not returned. set deleteObjectsReturnSuccessKeys = false"); - - return false; - } - - IllegalStateException exception = new IllegalStateException(); - - LOGGER.error("error when check if delete objects will return success." + - " delete keys {} resp {}, requestId {},httpCode {} httpText {}", - path, resp, resp.responseMetadata().requestId(), - resp.sdkHttpResponse().statusCode(), resp.sdkHttpResponse().statusText(), exception); - - throw exception; - } - @Override - void doRangeRead(String path, long start, long end, - Consumer failHandler, Consumer successHandler) { - GetObjectRequest request = GetObjectRequest.builder() - .bucket(bucket) - .key(path) - .range(range(start, end)) - .build(); + CompletableFuture doRangeRead(ReadOptions options, String path, long start, long end) { + GetObjectRequest request = GetObjectRequest.builder().bucket(bucket).key(path).range(range(start, end)).build(); + CompletableFuture cf = new CompletableFuture<>(); readS3Client.getObject(request, AsyncResponseTransformer.toPublisher()) .thenAccept(responsePublisher -> { CompositeByteBuf buf = ByteBufAlloc.compositeByteBuffer(); responsePublisher.subscribe(bytes -> { // the aws client will copy DefaultHttpContent to heap ByteBuffer buf.addComponent(true, Unpooled.wrappedBuffer(bytes)); - }).thenAccept(v -> { - successHandler.accept(buf); - }).exceptionally(ex -> { - buf.release(); - failHandler.accept(ex); - return null; + }).whenComplete((rst, ex) -> { + if (ex != null) { + buf.release(); + cf.completeExceptionally(ex); + } else { + cf.complete(buf); + } }); }) .exceptionally(ex -> { - failHandler.accept(ex); + cf.completeExceptionally(ex); return null; }); + return cf; } @Override - void doWrite(String path, ByteBuf data, - Consumer failHandler, Runnable successHandler) { + CompletableFuture doWrite(WriteOptions options, String path, ByteBuf data) { PutObjectRequest.Builder builder = PutObjectRequest.builder().bucket(bucket).key(path); if (null != tagging) { builder.tagging(tagging); } PutObjectRequest request = builder.build(); AsyncRequestBody body = AsyncRequestBody.fromByteBuffersUnsafe(data.nioBuffers()); - writeS3Client.putObject(request, body).thenAccept(putObjectResponse -> { - successHandler.run(); - }).exceptionally(ex -> { - failHandler.accept(ex); - return null; - }); + return writeS3Client.putObject(request, body).thenApply(rst -> null); } @Override - void doCreateMultipartUpload(String path, - Consumer failHandler, Consumer successHandler) { + CompletableFuture doCreateMultipartUpload(WriteOptions options, String path) { CreateMultipartUploadRequest.Builder builder = CreateMultipartUploadRequest.builder().bucket(bucket).key(path); if (null != tagging) { builder.tagging(tagging); } CreateMultipartUploadRequest request = builder.build(); - writeS3Client.createMultipartUpload(request).thenAccept(createMultipartUploadResponse -> { - successHandler.accept(createMultipartUploadResponse.uploadId()); - }).exceptionally(ex -> { - failHandler.accept(ex); - return null; - }); + return writeS3Client.createMultipartUpload(request).thenApply(CreateMultipartUploadResponse::uploadId); } @Override - void doUploadPart(String path, String uploadId, int partNumber, ByteBuf part, - Consumer failHandler, Consumer successHandler) { + CompletableFuture doUploadPart(WriteOptions options, String path, String uploadId, + int partNumber, ByteBuf part) { AsyncRequestBody body = AsyncRequestBody.fromByteBuffersUnsafe(part.nioBuffers()); UploadPartRequest request = UploadPartRequest.builder().bucket(bucket).key(path).uploadId(uploadId) .partNumber(partNumber).build(); - CompletableFuture uploadPartCf = writeS3Client.uploadPart(request, body); - uploadPartCf.thenAccept(uploadPartResponse -> { - ObjectStorageCompletedPart objectStorageCompletedPart = new ObjectStorageCompletedPart(partNumber, uploadPartResponse.eTag()); - successHandler.accept(objectStorageCompletedPart); - }).exceptionally(ex -> { - failHandler.accept(ex); - return null; - }); + return writeS3Client.uploadPart(request, body) + .thenApply(resp -> new ObjectStorageCompletedPart(partNumber, resp.eTag())); } @Override - void doUploadPartCopy(String sourcePath, String path, long start, long end, String uploadId, int partNumber, - long apiCallAttemptTimeout, - Consumer failHandler, Consumer successHandler) { + CompletableFuture doUploadPartCopy(WriteOptions options, String sourcePath, String path, + long start, long end, String uploadId, int partNumber) { UploadPartCopyRequest request = UploadPartCopyRequest.builder().sourceBucket(bucket).sourceKey(sourcePath) .destinationBucket(bucket).destinationKey(path).copySourceRange(range(start, end)).uploadId(uploadId).partNumber(partNumber) - .overrideConfiguration(AwsRequestOverrideConfiguration.builder().apiCallAttemptTimeout(Duration.ofMillis(apiCallAttemptTimeout)).apiCallTimeout(Duration.ofMillis(apiCallAttemptTimeout)).build()) + .overrideConfiguration( + AwsRequestOverrideConfiguration.builder() + .apiCallAttemptTimeout(Duration.ofMillis(options.apiCallAttemptTimeout())) + .apiCallTimeout(Duration.ofMillis(options.apiCallAttemptTimeout())).build() + ) .build(); - writeS3Client.uploadPartCopy(request).thenAccept(uploadPartCopyResponse -> { - ObjectStorageCompletedPart completedPart = new ObjectStorageCompletedPart(partNumber, uploadPartCopyResponse.copyPartResult().eTag()); - successHandler.accept(completedPart); - }).exceptionally(ex -> { - failHandler.accept(ex); - return null; - }); + return writeS3Client.uploadPartCopy(request).thenApply(resp -> new ObjectStorageCompletedPart(partNumber, resp.copyPartResult().eTag())); } @Override - public void doCompleteMultipartUpload(String path, String uploadId, List parts, - Consumer failHandler, Runnable successHandler) { + public CompletableFuture doCompleteMultipartUpload(WriteOptions options, String path, String uploadId, + List parts) { List completedParts = parts.stream() .map(part -> CompletedPart.builder().partNumber(part.getPartNumber()).eTag(part.getPartId()).build()) .collect(Collectors.toList()); CompletedMultipartUpload multipartUpload = CompletedMultipartUpload.builder().parts(completedParts).build(); CompleteMultipartUploadRequest request = CompleteMultipartUploadRequest.builder().bucket(bucket).key(path).uploadId(uploadId).multipartUpload(multipartUpload).build(); - writeS3Client.completeMultipartUpload(request).thenAccept(completeMultipartUploadResponse -> { - successHandler.run(); - }).exceptionally(ex -> { - failHandler.accept(ex); - return null; - }); + return writeS3Client.completeMultipartUpload(request).thenApply(resp -> null); } - public void doDeleteObjects(List objectKeys, - Consumer failHandler, Runnable successHandler) { + public CompletableFuture doDeleteObjects(List objectKeys) { ObjectIdentifier[] toDeleteKeys = objectKeys.stream().map(key -> ObjectIdentifier.builder() .key(key) @@ -300,19 +218,21 @@ public void doDeleteObjects(List objectKeys, .delete(Delete.builder().objects(toDeleteKeys).build()) .build(); + CompletableFuture cf = new CompletableFuture<>(); this.writeS3Client.deleteObjects(request) .thenAccept(resp -> { try { - handleDeleteObjectsResponse(resp, deleteObjectsReturnSuccessKeys); - successHandler.run(); - } catch (Exception ex) { - failHandler.accept(ex); + checkDeleteObjectsResponse(resp); + cf.complete(null); + } catch (Throwable ex) { + cf.completeExceptionally(ex); } }) .exceptionally(ex -> { - failHandler.accept(ex); + cf.completeExceptionally(ex); return null; }); + return cf; } @Override @@ -351,39 +271,17 @@ private String range(long start, long end) { return "bytes=" + start + "-" + (end - 1); } - private boolean getDeleteObjectsMode() { + private void readinessCheck() { try { - return asyncCheckDeleteObjectsReturnSuccessDeleteKeys().get(30, TimeUnit.SECONDS); + String path = "/automq/readiness_check/%d" + System.nanoTime(); + byte[] content = new Date().toString().getBytes(StandardCharsets.UTF_8); + doWrite(new WriteOptions(), path, Unpooled.wrappedBuffer(content)).get(); + doDeleteObjects(List.of(path)).get(); } catch (Throwable e) { - LOGGER.error("Failed to check if the s3 `deleteObjects` api will return deleteKeys", e); throw new RuntimeException(e); } } - private CompletableFuture asyncCheckDeleteObjectsReturnSuccessDeleteKeys() { - byte[] content = new Date().toString().getBytes(StandardCharsets.UTF_8); - String path1 = String.format("check_available/deleteObjectsMode/%d", System.nanoTime()); - String path2 = String.format("check_available/deleteObjectsMode/%d", System.nanoTime() + 1); - List path = List.of(path1, path2); - - ObjectIdentifier[] toDeleteKeys = path.stream().map(key -> - ObjectIdentifier.builder() - .key(key) - .build() - ).toArray(ObjectIdentifier[]::new); - DeleteObjectsRequest request = DeleteObjectsRequest.builder() - .bucket(bucket) - .delete(Delete.builder().objects(toDeleteKeys).build()) - .build(); - - return CompletableFuture.allOf( - write(path1, Unpooled.wrappedBuffer(content), WriteOptions.DEFAULT.throttleStrategy()), - write(path2, Unpooled.wrappedBuffer(content), WriteOptions.DEFAULT.throttleStrategy()) - ) - .thenCompose(__ -> this.writeS3Client.deleteObjects(request)) - .thenApply(resp -> checkIfDeleteObjectsWillReturnSuccessDeleteKeys(path, resp)); - } - private S3AsyncClient newS3Client(String endpoint, String region, boolean forcePathStyle, List credentialsProviders, int maxConcurrency) { S3AsyncClientBuilder builder = S3AsyncClient.builder().region(Region.of(region)); @@ -422,8 +320,8 @@ public static class Builder { private boolean forcePathStyle; private List credentialsProviders; private Map tagging; - private AsyncNetworkBandwidthLimiter inboundLimiter; - private AsyncNetworkBandwidthLimiter outboundLimiter; + private NetworkBandwidthLimiter inboundLimiter = NetworkBandwidthLimiter.NOOP; + private NetworkBandwidthLimiter outboundLimiter = NetworkBandwidthLimiter.NOOP; private boolean readWriteIsolate; private int maxReadConcurrency = 50; private int maxWriteConcurrency = 50; @@ -459,12 +357,12 @@ public Builder tagging(Map tagging) { return this; } - public Builder inboundLimiter(AsyncNetworkBandwidthLimiter inboundLimiter) { + public Builder inboundLimiter(NetworkBandwidthLimiter inboundLimiter) { this.inboundLimiter = inboundLimiter; return this; } - public Builder outboundLimiter(AsyncNetworkBandwidthLimiter outboundLimiter) { + public Builder outboundLimiter(NetworkBandwidthLimiter outboundLimiter) { this.outboundLimiter = outboundLimiter; return this; } diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/MemoryObjectStorage.java b/s3stream/src/main/java/com/automq/stream/s3/operator/MemoryObjectStorage.java index fa0d8df0fd..6bb225655e 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/MemoryObjectStorage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/MemoryObjectStorage.java @@ -13,6 +13,7 @@ import com.automq.stream.s3.ByteBufAlloc; import com.automq.stream.s3.metadata.S3ObjectMetadata; +import com.automq.stream.utils.FutureUtil; import com.automq.stream.utils.Threads; import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; @@ -22,7 +23,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; import java.util.stream.Collectors; public class MemoryObjectStorage extends AbstractObjectStorage { @@ -38,38 +38,33 @@ public MemoryObjectStorage() { } @Override - void doRangeRead(String path, long start, long end, Consumer failHandler, - Consumer successHandler) { + CompletableFuture doRangeRead(ReadOptions options, String path, long start, long end) { ByteBuf value = storage.get(path); if (value == null) { - failHandler.accept(new IllegalArgumentException("object not exist")); - return; + return FutureUtil.failedFuture(new IllegalArgumentException("object not exist")); } int length = end != -1L ? (int) (end - start) : (int) (value.readableBytes() - start); ByteBuf rst = value.retainedSlice(value.readerIndex() + (int) start, length); CompositeByteBuf buf = ByteBufAlloc.compositeByteBuffer(); buf.addComponent(true, rst); if (delay == 0) { - successHandler.accept(buf); + return CompletableFuture.completedFuture(buf); } else { - Threads.COMMON_SCHEDULER.schedule(() -> successHandler.accept(buf), delay, TimeUnit.MILLISECONDS); + CompletableFuture cf = new CompletableFuture<>(); + Threads.COMMON_SCHEDULER.schedule(() -> cf.complete(buf), delay, TimeUnit.MILLISECONDS); + return cf; } } @Override - void doWrite(String path, ByteBuf data, Consumer failHandler, Runnable successHandler) { - try { - if (data == null) { - failHandler.accept(new IllegalArgumentException("data to write cannot be null")); - return; - } - ByteBuf buf = Unpooled.buffer(data.readableBytes()); - buf.writeBytes(data.duplicate()); - storage.put(path, buf); - successHandler.run(); - } catch (Exception ex) { - failHandler.accept(ex); + CompletableFuture doWrite(WriteOptions options, String path, ByteBuf data) { + if (data == null) { + return FutureUtil.failedFuture(new IllegalArgumentException("data to write cannot be null")); } + ByteBuf buf = Unpooled.buffer(data.readableBytes()); + buf.writeBytes(data.duplicate()); + storage.put(path, buf); + return CompletableFuture.completedFuture(null); } @Override @@ -117,45 +112,38 @@ public CompletableFuture release() { } @Override - void doCreateMultipartUpload(String path, - Consumer failHandler, Consumer successHandler) { - failHandler.accept(new UnsupportedOperationException()); + CompletableFuture doCreateMultipartUpload(WriteOptions options, String path) { + return FutureUtil.failedFuture(new UnsupportedOperationException()); } @Override - void doUploadPart(String path, String uploadId, int partNumber, ByteBuf part, - Consumer failHandler, Consumer successHandler) { - failHandler.accept(new UnsupportedOperationException()); + CompletableFuture doUploadPart(WriteOptions options, String path, String uploadId, + int partNumber, ByteBuf part) { + return FutureUtil.failedFuture(new UnsupportedOperationException()); } @Override - void doUploadPartCopy(String sourcePath, String path, long start, long end, String uploadId, int partNumber, - long apiCallAttemptTimeout, Consumer failHandler, - Consumer successHandler) { - failHandler.accept(new UnsupportedOperationException()); + CompletableFuture doUploadPartCopy(WriteOptions options, String sourcePath, String path, + long start, long end, String uploadId, int partNumber) { + return FutureUtil.failedFuture(new UnsupportedOperationException()); } @Override - void doCompleteMultipartUpload(String path, String uploadId, List parts, - Consumer failHandler, Runnable successHandler) { - failHandler.accept(new UnsupportedOperationException()); + CompletableFuture doCompleteMultipartUpload(WriteOptions options, String path, String uploadId, + List parts) { + return FutureUtil.failedFuture(new UnsupportedOperationException()); } @Override - void doDeleteObjects(List objectKeys, Consumer failHandler, Runnable successHandler) { + CompletableFuture doDeleteObjects(List objectKeys) { objectKeys.forEach(storage::remove); - successHandler.run(); + return CompletableFuture.completedFuture(null); } @Override boolean isUnrecoverable(Throwable ex) { - if (ex instanceof UnsupportedOperationException) { - return true; - } - if (ex instanceof IllegalArgumentException) { - return true; - } - return false; + Throwable cause = FutureUtil.cause(ex); + return cause instanceof UnsupportedOperationException || cause instanceof IllegalArgumentException; } @Override diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java b/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java index 80641167ee..cee15cda29 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java @@ -58,7 +58,7 @@ public MultiPartWriter(ObjectStorage.WriteOptions writeOptions, AbstractObjectSt private void init() { FutureUtil.propagate( - operator.createMultipartUpload(path).thenApply(uploadId -> { + operator.createMultipartUpload(writeOptions, path).thenApply(uploadId -> { this.uploadId = uploadId; return uploadId; }), @@ -150,7 +150,7 @@ public CompletableFuture close() { S3ObjectStats.getInstance().objectStageReadyCloseStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); closeCf = new CompletableFuture<>(); CompletableFuture uploadDoneCf = uploadIdCf.thenCompose(uploadId -> CompletableFuture.allOf(parts.toArray(new CompletableFuture[0]))); - FutureUtil.propagate(uploadDoneCf.thenCompose(nil -> operator.completeMultipartUpload(path, uploadId, genCompleteParts())), closeCf); + FutureUtil.propagate(uploadDoneCf.thenCompose(nil -> operator.completeMultipartUpload(writeOptions, path, uploadId, genCompleteParts())), closeCf); closeCf.whenComplete((nil, ex) -> { S3ObjectStats.getInstance().objectStageTotalStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); S3ObjectStats.getInstance().objectNumInTotalStats.add(MetricsLevel.DEBUG, 1); @@ -234,7 +234,7 @@ public void upload() { private void upload0() { TimerUtil timerUtil = new TimerUtil(); - FutureUtil.propagate(uploadIdCf.thenCompose(uploadId -> operator.uploadPart(path, uploadId, partNumber, partBuf, throttleStrategy)), partCf); + FutureUtil.propagate(uploadIdCf.thenCompose(uploadId -> operator.uploadPart(writeOptions, path, uploadId, partNumber, partBuf)), partCf); partCf.whenComplete((nil, ex) -> { S3ObjectStats.getInstance().objectStageUploadPartStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); }); @@ -259,7 +259,7 @@ class CopyObjectPart { public CopyObjectPart(String sourcePath, long start, long end) { int partNumber = nextPartNumber.getAndIncrement(); parts.add(partCf); - FutureUtil.propagate(uploadIdCf.thenCompose(uploadId -> operator.uploadPartCopy(sourcePath, path, start, end, uploadId, partNumber)), partCf); + FutureUtil.propagate(uploadIdCf.thenCompose(uploadId -> operator.uploadPartCopy(writeOptions, sourcePath, path, start, end, uploadId, partNumber)), partCf); } public CompletableFuture getFuture() { diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/ObjectStorage.java b/s3stream/src/main/java/com/automq/stream/s3/operator/ObjectStorage.java index 4221144cad..6ed5104c9f 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/ObjectStorage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/ObjectStorage.java @@ -80,6 +80,7 @@ class WriteOptions { private ThrottleStrategy throttleStrategy = ThrottleStrategy.BYPASS; private int allocType = ByteBufAlloc.DEFAULT; + private long apiCallAttemptTimeout = -1L; public WriteOptions throttleStrategy(ThrottleStrategy throttleStrategy) { this.throttleStrategy = throttleStrategy; @@ -91,6 +92,11 @@ public WriteOptions allocType(int allocType) { return this; } + public WriteOptions apiCallAttemptTimeout(long apiCallAttemptTimeout) { + this.apiCallAttemptTimeout = apiCallAttemptTimeout; + return this; + } + public ThrottleStrategy throttleStrategy() { return throttleStrategy; } @@ -99,6 +105,10 @@ public int allocType() { return allocType; } + public long apiCallAttemptTimeout() { + return apiCallAttemptTimeout; + } + } class ReadOptions { diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/ProxyWriter.java b/s3stream/src/main/java/com/automq/stream/s3/operator/ProxyWriter.java index b409cfcc06..05fce0d637 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/ProxyWriter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/ProxyWriter.java @@ -31,20 +31,20 @@ class ProxyWriter implements Writer { final ObjectWriter objectWriter = new ObjectWriter(); private final WriteOptions writeOptions; - private final AbstractObjectStorage operator; + private final AbstractObjectStorage objectStorage; private final String path; private final long minPartSize; Writer multiPartWriter = null; - public ProxyWriter(WriteOptions writeOptions, AbstractObjectStorage operator, String path, long minPartSize) { + public ProxyWriter(WriteOptions writeOptions, AbstractObjectStorage objectStorage, String path, long minPartSize) { this.writeOptions = writeOptions; - this.operator = operator; + this.objectStorage = objectStorage; this.path = path; this.minPartSize = minPartSize; } - public ProxyWriter(WriteOptions writeOptions, AbstractObjectStorage operator, String path) { - this(writeOptions, operator, path, Writer.MIN_PART_SIZE); + public ProxyWriter(WriteOptions writeOptions, AbstractObjectStorage objectStorage, String path) { + this(writeOptions, objectStorage, path, Writer.MIN_PART_SIZE); } @Override @@ -105,7 +105,7 @@ public CompletableFuture release() { } private void newMultiPartWriter() { - this.multiPartWriter = new MultiPartWriter(writeOptions, operator, path, minPartSize); + this.multiPartWriter = new MultiPartWriter(writeOptions, objectStorage, path, minPartSize); if (objectWriter.data.readableBytes() > 0) { FutureUtil.propagate(multiPartWriter.write(objectWriter.data), objectWriter.cf); } else { @@ -153,7 +153,7 @@ public boolean hasBatchingPart() { public CompletableFuture close() { S3ObjectStats.getInstance().objectStageReadyCloseStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); int size = data.readableBytes(); - FutureUtil.propagate(operator.write(path, data, writeOptions.throttleStrategy()), cf); + FutureUtil.propagate(objectStorage.write(writeOptions, path, data), cf); cf.whenComplete((nil, e) -> { S3ObjectStats.getInstance().objectStageTotalStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); S3ObjectStats.getInstance().objectNumInTotalStats.add(MetricsLevel.DEBUG, 1); diff --git a/s3stream/src/test/java/com/automq/stream/s3/ObjectReaderTest.java b/s3stream/src/test/java/com/automq/stream/s3/ObjectReaderTest.java index 2971d110de..1bec602ffd 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/ObjectReaderTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/ObjectReaderTest.java @@ -122,7 +122,7 @@ public void testReadBlockGroup() throws ExecutionException, InterruptedException int indexSize = buf.readableBytes() - indexPosition; buf.writeBytes(new ObjectWriter.Footer(indexPosition, indexSize).buffer()); int objectSize = buf.readableBytes(); - objectStorage.write(WriteOptions.DEFAULT, ObjectUtils.genKey(0, 1L), buf); + objectStorage.write(WriteOptions.DEFAULT, ObjectUtils.genKey(0, 1L), buf).get(); try (ObjectReader reader = ObjectReader.reader(new S3ObjectMetadata(1L, objectSize, S3ObjectType.STREAM), objectStorage)) { ObjectReader.FindIndexResult rst = reader.find(233L, 10L, 14L, 1024).get(); assertEquals(1, rst.streamDataBlocks().size()); diff --git a/s3stream/src/test/java/com/automq/stream/s3/operator/AbstractObjectStorageTest.java b/s3stream/src/test/java/com/automq/stream/s3/operator/AbstractObjectStorageTest.java index 599d30c742..d8b0f84d73 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/operator/AbstractObjectStorageTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/operator/AbstractObjectStorageTest.java @@ -11,28 +11,26 @@ package com.automq.stream.s3.operator; - import com.automq.stream.s3.TestUtils; import com.automq.stream.s3.metadata.S3ObjectMetadata; import com.automq.stream.s3.metadata.S3ObjectType; import com.automq.stream.s3.operator.ObjectStorage.ReadOptions; import io.netty.buffer.ByteBuf; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; - import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; - class AbstractObjectStorageTest { AbstractObjectStorage objectStorage; @@ -51,13 +49,13 @@ public void tearDown() { public void testMergeTask() { S3ObjectMetadata s3ObjectMetadata = new S3ObjectMetadata(1, 3000, S3ObjectType.STREAM); AbstractObjectStorage.MergedReadTask mergedReadTask = new AbstractObjectStorage.MergedReadTask( - new AbstractObjectStorage.ReadTask(s3ObjectMetadata.key(), 0, 1024, new CompletableFuture<>()), 0); - boolean ret = mergedReadTask.tryMerge(new AbstractObjectStorage.ReadTask(s3ObjectMetadata.key(), 1024, 2048, new CompletableFuture<>())); + new AbstractObjectStorage.ReadTask(ReadOptions.DEFAULT, s3ObjectMetadata.key(), 0, 1024, new CompletableFuture<>()), 0); + boolean ret = mergedReadTask.tryMerge(new AbstractObjectStorage.ReadTask(ReadOptions.DEFAULT, s3ObjectMetadata.key(), 1024, 2048, new CompletableFuture<>())); assertTrue(ret); assertEquals(0, mergedReadTask.dataSparsityRate); assertEquals(0, mergedReadTask.start); assertEquals(2048, mergedReadTask.end); - ret = mergedReadTask.tryMerge(new AbstractObjectStorage.ReadTask(s3ObjectMetadata.key(), 2049, 3000, new CompletableFuture<>())); + ret = mergedReadTask.tryMerge(new AbstractObjectStorage.ReadTask(ReadOptions.DEFAULT, s3ObjectMetadata.key(), 2049, 3000, new CompletableFuture<>())); assertFalse(ret); assertEquals(0, mergedReadTask.dataSparsityRate); assertEquals(0, mergedReadTask.start); @@ -68,13 +66,13 @@ public void testMergeTask() { public void testMergeTask2() { S3ObjectMetadata s3ObjectMetadata = new S3ObjectMetadata(1, 4096, S3ObjectType.STREAM); AbstractObjectStorage.MergedReadTask mergedReadTask = new AbstractObjectStorage.MergedReadTask( - new AbstractObjectStorage.ReadTask(s3ObjectMetadata.key(), 0, 1024, new CompletableFuture<>()), 0.5f); - boolean ret = mergedReadTask.tryMerge(new AbstractObjectStorage.ReadTask(s3ObjectMetadata.key(), 2048, 4096, new CompletableFuture<>())); + new AbstractObjectStorage.ReadTask(ReadOptions.DEFAULT, s3ObjectMetadata.key(), 0, 1024, new CompletableFuture<>()), 0.5f); + boolean ret = mergedReadTask.tryMerge(new AbstractObjectStorage.ReadTask(ReadOptions.DEFAULT, s3ObjectMetadata.key(), 2048, 4096, new CompletableFuture<>())); assertTrue(ret); assertEquals(0.25, mergedReadTask.dataSparsityRate, 0.01); assertEquals(0, mergedReadTask.start); assertEquals(4096, mergedReadTask.end); - ret = mergedReadTask.tryMerge(new AbstractObjectStorage.ReadTask(s3ObjectMetadata.key(), 1024, 1536, new CompletableFuture<>())); + ret = mergedReadTask.tryMerge(new AbstractObjectStorage.ReadTask(ReadOptions.DEFAULT, s3ObjectMetadata.key(), 1024, 1536, new CompletableFuture<>())); assertTrue(ret); assertEquals(0.125, mergedReadTask.dataSparsityRate, 0.01); assertEquals(0, mergedReadTask.start); @@ -83,15 +81,13 @@ public void testMergeTask2() { @Test void testMergeRead() throws ExecutionException, InterruptedException { - objectStorage = new MemoryObjectStorage(true) { - @Override - CompletableFuture mergedRangeRead(String path, long start, long end) { - return CompletableFuture.completedFuture(TestUtils.random((int) (end - start + 1))); - } - }; - objectStorage = spy(objectStorage); S3ObjectMetadata s3ObjectMetadata1 = new S3ObjectMetadata(1, 33554944, S3ObjectType.STREAM); S3ObjectMetadata s3ObjectMetadata2 = new S3ObjectMetadata(2, 3072, S3ObjectType.STREAM); + objectStorage = new MemoryObjectStorage(true); + objectStorage.write(ObjectStorage.WriteOptions.DEFAULT, s3ObjectMetadata1.key(), TestUtils.random((int) s3ObjectMetadata1.objectSize())).get(); + objectStorage.write(ObjectStorage.WriteOptions.DEFAULT, s3ObjectMetadata2.key(), TestUtils.random((int) s3ObjectMetadata2.objectSize())).get(); + + objectStorage = spy(objectStorage); // obj0_0_1024 obj_1_1024_2048 obj_0_16776192_16777216 obj_0_2048_4096 obj_0_16777216_16778240 CompletableFuture cf1 = objectStorage.rangeRead(ReadOptions.DEFAULT, s3ObjectMetadata1.key(), 0, 1024); CompletableFuture cf2 = objectStorage.rangeRead(ReadOptions.DEFAULT, s3ObjectMetadata2.key(), 1024, 3072); @@ -101,10 +97,10 @@ CompletableFuture mergedRangeRead(String path, long start, long end) { objectStorage.tryMergeRead(); - verify(objectStorage, timeout(1000L).times(1)).mergedRangeRead(eq(s3ObjectMetadata1.key()), eq(0L), eq(4096L)); - verify(objectStorage, timeout(1000L).times(1)).mergedRangeRead(eq(s3ObjectMetadata2.key()), eq(1024L), eq(3072L)); - verify(objectStorage, timeout(1000L).times(1)).mergedRangeRead(eq(s3ObjectMetadata1.key()), eq(31457280L), eq(31461376L)); - verify(objectStorage, timeout(1000L).times(1)).mergedRangeRead(eq(s3ObjectMetadata1.key()), eq(33554432L), eq(33554944L)); + verify(objectStorage, timeout(1000L).times(1)).mergedRangeRead(any(), eq(s3ObjectMetadata1.key()), eq(0L), eq(4096L)); + verify(objectStorage, timeout(1000L).times(1)).mergedRangeRead(any(), eq(s3ObjectMetadata2.key()), eq(1024L), eq(3072L)); + verify(objectStorage, timeout(1000L).times(1)).mergedRangeRead(any(), eq(s3ObjectMetadata1.key()), eq(31457280L), eq(31461376L)); + verify(objectStorage, timeout(1000L).times(1)).mergedRangeRead(any(), eq(s3ObjectMetadata1.key()), eq(33554432L), eq(33554944L)); ByteBuf buf = cf1.get(); assertEquals(1024, buf.readableBytes()); @@ -123,7 +119,6 @@ CompletableFuture mergedRangeRead(String path, long start, long end) { buf.release(); } - @Test void testReadToEndOfObject() throws ExecutionException, InterruptedException { objectStorage = new MemoryObjectStorage(true); @@ -136,9 +131,9 @@ void testReadToEndOfObject() throws ExecutionException, InterruptedException { CompletableFuture cf2 = objectStorage.rangeRead(ReadOptions.DEFAULT, s3ObjectMetadata.key(), 2048L, -1L); objectStorage.tryMergeRead(); - verify(objectStorage, timeout(1000L).times(1)).mergedRangeRead(eq(s3ObjectMetadata.key()), eq(0L), eq(1024L)); + verify(objectStorage, timeout(1000L).times(1)).mergedRangeRead(any(), eq(s3ObjectMetadata.key()), eq(0L), eq(1024L)); objectStorage.tryMergeRead(); - verify(objectStorage, timeout(1000L).times(1)).mergedRangeRead(eq(s3ObjectMetadata.key()), eq(2048L), eq(-1L)); + verify(objectStorage, timeout(1000L).times(1)).mergedRangeRead(any(), eq(s3ObjectMetadata.key()), eq(2048L), eq(-1L)); ByteBuf buf = cf1.get(); assertEquals(1024, buf.readableBytes()); diff --git a/s3stream/src/test/java/com/automq/stream/s3/operator/ProxyWriterTest.java b/s3stream/src/test/java/com/automq/stream/s3/operator/ProxyWriterTest.java index ba2318fc83..3c32ca3b9e 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/operator/ProxyWriterTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/operator/ProxyWriterTest.java @@ -14,7 +14,6 @@ import com.automq.stream.s3.TestUtils; import com.automq.stream.s3.metadata.S3ObjectMetadata; import com.automq.stream.s3.metadata.S3ObjectType; -import com.automq.stream.s3.network.ThrottleStrategy; import io.netty.buffer.ByteBuf; import java.util.concurrent.CompletableFuture; import org.junit.jupiter.api.Assertions; @@ -50,21 +49,20 @@ public void setup() { public void testWrite_onePart() { writer.write(TestUtils.random(15 * 1024 * 1024)); writer.write(TestUtils.random(1024 * 1024)); - when(operator.write(eq("testpath"), any(), any())).thenReturn(CompletableFuture.completedFuture(null)); + when(operator.write(any(), eq("testpath"), any())).thenReturn(CompletableFuture.completedFuture(null)); assertTrue(writer.hasBatchingPart()); assertTrue(writer.close().isDone()); ArgumentCaptor captor = ArgumentCaptor.forClass(ByteBuf.class); - ArgumentCaptor captor2 = ArgumentCaptor.forClass(ThrottleStrategy.class); - verify(operator, times(1)).write(eq("testpath"), captor.capture(), captor2.capture()); + verify(operator, times(1)).write(any(), eq("testpath"), captor.capture()); Assertions.assertEquals(16 * 1024 * 1024, captor.getValue().readableBytes()); } @Test public void testWrite_dataLargerThanMaxUploadSize() { - when(operator.createMultipartUpload(eq("testpath"))).thenReturn(CompletableFuture.completedFuture("test_upload_id")); - when(operator.uploadPart(eq("testpath"), eq("test_upload_id"), eq(1), any(), any())).thenReturn(CompletableFuture.completedFuture(new AbstractObjectStorage.ObjectStorageCompletedPart(1, "etag1"))); - when(operator.uploadPart(eq("testpath"), eq("test_upload_id"), eq(2), any(), any())).thenReturn(CompletableFuture.completedFuture(new AbstractObjectStorage.ObjectStorageCompletedPart(2, "etag2"))); - when(operator.completeMultipartUpload(eq("testpath"), eq("test_upload_id"), any())).thenReturn(CompletableFuture.completedFuture(null)); + when(operator.createMultipartUpload(any(), eq("testpath"))).thenReturn(CompletableFuture.completedFuture("test_upload_id")); + when(operator.uploadPart(any(), eq("testpath"), eq("test_upload_id"), eq(1), any())).thenReturn(CompletableFuture.completedFuture(new AbstractObjectStorage.ObjectStorageCompletedPart(1, "etag1"))); + when(operator.uploadPart(any(), eq("testpath"), eq("test_upload_id"), eq(2), any())).thenReturn(CompletableFuture.completedFuture(new AbstractObjectStorage.ObjectStorageCompletedPart(2, "etag2"))); + when(operator.completeMultipartUpload(any(), eq("testpath"), eq("test_upload_id"), any())).thenReturn(CompletableFuture.completedFuture(null)); writer.write(TestUtils.random(17 * 1024 * 1024)); assertTrue(writer.hasBatchingPart()); assertNull(writer.multiPartWriter); @@ -75,21 +73,21 @@ public void testWrite_dataLargerThanMaxUploadSize() { assertNotNull(writer.multiPartWriter); assertFalse(writer.hasBatchingPart()); writer.close(); - verify(operator, times(2)).uploadPart(any(), any(), anyInt(), any(), any()); + verify(operator, times(2)).uploadPart(any(), any(), any(), anyInt(), any()); } @Test public void testWrite_copyWrite() { - when(operator.createMultipartUpload(eq("testpath"))).thenReturn(CompletableFuture.completedFuture("test_upload_id")); - when(operator.uploadPartCopy(eq("test_src_path"), eq("testpath"), eq(0L), eq(15L * 1024 * 1024), eq("test_upload_id"), eq(1))) + when(operator.createMultipartUpload(any(), eq("testpath"))).thenReturn(CompletableFuture.completedFuture("test_upload_id")); + when(operator.uploadPartCopy(any(), eq("test_src_path"), eq("testpath"), eq(0L), eq(15L * 1024 * 1024), eq("test_upload_id"), eq(1))) .thenReturn(CompletableFuture.completedFuture(new AbstractObjectStorage.ObjectStorageCompletedPart(1, "etag1"))); - when(operator.completeMultipartUpload(eq("testpath"), eq("test_upload_id"), any())).thenReturn(CompletableFuture.completedFuture(null)); + when(operator.completeMultipartUpload(any(), eq("testpath"), eq("test_upload_id"), any())).thenReturn(CompletableFuture.completedFuture(null)); S3ObjectMetadata s3ObjectMetadata = new S3ObjectMetadata(1, 15 * 1024 * 1024, S3ObjectType.STREAM); writer.copyWrite(s3ObjectMetadata, 0, 15 * 1024 * 1024); Assertions.assertTrue(writer.close().isDone()); - verify(operator, times(1)).uploadPartCopy(any(), any(), anyLong(), anyLong(), any(), anyInt()); + verify(operator, times(1)).uploadPartCopy(any(), any(), any(), anyLong(), anyLong(), any(), anyInt()); } }