Skip to content

Commit

Permalink
Dispatch transact() calls on an Executor when FLAG_ONEWAY would not b…
Browse files Browse the repository at this point in the history
…e respected.

Fixes #8914
  • Loading branch information
jdcormie committed Mar 15, 2022
1 parent 86b74d9 commit c33efa3
Show file tree
Hide file tree
Showing 6 changed files with 453 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ public void socketStats() throws Exception {}
@Override
public void flowControlPushBack() throws Exception {}

@Test
@Ignore("Not yet implemented. See https://github.com/grpc/grpc-java/issues/8931")
@Override
public void serverNotListening() throws Exception {}

@Test
@Ignore("This test isn't appropriate for BinderTransport.")
@Override
Expand Down
90 changes: 35 additions & 55 deletions binder/src/main/java/io/grpc/binder/internal/BinderTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ protected enum TransportState {
@Nullable
protected Status shutdownStatus;

@Nullable private IBinder outgoingBinder;
@Nullable private OneWayBinderProxy outgoingBinder;

private final FlowController flowController;

Expand Down Expand Up @@ -278,10 +278,10 @@ final void setState(TransportState newState) {
}

@GuardedBy("this")
protected boolean setOutgoingBinder(IBinder binder) {
protected boolean setOutgoingBinder(OneWayBinderProxy binder) {
this.outgoingBinder = binder;
try {
binder.linkToDeath(this, 0);
binder.getDelegate().linkToDeath(this, 0);
return true;
} catch (RemoteException re) {
return false;
Expand Down Expand Up @@ -326,39 +326,30 @@ final void sendSetupTransaction() {
}

@GuardedBy("this")
final void sendSetupTransaction(IBinder iBinder) {
Parcel parcel = Parcel.obtain();
try {
parcel.writeInt(WIRE_FORMAT_VERSION);
parcel.writeStrongBinder(incomingBinder);
if (!iBinder.transact(SETUP_TRANSPORT, parcel, null, IBinder.FLAG_ONEWAY)) {
shutdownInternal(
Status.UNAVAILABLE.withDescription("Failed sending SETUP_TRANSPORT transaction"), true);
}
final void sendSetupTransaction(OneWayBinderProxy iBinder) {
try (ParcelHolder parcel = ParcelHolder.obtain()) {
parcel.get().writeInt(WIRE_FORMAT_VERSION);
parcel.get().writeStrongBinder(incomingBinder);
iBinder.transact(SETUP_TRANSPORT, parcel);
} catch (RemoteException re) {
shutdownInternal(statusFromRemoteException(re), true);
} finally {
parcel.recycle();
}
}

@GuardedBy("this")
private final void sendShutdownTransaction() {
if (outgoingBinder != null) {
try {
outgoingBinder.unlinkToDeath(this, 0);
outgoingBinder.getDelegate().unlinkToDeath(this, 0);
} catch (NoSuchElementException e) {
// Ignore.
}
Parcel parcel = Parcel.obtain();
try {
try (ParcelHolder parcel = ParcelHolder.obtain()) {
// Send empty flags to avoid a memory leak linked to empty parcels (b/207778694).
parcel.writeInt(0);
outgoingBinder.transact(SHUTDOWN_TRANSPORT, parcel, null, IBinder.FLAG_ONEWAY);
parcel.get().writeInt(0);
outgoingBinder.transact(SHUTDOWN_TRANSPORT, parcel);
} catch (RemoteException re) {
// Ignore.
} finally {
parcel.recycle();
}
}
}
Expand All @@ -369,14 +360,11 @@ protected synchronized void sendPing(int id) throws StatusException {
} else if (outgoingBinder == null) {
throw Status.FAILED_PRECONDITION.withDescription("Transport not ready.").asException();
} else {
Parcel parcel = Parcel.obtain();
try {
parcel.writeInt(id);
outgoingBinder.transact(PING, parcel, null, IBinder.FLAG_ONEWAY);
try (ParcelHolder parcel = ParcelHolder.obtain()) {
parcel.get().writeInt(id);
outgoingBinder.transact(PING, parcel);
} catch (RemoteException re) {
throw statusFromRemoteException(re).asException();
} finally {
parcel.recycle();
}
}
}
Expand All @@ -401,12 +389,10 @@ final void unregisterCall(int callId) {
}
}

final void sendTransaction(int callId, Parcel parcel) throws StatusException {
int dataSize = parcel.dataSize();
final void sendTransaction(int callId, ParcelHolder parcel) throws StatusException {
int dataSize = parcel.get().dataSize();
try {
if (!outgoingBinder.transact(callId, parcel, null, IBinder.FLAG_ONEWAY)) {
throw Status.UNAVAILABLE.withDescription("Failed sending transaction").asException();
}
outgoingBinder.transact(callId, parcel);
} catch (RemoteException re) {
throw statusFromRemoteException(re).asException();
}
Expand All @@ -416,16 +402,13 @@ final void sendTransaction(int callId, Parcel parcel) throws StatusException {
}

final void sendOutOfBandClose(int callId, Status status) {
Parcel parcel = Parcel.obtain();
try {
parcel.writeInt(0); // Placeholder for flags. Will be filled in below.
int flags = TransactionUtils.writeStatus(parcel, status);
TransactionUtils.fillInFlags(parcel, flags | TransactionUtils.FLAG_OUT_OF_BAND_CLOSE);
try (ParcelHolder parcel = ParcelHolder.obtain()) {
parcel.get().writeInt(0); // Placeholder for flags. Will be filled in below.
int flags = TransactionUtils.writeStatus(parcel.get(), status);
TransactionUtils.fillInFlags(parcel.get(), flags | TransactionUtils.FLAG_OUT_OF_BAND_CLOSE);
sendTransaction(callId, parcel);
} catch (StatusException e) {
logger.log(Level.WARNING, "Failed sending oob close transaction", e);
} finally {
parcel.recycle();
}
}

Expand Down Expand Up @@ -496,10 +479,12 @@ protected Inbound<?> createInbound(int callId) {
protected void handleSetupTransport(Parcel parcel) {}

@GuardedBy("this")
private final void handlePing(Parcel parcel) {
private final void handlePing(Parcel requestParcel) {
int id = requestParcel.readInt();
if (transportState == TransportState.READY) {
try {
outgoingBinder.transact(PING_RESPONSE, parcel, null, IBinder.FLAG_ONEWAY);
try (ParcelHolder replyParcel = ParcelHolder.obtain()) {
replyParcel.get().writeInt(id);
outgoingBinder.transact(PING_RESPONSE, replyParcel);
} catch (RemoteException re) {
// Ignore.
}
Expand All @@ -510,21 +495,15 @@ private final void handlePing(Parcel parcel) {
protected void handlePingResponse(Parcel parcel) {}

@GuardedBy("this")
private void sendAcknowledgeBytes(IBinder iBinder) {
private void sendAcknowledgeBytes(OneWayBinderProxy iBinder) {
// Send a transaction to acknowledge reception of incoming data.
long n = numIncomingBytes.get();
acknowledgedIncomingBytes = n;
Parcel parcel = Parcel.obtain();
try {
parcel.writeLong(n);
if (!iBinder.transact(ACKNOWLEDGE_BYTES, parcel, null, IBinder.FLAG_ONEWAY)) {
shutdownInternal(
Status.UNAVAILABLE.withDescription("Failed sending ack bytes transaction"), true);
}
try (ParcelHolder parcel = ParcelHolder.obtain()) {
parcel.get().writeLong(n);
iBinder.transact(ACKNOWLEDGE_BYTES, parcel);
} catch (RemoteException re) {
shutdownInternal(statusFromRemoteException(re), true);
} finally {
parcel.recycle();
}
}

Expand Down Expand Up @@ -607,7 +586,7 @@ void releaseExecutors() {

@Override
public synchronized void onBound(IBinder binder) {
sendSetupTransaction(binder);
sendSetupTransaction(OneWayBinderProxy.wrap(binder, offloadExecutor));
}

@Override
Expand Down Expand Up @@ -748,7 +727,7 @@ private void checkSecurityPolicy(IBinder binder) {
if (inState(TransportState.SETUP)) {
if (!authorization.isOk()) {
shutdownInternal(authorization, true);
} else if (!setOutgoingBinder(binder)) {
} else if (!setOutgoingBinder(OneWayBinderProxy.wrap(binder, offloadExecutor))) {
shutdownInternal(
Status.UNAVAILABLE.withDescription("Failed to observe outgoing binder"), true);
} else {
Expand Down Expand Up @@ -827,7 +806,8 @@ public BinderServerTransport(
IBinder callbackBinder) {
super(executorServicePool, attributes, buildLogId(attributes));
this.streamTracerFactories = streamTracerFactories;
setOutgoingBinder(callbackBinder);
// TODO(jdcormie): Plumb in the Server's executor() and use it here instead.
setOutgoingBinder(OneWayBinderProxy.wrap(callbackBinder, getScheduledExecutorService()));
}

public synchronized void setServerTransportListener(ServerTransportListener serverTransportListener) {
Expand Down
134 changes: 134 additions & 0 deletions binder/src/main/java/io/grpc/binder/internal/OneWayBinderProxy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package io.grpc.binder.internal;

import android.os.Binder;
import android.os.IBinder;
import android.os.Parcel;
import android.os.RemoteException;
import io.grpc.internal.SerializingExecutor;
import java.util.concurrent.Executor;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* Wraps an {@link IBinder} with a safe and uniformly asynchronous transaction API.
*
* <p>The android.os.Binder implementation of {@link IBinder} is problematic for clients that want
* "oneway" transaction semantics because it implements transact() by invoking onTransact() on the
* caller's thread, even when the {@link IBinder#FLAG_ONEWAY} flag is set. Even though this behavior
* is documented, it's surprising and dangerous. Wrap your {@link IBinder}s with an instance of this
* class to ensure the following out-of-process "oneway" semantics are always in effect:
*
* <ul>
* <li>transact() merely enqueues the transaction for processing. It doesn't wait for onTransact()
* to complete.
* <li>transact() may fail for programming errors or transport-layer errors that are immediately
* obvious on the caller's side, but never for an Exception or false return value from
* onTransact().
* <li>onTransact() runs without holding any of the locks held by the thread calling transact().
* <li>onTransact() calls are dispatched one at a time in the same happens-before order as the
* corresponding calls to transact().
* </ul>
*
* <p>NB: One difference that this class can't conceal is that calls to onTransact() are serialized
* per {@link OneWayBinderProxy} instance, not per instance of the wrapped {@link IBinder}. An
* android.os.Binder with in-process callers could still receive concurrent calls to onTransact() on
* different threads if callers used different {@link OneWayBinderProxy} instances or if that Binder
* also had out-of-process callers.
*/
public abstract class OneWayBinderProxy {
private static final Logger logger = Logger.getLogger(OneWayBinderProxy.class.getName());
protected final IBinder delegate;

private OneWayBinderProxy(IBinder iBinder) {
this.delegate = iBinder;
}

/**
* Returns a new instance of {@link OneWayBinderProxy} that wraps {@code iBinder}.
*
* @param iBinder the binder to wrap
* @param executor a non-direct Executor used to dispatch calls to onTransact(), if necessary
* @return a new instance of {@link OneWayBinderProxy}
*/
public static OneWayBinderProxy wrap(IBinder iBinder, Executor executor) {
return (iBinder instanceof Binder)
? new InProcessImpl(iBinder, executor)
: new OutOfProcessImpl(iBinder);
}

/**
* Enqueues a transaction for the wrapped {@link IBinder} with guaranteed "oneway" semantics.
*
* <p>NB: Unlike {@link IBinder#transact}, implementations of this method take ownership of the
* {@code data} Parcel. When this method returns, {@code data} will normally be empty, but callers
* should still unconditionally {@link ParcelHolder#close()} it to avoid a leak in case they or
* the implementation throws before ownership is transferred.
*
* @param code identifies the type of this transaction
* @param data a non-empty container of the Parcel to be sent
* @throws RemoteException if the transaction could not even be queued for dispatch on the server.
* Failures from {@link Binder#onTransact} are *never* reported this way.
*/
public abstract void transact(int code, ParcelHolder data) throws RemoteException;

/**
* Returns the wrapped {@link IBinder} for the purpose of calling methods other than {@link
* IBinder#transact(int, Parcel, Parcel, int)}.
*/
public IBinder getDelegate() {
return delegate;
}

static class OutOfProcessImpl extends OneWayBinderProxy {
OutOfProcessImpl(IBinder iBinder) {
super(iBinder);
}

@Override
public void transact(int code, ParcelHolder data) throws RemoteException {
if (!transactAndRecycleParcel(code, data.release())) {
// This cannot happen (see g/android-binder/c/jM4NvS234Rw) but, just in case, let the caller
// handle it along with all the other possible transport-layer errors.
throw new RemoteException("BinderProxy#transact(" + code + ", FLAG_ONEWAY) returned false");
}
}
}

protected boolean transactAndRecycleParcel(int code, Parcel data) throws RemoteException {
try {
return delegate.transact(code, data, null, IBinder.FLAG_ONEWAY);
} finally {
data.recycle();
}
}

static class InProcessImpl extends OneWayBinderProxy {
private final SerializingExecutor executor;

InProcessImpl(IBinder binder, Executor executor) {
super(binder);
this.executor = new SerializingExecutor(executor);
}

@Override
public void transact(int code, ParcelHolder wrappedParcel) {
// Transfer ownership, taking care to handle any RuntimeException from execute().
Parcel parcel = wrappedParcel.get();
executor.execute(
() -> {
try {
if (!transactAndRecycleParcel(code, parcel)) {
// onTransact() in our same process returned this. Ignore it, just like Android
// would have if the android.os.Binder was in another process.
logger.log(Level.FINEST, "A oneway transaction was not understood - ignoring");
}
} catch (Exception e) {
// onTransact() in our same process threw this. Ignore it, just like Android would
// have if the android.os.Binder was in another process.
logger.log(Level.FINEST, "A oneway transaction threw - ignoring", e);
}
});
wrappedParcel.release();
}
}
}
24 changes: 11 additions & 13 deletions binder/src/main/java/io/grpc/binder/internal/Outbound.java
Original file line number Diff line number Diff line change
Expand Up @@ -221,15 +221,14 @@ final void send() throws StatusException {
@GuardedBy("this")
@SuppressWarnings("fallthrough")
protected final void sendInternal() throws StatusException {
Parcel parcel = Parcel.obtain();
int flags = 0;
parcel.writeInt(0); // Placeholder for flags. Will be filled in below.
parcel.writeInt(transactionIndex++);
try {
try (ParcelHolder parcel = ParcelHolder.obtain()) {
int flags = 0;
parcel.get().writeInt(0); // Placeholder for flags. Will be filled in below.
parcel.get().writeInt(transactionIndex++);
switch (outboundState) {
case INITIAL:
flags |= TransactionUtils.FLAG_PREFIX;
flags |= writePrefix(parcel);
flags |= writePrefix(parcel.get());
onOutboundState(State.PREFIX_SENT);
if (!messageAvailable() && !suffixReady) {
break;
Expand All @@ -239,7 +238,7 @@ protected final void sendInternal() throws StatusException {
InputStream messageStream = peekNextMessage();
if (messageStream != null) {
flags |= TransactionUtils.FLAG_MESSAGE_DATA;
flags |= writeMessageData(parcel, messageStream);
flags |= writeMessageData(parcel.get(), messageStream);
} else {
checkState(suffixReady);
}
Expand All @@ -252,20 +251,19 @@ protected final void sendInternal() throws StatusException {
// Fall-through.
case ALL_MESSAGES_SENT:
flags |= TransactionUtils.FLAG_SUFFIX;
flags |= writeSuffix(parcel);
flags |= writeSuffix(parcel.get());
onOutboundState(State.SUFFIX_SENT);
break;
default:
throw new AssertionError();
}
TransactionUtils.fillInFlags(parcel, flags);
TransactionUtils.fillInFlags(parcel.get(), flags);
int dataSize = parcel.get().dataSize();
transport.sendTransaction(callId, parcel);
statsTraceContext.outboundWireSize(parcel.dataSize());
statsTraceContext.outboundUncompressedSize(parcel.dataSize());
statsTraceContext.outboundWireSize(dataSize);
statsTraceContext.outboundUncompressedSize(dataSize);
} catch (IOException e) {
throw Status.INTERNAL.withCause(e).asException();
} finally {
parcel.recycle();
}
}

Expand Down
Loading

0 comments on commit c33efa3

Please sign in to comment.