Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class AsyncNetworkBandwidthLimiter {
public class AsyncNetworkBandwidthLimiter implements NetworkBandwidthLimiter {
private static final Logger LOGGER = new LogContext().logger(AsyncNetworkBandwidthLimiter.class);
private static final float DEFAULT_EXTRA_TOKEN_RATIO = 0.1f;
private static final long MAX_TOKEN_PART_SIZE = 1024 * 1024;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2024, AutoMQ CO.,LTD.
*
* Use of this software is governed by the Business Source License
* included in the file BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package com.automq.stream.s3.network;

import java.util.concurrent.CompletableFuture;

public interface NetworkBandwidthLimiter {
NetworkBandwidthLimiter NOOP = new Noop();

CompletableFuture<Void> consume(ThrottleStrategy throttleStrategy, long size);

long getMaxTokens();

long getAvailableTokens();


class Noop implements NetworkBandwidthLimiter {
@Override
public CompletableFuture<Void> consume(ThrottleStrategy throttleStrategy, long size) {
return CompletableFuture.completedFuture(null);
}

@Override
public long getMaxTokens() {
return Long.MAX_VALUE;
}

@Override
public long getAvailableTokens() {
return Long.MAX_VALUE;
}
}
}

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import com.automq.stream.s3.ByteBufAlloc;
import com.automq.stream.s3.metadata.S3ObjectMetadata;
import com.automq.stream.utils.FutureUtil;
import com.automq.stream.utils.Threads;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
Expand All @@ -22,7 +23,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;

public class MemoryObjectStorage extends AbstractObjectStorage {
Expand All @@ -38,38 +38,33 @@ public MemoryObjectStorage() {
}

@Override
void doRangeRead(String path, long start, long end, Consumer<Throwable> failHandler,
Consumer<CompositeByteBuf> successHandler) {
CompletableFuture<ByteBuf> doRangeRead(ReadOptions options, String path, long start, long end) {
ByteBuf value = storage.get(path);
if (value == null) {
failHandler.accept(new IllegalArgumentException("object not exist"));
return;
return FutureUtil.failedFuture(new IllegalArgumentException("object not exist"));
}
int length = end != -1L ? (int) (end - start) : (int) (value.readableBytes() - start);
ByteBuf rst = value.retainedSlice(value.readerIndex() + (int) start, length);
CompositeByteBuf buf = ByteBufAlloc.compositeByteBuffer();
buf.addComponent(true, rst);
if (delay == 0) {
successHandler.accept(buf);
return CompletableFuture.completedFuture(buf);
} else {
Threads.COMMON_SCHEDULER.schedule(() -> successHandler.accept(buf), delay, TimeUnit.MILLISECONDS);
CompletableFuture<ByteBuf> cf = new CompletableFuture<>();
Threads.COMMON_SCHEDULER.schedule(() -> cf.complete(buf), delay, TimeUnit.MILLISECONDS);
return cf;
}
}

@Override
void doWrite(String path, ByteBuf data, Consumer<Throwable> failHandler, Runnable successHandler) {
try {
if (data == null) {
failHandler.accept(new IllegalArgumentException("data to write cannot be null"));
return;
}
ByteBuf buf = Unpooled.buffer(data.readableBytes());
buf.writeBytes(data.duplicate());
storage.put(path, buf);
successHandler.run();
} catch (Exception ex) {
failHandler.accept(ex);
CompletableFuture<Void> doWrite(WriteOptions options, String path, ByteBuf data) {
if (data == null) {
return FutureUtil.failedFuture(new IllegalArgumentException("data to write cannot be null"));
}
ByteBuf buf = Unpooled.buffer(data.readableBytes());
buf.writeBytes(data.duplicate());
storage.put(path, buf);
return CompletableFuture.completedFuture(null);
}

@Override
Expand Down Expand Up @@ -117,45 +112,38 @@ public CompletableFuture<Void> release() {
}

@Override
void doCreateMultipartUpload(String path,
Consumer<Throwable> failHandler, Consumer<String> successHandler) {
failHandler.accept(new UnsupportedOperationException());
CompletableFuture<String> doCreateMultipartUpload(WriteOptions options, String path) {
return FutureUtil.failedFuture(new UnsupportedOperationException());
}

@Override
void doUploadPart(String path, String uploadId, int partNumber, ByteBuf part,
Consumer<Throwable> failHandler, Consumer<ObjectStorageCompletedPart> successHandler) {
failHandler.accept(new UnsupportedOperationException());
CompletableFuture<ObjectStorageCompletedPart> doUploadPart(WriteOptions options, String path, String uploadId,
int partNumber, ByteBuf part) {
return FutureUtil.failedFuture(new UnsupportedOperationException());
}

@Override
void doUploadPartCopy(String sourcePath, String path, long start, long end, String uploadId, int partNumber,
long apiCallAttemptTimeout, Consumer<Throwable> failHandler,
Consumer<ObjectStorageCompletedPart> successHandler) {
failHandler.accept(new UnsupportedOperationException());
CompletableFuture<ObjectStorageCompletedPart> doUploadPartCopy(WriteOptions options, String sourcePath, String path,
long start, long end, String uploadId, int partNumber) {
return FutureUtil.failedFuture(new UnsupportedOperationException());
}

@Override
void doCompleteMultipartUpload(String path, String uploadId, List<ObjectStorageCompletedPart> parts,
Consumer<Throwable> failHandler, Runnable successHandler) {
failHandler.accept(new UnsupportedOperationException());
CompletableFuture<Void> doCompleteMultipartUpload(WriteOptions options, String path, String uploadId,
List<ObjectStorageCompletedPart> parts) {
return FutureUtil.failedFuture(new UnsupportedOperationException());
}

@Override
void doDeleteObjects(List<String> objectKeys, Consumer<Throwable> failHandler, Runnable successHandler) {
CompletableFuture<Void> doDeleteObjects(List<String> objectKeys) {
objectKeys.forEach(storage::remove);
successHandler.run();
return CompletableFuture.completedFuture(null);
}

@Override
boolean isUnrecoverable(Throwable ex) {
if (ex instanceof UnsupportedOperationException) {
return true;
}
if (ex instanceof IllegalArgumentException) {
return true;
}
return false;
Throwable cause = FutureUtil.cause(ex);
return cause instanceof UnsupportedOperationException || cause instanceof IllegalArgumentException;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public MultiPartWriter(ObjectStorage.WriteOptions writeOptions, AbstractObjectSt

private void init() {
FutureUtil.propagate(
operator.createMultipartUpload(path).thenApply(uploadId -> {
operator.createMultipartUpload(writeOptions, path).thenApply(uploadId -> {
this.uploadId = uploadId;
return uploadId;
}),
Expand Down Expand Up @@ -150,7 +150,7 @@ public CompletableFuture<Void> close() {
S3ObjectStats.getInstance().objectStageReadyCloseStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
closeCf = new CompletableFuture<>();
CompletableFuture<Void> uploadDoneCf = uploadIdCf.thenCompose(uploadId -> CompletableFuture.allOf(parts.toArray(new CompletableFuture[0])));
FutureUtil.propagate(uploadDoneCf.thenCompose(nil -> operator.completeMultipartUpload(path, uploadId, genCompleteParts())), closeCf);
FutureUtil.propagate(uploadDoneCf.thenCompose(nil -> operator.completeMultipartUpload(writeOptions, path, uploadId, genCompleteParts())), closeCf);
closeCf.whenComplete((nil, ex) -> {
S3ObjectStats.getInstance().objectStageTotalStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
S3ObjectStats.getInstance().objectNumInTotalStats.add(MetricsLevel.DEBUG, 1);
Expand Down Expand Up @@ -234,7 +234,7 @@ public void upload() {

private void upload0() {
TimerUtil timerUtil = new TimerUtil();
FutureUtil.propagate(uploadIdCf.thenCompose(uploadId -> operator.uploadPart(path, uploadId, partNumber, partBuf, throttleStrategy)), partCf);
FutureUtil.propagate(uploadIdCf.thenCompose(uploadId -> operator.uploadPart(writeOptions, path, uploadId, partNumber, partBuf)), partCf);
partCf.whenComplete((nil, ex) -> {
S3ObjectStats.getInstance().objectStageUploadPartStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
});
Expand All @@ -259,7 +259,7 @@ class CopyObjectPart {
public CopyObjectPart(String sourcePath, long start, long end) {
int partNumber = nextPartNumber.getAndIncrement();
parts.add(partCf);
FutureUtil.propagate(uploadIdCf.thenCompose(uploadId -> operator.uploadPartCopy(sourcePath, path, start, end, uploadId, partNumber)), partCf);
FutureUtil.propagate(uploadIdCf.thenCompose(uploadId -> operator.uploadPartCopy(writeOptions, sourcePath, path, start, end, uploadId, partNumber)), partCf);
}

public CompletableFuture<Void> getFuture() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class WriteOptions {

private ThrottleStrategy throttleStrategy = ThrottleStrategy.BYPASS;
private int allocType = ByteBufAlloc.DEFAULT;
private long apiCallAttemptTimeout = -1L;

public WriteOptions throttleStrategy(ThrottleStrategy throttleStrategy) {
this.throttleStrategy = throttleStrategy;
Expand All @@ -91,6 +92,11 @@ public WriteOptions allocType(int allocType) {
return this;
}

public WriteOptions apiCallAttemptTimeout(long apiCallAttemptTimeout) {
this.apiCallAttemptTimeout = apiCallAttemptTimeout;
return this;
}

public ThrottleStrategy throttleStrategy() {
return throttleStrategy;
}
Expand All @@ -99,6 +105,10 @@ public int allocType() {
return allocType;
}

public long apiCallAttemptTimeout() {
return apiCallAttemptTimeout;
}

}

class ReadOptions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,20 @@
class ProxyWriter implements Writer {
final ObjectWriter objectWriter = new ObjectWriter();
private final WriteOptions writeOptions;
private final AbstractObjectStorage operator;
private final AbstractObjectStorage objectStorage;
private final String path;
private final long minPartSize;
Writer multiPartWriter = null;

public ProxyWriter(WriteOptions writeOptions, AbstractObjectStorage operator, String path, long minPartSize) {
public ProxyWriter(WriteOptions writeOptions, AbstractObjectStorage objectStorage, String path, long minPartSize) {
this.writeOptions = writeOptions;
this.operator = operator;
this.objectStorage = objectStorage;
this.path = path;
this.minPartSize = minPartSize;
}

public ProxyWriter(WriteOptions writeOptions, AbstractObjectStorage operator, String path) {
this(writeOptions, operator, path, Writer.MIN_PART_SIZE);
public ProxyWriter(WriteOptions writeOptions, AbstractObjectStorage objectStorage, String path) {
this(writeOptions, objectStorage, path, Writer.MIN_PART_SIZE);
}

@Override
Expand Down Expand Up @@ -105,7 +105,7 @@ public CompletableFuture<Void> release() {
}

private void newMultiPartWriter() {
this.multiPartWriter = new MultiPartWriter(writeOptions, operator, path, minPartSize);
this.multiPartWriter = new MultiPartWriter(writeOptions, objectStorage, path, minPartSize);
if (objectWriter.data.readableBytes() > 0) {
FutureUtil.propagate(multiPartWriter.write(objectWriter.data), objectWriter.cf);
} else {
Expand Down Expand Up @@ -153,7 +153,7 @@ public boolean hasBatchingPart() {
public CompletableFuture<Void> close() {
S3ObjectStats.getInstance().objectStageReadyCloseStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
int size = data.readableBytes();
FutureUtil.propagate(operator.write(path, data, writeOptions.throttleStrategy()), cf);
FutureUtil.propagate(objectStorage.write(writeOptions, path, data), cf);
cf.whenComplete((nil, e) -> {
S3ObjectStats.getInstance().objectStageTotalStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
S3ObjectStats.getInstance().objectNumInTotalStats.add(MetricsLevel.DEBUG, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public void testReadBlockGroup() throws ExecutionException, InterruptedException
int indexSize = buf.readableBytes() - indexPosition;
buf.writeBytes(new ObjectWriter.Footer(indexPosition, indexSize).buffer());
int objectSize = buf.readableBytes();
objectStorage.write(WriteOptions.DEFAULT, ObjectUtils.genKey(0, 1L), buf);
objectStorage.write(WriteOptions.DEFAULT, ObjectUtils.genKey(0, 1L), buf).get();
try (ObjectReader reader = ObjectReader.reader(new S3ObjectMetadata(1L, objectSize, S3ObjectType.STREAM), objectStorage)) {
ObjectReader.FindIndexResult rst = reader.find(233L, 10L, 14L, 1024).get();
assertEquals(1, rst.streamDataBlocks().size());
Expand Down
Loading