diff --git a/binder/src/main/java/io/grpc/binder/internal/BinderTransport.java b/binder/src/main/java/io/grpc/binder/internal/BinderTransport.java index e07619dca33..219651a8b69 100644 --- a/binder/src/main/java/io/grpc/binder/internal/BinderTransport.java +++ b/binder/src/main/java/io/grpc/binder/internal/BinderTransport.java @@ -60,6 +60,8 @@ import io.grpc.internal.StatsTraceContext; import io.grpc.internal.TimeProvider; import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; @@ -186,6 +188,9 @@ protected enum TransportState { protected final ConcurrentHashMap> ongoingCalls; + @GuardedBy("this") + private final LinkedHashSet callIdsToNotifyWhenReady = new LinkedHashSet<>(); + @GuardedBy("this") protected Attributes attributes; @@ -529,9 +534,18 @@ final void handleAcknowledgedBytes(long numBytes) { logger.log( Level.FINE, "handleAcknowledgedBytes: Transmit Window No-Longer Full. Unblock calls: " + this); - // We're ready again, and need to poke any waiting transactions. - for (Inbound inbound : ongoingCalls.values()) { - inbound.onTransportReady(); + + // The LinkedHashSet contract guarantees that an id already present in this collection will + // not lose its priority if we re-insert it here. + callIdsToNotifyWhenReady.addAll(ongoingCalls.keySet()); + + Iterator i = callIdsToNotifyWhenReady.iterator(); + while (isReady() && i.hasNext()) { + Inbound inbound = ongoingCalls.get(i.next()); + i.remove(); + if (inbound != null) { // Calls can be removed out from under us. + inbound.onTransportReady(); + } } } }