diff --git a/core/src/main/java/kafka/automq/zerozone/DefaultLinkRecordDecoder.java b/core/src/main/java/kafka/automq/zerozone/DefaultLinkRecordDecoder.java index 7d1ac383dd..535c3d5b69 100644 --- a/core/src/main/java/kafka/automq/zerozone/DefaultLinkRecordDecoder.java +++ b/core/src/main/java/kafka/automq/zerozone/DefaultLinkRecordDecoder.java @@ -67,10 +67,10 @@ public CompletableFuture decode(StreamRecordBatch src) { streamRecordBatch.encoded(SnapshotReadCache.ENCODE_ALLOC); return streamRecordBatch; } finally { - src.release(); buf.release(); } }).whenComplete((rst, ex) -> { + src.release(); if (ex != null) { LOGGER.error("Error while decoding link record, link={}", linkRecord, ex); } diff --git a/core/src/main/java/kafka/automq/zerozone/ObjectRouterChannel.java b/core/src/main/java/kafka/automq/zerozone/ObjectRouterChannel.java index be9e51867b..4f4883fc68 100644 --- a/core/src/main/java/kafka/automq/zerozone/ObjectRouterChannel.java +++ b/core/src/main/java/kafka/automq/zerozone/ObjectRouterChannel.java @@ -27,6 +27,7 @@ import com.automq.stream.s3.wal.impl.object.ObjectWALService; import com.automq.stream.utils.FutureUtil; import com.automq.stream.utils.LogContext; +import com.automq.stream.utils.Threads; import org.slf4j.Logger; @@ -44,6 +45,7 @@ public class ObjectRouterChannel implements RouterChannel { private static final ExecutorService ASYNC_EXECUTOR = Executors.newCachedThreadPool(); + private static final long OVER_CAPACITY_RETRY_DELAY_MS = 1000L; private final Logger logger; private final AtomicLong mockOffset = new AtomicLong(0); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); @@ -82,20 +84,30 @@ public CompletableFuture append(int targetNodeId, short orderHint, CompletableFuture append0(int targetNodeId, short orderHint, ByteBuf data) { StreamRecordBatch record = new StreamRecordBatch(targetNodeId, 0, mockOffset.incrementAndGet(), 1, data); - try { - return wal.append(TraceContext.DEFAULT, record).thenApply(walRst -> { - readLock.lock(); - try { - long epoch = this.channelEpoch; - ChannelOffset channelOffset = ChannelOffset.of(channelId, orderHint, nodeId, targetNodeId, walRst.recordOffset().buffer()); - channelEpoch2LastRecordOffset.put(epoch, walRst.recordOffset()); - return new AppendResult(epoch, channelOffset.byteBuf()); - } finally { - readLock.unlock(); - } - }); - } catch (OverCapacityException e) { - return CompletableFuture.failedFuture(e); + record.encoded(); + record.retain(); + for (; ; ) { + try { + return wal.append(TraceContext.DEFAULT, record).thenApply(walRst -> { + readLock.lock(); + try { + long epoch = this.channelEpoch; + ChannelOffset channelOffset = ChannelOffset.of(channelId, orderHint, nodeId, targetNodeId, walRst.recordOffset().buffer()); + channelEpoch2LastRecordOffset.put(epoch, walRst.recordOffset()); + return new AppendResult(epoch, channelOffset.byteBuf()); + } finally { + readLock.unlock(); + } + }).whenComplete((r, e) -> record.release()); + } catch (OverCapacityException e) { + logger.warn("OverCapacityException occurred while appending, err={}", e.getMessage()); + // Use block-based delayed retries for network backpressure. + Threads.sleep(OVER_CAPACITY_RETRY_DELAY_MS); + } catch (Throwable e) { + logger.error("[UNEXPECTED], append wal fail", e); + record.release(); + return CompletableFuture.failedFuture(e); + } } } diff --git a/core/src/main/java/kafka/automq/zerozone/RouterInV2.java b/core/src/main/java/kafka/automq/zerozone/RouterInV2.java index d77e706fe9..f4827b936d 100644 --- a/core/src/main/java/kafka/automq/zerozone/RouterInV2.java +++ b/core/src/main/java/kafka/automq/zerozone/RouterInV2.java @@ -47,9 +47,9 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -71,7 +71,7 @@ public class RouterInV2 implements NonBlockingLocalRouterHandler { private final String rack; private final RouterInProduceHandler localAppendHandler; private RouterInProduceHandler routerInProduceHandler; - private final Queue unpackLinkQueue = new ConcurrentLinkedQueue<>(); + private final BlockingQueue unpackLinkQueue = new ArrayBlockingQueue<>(Systems.CPU_CORES * 8192); private final EventLoop[] appendEventLoops; private final FastThreadLocal requestLocals = new FastThreadLocal<>() { @Override @@ -115,9 +115,9 @@ private CompletableFuture handleZoneRouterRequest0(Aut for (ByteBuf channelOffset : routerRecord.channelOffsets()) { PartitionProduceRequest partitionProduceRequest = new PartitionProduceRequest(ChannelOffset.of(channelOffset)); partitionProduceRequest.unpackLinkCf = routerChannel.get(channelOffset); - unpackLinkQueue.add(partitionProduceRequest); + addToUnpackLinkQueue(partitionProduceRequest); partitionProduceRequest.unpackLinkCf.whenComplete((rst, ex) -> { - if (ex != null) { + if (ex == null) { size.addAndGet(rst.readableBytes()); } handleUnpackLink(); @@ -165,6 +165,16 @@ private void handleUnpackLink() { } } + private void addToUnpackLinkQueue(PartitionProduceRequest req) { + for (;;) { + try { + unpackLinkQueue.put(req); + return; + } catch (InterruptedException ignored) { + } + } + } + @Override public CompletableFuture append( ChannelOffset channelOffset, diff --git a/core/src/main/java/kafka/automq/zerozone/RouterOutV2.java b/core/src/main/java/kafka/automq/zerozone/RouterOutV2.java index 98f8331598..5b44b49c27 100644 --- a/core/src/main/java/kafka/automq/zerozone/RouterOutV2.java +++ b/core/src/main/java/kafka/automq/zerozone/RouterOutV2.java @@ -109,6 +109,11 @@ public void handleProduceAppendProxy(ProduceRequestArgs args) { } ZeroZoneMetricsManager.PROXY_REQUEST_LATENCY.record(time.nanoseconds() - startNanos); }); + }).exceptionally(ex -> { + LOGGER.error("Exception in processing append proxies", ex); + // Make the producer retry send. + responseMap.put(tp, errorPartitionResponse(Errors.LEADER_NOT_AVAILABLE)); + return null; }); cfList.add(proxyCf); } @@ -144,6 +149,10 @@ interface Proxy { void send(ProxyRequest request); } + static ProduceResponse.PartitionResponse errorPartitionResponse(Errors error) { + return new ProduceResponse.PartitionResponse(error, -1, -1, -1, -1, Collections.emptyList(), ""); + } + static class LocalProxy implements Proxy { private final NonBlockingLocalRouterHandler localRouterHandler; @@ -341,7 +350,7 @@ public void completeWithUnknownError() { } private void completeWithError(Errors errors) { - ProduceResponse.PartitionResponse rst = new ProduceResponse.PartitionResponse(errors, -1, -1, -1, -1, Collections.emptyList(), ""); + ProduceResponse.PartitionResponse rst = errorPartitionResponse(errors); cf.complete(rst); } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/SnapshotReadCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/SnapshotReadCache.java index 00da4be47b..a53eb68e46 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/SnapshotReadCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/SnapshotReadCache.java @@ -35,11 +35,12 @@ import java.util.Map; import java.util.Queue; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; @@ -74,7 +75,8 @@ public class SnapshotReadCache { private final LinkRecordDecoder linkRecordDecoder; private final Time time = Time.SYSTEM; - public SnapshotReadCache(StreamManager streamManager, LogCache cache, ObjectStorage objectStorage, LinkRecordDecoder linkRecordDecoder) { + public SnapshotReadCache(StreamManager streamManager, LogCache cache, ObjectStorage objectStorage, + LinkRecordDecoder linkRecordDecoder) { activeStreams = CacheBuilder.newBuilder() .expireAfterAccess(10, TimeUnit.MINUTES) .removalListener((RemovalListener) notification -> @@ -128,7 +130,8 @@ public synchronized CompletableFuture replay(List object return objectReplay.replay(objects); } - public synchronized CompletableFuture replay(WriteAheadLog confirmWAL, RecordOffset startOffset, RecordOffset endOffset) { + public synchronized CompletableFuture replay(WriteAheadLog confirmWAL, RecordOffset startOffset, + RecordOffset endOffset) { long startNanos = time.nanoseconds(); return walReplay.replay(confirmWAL, startOffset, endOffset) .whenComplete((nil, ex) -> REPLAY_LATENCY.record(time.nanoseconds() - startNanos)); @@ -153,32 +156,69 @@ private void activeStream(long streamId) { } class WalReplay { + private static final long TASK_WAITING_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(5); + private static final int MAX_WAITING_LOAD_TASK_COUNT = 4096; // soft limit the inflight memory - private final Semaphore inflightLimiter = new Semaphore(Systems.CPU_CORES * 4); - private final Queue waitingLoadTasks = new ConcurrentLinkedQueue<>(); + private final int maxInflightLoadingCount = Systems.CPU_CORES * 4; + private final BlockingQueue waitingLoadTasks = new ArrayBlockingQueue<>(MAX_WAITING_LOAD_TASK_COUNT); private final Queue loadingTasks = new ConcurrentLinkedQueue<>(); public CompletableFuture replay(WriteAheadLog wal, RecordOffset startOffset, RecordOffset endOffset) { - inflightLimiter.acquireUninterruptibly(); WalReplayTask task = new WalReplayTask(wal, startOffset, endOffset); - waitingLoadTasks.add(task); + while (!waitingLoadTasks.add(task)) { + // The replay won't be called on the SnapshotReadCache.eventLoop, so there won't be a deadlock. + eventLoop.submit(this::clearOverloadedTask).join(); + } eventLoop.submit(this::tryLoad); - return task.replayCf.whenComplete((nil, ex) -> inflightLimiter.release()); + return task.replayCf.whenCompleteAsync((nil, ex) -> tryLoad(), eventLoop); } @EventLoopSafe private void tryLoad() { for (; ; ) { - WalReplayTask task = waitingLoadTasks.poll(); + if (loadingTasks.size() >= maxInflightLoadingCount) { + break; + } + WalReplayTask task = waitingLoadTasks.peek(); if (task == null) { break; } + if (time.nanoseconds() - task.timestampNanos > TASK_WAITING_TIMEOUT_NANOS) { + clearOverloadedTask(); + return; + } + waitingLoadTasks.poll(); loadingTasks.add(task); task.run(); task.loadCf.whenCompleteAsync((rst, ex) -> tryPutIntoCache(), eventLoop); } } + /** + * Clears all waiting tasks when the replay system is overloaded. + * This is triggered when tasks wait longer than TASK_WAITING_TIMEOUT_NANOS or waitingLoadTasks is full. + * All dropped tasks have their futures completed with null, and affected + * nodes are notified to commit their WAL to free up resources. + */ + @EventLoopSafe + private void clearOverloadedTask() { + // The WalReplay is overloaded, so we need to drain all tasks promptly. + Set nodeIds = new HashSet<>(); + int dropCount = 0; + for (; ; ) { + WalReplayTask task = waitingLoadTasks.poll(); + if (task == null) { + break; + } + nodeIds.add(task.wal.metadata().nodeId()); + task.loadCf.complete(null); + task.replayCf.complete(null); + dropCount++; + } + nodeIds.forEach(cacheFreeListener::notifyListener); + LOGGER.warn("wal replay is overloaded, drop all {} waiting tasks and request nodes={} to commit", dropCount, nodeIds); + } + @EventLoopSafe private void tryPutIntoCache() { for (; ; ) { @@ -195,6 +235,7 @@ private void tryPutIntoCache() { } class WalReplayTask { + final long timestampNanos = time.nanoseconds(); final WriteAheadLog wal; final RecordOffset startOffset; final RecordOffset endOffset; @@ -389,9 +430,11 @@ public void onFree(List bounds) { requestCommitNodes.add(streamMetadata.nodeId()); } } - listeners.forEach(listener -> - requestCommitNodes.forEach(nodeId -> - FutureUtil.suppress(() -> listener.onEvent(new RequestCommitEvent(nodeId)), LOGGER))); + requestCommitNodes.forEach(this::notifyListener); + } + + public void notifyListener(int nodeId) { + listeners.forEach(listener -> FutureUtil.suppress(() -> listener.onEvent(new RequestCommitEvent(nodeId)), LOGGER)); } public void addListener(EventListener listener) {