diff --git a/binder/src/androidTest/java/io/grpc/binder/internal/BinderClientTransportTest.java b/binder/src/androidTest/java/io/grpc/binder/internal/BinderClientTransportTest.java index fe2fd587453..0038a054854 100644 --- a/binder/src/androidTest/java/io/grpc/binder/internal/BinderClientTransportTest.java +++ b/binder/src/androidTest/java/io/grpc/binder/internal/BinderClientTransportTest.java @@ -100,7 +100,7 @@ public final class BinderClientTransportTest { .build(); AndroidComponentAddress serverAddress; - BinderTransport.BinderClientTransport transport; + BinderClientTransport transport; BlockingSecurityPolicy blockingSecurityPolicy = new BlockingSecurityPolicy(); private final ObjectPool executorServicePool = @@ -178,7 +178,7 @@ public BinderClientTransportBuilder setPreAuthorizeServer(boolean preAuthorizeSe return this; } - public BinderTransport.BinderClientTransport build() { + public BinderClientTransport build() { return factoryBuilder .buildClientTransportFactory() .newClientTransport(serverAddress, new ClientTransportOptions(), null); @@ -502,8 +502,7 @@ public void testAsyncSecurityPolicyCancelledUponExternalTermination() throws Exc } private static void startAndAwaitReady( - BinderTransport.BinderClientTransport transport, TestTransportListener transportListener) - throws Exception { + BinderClientTransport transport, TestTransportListener transportListener) throws Exception { transport.start(transportListener).run(); transportListener.awaitReady(); } diff --git a/binder/src/androidTest/java/io/grpc/binder/internal/BinderTransportTest.java b/binder/src/androidTest/java/io/grpc/binder/internal/BinderTransportTest.java index fc9a383d572..7932cabde89 100644 --- a/binder/src/androidTest/java/io/grpc/binder/internal/BinderTransportTest.java +++ b/binder/src/androidTest/java/io/grpc/binder/internal/BinderTransportTest.java @@ -106,8 +106,7 @@ protected ManagedClientTransport newClientTransport(InternalServer server) { options.setEagAttributes(eagAttrs()); options.setChannelLogger(transportLogger()); - return new BinderTransport.BinderClientTransport( - builder.buildClientTransportFactory(), addr, options); + return new BinderClientTransport(builder.buildClientTransportFactory(), addr, options); } @Test diff --git a/binder/src/main/java/io/grpc/binder/internal/ActiveTransportTracker.java b/binder/src/main/java/io/grpc/binder/internal/ActiveTransportTracker.java index 2bfa9fea4cb..01505bfd509 100644 --- a/binder/src/main/java/io/grpc/binder/internal/ActiveTransportTracker.java +++ b/binder/src/main/java/io/grpc/binder/internal/ActiveTransportTracker.java @@ -11,8 +11,8 @@ import io.grpc.internal.ServerTransportListener; /** - * Tracks which {@link BinderTransport.BinderServerTransport} are currently active and allows - * invoking a {@link Runnable} only once all transports are terminated. + * Tracks which {@link BinderServerTransport} are currently active and allows invoking a {@link + * Runnable} only once all transports are terminated. */ final class ActiveTransportTracker implements ServerListener { private final ServerListener delegate; diff --git a/binder/src/main/java/io/grpc/binder/internal/BinderClientTransport.java b/binder/src/main/java/io/grpc/binder/internal/BinderClientTransport.java new file mode 100644 index 00000000000..95bd531aa41 --- /dev/null +++ b/binder/src/main/java/io/grpc/binder/internal/BinderClientTransport.java @@ -0,0 +1,441 @@ +/* + * Copyright 2020 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.grpc.binder.internal; + +import static com.google.common.base.Preconditions.checkNotNull; +import static io.grpc.binder.ApiConstants.PRE_AUTH_SERVER_OVERRIDE; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import android.content.Context; +import android.content.pm.ServiceInfo; +import android.os.Binder; +import android.os.IBinder; +import android.os.Parcel; +import android.os.Process; +import com.google.common.base.Ticker; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.errorprone.annotations.CheckReturnValue; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import io.grpc.Attributes; +import io.grpc.CallOptions; +import io.grpc.ClientStreamTracer; +import io.grpc.Grpc; +import io.grpc.Internal; +import io.grpc.InternalLogId; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.SecurityLevel; +import io.grpc.Status; +import io.grpc.StatusException; +import io.grpc.binder.AndroidComponentAddress; +import io.grpc.binder.AsyncSecurityPolicy; +import io.grpc.binder.InboundParcelablePolicy; +import io.grpc.binder.SecurityPolicy; +import io.grpc.internal.ClientStream; +import io.grpc.internal.ClientTransportFactory.ClientTransportOptions; +import io.grpc.internal.ConnectionClientTransport; +import io.grpc.internal.FailingClientStream; +import io.grpc.internal.GrpcAttributes; +import io.grpc.internal.GrpcUtil; +import io.grpc.internal.ManagedClientTransport; +import io.grpc.internal.ObjectPool; +import io.grpc.internal.StatsTraceContext; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; + +/** Concrete client-side transport implementation. */ +@ThreadSafe +@Internal +public final class BinderClientTransport extends BinderTransport + implements ConnectionClientTransport, Bindable.Observer { + + private final ObjectPool offloadExecutorPool; + private final Executor offloadExecutor; + private final SecurityPolicy securityPolicy; + private final Bindable serviceBinding; + + /** Number of ongoing calls which keep this transport "in-use". */ + private final AtomicInteger numInUseStreams; + + private final long readyTimeoutMillis; + private final PingTracker pingTracker; + private final boolean preAuthorizeServer; + + @Nullable private ManagedClientTransport.Listener clientTransportListener; + + @GuardedBy("this") + private int latestCallId = FIRST_CALL_ID; + + @GuardedBy("this") + private ScheduledFuture readyTimeoutFuture; // != null iff timeout scheduled. + + @GuardedBy("this") + @Nullable + private ListenableFuture authResultFuture; // null before we check auth. + + @GuardedBy("this") + @Nullable + private ListenableFuture preAuthResultFuture; // null before we pre-auth. + + /** + * Constructs a new transport instance. + * + * @param factory parameters common to all a Channel's transports + * @param targetAddress the fully resolved and load-balanced server address + * @param options other parameters that can vary as transports come and go within a Channel + */ + public BinderClientTransport( + BinderClientTransportFactory factory, + AndroidComponentAddress targetAddress, + ClientTransportOptions options) { + super( + factory.scheduledExecutorPool, + buildClientAttributes( + options.getEagAttributes(), + factory.sourceContext, + targetAddress, + factory.inboundParcelablePolicy), + factory.binderDecorator, + buildLogId(factory.sourceContext, targetAddress)); + this.offloadExecutorPool = factory.offloadExecutorPool; + this.securityPolicy = factory.securityPolicy; + this.offloadExecutor = offloadExecutorPool.getObject(); + this.readyTimeoutMillis = factory.readyTimeoutMillis; + Boolean preAuthServerOverride = options.getEagAttributes().get(PRE_AUTH_SERVER_OVERRIDE); + this.preAuthorizeServer = + preAuthServerOverride != null ? preAuthServerOverride : factory.preAuthorizeServers; + numInUseStreams = new AtomicInteger(); + pingTracker = new PingTracker(Ticker.systemTicker(), (id) -> sendPing(id)); + + serviceBinding = + new ServiceBinding( + factory.mainThreadExecutor, + factory.sourceContext, + factory.channelCredentials, + targetAddress.asBindIntent(), + targetAddress.getTargetUser() != null + ? targetAddress.getTargetUser() + : factory.defaultTargetUserHandle, + factory.bindServiceFlags.toInteger(), + this); + } + + @Override + void releaseExecutors() { + super.releaseExecutors(); + offloadExecutorPool.returnObject(offloadExecutor); + } + + @Override + public synchronized void onBound(IBinder binder) { + sendSetupTransaction(binderDecorator.decorate(OneWayBinderProxy.wrap(binder, offloadExecutor))); + } + + @Override + public synchronized void onUnbound(Status reason) { + shutdownInternal(reason, true); + } + + @CheckReturnValue + @Override + public synchronized Runnable start(Listener clientTransportListener) { + this.clientTransportListener = checkNotNull(clientTransportListener); + return () -> { + synchronized (BinderClientTransport.this) { + if (inState(TransportState.NOT_STARTED)) { + setState(TransportState.SETUP); + try { + if (preAuthorizeServer) { + preAuthorize(serviceBinding.resolve()); + } else { + serviceBinding.bind(); + } + } catch (StatusException e) { + shutdownInternal(e.getStatus(), true); + return; + } + if (readyTimeoutMillis >= 0) { + readyTimeoutFuture = + getScheduledExecutorService() + .schedule( + BinderClientTransport.this::onReadyTimeout, + readyTimeoutMillis, + MILLISECONDS); + } + } + } + }; + } + + @GuardedBy("this") + private void preAuthorize(ServiceInfo serviceInfo) { + // It's unlikely, but the identity/existence of this Service could change by the time we + // actually connect. It doesn't matter though, because: + // - If pre-auth fails (but would succeed against the server's new state), the grpc-core layer + // will eventually retry using a new transport instance that will see the Service's new state. + // - If pre-auth succeeds (but would fail against the server's new state), we might give an + // unauthorized server a chance to run, but the connection will still fail by SecurityPolicy + // check later in handshake. Pre-auth remains effective at mitigating abuse because malware + // can't typically control the exact timing of its installation. + preAuthResultFuture = checkServerAuthorizationAsync(serviceInfo.applicationInfo.uid); + Futures.addCallback( + preAuthResultFuture, + new FutureCallback() { + @Override + public void onSuccess(Status result) { + handlePreAuthResult(result); + } + + @Override + public void onFailure(Throwable t) { + handleAuthResult(t); + } + }, + offloadExecutor); + } + + private synchronized void handlePreAuthResult(Status authorization) { + if (inState(TransportState.SETUP)) { + if (!authorization.isOk()) { + shutdownInternal(authorization, true); + } else { + serviceBinding.bind(); + } + } + } + + private synchronized void onReadyTimeout() { + if (inState(TransportState.SETUP)) { + readyTimeoutFuture = null; + shutdownInternal( + Status.DEADLINE_EXCEEDED.withDescription( + "Connect timeout " + readyTimeoutMillis + "ms lapsed"), + true); + } + } + + @Override + public synchronized ClientStream newStream( + final MethodDescriptor method, + final Metadata headers, + final CallOptions callOptions, + ClientStreamTracer[] tracers) { + if (!inState(TransportState.READY)) { + return newFailingClientStream( + isShutdown() + ? shutdownStatus + : Status.INTERNAL.withDescription("newStream() before transportReady()"), + attributes, + headers, + tracers); + } + + int callId = latestCallId++; + if (latestCallId == LAST_CALL_ID) { + latestCallId = FIRST_CALL_ID; + } + StatsTraceContext statsTraceContext = + StatsTraceContext.newClientContext(tracers, attributes, headers); + Inbound.ClientInbound inbound = + new Inbound.ClientInbound( + this, attributes, callId, GrpcUtil.shouldBeCountedForInUse(callOptions)); + if (ongoingCalls.putIfAbsent(callId, inbound) != null) { + Status failure = Status.INTERNAL.withDescription("Clashing call IDs"); + shutdownInternal(failure, true); + return newFailingClientStream(failure, attributes, headers, tracers); + } else { + if (inbound.countsForInUse() && numInUseStreams.getAndIncrement() == 0) { + clientTransportListener.transportInUse(true); + } + Outbound.ClientOutbound outbound = + new Outbound.ClientOutbound(this, callId, method, headers, statsTraceContext); + if (method.getType().clientSendsOneMessage()) { + return new SingleMessageClientStream(inbound, outbound, attributes); + } else { + return new MultiMessageClientStream(inbound, outbound, attributes); + } + } + } + + @Override + protected void unregisterInbound(Inbound inbound) { + if (inbound.countsForInUse() && numInUseStreams.decrementAndGet() == 0) { + clientTransportListener.transportInUse(false); + } + super.unregisterInbound(inbound); + } + + @Override + public void ping(final PingCallback callback, Executor executor) { + pingTracker.startPing(callback, executor); + } + + @Override + public synchronized void shutdown(Status reason) { + checkNotNull(reason, "reason"); + shutdownInternal(reason, false); + } + + @Override + public synchronized void shutdownNow(Status reason) { + checkNotNull(reason, "reason"); + shutdownInternal(reason, true); + } + + @Override + @GuardedBy("this") + void notifyShutdown(Status status) { + clientTransportListener.transportShutdown(status); + } + + @Override + @GuardedBy("this") + void notifyTerminated() { + if (numInUseStreams.getAndSet(0) > 0) { + clientTransportListener.transportInUse(false); + } + if (readyTimeoutFuture != null) { + readyTimeoutFuture.cancel(false); + readyTimeoutFuture = null; + } + if (preAuthResultFuture != null) { + preAuthResultFuture.cancel(false); // No effect if already complete. + } + if (authResultFuture != null) { + authResultFuture.cancel(false); // No effect if already complete. + } + serviceBinding.unbind(); + clientTransportListener.transportTerminated(); + } + + @Override + @GuardedBy("this") + protected void handleSetupTransport(Parcel parcel) { + int remoteUid = Binder.getCallingUid(); + attributes = setSecurityAttrs(attributes, remoteUid); + if (inState(TransportState.SETUP)) { + int version = parcel.readInt(); + IBinder binder = parcel.readStrongBinder(); + if (version != WIRE_FORMAT_VERSION) { + shutdownInternal(Status.UNAVAILABLE.withDescription("Wire format version mismatch"), true); + } else if (binder == null) { + shutdownInternal( + Status.UNAVAILABLE.withDescription("Malformed SETUP_TRANSPORT data"), true); + } else { + authResultFuture = checkServerAuthorizationAsync(remoteUid); + Futures.addCallback( + authResultFuture, + new FutureCallback() { + @Override + public void onSuccess(Status result) { + handleAuthResult(binder, result); + } + + @Override + public void onFailure(Throwable t) { + handleAuthResult(t); + } + }, + offloadExecutor); + } + } + } + + private ListenableFuture checkServerAuthorizationAsync(int remoteUid) { + return (securityPolicy instanceof AsyncSecurityPolicy) + ? ((AsyncSecurityPolicy) securityPolicy).checkAuthorizationAsync(remoteUid) + : Futures.submit(() -> securityPolicy.checkAuthorization(remoteUid), offloadExecutor); + } + + private synchronized void handleAuthResult(IBinder binder, Status authorization) { + if (inState(TransportState.SETUP)) { + if (!authorization.isOk()) { + shutdownInternal(authorization, true); + } else if (!setOutgoingBinder(OneWayBinderProxy.wrap(binder, offloadExecutor))) { + shutdownInternal( + Status.UNAVAILABLE.withDescription("Failed to observe outgoing binder"), true); + } else { + // Check state again, since a failure inside setOutgoingBinder (or a callback it + // triggers), could have shut us down. + if (!isShutdown()) { + setState(TransportState.READY); + attributes = clientTransportListener.filterTransport(attributes); + clientTransportListener.transportReady(); + if (readyTimeoutFuture != null) { + readyTimeoutFuture.cancel(false); + readyTimeoutFuture = null; + } + } + } + } + } + + private synchronized void handleAuthResult(Throwable t) { + shutdownInternal( + Status.INTERNAL.withDescription("Could not evaluate SecurityPolicy").withCause(t), true); + } + + @GuardedBy("this") + @Override + protected void handlePingResponse(Parcel parcel) { + pingTracker.onPingResponse(parcel.readInt()); + } + + private static ClientStream newFailingClientStream( + Status failure, Attributes attributes, Metadata headers, ClientStreamTracer[] tracers) { + StatsTraceContext statsTraceContext = + StatsTraceContext.newClientContext(tracers, attributes, headers); + statsTraceContext.clientOutboundHeaders(); + return new FailingClientStream(failure, tracers); + } + + private static InternalLogId buildLogId( + Context sourceContext, AndroidComponentAddress targetAddress) { + return InternalLogId.allocate( + BinderClientTransport.class, + sourceContext.getClass().getSimpleName() + "->" + targetAddress); + } + + private static Attributes buildClientAttributes( + Attributes eagAttrs, + Context sourceContext, + AndroidComponentAddress targetAddress, + InboundParcelablePolicy inboundParcelablePolicy) { + return Attributes.newBuilder() + .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.NONE) // Trust noone for now. + .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttrs) + .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, AndroidComponentAddress.forContext(sourceContext)) + .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, targetAddress) + .set(INBOUND_PARCELABLE_POLICY, inboundParcelablePolicy) + .build(); + } + + private static Attributes setSecurityAttrs(Attributes attributes, int uid) { + return attributes.toBuilder() + .set(REMOTE_UID, uid) + .set( + GrpcAttributes.ATTR_SECURITY_LEVEL, + uid == Process.myUid() + ? SecurityLevel.PRIVACY_AND_INTEGRITY + : SecurityLevel.INTEGRITY) // TODO: Have the SecrityPolicy decide this. + .build(); + } +} diff --git a/binder/src/main/java/io/grpc/binder/internal/BinderClientTransportFactory.java b/binder/src/main/java/io/grpc/binder/internal/BinderClientTransportFactory.java index 85a0ddd35b7..3f51452c90c 100644 --- a/binder/src/main/java/io/grpc/binder/internal/BinderClientTransportFactory.java +++ b/binder/src/main/java/io/grpc/binder/internal/BinderClientTransportFactory.java @@ -83,12 +83,12 @@ private BinderClientTransportFactory(Builder builder) { } @Override - public BinderTransport.BinderClientTransport newClientTransport( + public BinderClientTransport newClientTransport( SocketAddress addr, ClientTransportOptions options, ChannelLogger channelLogger) { if (closed) { throw new IllegalStateException("The transport factory is closed."); } - return new BinderTransport.BinderClientTransport(this, (AndroidComponentAddress) addr, options); + return new BinderClientTransport(this, (AndroidComponentAddress) addr, options); } @Override diff --git a/binder/src/main/java/io/grpc/binder/internal/BinderServer.java b/binder/src/main/java/io/grpc/binder/internal/BinderServer.java index 6b8347390b9..fca8e3d88e1 100644 --- a/binder/src/main/java/io/grpc/binder/internal/BinderServer.java +++ b/binder/src/main/java/io/grpc/binder/internal/BinderServer.java @@ -178,8 +178,8 @@ public synchronized boolean handleTransaction(int code, Parcel parcel) { serverPolicyChecker, checkNotNull(executor, "Not started?")); // Create a new transport and let our listener know about it. - BinderTransport.BinderServerTransport transport = - new BinderTransport.BinderServerTransport( + BinderServerTransport transport = + new BinderServerTransport( executorServicePool, attrsBuilder.build(), streamTracerFactories, diff --git a/binder/src/main/java/io/grpc/binder/internal/BinderServerTransport.java b/binder/src/main/java/io/grpc/binder/internal/BinderServerTransport.java new file mode 100644 index 00000000000..1c345249735 --- /dev/null +++ b/binder/src/main/java/io/grpc/binder/internal/BinderServerTransport.java @@ -0,0 +1,126 @@ +/* + * Copyright 2020 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.grpc.binder.internal; + +import android.os.IBinder; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import io.grpc.Attributes; +import io.grpc.Grpc; +import io.grpc.Internal; +import io.grpc.InternalLogId; +import io.grpc.Metadata; +import io.grpc.ServerStreamTracer; +import io.grpc.Status; +import io.grpc.internal.ObjectPool; +import io.grpc.internal.ServerStream; +import io.grpc.internal.ServerTransport; +import io.grpc.internal.ServerTransportListener; +import io.grpc.internal.StatsTraceContext; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import javax.annotation.Nullable; + +/** Concrete server-side transport implementation. */ +@Internal +public final class BinderServerTransport extends BinderTransport implements ServerTransport { + + private final List streamTracerFactories; + @Nullable private ServerTransportListener serverTransportListener; + + /** + * Constructs a new transport instance. + * + * @param binderDecorator used to decorate 'callbackBinder', for fault injection. + */ + public BinderServerTransport( + ObjectPool executorServicePool, + Attributes attributes, + List streamTracerFactories, + OneWayBinderProxy.Decorator binderDecorator, + IBinder callbackBinder) { + super(executorServicePool, attributes, binderDecorator, buildLogId(attributes)); + this.streamTracerFactories = streamTracerFactories; + // TODO(jdcormie): Plumb in the Server's executor() and use it here instead. + setOutgoingBinder(OneWayBinderProxy.wrap(callbackBinder, getScheduledExecutorService())); + } + + public synchronized void setServerTransportListener( + ServerTransportListener serverTransportListener) { + this.serverTransportListener = serverTransportListener; + if (isShutdown()) { + setState(TransportState.SHUTDOWN_TERMINATED); + notifyTerminated(); + releaseExecutors(); + } else { + sendSetupTransaction(); + // Check we're not shutdown again, since a failure inside sendSetupTransaction (or a callback + // it triggers), could have shut us down. + if (!isShutdown()) { + setState(TransportState.READY); + attributes = serverTransportListener.transportReady(attributes); + } + } + } + + StatsTraceContext createStatsTraceContext(String methodName, Metadata headers) { + return StatsTraceContext.newServerContext(streamTracerFactories, methodName, headers); + } + + synchronized Status startStream(ServerStream stream, String methodName, Metadata headers) { + if (isShutdown()) { + return Status.UNAVAILABLE.withDescription("transport is shutdown"); + } else { + serverTransportListener.streamCreated(stream, methodName, headers); + return Status.OK; + } + } + + @Override + @GuardedBy("this") + void notifyShutdown(Status status) { + // Nothing to do. + } + + @Override + @GuardedBy("this") + void notifyTerminated() { + if (serverTransportListener != null) { + serverTransportListener.transportTerminated(); + } + } + + @Override + public synchronized void shutdown() { + shutdownInternal(Status.OK, false); + } + + @Override + public synchronized void shutdownNow(Status reason) { + shutdownInternal(reason, true); + } + + @Override + @Nullable + @GuardedBy("this") + protected Inbound createInbound(int callId) { + return new Inbound.ServerInbound(this, attributes, callId); + } + + private static InternalLogId buildLogId(Attributes attributes) { + return InternalLogId.allocate( + BinderServerTransport.class, "from " + attributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR)); + } +} diff --git a/binder/src/main/java/io/grpc/binder/internal/BinderTransport.java b/binder/src/main/java/io/grpc/binder/internal/BinderTransport.java index d87cfb74044..0fe131a0728 100644 --- a/binder/src/main/java/io/grpc/binder/internal/BinderTransport.java +++ b/binder/src/main/java/io/grpc/binder/internal/BinderTransport.java @@ -19,67 +19,32 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static com.google.common.util.concurrent.Futures.immediateFuture; -import static io.grpc.binder.ApiConstants.PRE_AUTH_SERVER_OVERRIDE; -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import android.content.Context; -import android.content.pm.ServiceInfo; -import android.os.Binder; import android.os.DeadObjectException; import android.os.IBinder; import android.os.Parcel; -import android.os.Process; import android.os.RemoteException; import android.os.TransactionTooLargeException; import androidx.annotation.BinderThread; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Ticker; import com.google.common.base.Verify; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.errorprone.annotations.CheckReturnValue; import com.google.errorprone.annotations.concurrent.GuardedBy; import io.grpc.Attributes; -import io.grpc.CallOptions; -import io.grpc.ClientStreamTracer; -import io.grpc.Grpc; import io.grpc.Internal; import io.grpc.InternalChannelz.SocketStats; import io.grpc.InternalLogId; -import io.grpc.Metadata; -import io.grpc.MethodDescriptor; -import io.grpc.SecurityLevel; -import io.grpc.ServerStreamTracer; import io.grpc.Status; import io.grpc.StatusException; -import io.grpc.binder.AndroidComponentAddress; -import io.grpc.binder.AsyncSecurityPolicy; import io.grpc.binder.InboundParcelablePolicy; -import io.grpc.binder.SecurityPolicy; -import io.grpc.internal.ClientStream; -import io.grpc.internal.ClientTransportFactory.ClientTransportOptions; -import io.grpc.internal.ConnectionClientTransport; -import io.grpc.internal.FailingClientStream; -import io.grpc.internal.GrpcAttributes; -import io.grpc.internal.GrpcUtil; -import io.grpc.internal.ManagedClientTransport; import io.grpc.internal.ObjectPool; -import io.grpc.internal.ServerStream; -import io.grpc.internal.ServerTransport; -import io.grpc.internal.ServerTransportListener; -import io.grpc.internal.StatsTraceContext; import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedHashSet; -import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -169,10 +134,10 @@ public abstract class BinderTransport implements IBinder.DeathRecipient { private static final int RESERVED_TRANSACTIONS = 1000; /** The first call ID we can use. */ - private static final int FIRST_CALL_ID = IBinder.FIRST_CALL_TRANSACTION + RESERVED_TRANSACTIONS; + static final int FIRST_CALL_ID = IBinder.FIRST_CALL_TRANSACTION + RESERVED_TRANSACTIONS; /** The last call ID we can use. */ - private static final int LAST_CALL_ID = IBinder.LAST_CALL_TRANSACTION; + static final int LAST_CALL_ID = IBinder.LAST_CALL_TRANSACTION; /** The states of this transport. */ protected enum TransportState { @@ -218,7 +183,7 @@ protected enum TransportState { // Only read/written on @BinderThread. private long acknowledgedIncomingBytes; - private BinderTransport( + protected BinderTransport( ObjectPool executorServicePool, Attributes attributes, OneWayBinderProxy.Decorator binderDecorator, @@ -559,478 +524,6 @@ final void handleAcknowledgedBytes(long numBytes) { } } - /** Concrete client-side transport implementation. */ - @ThreadSafe - @Internal - public static final class BinderClientTransport extends BinderTransport - implements ConnectionClientTransport, Bindable.Observer { - - private final ObjectPool offloadExecutorPool; - private final Executor offloadExecutor; - private final SecurityPolicy securityPolicy; - private final Bindable serviceBinding; - - /** Number of ongoing calls which keep this transport "in-use". */ - private final AtomicInteger numInUseStreams; - - private final long readyTimeoutMillis; - private final PingTracker pingTracker; - private final boolean preAuthorizeServer; - - @Nullable private ManagedClientTransport.Listener clientTransportListener; - - @GuardedBy("this") - private int latestCallId = FIRST_CALL_ID; - - @GuardedBy("this") - private ScheduledFuture readyTimeoutFuture; // != null iff timeout scheduled. - @GuardedBy("this") - @Nullable private ListenableFuture authResultFuture; // null before we check auth. - - @GuardedBy("this") - @Nullable - private ListenableFuture preAuthResultFuture; // null before we pre-auth. - - /** - * Constructs a new transport instance. - * - * @param factory parameters common to all a Channel's transports - * @param targetAddress the fully resolved and load-balanced server address - * @param options other parameters that can vary as transports come and go within a Channel - */ - public BinderClientTransport( - BinderClientTransportFactory factory, - AndroidComponentAddress targetAddress, - ClientTransportOptions options) { - super( - factory.scheduledExecutorPool, - buildClientAttributes( - options.getEagAttributes(), - factory.sourceContext, - targetAddress, - factory.inboundParcelablePolicy), - factory.binderDecorator, - buildLogId(factory.sourceContext, targetAddress)); - this.offloadExecutorPool = factory.offloadExecutorPool; - this.securityPolicy = factory.securityPolicy; - this.offloadExecutor = offloadExecutorPool.getObject(); - this.readyTimeoutMillis = factory.readyTimeoutMillis; - Boolean preAuthServerOverride = options.getEagAttributes().get(PRE_AUTH_SERVER_OVERRIDE); - this.preAuthorizeServer = - preAuthServerOverride != null ? preAuthServerOverride : factory.preAuthorizeServers; - numInUseStreams = new AtomicInteger(); - pingTracker = new PingTracker(Ticker.systemTicker(), (id) -> sendPing(id)); - - serviceBinding = - new ServiceBinding( - factory.mainThreadExecutor, - factory.sourceContext, - factory.channelCredentials, - targetAddress.asBindIntent(), - targetAddress.getTargetUser() != null - ? targetAddress.getTargetUser() - : factory.defaultTargetUserHandle, - factory.bindServiceFlags.toInteger(), - this); - } - - @Override - void releaseExecutors() { - super.releaseExecutors(); - offloadExecutorPool.returnObject(offloadExecutor); - } - - @Override - public synchronized void onBound(IBinder binder) { - sendSetupTransaction( - binderDecorator.decorate(OneWayBinderProxy.wrap(binder, offloadExecutor))); - } - - @Override - public synchronized void onUnbound(Status reason) { - shutdownInternal(reason, true); - } - - @CheckReturnValue - @Override - public synchronized Runnable start(ManagedClientTransport.Listener clientTransportListener) { - this.clientTransportListener = checkNotNull(clientTransportListener); - return () -> { - synchronized (BinderClientTransport.this) { - if (inState(TransportState.NOT_STARTED)) { - setState(TransportState.SETUP); - try { - if (preAuthorizeServer) { - preAuthorize(serviceBinding.resolve()); - } else { - serviceBinding.bind(); - } - } catch (StatusException e) { - shutdownInternal(e.getStatus(), true); - return; - } - if (readyTimeoutMillis >= 0) { - readyTimeoutFuture = - getScheduledExecutorService() - .schedule( - BinderClientTransport.this::onReadyTimeout, - readyTimeoutMillis, - MILLISECONDS); - } - } - } - }; - } - - @GuardedBy("this") - private void preAuthorize(ServiceInfo serviceInfo) { - // It's unlikely, but the identity/existence of this Service could change by the time we - // actually connect. It doesn't matter though, because: - // - If pre-auth fails (but would succeed against the server's new state), the grpc-core layer - // will eventually retry using a new transport instance that will see the Service's new state. - // - If pre-auth succeeds (but would fail against the server's new state), we might give an - // unauthorized server a chance to run, but the connection will still fail by SecurityPolicy - // check later in handshake. Pre-auth remains effective at mitigating abuse because malware - // can't typically control the exact timing of its installation. - preAuthResultFuture = checkServerAuthorizationAsync(serviceInfo.applicationInfo.uid); - Futures.addCallback( - preAuthResultFuture, - new FutureCallback() { - @Override - public void onSuccess(Status result) { - handlePreAuthResult(result); - } - - @Override - public void onFailure(Throwable t) { - handleAuthResult(t); - } - }, - offloadExecutor); - } - - private synchronized void handlePreAuthResult(Status authorization) { - if (inState(TransportState.SETUP)) { - if (!authorization.isOk()) { - shutdownInternal(authorization, true); - } else { - serviceBinding.bind(); - } - } - } - - private synchronized void onReadyTimeout() { - if (inState(TransportState.SETUP)) { - readyTimeoutFuture = null; - shutdownInternal( - Status.DEADLINE_EXCEEDED.withDescription( - "Connect timeout " + readyTimeoutMillis + "ms lapsed"), - true); - } - } - - @Override - public synchronized ClientStream newStream( - final MethodDescriptor method, - final Metadata headers, - final CallOptions callOptions, - ClientStreamTracer[] tracers) { - if (!inState(TransportState.READY)) { - return newFailingClientStream( - isShutdown() - ? shutdownStatus - : Status.INTERNAL.withDescription("newStream() before transportReady()"), - attributes, - headers, - tracers); - } - - int callId = latestCallId++; - if (latestCallId == LAST_CALL_ID) { - latestCallId = FIRST_CALL_ID; - } - StatsTraceContext statsTraceContext = - StatsTraceContext.newClientContext(tracers, attributes, headers); - Inbound.ClientInbound inbound = - new Inbound.ClientInbound( - this, attributes, callId, GrpcUtil.shouldBeCountedForInUse(callOptions)); - if (ongoingCalls.putIfAbsent(callId, inbound) != null) { - Status failure = Status.INTERNAL.withDescription("Clashing call IDs"); - shutdownInternal(failure, true); - return newFailingClientStream(failure, attributes, headers, tracers); - } else { - if (inbound.countsForInUse() && numInUseStreams.getAndIncrement() == 0) { - clientTransportListener.transportInUse(true); - } - Outbound.ClientOutbound outbound = - new Outbound.ClientOutbound(this, callId, method, headers, statsTraceContext); - if (method.getType().clientSendsOneMessage()) { - return new SingleMessageClientStream(inbound, outbound, attributes); - } else { - return new MultiMessageClientStream(inbound, outbound, attributes); - } - } - } - - @Override - protected void unregisterInbound(Inbound inbound) { - if (inbound.countsForInUse() && numInUseStreams.decrementAndGet() == 0) { - clientTransportListener.transportInUse(false); - } - super.unregisterInbound(inbound); - } - - @Override - public void ping(final PingCallback callback, Executor executor) { - pingTracker.startPing(callback, executor); - } - - @Override - public synchronized void shutdown(Status reason) { - checkNotNull(reason, "reason"); - shutdownInternal(reason, false); - } - - @Override - public synchronized void shutdownNow(Status reason) { - checkNotNull(reason, "reason"); - shutdownInternal(reason, true); - } - - @Override - @GuardedBy("this") - void notifyShutdown(Status status) { - clientTransportListener.transportShutdown(status); - } - - @Override - @GuardedBy("this") - void notifyTerminated() { - if (numInUseStreams.getAndSet(0) > 0) { - clientTransportListener.transportInUse(false); - } - if (readyTimeoutFuture != null) { - readyTimeoutFuture.cancel(false); - readyTimeoutFuture = null; - } - if (preAuthResultFuture != null) { - preAuthResultFuture.cancel(false); // No effect if already complete. - } - if (authResultFuture != null) { - authResultFuture.cancel(false); // No effect if already complete. - } - serviceBinding.unbind(); - clientTransportListener.transportTerminated(); - } - - @Override - @GuardedBy("this") - protected void handleSetupTransport(Parcel parcel) { - int remoteUid = Binder.getCallingUid(); - attributes = setSecurityAttrs(attributes, remoteUid); - if (inState(TransportState.SETUP)) { - int version = parcel.readInt(); - IBinder binder = parcel.readStrongBinder(); - if (version != WIRE_FORMAT_VERSION) { - shutdownInternal( - Status.UNAVAILABLE.withDescription("Wire format version mismatch"), true); - } else if (binder == null) { - shutdownInternal( - Status.UNAVAILABLE.withDescription("Malformed SETUP_TRANSPORT data"), true); - } else { - authResultFuture = checkServerAuthorizationAsync(remoteUid); - Futures.addCallback( - authResultFuture, - new FutureCallback() { - @Override - public void onSuccess(Status result) { - handleAuthResult(binder, result); - } - - @Override - public void onFailure(Throwable t) { - handleAuthResult(t); - } - }, - offloadExecutor); - } - } - } - - private ListenableFuture checkServerAuthorizationAsync(int remoteUid) { - return (securityPolicy instanceof AsyncSecurityPolicy) - ? ((AsyncSecurityPolicy) securityPolicy).checkAuthorizationAsync(remoteUid) - : Futures.submit(() -> securityPolicy.checkAuthorization(remoteUid), offloadExecutor); - } - - private synchronized void handleAuthResult(IBinder binder, Status authorization) { - if (inState(TransportState.SETUP)) { - if (!authorization.isOk()) { - shutdownInternal(authorization, true); - } else if (!setOutgoingBinder(OneWayBinderProxy.wrap(binder, offloadExecutor))) { - shutdownInternal( - Status.UNAVAILABLE.withDescription("Failed to observe outgoing binder"), true); - } else { - // Check state again, since a failure inside setOutgoingBinder (or a callback it - // triggers), could have shut us down. - if (!isShutdown()) { - setState(TransportState.READY); - attributes = clientTransportListener.filterTransport(attributes); - clientTransportListener.transportReady(); - if (readyTimeoutFuture != null) { - readyTimeoutFuture.cancel(false); - readyTimeoutFuture = null; - } - } - } - } - } - - private synchronized void handleAuthResult(Throwable t) { - shutdownInternal( - Status.INTERNAL.withDescription("Could not evaluate SecurityPolicy").withCause(t), true); - } - - @GuardedBy("this") - @Override - protected void handlePingResponse(Parcel parcel) { - pingTracker.onPingResponse(parcel.readInt()); - } - - private static ClientStream newFailingClientStream( - Status failure, Attributes attributes, Metadata headers, ClientStreamTracer[] tracers) { - StatsTraceContext statsTraceContext = - StatsTraceContext.newClientContext(tracers, attributes, headers); - statsTraceContext.clientOutboundHeaders(); - return new FailingClientStream(failure, tracers); - } - - private static InternalLogId buildLogId( - Context sourceContext, AndroidComponentAddress targetAddress) { - return InternalLogId.allocate( - BinderClientTransport.class, - sourceContext.getClass().getSimpleName() + "->" + targetAddress); - } - - private static Attributes buildClientAttributes( - Attributes eagAttrs, - Context sourceContext, - AndroidComponentAddress targetAddress, - InboundParcelablePolicy inboundParcelablePolicy) { - return Attributes.newBuilder() - .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.NONE) // Trust noone for now. - .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttrs) - .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, AndroidComponentAddress.forContext(sourceContext)) - .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, targetAddress) - .set(INBOUND_PARCELABLE_POLICY, inboundParcelablePolicy) - .build(); - } - - private static Attributes setSecurityAttrs(Attributes attributes, int uid) { - return attributes.toBuilder() - .set(REMOTE_UID, uid) - .set( - GrpcAttributes.ATTR_SECURITY_LEVEL, - uid == Process.myUid() - ? SecurityLevel.PRIVACY_AND_INTEGRITY - : SecurityLevel.INTEGRITY) // TODO: Have the SecrityPolicy decide this. - .build(); - } - } - - /** Concrete server-side transport implementation. */ - @Internal - public static final class BinderServerTransport extends BinderTransport - implements ServerTransport { - - private final List streamTracerFactories; - @Nullable private ServerTransportListener serverTransportListener; - - /** - * Constructs a new transport instance. - * - * @param binderDecorator used to decorate 'callbackBinder', for fault injection. - */ - public BinderServerTransport( - ObjectPool executorServicePool, - Attributes attributes, - List streamTracerFactories, - OneWayBinderProxy.Decorator binderDecorator, - IBinder callbackBinder) { - super(executorServicePool, attributes, binderDecorator, buildLogId(attributes)); - this.streamTracerFactories = streamTracerFactories; - // TODO(jdcormie): Plumb in the Server's executor() and use it here instead. - setOutgoingBinder(OneWayBinderProxy.wrap(callbackBinder, getScheduledExecutorService())); - } - - public synchronized void setServerTransportListener( - ServerTransportListener serverTransportListener) { - this.serverTransportListener = serverTransportListener; - if (isShutdown()) { - setState(TransportState.SHUTDOWN_TERMINATED); - notifyTerminated(); - releaseExecutors(); - } else { - sendSetupTransaction(); - // Check we're not shutdown again, since a failure inside sendSetupTransaction (or a - // callback it triggers), could have shut us down. - if (!isShutdown()) { - setState(TransportState.READY); - attributes = serverTransportListener.transportReady(attributes); - } - } - } - - StatsTraceContext createStatsTraceContext(String methodName, Metadata headers) { - return StatsTraceContext.newServerContext(streamTracerFactories, methodName, headers); - } - - synchronized Status startStream(ServerStream stream, String methodName, Metadata headers) { - if (isShutdown()) { - return Status.UNAVAILABLE.withDescription("transport is shutdown"); - } else { - serverTransportListener.streamCreated(stream, methodName, headers); - return Status.OK; - } - } - - @Override - @GuardedBy("this") - void notifyShutdown(Status status) { - // Nothing to do. - } - - @Override - @GuardedBy("this") - void notifyTerminated() { - if (serverTransportListener != null) { - serverTransportListener.transportTerminated(); - } - } - - @Override - public synchronized void shutdown() { - shutdownInternal(Status.OK, false); - } - - @Override - public synchronized void shutdownNow(Status reason) { - shutdownInternal(reason, true); - } - - @Override - @Nullable - @GuardedBy("this") - protected Inbound createInbound(int callId) { - return new Inbound.ServerInbound(this, attributes, callId); - } - - private static InternalLogId buildLogId(Attributes attributes) { - return InternalLogId.allocate( - BinderServerTransport.class, "from " + attributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR)); - } - } - private static void checkTransition(TransportState current, TransportState next) { switch (next) { case SETUP: diff --git a/binder/src/main/java/io/grpc/binder/internal/Inbound.java b/binder/src/main/java/io/grpc/binder/internal/Inbound.java index 50654297c74..9b9dfeef5ce 100644 --- a/binder/src/main/java/io/grpc/binder/internal/Inbound.java +++ b/binder/src/main/java/io/grpc/binder/internal/Inbound.java @@ -610,10 +610,9 @@ protected void deliverCloseAbnormal(Status status) { // Server-side inbound transactions. static final class ServerInbound extends Inbound { - private final BinderTransport.BinderServerTransport serverTransport; + private final BinderServerTransport serverTransport; - ServerInbound( - BinderTransport.BinderServerTransport transport, Attributes attributes, int callId) { + ServerInbound(BinderServerTransport transport, Attributes attributes, int callId) { super(transport, attributes, callId); this.serverTransport = transport; } diff --git a/binder/src/test/java/io/grpc/binder/internal/BinderServerTransportTest.java b/binder/src/test/java/io/grpc/binder/internal/BinderServerTransportTest.java index d47106d1d35..e7e73e6d4b0 100644 --- a/binder/src/test/java/io/grpc/binder/internal/BinderServerTransportTest.java +++ b/binder/src/test/java/io/grpc/binder/internal/BinderServerTransportTest.java @@ -56,12 +56,12 @@ public final class BinderServerTransportTest { @Mock IBinder mockBinder; - BinderTransport.BinderServerTransport transport; + BinderServerTransport transport; @Before public void setUp() throws Exception { transport = - new BinderTransport.BinderServerTransport( + new BinderServerTransport( new FixedObjectPool<>(executorService), Attributes.EMPTY, ImmutableList.of(), diff --git a/binder/src/testFixtures/java/io/grpc/binder/internal/BinderClientTransportBuilder.java b/binder/src/testFixtures/java/io/grpc/binder/internal/BinderClientTransportBuilder.java index e98daeec3ed..f732ff64663 100644 --- a/binder/src/testFixtures/java/io/grpc/binder/internal/BinderClientTransportBuilder.java +++ b/binder/src/testFixtures/java/io/grpc/binder/internal/BinderClientTransportBuilder.java @@ -24,8 +24,8 @@ import java.net.SocketAddress; /** - * Helps unit tests create {@link BinderTransport.BinderClientTransport} instances without having to - * mention irrelevant details (go/tott/719). + * Helps unit tests create {@link BinderClientTransport} instances without having to mention + * irrelevant details (go/tott/719). */ public class BinderClientTransportBuilder { private BinderClientTransportFactory factory; @@ -54,7 +54,7 @@ public BinderClientTransportBuilder setFactory(BinderClientTransportFactory fact return this; } - public BinderTransport.BinderClientTransport build() { + public BinderClientTransport build() { return factory.newClientTransport( checkNotNull(serverAddress), checkNotNull(options), checkNotNull(channelLogger)); }