From a0a25df6d4dd57c50a6656f66dee51f1c55f9ead Mon Sep 17 00:00:00 2001 From: John Cormie Date: Fri, 14 Jan 2022 19:57:28 +0000 Subject: [PATCH] Invoke onTransportReady() in a round-robin fashion. Also call onTransportReady() only if isReady() still holds by the time we get to a given Inbound. This dramatically reduces timeouts and improves throughput when flow control has kicked in. This approach is still not completely fair since each ongoing call might consume a different amount of window on its turn, but because of the way Outbound#writeMessageData() and BlockPool already work, everyone gets to send at least 16kb. --- .../grpc/binder/internal/BinderTransport.java | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/binder/src/main/java/io/grpc/binder/internal/BinderTransport.java b/binder/src/main/java/io/grpc/binder/internal/BinderTransport.java index e07619dca33..219651a8b69 100644 --- a/binder/src/main/java/io/grpc/binder/internal/BinderTransport.java +++ b/binder/src/main/java/io/grpc/binder/internal/BinderTransport.java @@ -60,6 +60,8 @@ import io.grpc.internal.StatsTraceContext; import io.grpc.internal.TimeProvider; import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; @@ -186,6 +188,9 @@ protected enum TransportState { protected final ConcurrentHashMap> ongoingCalls; + @GuardedBy("this") + private final LinkedHashSet callIdsToNotifyWhenReady = new LinkedHashSet<>(); + @GuardedBy("this") protected Attributes attributes; @@ -529,9 +534,18 @@ final void handleAcknowledgedBytes(long numBytes) { logger.log( Level.FINE, "handleAcknowledgedBytes: Transmit Window No-Longer Full. Unblock calls: " + this); - // We're ready again, and need to poke any waiting transactions. - for (Inbound inbound : ongoingCalls.values()) { - inbound.onTransportReady(); + + // The LinkedHashSet contract guarantees that an id already present in this collection will + // not lose its priority if we re-insert it here. + callIdsToNotifyWhenReady.addAll(ongoingCalls.keySet()); + + Iterator i = callIdsToNotifyWhenReady.iterator(); + while (isReady() && i.hasNext()) { + Inbound inbound = ongoingCalls.get(i.next()); + i.remove(); + if (inbound != null) { // Calls can be removed out from under us. + inbound.onTransportReady(); + } } } }