diff --git a/binder/src/main/java/io/grpc/binder/internal/BinderServerTransport.java b/binder/src/main/java/io/grpc/binder/internal/BinderServerTransport.java index 1556b838142..cbe64149e15 100644 --- a/binder/src/main/java/io/grpc/binder/internal/BinderServerTransport.java +++ b/binder/src/main/java/io/grpc/binder/internal/BinderServerTransport.java @@ -15,9 +15,6 @@ */ package io.grpc.binder.internal; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; - import android.os.IBinder; import com.google.errorprone.annotations.concurrent.GuardedBy; import io.grpc.Attributes; @@ -41,7 +38,9 @@ public final class BinderServerTransport extends BinderTransport implements ServerTransport { private final List streamTracerFactories; - @Nullable private ServerTransportListener serverTransportListener; + + @GuardedBy("this") + private final SimplePromise listenerPromise = new SimplePromise<>(); /** * Constructs a new transport instance. @@ -69,13 +68,8 @@ public BinderServerTransport( * @param serverTransportListener where this transport will report events */ public synchronized void start(ServerTransportListener serverTransportListener) { - checkState(this.serverTransportListener == null, "Already started!"); - this.serverTransportListener = checkNotNull(serverTransportListener, "serverTransportListener"); - if (isShutdown()) { - setState(TransportState.SHUTDOWN_TERMINATED); - notifyTerminated(); - releaseExecutors(); - } else { + this.listenerPromise.set(serverTransportListener); + if (!isShutdown()) { sendSetupTransaction(); // Check we're not shutdown again, since a failure inside sendSetupTransaction (or a callback // it triggers), could have shut us down. @@ -90,11 +84,16 @@ StatsTraceContext createStatsTraceContext(String methodName, Metadata headers) { return StatsTraceContext.newServerContext(streamTracerFactories, methodName, headers); } + /** + * Reports a new ServerStream requested by the remote client. + * + *

Precondition: {@link #start(ServerTransportListener)} must already have been called. + */ synchronized Status startStream(ServerStream stream, String methodName, Metadata headers) { if (isShutdown()) { return Status.UNAVAILABLE.withDescription("transport is shutdown"); } else { - serverTransportListener.streamCreated(stream, methodName, headers); + listenerPromise.get().streamCreated(stream, methodName, headers); return Status.OK; } } @@ -108,9 +107,7 @@ void notifyShutdown(Status status) { @Override @GuardedBy("this") void notifyTerminated() { - if (serverTransportListener != null) { - serverTransportListener.transportTerminated(); - } + listenerPromise.runWhenSet(ServerTransportListener::transportTerminated); } @Override diff --git a/binder/src/main/java/io/grpc/binder/internal/SimplePromise.java b/binder/src/main/java/io/grpc/binder/internal/SimplePromise.java new file mode 100644 index 00000000000..c7d227fbf64 --- /dev/null +++ b/binder/src/main/java/io/grpc/binder/internal/SimplePromise.java @@ -0,0 +1,97 @@ +/* + * Copyright 2025 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.binder.internal; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import java.util.ArrayList; +import java.util.List; + +/** + * Placeholder for an object that will be provided later. + * + *

Similar to {@link com.google.common.util.concurrent.SettableFuture}, except it cannot fail or + * be cancelled. Most importantly, this class guarantees that {@link Listener}s run one-at-a-time + * and in the same order that they were scheduled. This conveniently matches the expectations of + * most listener interfaces in the io.grpc universe. + * + *

Not safe for concurrent use by multiple threads. Thread-compatible for callers that provide + * synchronization externally. + */ +public class SimplePromise { + private T value; + private List> pendingListeners; // Allocated lazily in the hopes it's never needed. + + /** + * Provides the promised object and runs any pending listeners. + * + * @throws IllegalStateException if this method has already been called + * @throws RuntimeException if some pending listener threw when we tried to run it + */ + public void set(T value) { + checkNotNull(value, "value"); + checkState(this.value == null, "Already set!"); + this.value = value; + if (pendingListeners != null) { + for (Listener listener : pendingListeners) { + listener.notify(value); + } + pendingListeners = null; + } + } + + /** + * Returns the promised object, under the assumption that it's already been set. + * + *

Compared to {@link #runWhenSet(Listener)}, this method may be a more efficient way to access + * the promised value in the case where you somehow know externally that {@link #set(T)} has + * "happened-before" this call. + * + * @throws IllegalStateException if {@link #set(T)} has not yet been called + */ + public T get() { + checkState(value != null, "Not yet set!"); + return value; + } + + /** + * Runs the given listener when this promise is fulfilled, or immediately if already fulfilled. + * + * @throws RuntimeException if already fulfilled and 'listener' threw when we tried to run it + */ + public void runWhenSet(Listener listener) { + if (value != null) { + listener.notify(value); + } else { + if (pendingListeners == null) { + pendingListeners = new ArrayList<>(); + } + pendingListeners.add(listener); + } + } + + /** + * An object that wants to get notified when a SimplePromise has been fulfilled. + */ + public interface Listener { + /** + * Indicates that the associated SimplePromise has been fulfilled with the given `value`. + */ + void notify(T value); + } +} diff --git a/binder/src/test/java/io/grpc/binder/internal/BinderServerTransportTest.java b/binder/src/test/java/io/grpc/binder/internal/BinderServerTransportTest.java index b3b99ae34e8..12416922fc9 100644 --- a/binder/src/test/java/io/grpc/binder/internal/BinderServerTransportTest.java +++ b/binder/src/test/java/io/grpc/binder/internal/BinderServerTransportTest.java @@ -21,8 +21,10 @@ import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.when; import static org.robolectric.Shadows.shadowOf; +import android.os.DeadObjectException; import android.os.IBinder; import android.os.Looper; import android.os.Parcel; @@ -94,6 +96,27 @@ public void testSetupTransactionFailureReportsMultipleTerminations_b153460678() assertThat(transportListener.isTerminated()).isTrue(); } + @Test + public void testStartAfterShutdownAndIdle() throws Exception { + transport = newBinderServerTransportBuilder().build(); + transport.shutdownNow(Status.UNKNOWN.withDescription("reasons")); + shadowOf(Looper.getMainLooper()).idle(); + transport.start(transportListener); + shadowOf(Looper.getMainLooper()).idle(); + + assertThat(transportListener.isTerminated()).isTrue(); + } + + @Test + public void testStartAfterShutdownNoIdle() throws Exception { + transport = newBinderServerTransportBuilder().build(); + transport.shutdownNow(Status.UNKNOWN.withDescription("reasons")); + transport.start(transportListener); + shadowOf(Looper.getMainLooper()).idle(); + + assertThat(transportListener.isTerminated()).isTrue(); + } + static class BinderServerTransportBuilder { ObjectPool executorServicePool; Attributes attributes; diff --git a/binder/src/test/java/io/grpc/binder/internal/SimplePromiseTest.java b/binder/src/test/java/io/grpc/binder/internal/SimplePromiseTest.java new file mode 100644 index 00000000000..6486ff5e8a1 --- /dev/null +++ b/binder/src/test/java/io/grpc/binder/internal/SimplePromiseTest.java @@ -0,0 +1,143 @@ +/* + * Copyright 2025 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.binder.internal; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import io.grpc.binder.internal.SimplePromise.Listener; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +@RunWith(JUnit4.class) +public final class SimplePromiseTest { + + private static final String FULFILLED_VALUE = "a fulfilled value"; + + @Mock private Listener mockListener1; + @Mock private Listener mockListener2; + @Rule public final MockitoRule mocks = MockitoJUnit.rule(); + + private SimplePromise promise = new SimplePromise<>(); + + @Before + public void setUp() { + } + + @Test + public void get_beforeFulfilled_throws() { + IllegalStateException e = assertThrows(IllegalStateException.class, () -> promise.get()); + assertThat(e).hasMessageThat().isEqualTo("Not yet set!"); + } + + @Test + public void get_afterFulfilled_returnsValue() { + promise.set(FULFILLED_VALUE); + assertThat(promise.get()).isEqualTo(FULFILLED_VALUE); + } + + @Test + public void set_withNull_throws() { + assertThrows(NullPointerException.class, () -> promise.set(null)); + } + + @Test + public void set_calledTwice_throws() { + promise.set(FULFILLED_VALUE); + IllegalStateException e = + assertThrows(IllegalStateException.class, () -> promise.set("another value")); + assertThat(e).hasMessageThat().isEqualTo("Already set!"); + } + + @Test + public void runWhenSet_beforeFulfill_listenerIsNotifiedUponSet() { + promise.runWhenSet(mockListener1); + + // Should not have been called yet. + verify(mockListener1, never()).notify(FULFILLED_VALUE); + + promise.set(FULFILLED_VALUE); + + // Now it should be called. + verify(mockListener1, times(1)).notify(FULFILLED_VALUE); + } + + @Test + public void runWhenSet_afterSet_listenerIsNotifiedImmediately() { + promise.set(FULFILLED_VALUE); + promise.runWhenSet(mockListener1); + + // Should have been called immediately. + verify(mockListener1, times(1)).notify(FULFILLED_VALUE); + } + + @Test + public void multipleListeners_addedBeforeSet_allNotifiedInOrder() { + promise.runWhenSet(mockListener1); + promise.runWhenSet(mockListener2); + + promise.set(FULFILLED_VALUE); + + InOrder inOrder = inOrder(mockListener1, mockListener2); + inOrder.verify(mockListener1).notify(FULFILLED_VALUE); + inOrder.verify(mockListener2).notify(FULFILLED_VALUE); + } + + @Test + public void listenerThrows_duringSet_propagatesException() { + // A listener that will throw when notified. + Listener throwingListener = + (value) -> { + throw new UnsupportedOperationException("Listener failed"); + }; + + promise.runWhenSet(throwingListener); + + // Fulfilling the promise should now throw the exception from the listener. + UnsupportedOperationException e = + assertThrows(UnsupportedOperationException.class, () -> promise.set(FULFILLED_VALUE)); + assertThat(e).hasMessageThat().isEqualTo("Listener failed"); + } + + @Test + public void listenerThrows_whenAddedAfterSet_propagatesException() { + promise.set(FULFILLED_VALUE); + + // A listener that will throw when notified. + Listener throwingListener = + (value) -> { + throw new UnsupportedOperationException("Listener failed"); + }; + + // Running the listener should throw immediately because the promise is already fulfilled. + UnsupportedOperationException e = + assertThrows( + UnsupportedOperationException.class, () -> promise.runWhenSet(throwingListener)); + assertThat(e).hasMessageThat().isEqualTo("Listener failed"); + } +}