Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@
import com.automq.stream.utils.ExceptionUtil;
import com.automq.stream.utils.FutureTicker;
import com.automq.stream.utils.FutureUtil;
import com.automq.stream.utils.Systems;
import com.automq.stream.utils.ThreadUtils;
import com.automq.stream.utils.Threads;
import com.automq.stream.utils.threads.EventLoop;

import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
Expand Down Expand Up @@ -89,7 +91,6 @@
import io.opentelemetry.instrumentation.annotations.SpanAttribute;
import io.opentelemetry.instrumentation.annotations.WithSpan;

import static com.automq.stream.utils.FutureUtil.suppress;

public class S3Storage implements Storage {
private static final Logger LOGGER = LoggerFactory.getLogger(S3Storage.class);
Expand Down Expand Up @@ -152,6 +153,9 @@ public static LinkRecordDecoder getLinkRecordDecoder() {
* @see #handleAppendCallback
*/
private final Lock[] streamCallbackLocks = IntStream.range(0, NUM_STREAM_CALLBACK_LOCKS).mapToObj(i -> new ReentrantLock()).toArray(Lock[]::new);
private final EventLoop[] callbackExecutors = IntStream.range(0, Systems.CPU_CORES).mapToObj(i -> new EventLoop("AUTOMQ_S3STREAM_APPEND_CALLBACK-" + i))
.toArray(EventLoop[]::new);

private long lastLogTimestamp = 0L;
private volatile double maxDataWriteRate = 0.0;

Expand Down Expand Up @@ -523,6 +527,9 @@ public void shutdown() {
FutureUtil.suppress(() -> delayTrim.close(), LOGGER);
deltaWAL.shutdownGracefully();
ThreadUtils.shutdownExecutor(backgroundExecutor, 10, TimeUnit.SECONDS, LOGGER);
for (EventLoop executor : callbackExecutors) {
executor.shutdownGracefully();
}
}

@Override
Expand Down Expand Up @@ -800,7 +807,15 @@ public CompletableFuture<Void> forceUpload(long streamId) {
}

private void handleAppendCallback(WalWriteRequest request) {
suppress(() -> handleAppendCallback0(request), LOGGER);
// parallel execute append callback in streamId based executor.
EventLoop executor = callbackExecutors[Math.abs((int) (request.record.getStreamId() % callbackExecutors.length))];
executor.execute(() -> {
try {
handleAppendCallback0(request);
} catch (Throwable e) {
LOGGER.error("[UNEXPECTED], handle append callback fail, request {}", request, e);
}
});
}

private void handleAppendCallback0(WalWriteRequest request) {
Expand All @@ -812,7 +827,6 @@ private void handleAppendCallback0(WalWriteRequest request) {
// cache block is full, trigger WAL upload.
uploadDeltaWAL();
}
// TODO: parallel callback
request.cf.complete(null);
StorageOperationStats.getInstance().appendCallbackStats.record(TimerUtil.timeElapsedSince(startTime, TimeUnit.NANOSECONDS));
}
Expand Down
31 changes: 25 additions & 6 deletions s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;

import io.opentelemetry.instrumentation.annotations.SpanAttribute;
Expand Down Expand Up @@ -77,7 +78,7 @@ public LogCache(long capacity, long cacheBlockMaxSize, int maxCacheBlockStreamCo
this.capacity = capacity;
this.cacheBlockMaxSize = cacheBlockMaxSize;
this.maxCacheBlockStreamCount = maxCacheBlockStreamCount;
this.activeBlock = new LogCacheBlock(cacheBlockMaxSize, maxCacheBlockStreamCount);
this.activeBlock = new LogCacheBlock(cacheBlockMaxSize, maxCacheBlockStreamCount, s -> 0);
this.blocks.add(activeBlock);
this.blockFreeListener = blockFreeListener;
}
Expand Down Expand Up @@ -214,7 +215,15 @@ public LogCacheBlock archiveCurrentBlock() {
try {
LogCacheBlock block = activeBlock;
block.lastRecordOffset = lastRecordOffset;
activeBlock = new LogCacheBlock(cacheBlockMaxSize, maxCacheBlockStreamCount);
activeBlock = new LogCacheBlock(cacheBlockMaxSize, maxCacheBlockStreamCount, streamId -> {
StreamCache previousStreamCache = block.map.get(streamId);
if (previousStreamCache != null) {
// Let the initial capacity be 10% larger than the previous block's stream cache to avoid frequent resizing.
return previousStreamCache.count() * 10 / 9;
} else {
return 0;
}
});
blocks.add(activeBlock);
blockCount.set(blocks.size());
return block;
Expand Down Expand Up @@ -390,17 +399,19 @@ public static class LogCacheBlock {
private final long createdTimestamp = System.currentTimeMillis();
private final AtomicLong size = new AtomicLong();
private final List<FreeListener> freeListeners = new ArrayList<>();
private final Function<Long, Integer> streamRecordsCapacityHint;
volatile boolean free;
private RecordOffset lastRecordOffset;

public LogCacheBlock(long maxSize, int maxStreamCount) {
public LogCacheBlock(long maxSize, int maxStreamCount, Function<Long, Integer> streamRecordsCapacityHint) {
this.blockId = BLOCK_ID_ALLOC.getAndIncrement();
this.maxSize = maxSize;
this.maxStreamCount = maxStreamCount;
this.streamRecordsCapacityHint = streamRecordsCapacityHint;
}

public LogCacheBlock(long maxSize) {
this(maxSize, DEFAULT_MAX_BLOCK_STREAM_COUNT);
this(maxSize, DEFAULT_MAX_BLOCK_STREAM_COUNT, s -> 0);
}

public long blockId() {
Expand All @@ -414,7 +425,7 @@ public boolean isFull() {
public boolean put(StreamRecordBatch recordBatch) {
map.compute(recordBatch.getStreamId(), (id, cache) -> {
if (cache == null) {
cache = new StreamCache();
cache = new StreamCache(streamRecordsCapacityHint.apply(id));
}
cache.add(recordBatch);
return cache;
Expand Down Expand Up @@ -536,11 +547,15 @@ public StreamRange(long startOffset, long endOffset) {
}

static class StreamCache {
List<StreamRecordBatch> records = new ArrayList<>();
List<StreamRecordBatch> records;
long startOffset = NOOP_OFFSET;
long endOffset = NOOP_OFFSET;
Map<Long, IndexAndCount> offsetIndexMap = new HashMap<>();

public StreamCache(int initialCapacity) {
this.records = new ArrayList<>(initialCapacity);
}

synchronized void add(StreamRecordBatch recordBatch) {
if (recordBatch.getBaseOffset() != endOffset && endOffset != NOOP_OFFSET) {
RuntimeException ex = new IllegalArgumentException(String.format("streamId=%s record batch base offset mismatch, expect %s, actual %s",
Expand Down Expand Up @@ -630,6 +645,10 @@ synchronized long endOffset() {
synchronized void endOffset(long endOffset) {
this.endOffset = endOffset;
}

synchronized int count() {
return records.size();
}
}

static class IndexAndCount {
Expand Down
Loading