Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
*/
package io.grpc.binder.internal;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

import android.os.IBinder;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.grpc.Attributes;
Expand All @@ -41,7 +38,9 @@
public final class BinderServerTransport extends BinderTransport implements ServerTransport {

private final List<ServerStreamTracer.Factory> streamTracerFactories;
@Nullable private ServerTransportListener serverTransportListener;

@GuardedBy("this")
private final SimplePromise<ServerTransportListener> listenerPromise = new SimplePromise<>();

/**
* Constructs a new transport instance.
Expand Down Expand Up @@ -69,13 +68,8 @@ public BinderServerTransport(
* @param serverTransportListener where this transport will report events
*/
public synchronized void start(ServerTransportListener serverTransportListener) {
checkState(this.serverTransportListener == null, "Already started!");
this.serverTransportListener = checkNotNull(serverTransportListener, "serverTransportListener");
if (isShutdown()) {
setState(TransportState.SHUTDOWN_TERMINATED);
notifyTerminated();
releaseExecutors();
} else {
this.listenerPromise.set(serverTransportListener);
if (!isShutdown()) {
sendSetupTransaction();
// Check we're not shutdown again, since a failure inside sendSetupTransaction (or a callback
// it triggers), could have shut us down.
Expand All @@ -90,11 +84,16 @@ StatsTraceContext createStatsTraceContext(String methodName, Metadata headers) {
return StatsTraceContext.newServerContext(streamTracerFactories, methodName, headers);
}

/**
* Reports a new ServerStream requested by the remote client.
*
* <p>Precondition: {@link #start(ServerTransportListener)} must already have been called.
*/
synchronized Status startStream(ServerStream stream, String methodName, Metadata headers) {
if (isShutdown()) {
return Status.UNAVAILABLE.withDescription("transport is shutdown");
} else {
serverTransportListener.streamCreated(stream, methodName, headers);
listenerPromise.get().streamCreated(stream, methodName, headers);
return Status.OK;
}
}
Expand All @@ -108,9 +107,7 @@ void notifyShutdown(Status status) {
@Override
@GuardedBy("this")
void notifyTerminated() {
if (serverTransportListener != null) {
serverTransportListener.transportTerminated();
}
listenerPromise.runWhenSet(ServerTransportListener::transportTerminated);
}

@Override
Expand Down
97 changes: 97 additions & 0 deletions binder/src/main/java/io/grpc/binder/internal/SimplePromise.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright 2025 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.binder.internal;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

import java.util.ArrayList;
import java.util.List;

/**
* Placeholder for an object that will be provided later.
*
* <p>Similar to {@link com.google.common.util.concurrent.SettableFuture}, except it cannot fail or
* be cancelled. Most importantly, this class guarantees that {@link Listener}s run one-at-a-time
* and in the same order that they were scheduled. This conveniently matches the expectations of
* most listener interfaces in the io.grpc universe.
*
* <p>Not safe for concurrent use by multiple threads. Thread-compatible for callers that provide
* synchronization externally.
*/
public class SimplePromise<T> {
private T value;
private List<Listener<T>> pendingListeners; // Allocated lazily in the hopes it's never needed.

/**
* Provides the promised object and runs any pending listeners.
*
* @throws IllegalStateException if this method has already been called
* @throws RuntimeException if some pending listener threw when we tried to run it
*/
public void set(T value) {
checkNotNull(value, "value");
checkState(this.value == null, "Already set!");
this.value = value;
if (pendingListeners != null) {
for (Listener<T> listener : pendingListeners) {
listener.notify(value);
}
pendingListeners = null;
}
}

/**
* Returns the promised object, under the assumption that it's already been set.
*
* <p>Compared to {@link #runWhenSet(Listener)}, this method may be a more efficient way to access
* the promised value in the case where you somehow know externally that {@link #set(T)} has
* "happened-before" this call.
*
* @throws IllegalStateException if {@link #set(T)} has not yet been called
*/
public T get() {
checkState(value != null, "Not yet set!");
return value;
}

/**
* Runs the given listener when this promise is fulfilled, or immediately if already fulfilled.
*
* @throws RuntimeException if already fulfilled and 'listener' threw when we tried to run it
*/
public void runWhenSet(Listener<T> listener) {
if (value != null) {
listener.notify(value);
} else {
if (pendingListeners == null) {
pendingListeners = new ArrayList<>();
}
pendingListeners.add(listener);
}
}

/**
* An object that wants to get notified when a SimplePromise has been fulfilled.
*/
public interface Listener<T> {
/**
* Indicates that the associated SimplePromise has been fulfilled with the given `value`.
*/
void notify(T value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.when;
import static org.robolectric.Shadows.shadowOf;

import android.os.DeadObjectException;
import android.os.IBinder;
import android.os.Looper;
import android.os.Parcel;
Expand Down Expand Up @@ -94,6 +96,27 @@ public void testSetupTransactionFailureReportsMultipleTerminations_b153460678()
assertThat(transportListener.isTerminated()).isTrue();
}

@Test
public void testStartAfterShutdownAndIdle() throws Exception {
transport = newBinderServerTransportBuilder().build();
transport.shutdownNow(Status.UNKNOWN.withDescription("reasons"));
shadowOf(Looper.getMainLooper()).idle();
transport.start(transportListener);
shadowOf(Looper.getMainLooper()).idle();

assertThat(transportListener.isTerminated()).isTrue();
}

@Test
public void testStartAfterShutdownNoIdle() throws Exception {
transport = newBinderServerTransportBuilder().build();
transport.shutdownNow(Status.UNKNOWN.withDescription("reasons"));
transport.start(transportListener);
shadowOf(Looper.getMainLooper()).idle();

assertThat(transportListener.isTerminated()).isTrue();
}

static class BinderServerTransportBuilder {
ObjectPool<ScheduledExecutorService> executorServicePool;
Attributes attributes;
Expand Down
143 changes: 143 additions & 0 deletions binder/src/test/java/io/grpc/binder/internal/SimplePromiseTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright 2025 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.binder.internal;

import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import io.grpc.binder.internal.SimplePromise.Listener;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;

@RunWith(JUnit4.class)
public final class SimplePromiseTest {

private static final String FULFILLED_VALUE = "a fulfilled value";

@Mock private Listener<String> mockListener1;
@Mock private Listener<String> mockListener2;
@Rule public final MockitoRule mocks = MockitoJUnit.rule();

private SimplePromise<String> promise = new SimplePromise<>();

@Before
public void setUp() {
}

@Test
public void get_beforeFulfilled_throws() {
IllegalStateException e = assertThrows(IllegalStateException.class, () -> promise.get());
assertThat(e).hasMessageThat().isEqualTo("Not yet set!");
}

@Test
public void get_afterFulfilled_returnsValue() {
promise.set(FULFILLED_VALUE);
assertThat(promise.get()).isEqualTo(FULFILLED_VALUE);
}

@Test
public void set_withNull_throws() {
assertThrows(NullPointerException.class, () -> promise.set(null));
}

@Test
public void set_calledTwice_throws() {
promise.set(FULFILLED_VALUE);
IllegalStateException e =
assertThrows(IllegalStateException.class, () -> promise.set("another value"));
assertThat(e).hasMessageThat().isEqualTo("Already set!");
}

@Test
public void runWhenSet_beforeFulfill_listenerIsNotifiedUponSet() {
promise.runWhenSet(mockListener1);

// Should not have been called yet.
verify(mockListener1, never()).notify(FULFILLED_VALUE);

promise.set(FULFILLED_VALUE);

// Now it should be called.
verify(mockListener1, times(1)).notify(FULFILLED_VALUE);
}

@Test
public void runWhenSet_afterSet_listenerIsNotifiedImmediately() {
promise.set(FULFILLED_VALUE);
promise.runWhenSet(mockListener1);

// Should have been called immediately.
verify(mockListener1, times(1)).notify(FULFILLED_VALUE);
}

@Test
public void multipleListeners_addedBeforeSet_allNotifiedInOrder() {
promise.runWhenSet(mockListener1);
promise.runWhenSet(mockListener2);

promise.set(FULFILLED_VALUE);

InOrder inOrder = inOrder(mockListener1, mockListener2);
inOrder.verify(mockListener1).notify(FULFILLED_VALUE);
inOrder.verify(mockListener2).notify(FULFILLED_VALUE);
}

@Test
public void listenerThrows_duringSet_propagatesException() {
// A listener that will throw when notified.
Listener<String> throwingListener =
(value) -> {
throw new UnsupportedOperationException("Listener failed");
};

promise.runWhenSet(throwingListener);

// Fulfilling the promise should now throw the exception from the listener.
UnsupportedOperationException e =
assertThrows(UnsupportedOperationException.class, () -> promise.set(FULFILLED_VALUE));
assertThat(e).hasMessageThat().isEqualTo("Listener failed");
}

@Test
public void listenerThrows_whenAddedAfterSet_propagatesException() {
promise.set(FULFILLED_VALUE);

// A listener that will throw when notified.
Listener<String> throwingListener =
(value) -> {
throw new UnsupportedOperationException("Listener failed");
};

// Running the listener should throw immediately because the promise is already fulfilled.
UnsupportedOperationException e =
assertThrows(
UnsupportedOperationException.class, () -> promise.runWhenSet(throwingListener));
assertThat(e).hasMessageThat().isEqualTo("Listener failed");
}
}