From 3d0e7054baeac98e2347093c8a304e3826845d5e Mon Sep 17 00:00:00 2001 From: Spencer Fang Date: Tue, 9 Jan 2018 15:17:53 -0800 Subject: [PATCH 1/4] core: install the binary logging client interceptor --- .../main/java/io/grpc/ClientInterceptors.java | 52 ++++ .../java/io/grpc/ForwardingClientCall.java | 36 +-- .../io/grpc/ForwardingClientCallListener.java | 19 +- .../io/grpc/InternalClientInterceptors.java | 36 +++ .../io/grpc/PartialForwardingClientCall.java | 60 +++++ .../PartialForwardingClientCallListener.java | 44 ++++ .../AbstractManagedChannelImplBuilder.java | 2 + .../io/grpc/internal/BinaryLogProvider.java | 86 +++++-- .../io/grpc/internal/ManagedChannelImpl.java | 9 +- .../grpc/internal/BinaryLogProviderTest.java | 226 +++++++++++++++++- .../grpc/internal/ManagedChannelImplTest.java | 73 ++++++ 11 files changed, 574 insertions(+), 69 deletions(-) create mode 100644 core/src/main/java/io/grpc/InternalClientInterceptors.java create mode 100644 core/src/main/java/io/grpc/PartialForwardingClientCall.java create mode 100644 core/src/main/java/io/grpc/PartialForwardingClientCallListener.java diff --git a/core/src/main/java/io/grpc/ClientInterceptors.java b/core/src/main/java/io/grpc/ClientInterceptors.java index 2504494e90b..71175bb7c3a 100644 --- a/core/src/main/java/io/grpc/ClientInterceptors.java +++ b/core/src/main/java/io/grpc/ClientInterceptors.java @@ -17,6 +17,8 @@ package io.grpc; import com.google.common.base.Preconditions; +import io.grpc.MethodDescriptor.Marshaller; +import java.io.InputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -89,6 +91,56 @@ public static Channel intercept(Channel channel, List ClientInterceptor wrapClientInterceptor( + final ClientInterceptor interceptor, + final Marshaller reqMarshaller, + final Marshaller respMarshaller) { + return new ClientInterceptor() { + @Override + public ClientCall interceptCall( + final MethodDescriptor method, CallOptions callOptions, Channel next) { + final MethodDescriptor wrappedMethod = + method.toBuilder(reqMarshaller, respMarshaller).build(); + final ClientCall wrappedCall = + interceptor.interceptCall(wrappedMethod, callOptions, next); + return new PartialForwardingClientCall() { + @Override + public void start(final Listener responseListener, Metadata headers) { + wrappedCall.start(new PartialForwardingClientCallListener() { + @Override + public void onMessage(WRespT wMessage) { + InputStream bytes = respMarshaller.stream(wMessage); + RespT message = method.getResponseMarshaller().parse(bytes); + responseListener.onMessage(message); + } + + @Override + protected Listener delegate() { + return responseListener; + } + }, headers); + } + + @Override + public void sendMessage(ReqT message) { + InputStream bytes = method.getRequestMarshaller().stream(message); + WReqT wReq = reqMarshaller.parse(bytes); + wrappedCall.sendMessage(wReq); + } + + @Override + protected ClientCall delegate() { + return wrappedCall; + } + }; + } + }; + } + private static class InterceptorChannel extends Channel { private final Channel channel; private final ClientInterceptor interceptor; diff --git a/core/src/main/java/io/grpc/ForwardingClientCall.java b/core/src/main/java/io/grpc/ForwardingClientCall.java index 996801be749..4e12c60b49d 100644 --- a/core/src/main/java/io/grpc/ForwardingClientCall.java +++ b/core/src/main/java/io/grpc/ForwardingClientCall.java @@ -16,15 +16,15 @@ package io.grpc; -import javax.annotation.Nullable; - /** * A {@link ClientCall} which forwards all of it's methods to another {@link ClientCall}. */ -public abstract class ForwardingClientCall extends ClientCall { +public abstract class ForwardingClientCall + extends PartialForwardingClientCall { /** * Returns the delegated {@code ClientCall}. */ + @Override protected abstract ClientCall delegate(); @Override @@ -32,41 +32,11 @@ public void start(Listener responseListener, Metadata headers) { delegate().start(responseListener, headers); } - @Override - public void request(int numMessages) { - delegate().request(numMessages); - } - - @Override - public void cancel(@Nullable String message, @Nullable Throwable cause) { - delegate().cancel(message, cause); - } - - @Override - public void halfClose() { - delegate().halfClose(); - } - @Override public void sendMessage(ReqT message) { delegate().sendMessage(message); } - @Override - public void setMessageCompression(boolean enabled) { - delegate().setMessageCompression(enabled); - } - - @Override - public boolean isReady() { - return delegate().isReady(); - } - - @Override - public Attributes getAttributes() { - return delegate().getAttributes(); - } - /** * A simplified version of {@link ForwardingClientCall} where subclasses can pass in a {@link * ClientCall} as the delegate. diff --git a/core/src/main/java/io/grpc/ForwardingClientCallListener.java b/core/src/main/java/io/grpc/ForwardingClientCallListener.java index 4df315590fa..40cf2b56f9e 100644 --- a/core/src/main/java/io/grpc/ForwardingClientCallListener.java +++ b/core/src/main/java/io/grpc/ForwardingClientCallListener.java @@ -20,32 +20,19 @@ * A {@link ClientCall.Listener} which forwards all of its methods to another {@link * ClientCall.Listener}. */ -public abstract class ForwardingClientCallListener extends ClientCall.Listener { +public abstract class ForwardingClientCallListener + extends PartialForwardingClientCallListener { /** * Returns the delegated {@code ClientCall.Listener}. */ - protected abstract ClientCall.Listener delegate(); - @Override - public void onHeaders(Metadata headers) { - delegate().onHeaders(headers); - } + protected abstract ClientCall.Listener delegate(); @Override public void onMessage(RespT message) { delegate().onMessage(message); } - @Override - public void onClose(Status status, Metadata trailers) { - delegate().onClose(status, trailers); - } - - @Override - public void onReady() { - delegate().onReady(); - } - /** * A simplified version of {@link ForwardingClientCallListener} where subclasses can pass in a * {@link ClientCall.Listener} as the delegate. diff --git a/core/src/main/java/io/grpc/InternalClientInterceptors.java b/core/src/main/java/io/grpc/InternalClientInterceptors.java new file mode 100644 index 00000000000..b6c26246751 --- /dev/null +++ b/core/src/main/java/io/grpc/InternalClientInterceptors.java @@ -0,0 +1,36 @@ +/* + * Copyright 2017, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc; + +import io.grpc.MethodDescriptor.Marshaller; + +/** + * Accessor to internal methods of {@link ServerInterceptors}. + */ +@Internal +public class InternalClientInterceptors { + public static ClientInterceptor wrapClientInterceptor( + final ClientInterceptor wrappedInterceptor, + final Marshaller reqMarshaller, + final Marshaller respMarshaller) { + return ClientInterceptors.wrapClientInterceptor( + wrappedInterceptor, reqMarshaller, respMarshaller); + } + + private InternalClientInterceptors() { + } +} diff --git a/core/src/main/java/io/grpc/PartialForwardingClientCall.java b/core/src/main/java/io/grpc/PartialForwardingClientCall.java new file mode 100644 index 00000000000..0ab21b5b091 --- /dev/null +++ b/core/src/main/java/io/grpc/PartialForwardingClientCall.java @@ -0,0 +1,60 @@ +/* + * Copyright 2018, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc; + +import javax.annotation.Nullable; + +/** + * A {@link ClientCall} which forwards all of its methods to another {@link ClientCall} which + * may have a different sendMessage() message type. + */ +abstract class PartialForwardingClientCall extends ClientCall { + /** + * Returns the delegated {@code ClientCall}. + */ + protected abstract ClientCall delegate(); + + @Override + public void request(int numMessages) { + delegate().request(numMessages); + } + + @Override + public void cancel(@Nullable String message, @Nullable Throwable cause) { + delegate().cancel(message, cause); + } + + @Override + public void halfClose() { + delegate().halfClose(); + } + + @Override + public void setMessageCompression(boolean enabled) { + delegate().setMessageCompression(enabled); + } + + @Override + public boolean isReady() { + return delegate().isReady(); + } + + @Override + public Attributes getAttributes() { + return delegate().getAttributes(); + } +} diff --git a/core/src/main/java/io/grpc/PartialForwardingClientCallListener.java b/core/src/main/java/io/grpc/PartialForwardingClientCallListener.java new file mode 100644 index 00000000000..33e85e6c9de --- /dev/null +++ b/core/src/main/java/io/grpc/PartialForwardingClientCallListener.java @@ -0,0 +1,44 @@ +/* + * Copyright 2018, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc; + +/** + * A {@link ClientCall.Listener} which forwards all of its methods to another {@link + * ClientCall.Listener} which may have a different parameterized type than the + * onMessage() message type. + */ +abstract class PartialForwardingClientCallListener extends ClientCall.Listener { + /** + * Returns the delegated {@code ClientCall.Listener}. + */ + protected abstract ClientCall.Listener delegate(); + + @Override + public void onHeaders(Metadata headers) { + delegate().onHeaders(headers); + } + + @Override + public void onClose(Status status, Metadata trailers) { + delegate().onClose(status, trailers); + } + + @Override + public void onReady() { + delegate().onReady(); + } +} diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java index 90a6f2cc37e..46f68a03608 100644 --- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java @@ -129,6 +129,8 @@ public static ManagedChannelBuilder forTarget(String target) { private int maxInboundMessageSize = GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE; + BinaryLogProvider binlogProvider = BinaryLogProvider.provider(); + /** * Sets the maximum message size allowed for a single gRPC frame. If an inbound messages * larger than this limit is received it will not be processed and the RPC will fail with diff --git a/core/src/main/java/io/grpc/internal/BinaryLogProvider.java b/core/src/main/java/io/grpc/internal/BinaryLogProvider.java index b29cb434e09..dd77fd69e1c 100644 --- a/core/src/main/java/io/grpc/internal/BinaryLogProvider.java +++ b/core/src/main/java/io/grpc/internal/BinaryLogProvider.java @@ -17,8 +17,16 @@ package io.grpc.internal; import com.google.common.annotations.VisibleForTesting; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; import io.grpc.ClientInterceptor; +import io.grpc.ClientInterceptors; +import io.grpc.InternalClientInterceptors; +import io.grpc.MethodDescriptor; +import io.grpc.MethodDescriptor.Marshaller; import io.grpc.ServerInterceptor; +import java.io.InputStream; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -32,17 +40,27 @@ public abstract class BinaryLogProvider { private static final Logger logger = Logger.getLogger(BinaryLogProvider.class.getName()); - private static final BinaryLogProvider NULL_PROVIDER = new NullProvider(); private static final BinaryLogProvider PROVIDER = load(BinaryLogProvider.class.getClassLoader()); + @VisibleForTesting + static final Marshaller IDENTITY_MARSHALLER = new IdentityMarshaller(); + + private final ClientInterceptor binaryLogShim = new BinaryLogShim(); /** - * Always returns a {@code BinaryLogProvider}, even if the provider always returns null - * interceptors. + * Returns a {@code BinaryLogProvider}, or {@code null} if there is no provider. */ + @Nullable public static BinaryLogProvider provider() { return PROVIDER; } + /** + * Wraps a channel to provide binary logging on {@link ClientCall}s as needed. + */ + Channel wrapChannel(Channel channel) { + return ClientInterceptors.intercept(channel, binaryLogShim); + } + @VisibleForTesting static BinaryLogProvider load(ClassLoader classLoader) { try { @@ -50,13 +68,13 @@ static BinaryLogProvider load(ClassLoader classLoader) { } catch (Throwable t) { logger.log( Level.SEVERE, "caught exception loading BinaryLogProvider, will disable binary log", t); - return NULL_PROVIDER; + return null; } } private static BinaryLogProvider loadHelper(ClassLoader classLoader) { if (isAndroid()) { - return NULL_PROVIDER; + return null; } Iterator iter = getCandidatesViaServiceLoader(classLoader).iterator(); @@ -70,7 +88,7 @@ private static BinaryLogProvider loadHelper(ClassLoader classLoader) { } } if (list.isEmpty()) { - return NULL_PROVIDER; + return null; } else { return Collections.max(list, new Comparator() { @Override @@ -94,21 +112,21 @@ private static ServiceLoader getCandidatesViaServiceLoader( /** * Returns a {@link ServerInterceptor} for binary logging. gRPC is free to cache the interceptor, - * so the interceptor must be reusable across server calls. At runtime, the request and response - * types passed into the interceptor is always {@link java.io.InputStream}. + * so the interceptor must be reusable across calls. At runtime, the request and response + * marshallers are always {@code Marshaller}. * Returns {@code null} if this method is not binary logged. */ @Nullable - public abstract ServerInterceptor getServerInterceptor(String fullMethodName); + protected abstract ServerInterceptor getServerInterceptor(String fullMethodName); /** * Returns a {@link ClientInterceptor} for binary logging. gRPC is free to cache the interceptor, - * so the interceptor must be reusable across server calls. At runtime, the request and response - * types passed into the interceptor is always {@link java.io.InputStream}. + * so the interceptor must be reusable across calls. At runtime, the request and response + * marshallers are always {@code Marshaller}. * Returns {@code null} if this method is not binary logged. */ @Nullable - public abstract ClientInterceptor getClientInterceptor(String fullMethodName); + protected abstract ClientInterceptor getClientInterceptor(String fullMethodName); /** * A priority, from 0 to 10 that this provider should be used, taking the current environment into @@ -125,12 +143,12 @@ private static ServiceLoader getCandidatesViaServiceLoader( static final class NullProvider extends BinaryLogProvider { @Nullable @Override - public ServerInterceptor getServerInterceptor(String fullMethodName) { + protected ServerInterceptor getServerInterceptor(String fullMethodName) { return null; } @Override - public ClientInterceptor getClientInterceptor(String fullMethodName) { + protected ClientInterceptor getClientInterceptor(String fullMethodName) { return null; } @@ -154,4 +172,44 @@ protected static boolean isAndroid() { return false; } } + + // Creating a named class makes debugging easier + private static final class IdentityMarshaller implements Marshaller { + @Override + public InputStream stream(InputStream value) { + return value; + } + + @Override + public InputStream parse(InputStream stream) { + return stream; + + } + } + + /** + * The pipeline of interceptors is hard coded when the {@link ManagedChannelImpl} is created. + * This shim interceptor should always be installed as a placeholder. When a call starts, + * this interceptor checks with the {@link BinaryLogProvider} to see if logging should happen + * for this particular {@link ClientCall}'s method. + */ + private final class BinaryLogShim implements ClientInterceptor { + @Override + public ClientCall interceptCall( + MethodDescriptor method, + CallOptions callOptions, + Channel next) { + ClientInterceptor binlogInterceptor = getClientInterceptor(method.getFullMethodName()); + if (binlogInterceptor == null) { + return next.newCall(method, callOptions); + } else { + return InternalClientInterceptors + .wrapClientInterceptor( + binlogInterceptor, + IDENTITY_MARSHALLER, + IDENTITY_MARSHALLER) + .interceptCall(method, callOptions, next); + } + } + } } diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 0a31567e127..d067def85a4 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -138,7 +138,8 @@ public final class ManagedChannelImpl /** * We delegate to this channel, so that we can have interceptors as necessary. If there aren't - * any interceptors this will just be {@link RealChannel}. + * any interceptors and the {@link BinaryLogProvider} is {@code null} then this will just be a + * {@link RealChannel}. */ private final Channel interceptorChannel; @Nullable private final String userAgent; @@ -488,7 +489,11 @@ ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata new this.backoffPolicyProvider = backoffPolicyProvider; this.transportFactory = new CallCredentialsApplyingTransportFactory(clientTransportFactory, this.executor); - this.interceptorChannel = ClientInterceptors.intercept(new RealChannel(), interceptors); + Channel channel = new RealChannel(); + if (builder.binlogProvider != null) { + channel = builder.binlogProvider.wrapChannel(channel); + } + this.interceptorChannel = ClientInterceptors.intercept(channel, interceptors); this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier"); if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) { this.idleTimeoutMillis = builder.idleTimeoutMillis; diff --git a/core/src/test/java/io/grpc/internal/BinaryLogProviderTest.java b/core/src/test/java/io/grpc/internal/BinaryLogProviderTest.java index c5964db920e..a06b301d18c 100644 --- a/core/src/test/java/io/grpc/internal/BinaryLogProviderTest.java +++ b/core/src/test/java/io/grpc/internal/BinaryLogProviderTest.java @@ -16,11 +16,38 @@ package io.grpc.internal; +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; import io.grpc.ClientInterceptor; +import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; +import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; +import io.grpc.IntegerMarshaller; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.MethodDescriptor.Marshaller; +import io.grpc.MethodDescriptor.MethodType; import io.grpc.ReplacingClassLoader; import io.grpc.ServerInterceptor; +import io.grpc.StringMarshaller; +import io.grpc.internal.NoopClientCall.NoopClientCallListener; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import org.junit.Test; import org.junit.runner.RunWith; @@ -30,10 +57,42 @@ @RunWith(JUnit4.class) public class BinaryLogProviderTest { private final String serviceFile = "META-INF/services/io.grpc.internal.BinaryLogProvider"; + private final Marshaller reqMarshaller = spy(StringMarshaller.INSTANCE); + private final Marshaller respMarshaller = spy(IntegerMarshaller.INSTANCE); + private final MethodDescriptor method = + MethodDescriptor + .newBuilder(reqMarshaller, respMarshaller) + .setFullMethodName("myservice/mymethod") + .setType(MethodType.UNARY) + .setSchemaDescriptor(new Object()) + .setIdempotent(true) + .setSafe(true) + .setSampledToLocalTracing(true) + .build(); + private final List binlogReq = new ArrayList(); + private final List binlogResp = new ArrayList(); + private final TestBinaryLogClientInterceptor clientBinlogInterceptor = + new TestBinaryLogClientInterceptor(); + private final BinaryLogProvider binlogProvider = new BinaryLogProvider() { + @Override + protected ServerInterceptor getServerInterceptor(String fullMethodName) { + return null; + } + + @Override + protected ClientInterceptor getClientInterceptor(String fullMethodName) { + return clientBinlogInterceptor; + } + + @Override + protected int priority() { + return 0; + } + }; @Test public void noProvider() { - assertSame(BinaryLogProvider.NullProvider.class, BinaryLogProvider.provider().getClass()); + assertNull(BinaryLogProvider.provider()); } @Test @@ -47,7 +106,117 @@ public void multipleProvider() { public void unavailableProvider() { ClassLoader cl = new ReplacingClassLoader(getClass().getClassLoader(), serviceFile, "io/grpc/internal/BinaryLogProviderTest-unavailableProvider.txt"); - assertSame(BinaryLogProvider.NullProvider.class, BinaryLogProvider.load(cl).getClass()); + assertNull(BinaryLogProvider.load(cl)); + } + + @Test + public void wrapChannel_methodDescriptor() throws Exception { + final AtomicReference> methodRef = + new AtomicReference>(); + Channel channel = new Channel() { + @Override + public ClientCall newCall( + MethodDescriptor method, CallOptions callOptions) { + methodRef.set(method); + return new NoopClientCall(); + } + + @Override + public String authority() { + throw new UnsupportedOperationException(); + } + }; + Channel wChannel = binlogProvider.wrapChannel(channel); + ClientCall ignoredClientCall = wChannel.newCall(method, CallOptions.DEFAULT); + validateWrappedMethod(methodRef.get()); + } + + @Test + public void wrapChannel_handler() throws Exception { + final List serializedReq = new ArrayList(); + final AtomicReference> listener = + new AtomicReference>(); + Channel channel = new Channel() { + @Override + public ClientCall newCall( + MethodDescriptor methodDescriptor, CallOptions callOptions) { + return new NoopClientCall() { + @Override + public void start(Listener responseListener, Metadata headers) { + listener.set(responseListener); + } + + @Override + public void sendMessage(RequestT message) { + serializedReq.add((InputStream) message); + } + }; + } + + @Override + public String authority() { + throw new UnsupportedOperationException(); + } + }; + Channel wChannel = binlogProvider.wrapChannel(channel); + ClientCall clientCall = wChannel.newCall(method, CallOptions.DEFAULT); + final List observedResponse = new ArrayList(); + clientCall.start( + new NoopClientCallListener() { + @Override + public void onMessage(Integer message) { + observedResponse.add(message); + } + }, + new Metadata()); + + String actualRequest = "hello world"; + assertThat(binlogReq).isEmpty(); + assertThat(serializedReq).isEmpty(); + verify(reqMarshaller, never()).stream(any(String.class)); + clientCall.sendMessage(actualRequest); + // it is unacceptably expensive for the binlog to double parse every logged message + verify(reqMarshaller, times(1)).stream(any(String.class)); + verify(reqMarshaller, never()).parse(any(InputStream.class)); + assertThat(binlogReq).hasSize(1); + assertThat(serializedReq).hasSize(1); + assertEquals( + actualRequest, + StringMarshaller.INSTANCE.parse(new ByteArrayInputStream(binlogReq.get(0)))); + assertEquals( + actualRequest, + StringMarshaller.INSTANCE.parse(serializedReq.get(0))); + + int actualResponse = 12345; + assertThat(binlogResp).isEmpty(); + assertThat(observedResponse).isEmpty(); + verify(respMarshaller, never()).parse(any(InputStream.class)); + onClientMessageHelper(listener.get(), IntegerMarshaller.INSTANCE.stream(actualResponse)); + // it is unacceptably expensive for the binlog to double parse every logged message + verify(respMarshaller, times(1)).parse(any(InputStream.class)); + verify(respMarshaller, never()).stream(any(Integer.class)); + assertThat(binlogResp).hasSize(1); + assertThat(observedResponse).hasSize(1); + assertEquals( + actualResponse, + (int) IntegerMarshaller.INSTANCE.parse(new ByteArrayInputStream(binlogResp.get(0)))); + assertEquals(actualResponse, (int) observedResponse.get(0)); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private static void onClientMessageHelper(ClientCall.Listener listener, Object request) { + listener.onMessage(request); + } + + private void validateWrappedMethod(MethodDescriptor wMethod) { + assertSame(BinaryLogProvider.IDENTITY_MARSHALLER, wMethod.getRequestMarshaller()); + assertSame(BinaryLogProvider.IDENTITY_MARSHALLER, wMethod.getResponseMarshaller()); + assertEquals(method.getType(), wMethod.getType()); + assertEquals(method.getFullMethodName(), wMethod.getFullMethodName()); + assertEquals(method.getSchemaDescriptor(), wMethod.getSchemaDescriptor()); + assertEquals(method.isIdempotent(), wMethod.isIdempotent()); + assertEquals(method.isSafe(), wMethod.isSafe()); + assertEquals(method.isSampledToLocalTracing(), wMethod.isSampledToLocalTracing()); } public static final class Provider0 extends BaseProvider { @@ -77,13 +246,13 @@ public static class BaseProvider extends BinaryLogProvider { @Nullable @Override - public ServerInterceptor getServerInterceptor(String fullMethodName) { + protected ServerInterceptor getServerInterceptor(String fullMethodName) { throw new UnsupportedOperationException(); } @Nullable @Override - public ClientInterceptor getClientInterceptor(String fullMethodName) { + protected ClientInterceptor getClientInterceptor(String fullMethodName) { throw new UnsupportedOperationException(); } @@ -92,4 +261,53 @@ protected int priority() { return priority; } } + + private final class TestBinaryLogClientInterceptor implements ClientInterceptor { + @Override + public ClientCall interceptCall( + final MethodDescriptor method, + CallOptions callOptions, + Channel next) { + assertSame(BinaryLogProvider.IDENTITY_MARSHALLER, method.getRequestMarshaller()); + assertSame(BinaryLogProvider.IDENTITY_MARSHALLER, method.getResponseMarshaller()); + return new SimpleForwardingClientCall(next.newCall(method, callOptions)) { + @Override + public void start(Listener responseListener, Metadata headers) { + delegate().start( + new SimpleForwardingClientCallListener(responseListener) { + @Override + public void onMessage(RespT message) { + assertTrue(message instanceof InputStream); + try { + byte[] bytes = IoUtils.toByteArray((InputStream) message); + binlogResp.add(bytes); + ByteArrayInputStream input = new ByteArrayInputStream(bytes); + RespT dup = method.parseResponse(input); + assertSame(input, dup); + super.onMessage(dup); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }, + headers); + } + + @Override + public void sendMessage(ReqT message) { + assertTrue(message instanceof InputStream); + try { + byte[] bytes = IoUtils.toByteArray((InputStream) message); + binlogReq.add(bytes); + ByteArrayInputStream input = new ByteArrayInputStream(bytes); + ReqT dup = method.parseRequest(input); + assertSame(input, dup); + super.sendMessage(dup); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }; + } + } } diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index 8e1a5bd0691..b91159ebd09 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -59,6 +59,7 @@ import io.grpc.ConnectivityStateInfo; import io.grpc.Context; import io.grpc.EquivalentAddressGroup; +import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; import io.grpc.IntegerMarshaller; import io.grpc.InternalChannelStats; import io.grpc.InternalInstrumented; @@ -75,10 +76,13 @@ import io.grpc.MethodDescriptor.MethodType; import io.grpc.NameResolver; import io.grpc.SecurityLevel; +import io.grpc.ServerInterceptor; import io.grpc.Status; import io.grpc.StringMarshaller; import io.grpc.internal.ManagedChannelImpl.ManagedChannelReference; +import io.grpc.internal.NoopClientCall.NoopClientCallListener; import io.grpc.internal.TestUtils.MockClientTransportInfo; +import java.io.InputStream; import java.net.SocketAddress; import java.net.URI; import java.util.ArrayList; @@ -95,6 +99,7 @@ import java.util.logging.Level; import java.util.logging.LogRecord; import java.util.logging.Logger; +import javax.annotation.Nullable; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -187,6 +192,7 @@ public long currentTimeMillis() { private ObjectPool oobExecutorPool; @Mock private CallCredentials creds; + private BinaryLogProvider binlogProvider = null; private BlockingQueue transports; private ArgumentCaptor streamListenerCaptor = @@ -226,6 +232,7 @@ class Builder extends AbstractManagedChannelImplBuilder { .userAgent(userAgent); builder.executorPool = executorPool; builder.idleTimeoutMillis = idleTimeoutMillis; + builder.binlogProvider = binlogProvider; checkState(channel == null); channel = new ManagedChannelImpl( builder, mockTransportFactory, new FakeBackoffPolicyProvider(), @@ -1872,6 +1879,72 @@ public void channelsAndSubchannels_oob_instrumented_state() throws Exception { assertEquals(SHUTDOWN, getStats(oobChannel).state); } + @Test + public void binaryLogTest() throws Exception { + final List capturedReqs = new ArrayList(); + final class TracingClientInterceptor implements ClientInterceptor { + private final List> interceptedMethods = + new ArrayList>(); + + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + interceptedMethods.add(method); + return new SimpleForwardingClientCall(next.newCall(method, callOptions)) { + @Override + public void sendMessage(ReqT message) { + capturedReqs.add(message); + super.sendMessage(message); + } + }; + } + } + + TracingClientInterceptor userInterceptor = new TracingClientInterceptor(); + binlogProvider = new BinaryLogProvider() { + @Nullable + @Override + protected ServerInterceptor getServerInterceptor(String fullMethodName) { + return null; + } + + @Override + protected ClientInterceptor getClientInterceptor(String fullMethodName) { + return new TracingClientInterceptor(); + } + + @Override + protected int priority() { + return 0; + } + }; + createChannel( + new FakeNameResolverFactory(true), + Collections.singletonList(userInterceptor)); + ClientCall call = + channel.newCall(method, CallOptions.DEFAULT.withDeadlineAfter(0, TimeUnit.NANOSECONDS)); + ClientCall.Listener listener = new NoopClientCallListener(); + call.start(listener, new Metadata()); + assertEquals(1, executor.runDueTasks()); + + String actualRequest = "hello world"; + call.sendMessage(actualRequest); + + // The user supplied interceptor must still operate on the original message types + assertThat(userInterceptor.interceptedMethods).hasSize(1); + assertSame( + method.getRequestMarshaller(), + userInterceptor.interceptedMethods.get(0).getRequestMarshaller()); + assertSame( + method.getResponseMarshaller(), + userInterceptor.interceptedMethods.get(0).getResponseMarshaller()); + + // The binlog interceptor must be closest to the transport + assertThat(capturedReqs).hasSize(2); + // The InputStream is already spent, so just check its type rather than contents + assertEquals(actualRequest, capturedReqs.get(0)); + assertThat(capturedReqs.get(1)).isInstanceOf(InputStream.class); + } private static class FakeBackoffPolicyProvider implements BackoffPolicy.Provider { @Override From a42c9cfccc3a389436ca6f5074b07baa26ea7ec0 Mon Sep 17 00:00:00 2001 From: Spencer Fang Date: Tue, 9 Jan 2018 17:00:51 -0800 Subject: [PATCH 2/4] examples: spy() is broken --- examples/src/main/proto/testservice.proto | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) create mode 100644 examples/src/main/proto/testservice.proto diff --git a/examples/src/main/proto/testservice.proto b/examples/src/main/proto/testservice.proto new file mode 100644 index 00000000000..8dc48bb3b9b --- /dev/null +++ b/examples/src/main/proto/testservice.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "io.grpc.examples.testservice"; +option java_outer_classname = "TestServiceProto"; +option objc_class_prefix = "HLW"; + +message TestRequest { + +} + +message TestResponse { + +} +service TestService { + rpc TestServerStreaming(TestRequest) returns (stream TestResponse); + rpc TestClientStreaming(stream TestRequest) returns (TestResponse); + rpc TestBidiStreaming(stream TestRequest) returns (stream TestResponse); +} From 5b02a65f0a9c409b23f0025f01fbdb0370332ac0 Mon Sep 17 00:00:00 2001 From: Spencer Fang Date: Wed, 10 Jan 2018 11:05:01 -0800 Subject: [PATCH 3/4] remove testservice.proto --- examples/src/main/proto/testservice.proto | 19 ------------------- 1 file changed, 19 deletions(-) delete mode 100644 examples/src/main/proto/testservice.proto diff --git a/examples/src/main/proto/testservice.proto b/examples/src/main/proto/testservice.proto deleted file mode 100644 index 8dc48bb3b9b..00000000000 --- a/examples/src/main/proto/testservice.proto +++ /dev/null @@ -1,19 +0,0 @@ -syntax = "proto3"; - -option java_multiple_files = true; -option java_package = "io.grpc.examples.testservice"; -option java_outer_classname = "TestServiceProto"; -option objc_class_prefix = "HLW"; - -message TestRequest { - -} - -message TestResponse { - -} -service TestService { - rpc TestServerStreaming(TestRequest) returns (stream TestResponse); - rpc TestClientStreaming(stream TestRequest) returns (TestResponse); - rpc TestBidiStreaming(stream TestRequest) returns (stream TestResponse); -} From 08c9ff4470cdfebb2d33f0815cdfd495a97832e5 Mon Sep 17 00:00:00 2001 From: Spencer Fang Date: Wed, 31 Jan 2018 09:09:57 -0800 Subject: [PATCH 4/4] Review comments and more - TODO for retry - methods should be public, for later provider interface - fix capitalization typo --- .../java/io/grpc/internal/BinaryLogProvider.java | 14 ++++++++------ .../io/grpc/internal/BinaryLogProviderTest.java | 8 ++++---- .../io/grpc/internal/ManagedChannelImplTest.java | 4 ++-- 3 files changed, 14 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/BinaryLogProvider.java b/core/src/main/java/io/grpc/internal/BinaryLogProvider.java index dd77fd69e1c..c37cd36d29a 100644 --- a/core/src/main/java/io/grpc/internal/BinaryLogProvider.java +++ b/core/src/main/java/io/grpc/internal/BinaryLogProvider.java @@ -113,20 +113,22 @@ private static ServiceLoader getCandidatesViaServiceLoader( /** * Returns a {@link ServerInterceptor} for binary logging. gRPC is free to cache the interceptor, * so the interceptor must be reusable across calls. At runtime, the request and response - * marshallers are always {@code Marshaller}. + * marshallers are always {@code Marshaller}. * Returns {@code null} if this method is not binary logged. */ + // TODO(zpencer): ensure the interceptor properly handles retries and hedging @Nullable - protected abstract ServerInterceptor getServerInterceptor(String fullMethodName); + public abstract ServerInterceptor getServerInterceptor(String fullMethodName); /** * Returns a {@link ClientInterceptor} for binary logging. gRPC is free to cache the interceptor, * so the interceptor must be reusable across calls. At runtime, the request and response - * marshallers are always {@code Marshaller}. + * marshallers are always {@code Marshaller}. * Returns {@code null} if this method is not binary logged. */ + // TODO(zpencer): ensure the interceptor properly handles retries and hedging @Nullable - protected abstract ClientInterceptor getClientInterceptor(String fullMethodName); + public abstract ClientInterceptor getClientInterceptor(String fullMethodName); /** * A priority, from 0 to 10 that this provider should be used, taking the current environment into @@ -143,12 +145,12 @@ private static ServiceLoader getCandidatesViaServiceLoader( static final class NullProvider extends BinaryLogProvider { @Nullable @Override - protected ServerInterceptor getServerInterceptor(String fullMethodName) { + public ServerInterceptor getServerInterceptor(String fullMethodName) { return null; } @Override - protected ClientInterceptor getClientInterceptor(String fullMethodName) { + public ClientInterceptor getClientInterceptor(String fullMethodName) { return null; } diff --git a/core/src/test/java/io/grpc/internal/BinaryLogProviderTest.java b/core/src/test/java/io/grpc/internal/BinaryLogProviderTest.java index a06b301d18c..af22ea3855d 100644 --- a/core/src/test/java/io/grpc/internal/BinaryLogProviderTest.java +++ b/core/src/test/java/io/grpc/internal/BinaryLogProviderTest.java @@ -75,12 +75,12 @@ public class BinaryLogProviderTest { new TestBinaryLogClientInterceptor(); private final BinaryLogProvider binlogProvider = new BinaryLogProvider() { @Override - protected ServerInterceptor getServerInterceptor(String fullMethodName) { + public ServerInterceptor getServerInterceptor(String fullMethodName) { return null; } @Override - protected ClientInterceptor getClientInterceptor(String fullMethodName) { + public ClientInterceptor getClientInterceptor(String fullMethodName) { return clientBinlogInterceptor; } @@ -246,13 +246,13 @@ public static class BaseProvider extends BinaryLogProvider { @Nullable @Override - protected ServerInterceptor getServerInterceptor(String fullMethodName) { + public ServerInterceptor getServerInterceptor(String fullMethodName) { throw new UnsupportedOperationException(); } @Nullable @Override - protected ClientInterceptor getClientInterceptor(String fullMethodName) { + public ClientInterceptor getClientInterceptor(String fullMethodName) { throw new UnsupportedOperationException(); } diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index b91159ebd09..b705fa8e20a 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -1904,12 +1904,12 @@ public void sendMessage(ReqT message) { binlogProvider = new BinaryLogProvider() { @Nullable @Override - protected ServerInterceptor getServerInterceptor(String fullMethodName) { + public ServerInterceptor getServerInterceptor(String fullMethodName) { return null; } @Override - protected ClientInterceptor getClientInterceptor(String fullMethodName) { + public ClientInterceptor getClientInterceptor(String fullMethodName) { return new TracingClientInterceptor(); }