diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index dcc0e260bb..b53d4a651d 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -48,8 +48,10 @@ import com.automq.stream.utils.ExceptionUtil; import com.automq.stream.utils.FutureTicker; import com.automq.stream.utils.FutureUtil; +import com.automq.stream.utils.Systems; import com.automq.stream.utils.ThreadUtils; import com.automq.stream.utils.Threads; +import com.automq.stream.utils.threads.EventLoop; import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; @@ -89,7 +91,6 @@ import io.opentelemetry.instrumentation.annotations.SpanAttribute; import io.opentelemetry.instrumentation.annotations.WithSpan; -import static com.automq.stream.utils.FutureUtil.suppress; public class S3Storage implements Storage { private static final Logger LOGGER = LoggerFactory.getLogger(S3Storage.class); @@ -152,6 +153,9 @@ public static LinkRecordDecoder getLinkRecordDecoder() { * @see #handleAppendCallback */ private final Lock[] streamCallbackLocks = IntStream.range(0, NUM_STREAM_CALLBACK_LOCKS).mapToObj(i -> new ReentrantLock()).toArray(Lock[]::new); + private final EventLoop[] callbackExecutors = IntStream.range(0, Systems.CPU_CORES).mapToObj(i -> new EventLoop("AUTOMQ_S3STREAM_APPEND_CALLBACK-" + i)) + .toArray(EventLoop[]::new); + private long lastLogTimestamp = 0L; private volatile double maxDataWriteRate = 0.0; @@ -531,6 +535,9 @@ public void shutdown() { backgroundExecutor.shutdownNow(); LOGGER.warn("await backgroundExecutor close fail", e); } + for (EventLoop executor : callbackExecutors) { + executor.shutdownGracefully(); + } } @Override @@ -808,7 +815,15 @@ public CompletableFuture forceUpload(long streamId) { } private void handleAppendCallback(WalWriteRequest request) { - suppress(() -> handleAppendCallback0(request), LOGGER); + // parallel execute append callback in streamId based executor. + EventLoop executor = callbackExecutors[Math.abs((int) (request.record.getStreamId() % callbackExecutors.length))]; + executor.execute(() -> { + try { + handleAppendCallback0(request); + } catch (Throwable e) { + LOGGER.error("[UNEXPECTED], handle append callback fail, request {}", request, e); + } + }); } private void handleAppendCallback0(WalWriteRequest request) { @@ -820,7 +835,6 @@ private void handleAppendCallback0(WalWriteRequest request) { // cache block is full, trigger WAL upload. uploadDeltaWAL(); } - // TODO: parallel callback request.cf.complete(null); StorageOperationStats.getInstance().appendCallbackStats.record(TimerUtil.timeElapsedSince(startTime, TimeUnit.NANOSECONDS)); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java index 22941e5836..1477658b43 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java @@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; +import java.util.function.Function; import java.util.stream.Collectors; import io.opentelemetry.instrumentation.annotations.SpanAttribute; @@ -77,7 +78,7 @@ public LogCache(long capacity, long cacheBlockMaxSize, int maxCacheBlockStreamCo this.capacity = capacity; this.cacheBlockMaxSize = cacheBlockMaxSize; this.maxCacheBlockStreamCount = maxCacheBlockStreamCount; - this.activeBlock = new LogCacheBlock(cacheBlockMaxSize, maxCacheBlockStreamCount); + this.activeBlock = new LogCacheBlock(cacheBlockMaxSize, maxCacheBlockStreamCount, s -> 0); this.blocks.add(activeBlock); this.blockFreeListener = blockFreeListener; } @@ -214,7 +215,15 @@ public LogCacheBlock archiveCurrentBlock() { try { LogCacheBlock block = activeBlock; block.lastRecordOffset = lastRecordOffset; - activeBlock = new LogCacheBlock(cacheBlockMaxSize, maxCacheBlockStreamCount); + activeBlock = new LogCacheBlock(cacheBlockMaxSize, maxCacheBlockStreamCount, streamId -> { + StreamCache previousStreamCache = block.map.get(streamId); + if (previousStreamCache != null) { + // Let the initial capacity be 10% larger than the previous block's stream cache to avoid frequent resizing. + return previousStreamCache.count() * 10 / 9; + } else { + return 0; + } + }); blocks.add(activeBlock); blockCount.set(blocks.size()); return block; @@ -390,17 +399,19 @@ public static class LogCacheBlock { private final long createdTimestamp = System.currentTimeMillis(); private final AtomicLong size = new AtomicLong(); private final List freeListeners = new ArrayList<>(); + private final Function streamRecordsCapacityHint; volatile boolean free; private RecordOffset lastRecordOffset; - public LogCacheBlock(long maxSize, int maxStreamCount) { + public LogCacheBlock(long maxSize, int maxStreamCount, Function streamRecordsCapacityHint) { this.blockId = BLOCK_ID_ALLOC.getAndIncrement(); this.maxSize = maxSize; this.maxStreamCount = maxStreamCount; + this.streamRecordsCapacityHint = streamRecordsCapacityHint; } public LogCacheBlock(long maxSize) { - this(maxSize, DEFAULT_MAX_BLOCK_STREAM_COUNT); + this(maxSize, DEFAULT_MAX_BLOCK_STREAM_COUNT, s -> 0); } public long blockId() { @@ -414,7 +425,7 @@ public boolean isFull() { public boolean put(StreamRecordBatch recordBatch) { map.compute(recordBatch.getStreamId(), (id, cache) -> { if (cache == null) { - cache = new StreamCache(); + cache = new StreamCache(streamRecordsCapacityHint.apply(id)); } cache.add(recordBatch); return cache; @@ -536,11 +547,15 @@ public StreamRange(long startOffset, long endOffset) { } static class StreamCache { - List records = new ArrayList<>(); + List records; long startOffset = NOOP_OFFSET; long endOffset = NOOP_OFFSET; Map offsetIndexMap = new HashMap<>(); + public StreamCache(int initialCapacity) { + this.records = new ArrayList<>(initialCapacity); + } + synchronized void add(StreamRecordBatch recordBatch) { if (recordBatch.getBaseOffset() != endOffset && endOffset != NOOP_OFFSET) { RuntimeException ex = new IllegalArgumentException(String.format("streamId=%s record batch base offset mismatch, expect %s, actual %s", @@ -630,6 +645,10 @@ synchronized long endOffset() { synchronized void endOffset(long endOffset) { this.endOffset = endOffset; } + + synchronized int count() { + return records.size(); + } } static class IndexAndCount {