From de21f4f277f1d897b93b732f176ba276aba66002 Mon Sep 17 00:00:00 2001 From: Vindhya Ningegowda Date: Wed, 27 Jul 2022 11:29:05 -0700 Subject: [PATCH 1/5] remove logging providers-1 --- .../io/grpc/StaticTestingClassLoader.java | 1 + .../gcp/observability/GcpObservability.java | 20 +- .../observability/LoggingChannelProvider.java | 102 ------ .../observability/LoggingServerProvider.java | 88 ------ .../InternalLoggingChannelInterceptor.java | 2 +- .../InternalLoggingServerInterceptor.java | 2 +- .../gcp/observability/logging/GcpLogSink.java | 34 +- .../observability/GcpObservabilityTest.java | 8 - .../LoggingChannelProviderTest.java | 143 --------- .../LoggingServerProviderTest.java | 143 --------- .../grpc/gcp/observability/LoggingTest.java | 292 +++++++++++------- .../observability/logging/GcpLogSinkTest.java | 55 ++-- 12 files changed, 232 insertions(+), 658 deletions(-) delete mode 100644 gcp-observability/src/main/java/io/grpc/gcp/observability/LoggingChannelProvider.java delete mode 100644 gcp-observability/src/main/java/io/grpc/gcp/observability/LoggingServerProvider.java delete mode 100644 gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingChannelProviderTest.java delete mode 100644 gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingServerProviderTest.java diff --git a/context/src/test/java/io/grpc/StaticTestingClassLoader.java b/context/src/test/java/io/grpc/StaticTestingClassLoader.java index 716a887e8da..ce18571349f 100644 --- a/context/src/test/java/io/grpc/StaticTestingClassLoader.java +++ b/context/src/test/java/io/grpc/StaticTestingClassLoader.java @@ -37,6 +37,7 @@ public StaticTestingClassLoader(ClassLoader parent, Pattern classesToDefine) { @Override protected Class findClass(String name) throws ClassNotFoundException { if (!classesToDefine.matcher(name).matches()) { + System.out.println("NF class: " + name); throw new ClassNotFoundException(name); } InputStream is = getResourceAsStream(name.replace('.', '/') + ".class"); diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java index d5a608d83b0..c91d5440e6a 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java @@ -76,8 +76,6 @@ public static synchronized GcpObservability grpcInit() throws IOException { Sink sink = new GcpLogSink(observabilityConfig.getDestinationProjectId(), globalLocationTags.getLocationTags(), observabilityConfig.getCustomTags(), observabilityConfig.getFlushMessageCount()); - // TODO(dnvindhya): Cleanup code for LoggingChannelProvider and LoggingServerProvider - // once ChannelBuilder and ServerBuilder are used LogHelper helper = new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER); ConfigFilterHelper configFilterHelper = ConfigFilterHelper.factory(observabilityConfig); instance = grpcInit(sink, observabilityConfig, @@ -97,13 +95,8 @@ static GcpObservability grpcInit( InternalLoggingServerInterceptor.Factory serverInterceptorFactory) throws IOException { if (instance == null) { - instance = - new GcpObservability(sink, config, channelInterceptorFactory, serverInterceptorFactory); - LogHelper logHelper = new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER); - ConfigFilterHelper logFilterHelper = ConfigFilterHelper.factory(config); - instance.setProducer( - new InternalLoggingChannelInterceptor.FactoryImpl(logHelper, logFilterHelper), - new InternalLoggingServerInterceptor.FactoryImpl(logHelper, logFilterHelper)); + instance = new GcpObservability(sink, config); + instance.setProducer(channelInterceptorFactory, serverInterceptorFactory); } return instance; } @@ -116,8 +109,6 @@ public void close() { throw new IllegalStateException("GcpObservability already closed!"); } unRegisterStackDriverExporter(); - LoggingChannelProvider.shutdown(); - LoggingServerProvider.shutdown(); sink.close(); instance = null; } @@ -208,13 +199,8 @@ private void unRegisterStackDriverExporter() { private GcpObservability( Sink sink, - ObservabilityConfig config, - InternalLoggingChannelInterceptor.Factory channelInterceptorFactory, - InternalLoggingServerInterceptor.Factory serverInterceptorFactory) { + ObservabilityConfig config) { this.sink = checkNotNull(sink); this.config = checkNotNull(config); - - LoggingChannelProvider.init(checkNotNull(channelInterceptorFactory)); - LoggingServerProvider.init(checkNotNull(serverInterceptorFactory)); } } diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/LoggingChannelProvider.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/LoggingChannelProvider.java deleted file mode 100644 index 81c3501e1d4..00000000000 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/LoggingChannelProvider.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Copyright 2022 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.gcp.observability; - -import static com.google.common.base.Preconditions.checkNotNull; - -import io.grpc.ChannelCredentials; -import io.grpc.InternalManagedChannelProvider; -import io.grpc.ManagedChannelBuilder; -import io.grpc.ManagedChannelProvider; -import io.grpc.ManagedChannelRegistry; -import io.grpc.gcp.observability.interceptors.InternalLoggingChannelInterceptor; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.Collection; -import java.util.Collections; - -/** A channel provider that injects logging interceptor. */ -final class LoggingChannelProvider extends ManagedChannelProvider { - private final ManagedChannelProvider prevProvider; - private final InternalLoggingChannelInterceptor.Factory clientInterceptorFactory; - - private static LoggingChannelProvider instance; - - private LoggingChannelProvider(InternalLoggingChannelInterceptor.Factory factory) { - prevProvider = ManagedChannelProvider.provider(); - clientInterceptorFactory = factory; - } - - static synchronized void init(InternalLoggingChannelInterceptor.Factory factory) { - if (instance != null) { - throw new IllegalStateException("LoggingChannelProvider already initialized!"); - } - instance = new LoggingChannelProvider(factory); - ManagedChannelRegistry.getDefaultRegistry().register(instance); - } - - static synchronized void shutdown() { - if (instance == null) { - throw new IllegalStateException("LoggingChannelProvider not initialized!"); - } - ManagedChannelRegistry.getDefaultRegistry().deregister(instance); - instance = null; - } - - @Override - protected boolean isAvailable() { - return true; - } - - @Override - protected int priority() { - return 6; - } - - private ManagedChannelBuilder addInterceptor(ManagedChannelBuilder builder) { - return builder.intercept(clientInterceptorFactory.create()); - } - - @Override - protected ManagedChannelBuilder builderForAddress(String name, int port) { - return addInterceptor( - InternalManagedChannelProvider.builderForAddress(prevProvider, name, port)); - } - - @Override - protected ManagedChannelBuilder builderForTarget(String target) { - return addInterceptor(InternalManagedChannelProvider.builderForTarget(prevProvider, target)); - } - - @Override - protected NewChannelBuilderResult newChannelBuilder(String target, ChannelCredentials creds) { - NewChannelBuilderResult result = InternalManagedChannelProvider.newChannelBuilder(prevProvider, - target, creds); - ManagedChannelBuilder builder = result.getChannelBuilder(); - if (builder != null) { - return NewChannelBuilderResult.channelBuilder( - addInterceptor(builder)); - } - checkNotNull(result.getError(), "Expected error to be set!"); - return result; - } - - @Override - protected Collection> getSupportedSocketAddressTypes() { - return Collections.singleton(InetSocketAddress.class); - } -} diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/LoggingServerProvider.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/LoggingServerProvider.java deleted file mode 100644 index 6a4c710795e..00000000000 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/LoggingServerProvider.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Copyright 2022 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.gcp.observability; - -import static com.google.common.base.Preconditions.checkNotNull; - -import io.grpc.InternalServerProvider; -import io.grpc.ServerBuilder; -import io.grpc.ServerCredentials; -import io.grpc.ServerProvider; -import io.grpc.ServerRegistry; -import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor; - -/** A server provider that injects the logging interceptor. */ -final class LoggingServerProvider extends ServerProvider { - private final ServerProvider prevProvider; - private final InternalLoggingServerInterceptor.Factory serverInterceptorFactory; - - private static LoggingServerProvider instance; - - private LoggingServerProvider(InternalLoggingServerInterceptor.Factory factory) { - prevProvider = ServerProvider.provider(); - serverInterceptorFactory = factory; - } - - static synchronized void init(InternalLoggingServerInterceptor.Factory factory) { - if (instance != null) { - throw new IllegalStateException("LoggingServerProvider already initialized!"); - } - instance = new LoggingServerProvider(factory); - ServerRegistry.getDefaultRegistry().register(instance); - } - - static synchronized void shutdown() { - if (instance == null) { - throw new IllegalStateException("LoggingServerProvider not initialized!"); - } - ServerRegistry.getDefaultRegistry().deregister(instance); - instance = null; - } - - @Override - protected boolean isAvailable() { - return true; - } - - @Override - protected int priority() { - return 6; - } - - private ServerBuilder addInterceptor(ServerBuilder builder) { - return builder.intercept(serverInterceptorFactory.create()); - } - - @Override - protected ServerBuilder builderForPort(int port) { - return addInterceptor(InternalServerProvider.builderForPort(prevProvider, port)); - } - - @Override - protected NewServerBuilderResult newServerBuilderForPort(int port, ServerCredentials creds) { - ServerProvider.NewServerBuilderResult result = InternalServerProvider.newServerBuilderForPort( - prevProvider, port, - creds); - ServerBuilder builder = result.getServerBuilder(); - if (builder != null) { - return ServerProvider.NewServerBuilderResult.serverBuilder( - addInterceptor(builder)); - } - checkNotNull(result.getError(), "Expected error to be set!"); - return result; - } -} diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingChannelInterceptor.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingChannelInterceptor.java index 5c0355dd663..570ea35da62 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingChannelInterceptor.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingChannelInterceptor.java @@ -40,7 +40,7 @@ import java.util.logging.Logger; /** - * A logging interceptor for {@code LoggingChannelProvider}. + * A logging client interceptor for Observability. */ @Internal public final class InternalLoggingChannelInterceptor implements ClientInterceptor { diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingServerInterceptor.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingServerInterceptor.java index e0c5b0bb6a0..3797a744d5f 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingServerInterceptor.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingServerInterceptor.java @@ -40,7 +40,7 @@ import java.util.logging.Logger; /** - * A logging interceptor for {@code LoggingServerProvider}. + * A logging server interceptor for Observability. */ @Internal public final class InternalLoggingServerInterceptor implements ServerInterceptor { diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/GcpLogSink.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/GcpLogSink.java index 011a333ed9a..1333159ec14 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/GcpLogSink.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/GcpLogSink.java @@ -54,20 +54,13 @@ public class GcpLogSink implements Sink { = ImmutableSet.of("project_id", "location", "cluster_name", "namespace_name", "pod_name", "container_name"); private static final long FALLBACK_FLUSH_LIMIT = 100L; + private final String projectId; private final Map customTags; - private final Logging gcpLoggingClient; private final MonitoredResource kubernetesResource; private final Long flushLimit; + private Logging gcpLoggingClient; private long flushCounter; - private static Logging createLoggingClient(String projectId) { - LoggingOptions.Builder builder = LoggingOptions.newBuilder(); - if (!Strings.isNullOrEmpty(projectId)) { - builder.setProjectId(projectId); - } - return builder.build().getService(); - } - /** * Retrieves a single instance of GcpLogSink. * @@ -75,15 +68,7 @@ private static Logging createLoggingClient(String projectId) { */ public GcpLogSink(String destinationProjectId, Map locationTags, Map customTags, Long flushLimit) { - this(createLoggingClient(destinationProjectId), destinationProjectId, locationTags, - customTags, flushLimit); - - } - - @VisibleForTesting - GcpLogSink(Logging client, String destinationProjectId, Map locationTags, - Map customTags, Long flushLimit) { - this.gcpLoggingClient = client; + this.projectId = destinationProjectId; this.customTags = getCustomTags(customTags, locationTags, destinationProjectId); this.kubernetesResource = getResource(locationTags); this.flushLimit = flushLimit != null ? flushLimit : FALLBACK_FLUSH_LIMIT; @@ -98,8 +83,9 @@ public GcpLogSink(String destinationProjectId, Map locationTags, @Override public void write(GrpcLogRecord logProto) { if (gcpLoggingClient == null) { - logger.log(Level.SEVERE, "Attempt to write after GcpLogSink is closed."); - return; + synchronized (this) { + gcpLoggingClient = createLoggingClient(); + } } if (SERVICE_TO_EXCLUDE.equals(logProto.getServiceName())) { return; @@ -133,6 +119,14 @@ public void write(GrpcLogRecord logProto) { } } + Logging createLoggingClient() { + LoggingOptions.Builder builder = LoggingOptions.newBuilder(); + if (!Strings.isNullOrEmpty(projectId)) { + builder.setProjectId(projectId); + } + return builder.build().getService(); + } + @VisibleForTesting static Map getCustomTags(Map customTags, Map locationTags, String destinationProjectId) { diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/GcpObservabilityTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/GcpObservabilityTest.java index d494b3c14f1..5c96e47e34b 100644 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/GcpObservabilityTest.java +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/GcpObservabilityTest.java @@ -28,13 +28,11 @@ import io.grpc.ClientCall; import io.grpc.ClientInterceptor; import io.grpc.InternalGlobalInterceptors; -import io.grpc.ManagedChannelProvider; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.ServerCall; import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; -import io.grpc.ServerProvider; import io.grpc.StaticTestingClassLoader; import io.grpc.gcp.observability.interceptors.InternalLoggingChannelInterceptor; import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor; @@ -85,8 +83,6 @@ public static final class StaticTestingClassInitFinish implements Runnable { @Override public void run() { // TODO(dnvindhya) : Remove usage of Providers on cleaning up Logging*Provider - ManagedChannelProvider prevChannelProvider = ManagedChannelProvider.provider(); - ServerProvider prevServerProvider = ServerProvider.provider(); Sink sink = mock(Sink.class); ObservabilityConfig config = mock(ObservabilityConfig.class); InternalLoggingChannelInterceptor.Factory channelInterceptorFactory = @@ -98,16 +94,12 @@ public void run() { GcpObservability observability = GcpObservability.grpcInit( sink, config, channelInterceptorFactory, serverInterceptorFactory); - assertThat(ManagedChannelProvider.provider()).isInstanceOf(LoggingChannelProvider.class); - assertThat(ServerProvider.provider()).isInstanceOf(ServerProvider.class); observability1 = GcpObservability.grpcInit( sink, config, channelInterceptorFactory, serverInterceptorFactory); assertThat(observability1).isSameInstanceAs(observability); observability.close(); verify(sink).close(); - assertThat(ManagedChannelProvider.provider()).isSameInstanceAs(prevChannelProvider); - assertThat(ServerProvider.provider()).isSameInstanceAs(prevServerProvider); try { observability1.close(); fail("should have failed for calling close() second time"); diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingChannelProviderTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingChannelProviderTest.java deleted file mode 100644 index c4337d5d4fd..00000000000 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingChannelProviderTest.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Copyright 2022 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.gcp.observability; - -import static com.google.common.truth.Truth.assertThat; -import static org.junit.Assert.fail; -import static org.mockito.AdditionalAnswers.delegatesTo; -import static org.mockito.ArgumentMatchers.same; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import io.grpc.CallOptions; -import io.grpc.Channel; -import io.grpc.ClientCall; -import io.grpc.ClientInterceptor; -import io.grpc.Grpc; -import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; -import io.grpc.ManagedChannelProvider; -import io.grpc.MethodDescriptor; -import io.grpc.TlsChannelCredentials; -import io.grpc.gcp.observability.interceptors.ConfigFilterHelper; -import io.grpc.gcp.observability.interceptors.InternalLoggingChannelInterceptor; -import io.grpc.gcp.observability.interceptors.InternalLoggingChannelInterceptor.FactoryImpl; -import io.grpc.gcp.observability.interceptors.LogHelper; -import io.grpc.testing.TestMethodDescriptors; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.ArgumentMatchers; -import org.mockito.junit.MockitoJUnit; -import org.mockito.junit.MockitoRule; - -@RunWith(JUnit4.class) -public class LoggingChannelProviderTest { - - @Rule - public final MockitoRule mocks = MockitoJUnit.rule(); - - private final MethodDescriptor method = TestMethodDescriptors.voidMethod(); - - @Test - public void initTwiceCausesException() { - ManagedChannelProvider prevProvider = ManagedChannelProvider.provider(); - assertThat(prevProvider).isNotInstanceOf(LoggingChannelProvider.class); - LogHelper mockLogHelper = mock(LogHelper.class); - ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class); - LoggingChannelProvider.init( - new FactoryImpl(mockLogHelper, mockFilterHelper)); - assertThat(ManagedChannelProvider.provider()).isInstanceOf(LoggingChannelProvider.class); - try { - LoggingChannelProvider.init( - new FactoryImpl(mockLogHelper, mockFilterHelper)); - fail("should have failed for calling init() again"); - } catch (IllegalStateException e) { - assertThat(e).hasMessageThat().contains("LoggingChannelProvider already initialized!"); - } - LoggingChannelProvider.shutdown(); - assertThat(ManagedChannelProvider.provider()).isSameInstanceAs(prevProvider); - } - - @Test - public void forTarget_interceptorCalled() { - ClientInterceptor interceptor = mock(ClientInterceptor.class, - delegatesTo(new NoopInterceptor())); - InternalLoggingChannelInterceptor.Factory factory = mock( - InternalLoggingChannelInterceptor.Factory.class); - when(factory.create()).thenReturn(interceptor); - LoggingChannelProvider.init(factory); - ManagedChannelBuilder builder = ManagedChannelBuilder.forTarget("localhost"); - ManagedChannel channel = builder.build(); - CallOptions callOptions = CallOptions.DEFAULT; - - ClientCall unused = channel.newCall(method, callOptions); - verify(interceptor) - .interceptCall(same(method), same(callOptions), ArgumentMatchers.any()); - channel.shutdownNow(); - LoggingChannelProvider.shutdown(); - } - - @Test - public void forAddress_interceptorCalled() { - ClientInterceptor interceptor = mock(ClientInterceptor.class, - delegatesTo(new NoopInterceptor())); - InternalLoggingChannelInterceptor.Factory factory = mock( - InternalLoggingChannelInterceptor.Factory.class); - when(factory.create()).thenReturn(interceptor); - LoggingChannelProvider.init(factory); - ManagedChannelBuilder builder = ManagedChannelBuilder.forAddress("localhost", 80); - ManagedChannel channel = builder.build(); - CallOptions callOptions = CallOptions.DEFAULT; - - ClientCall unused = channel.newCall(method, callOptions); - verify(interceptor) - .interceptCall(same(method), same(callOptions), ArgumentMatchers.any()); - channel.shutdownNow(); - LoggingChannelProvider.shutdown(); - } - - @Test - public void newChannelBuilder_interceptorCalled() { - ClientInterceptor interceptor = mock(ClientInterceptor.class, - delegatesTo(new NoopInterceptor())); - InternalLoggingChannelInterceptor.Factory factory = mock( - InternalLoggingChannelInterceptor.Factory.class); - when(factory.create()).thenReturn(interceptor); - LoggingChannelProvider.init(factory); - ManagedChannelBuilder builder = Grpc.newChannelBuilder("localhost", - TlsChannelCredentials.create()); - ManagedChannel channel = builder.build(); - CallOptions callOptions = CallOptions.DEFAULT; - - ClientCall unused = channel.newCall(method, callOptions); - verify(interceptor) - .interceptCall(same(method), same(callOptions), ArgumentMatchers.any()); - channel.shutdownNow(); - LoggingChannelProvider.shutdown(); - } - - private static class NoopInterceptor implements ClientInterceptor { - @Override - public ClientCall interceptCall(MethodDescriptor method, - CallOptions callOptions, Channel next) { - return next.newCall(method, callOptions); - } - } -} diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingServerProviderTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingServerProviderTest.java deleted file mode 100644 index a5ec7966da7..00000000000 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingServerProviderTest.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Copyright 2022 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.gcp.observability; - -import static com.google.common.truth.Truth.assertThat; -import static org.junit.Assert.fail; -import static org.mockito.AdditionalAnswers.delegatesTo; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import io.grpc.Grpc; -import io.grpc.InsecureServerCredentials; -import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; -import io.grpc.Metadata; -import io.grpc.Server; -import io.grpc.ServerBuilder; -import io.grpc.ServerCall; -import io.grpc.ServerCallHandler; -import io.grpc.ServerInterceptor; -import io.grpc.ServerProvider; -import io.grpc.gcp.observability.interceptors.ConfigFilterHelper; -import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor; -import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor.FactoryImpl; -import io.grpc.gcp.observability.interceptors.LogHelper; -import io.grpc.stub.StreamObserver; -import io.grpc.testing.GrpcCleanupRule; -import io.grpc.testing.protobuf.SimpleRequest; -import io.grpc.testing.protobuf.SimpleResponse; -import io.grpc.testing.protobuf.SimpleServiceGrpc; -import java.io.IOException; -import java.util.function.Supplier; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.ArgumentMatchers; - -@RunWith(JUnit4.class) -public class LoggingServerProviderTest { - @Rule public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); - - @Test - public void initTwiceCausesException() { - ServerProvider prevProvider = ServerProvider.provider(); - assertThat(prevProvider).isNotInstanceOf(LoggingServerProvider.class); - LogHelper mockLogHelper = mock(LogHelper.class); - ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class); - LoggingServerProvider.init( - new FactoryImpl(mockLogHelper, mockFilterHelper)); - assertThat(ServerProvider.provider()).isInstanceOf(ServerProvider.class); - try { - LoggingServerProvider.init( - new FactoryImpl(mockLogHelper, mockFilterHelper)); - fail("should have failed for calling init() again"); - } catch (IllegalStateException e) { - assertThat(e).hasMessageThat().contains("LoggingServerProvider already initialized!"); - } - LoggingServerProvider.shutdown(); - assertThat(ServerProvider.provider()).isSameInstanceAs(prevProvider); - } - - @Test - public void forPort_interceptorCalled() throws IOException { - serverBuilder_interceptorCalled(() -> ServerBuilder.forPort(0)); - } - - @Test - public void newServerBuilder_interceptorCalled() throws IOException { - serverBuilder_interceptorCalled( - () -> Grpc.newServerBuilderForPort(0, InsecureServerCredentials.create())); - } - - @SuppressWarnings("unchecked") - private void serverBuilder_interceptorCalled(Supplier> serverBuilderSupplier) - throws IOException { - ServerInterceptor interceptor = - mock(ServerInterceptor.class, delegatesTo(new NoopInterceptor())); - InternalLoggingServerInterceptor.Factory factory = mock( - InternalLoggingServerInterceptor.Factory.class); - when(factory.create()).thenReturn(interceptor); - LoggingServerProvider.init(factory); - Server server = serverBuilderSupplier.get().addService(new SimpleServiceImpl()).build().start(); - int port = cleanupRule.register(server).getPort(); - ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", port).usePlaintext() - .build(); - SimpleServiceGrpc.SimpleServiceBlockingStub stub = SimpleServiceGrpc.newBlockingStub( - cleanupRule.register(channel)); - assertThat(unaryRpc("buddy", stub)).isEqualTo("Hello buddy"); - verify(interceptor).interceptCall(any(ServerCall.class), any(Metadata.class), anyCallHandler()); - LoggingServerProvider.shutdown(); - } - - private ServerCallHandler anyCallHandler() { - return ArgumentMatchers.any(); - } - - private static String unaryRpc( - String requestMessage, SimpleServiceGrpc.SimpleServiceBlockingStub blockingStub) { - SimpleRequest request = SimpleRequest.newBuilder().setRequestMessage(requestMessage).build(); - SimpleResponse response = blockingStub.unaryRpc(request); - return response.getResponseMessage(); - } - - private static class SimpleServiceImpl extends SimpleServiceGrpc.SimpleServiceImplBase { - - @Override - public void unaryRpc(SimpleRequest req, StreamObserver responseObserver) { - SimpleResponse response = - SimpleResponse.newBuilder() - .setResponseMessage("Hello " + req.getRequestMessage()) - .build(); - responseObserver.onNext(response); - responseObserver.onCompleted(); - } - } - - private static class NoopInterceptor implements ServerInterceptor { - @Override - public ServerCall.Listener interceptCall( - ServerCall call, - Metadata headers, - ServerCallHandler next) { - return next.startCall(call, headers); - } - } -} diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingTest.java index 2b41c83fe38..32d946b5598 100644 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingTest.java +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingTest.java @@ -17,6 +17,7 @@ package io.grpc.gcp.observability; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -28,10 +29,11 @@ import io.grpc.MethodDescriptor; import io.grpc.Server; import io.grpc.ServerBuilder; +import io.grpc.StaticTestingClassLoader; import io.grpc.gcp.observability.interceptors.ConfigFilterHelper; import io.grpc.gcp.observability.interceptors.ConfigFilterHelper.FilterParams; import io.grpc.gcp.observability.interceptors.InternalLoggingChannelInterceptor; -import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor.FactoryImpl; +import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor; import io.grpc.gcp.observability.interceptors.LogHelper; import io.grpc.gcp.observability.logging.GcpLogSink; import io.grpc.gcp.observability.logging.Sink; @@ -42,8 +44,9 @@ import io.grpc.testing.protobuf.SimpleServiceGrpc; import java.io.IOException; import java.util.Map; -import org.junit.Ignore; -import org.junit.Rule; +// import org.junit.Ignore; +import java.util.regex.Pattern; +import org.junit.ClassRule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -52,10 +55,10 @@ @RunWith(JUnit4.class) public class LoggingTest { - @Rule - public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); + @ClassRule + public static final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); - private static final String PROJECT_ID = "PROJECT"; + private static final String PROJECT_ID = "vindhyan-gke-dev"; private static final Map locationTags = ImmutableMap.of( "project_id", "PROJECT", "location", "us-central1-c", @@ -67,117 +70,188 @@ public class LoggingTest { "KEY2", "VALUE2"); private static final long flushLimit = 100L; + private final StaticTestingClassLoader classLoader = + new StaticTestingClassLoader( + getClass().getClassLoader(), + Pattern.compile( + "io\\.grpc\\.[^.]+|" + + "io\\.grpc\\.testing\\.[^.]+|" + + "io\\.grpc\\.internal\\.[^.]+|" + + "io\\.grpc\\.InternalGlobalInterceptors|io\\.grpc\\.GlobalInterceptors|" + + "io\\.grpc\\.gcp\\.observability\\.[^.]+|" + + "io\\.grpc\\.gcp\\.observability\\.interceptors\\.[^.]+|" + + "io\\.grpc\\.gcp\\.observability\\.logging\\.[^.]+|" + + "io\\.grpc\\.gcp\\.observability\\.LoggingTest\\$.*")); + /** - * Cloud logging test using LoggingChannelProvider and LoggingServerProvider. - * - *

Ignoring test, because it calls external CLoud Logging APIs. - * To test cloud logging setup, - * 1. Set up Cloud Logging Auth credentials - * 2. Assign permissions to service account to write logs to project specified by - * variable PROJECT_ID - * 3. Comment @Ignore annotation - *

+ * Cloud logging test. + * + *

Ignoring test, because it calls external CLoud Logging APIs. To test cloud logging setup, 1. + * Set up Cloud Logging Auth credentials 2. Assign permissions to service account to write logs to + * project specified by variable PROJECT_ID 3. Comment @Ignore annotation */ - @Ignore + // @Ignore @Test - public void clientServer_interceptorCalled_logAlways() - throws IOException { - Sink sink = new GcpLogSink(PROJECT_ID, locationTags, customTags, flushLimit); - LogHelper spyLogHelper = spy(new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER)); - ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class); - FilterParams logAlwaysFilterParams = - FilterParams.create(true, 0, 0); - when(mockFilterHelper.isMethodToBeLogged(any(MethodDescriptor.class))) - .thenReturn(logAlwaysFilterParams); - when(mockFilterHelper.isEventToBeLogged(any(GrpcLogRecord.EventType.class))) - .thenReturn(true); - LoggingServerProvider.init( - new FactoryImpl(spyLogHelper, mockFilterHelper)); - Server server = ServerBuilder.forPort(0).addService(new LoggingTestHelper.SimpleServiceImpl()) - .build().start(); - int port = cleanupRule.register(server).getPort(); - LoggingChannelProvider.init( - new InternalLoggingChannelInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper)); - SimpleServiceGrpc.SimpleServiceBlockingStub stub = SimpleServiceGrpc.newBlockingStub( - cleanupRule.register(ManagedChannelBuilder.forAddress("localhost", port) - .usePlaintext().build())); - assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub)) - .isEqualTo("Hello buddy"); - assertThat(Mockito.mockingDetails(spyLogHelper).getInvocations().size()).isGreaterThan(11); - sink.close(); - LoggingChannelProvider.shutdown(); - LoggingServerProvider.shutdown(); + public void clientServer_interceptorCalled_logAlways() throws Exception { + Class runnable = + classLoader.loadClass(LoggingTest.StaticTestingClassEndtoEndLogging.class.getName()); + ((Runnable) runnable.getDeclaredConstructor().newInstance()).run(); } @Test - public void clientServer_interceptorCalled_logNever() throws IOException { - Sink mockSink = mock(GcpLogSink.class); - LogHelper spyLogHelper = spy(new LogHelper(mockSink, TimeProvider.SYSTEM_TIME_PROVIDER)); - ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class); - FilterParams logNeverFilterParams = - FilterParams.create(false, 0, 0); - when(mockFilterHelper.isMethodToBeLogged(any(MethodDescriptor.class))) - .thenReturn(logNeverFilterParams); - when(mockFilterHelper.isEventToBeLogged(any(GrpcLogRecord.EventType.class))) - .thenReturn(true); - LoggingServerProvider.init( - new FactoryImpl(spyLogHelper, mockFilterHelper)); - Server server = ServerBuilder.forPort(0).addService(new LoggingTestHelper.SimpleServiceImpl()) - .build().start(); - int port = cleanupRule.register(server).getPort(); - LoggingChannelProvider.init( - new InternalLoggingChannelInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper)); - SimpleServiceGrpc.SimpleServiceBlockingStub stub = SimpleServiceGrpc.newBlockingStub( - cleanupRule.register(ManagedChannelBuilder.forAddress("localhost", port) - .usePlaintext().build())); - assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub)) - .isEqualTo("Hello buddy"); - verifyNoInteractions(spyLogHelper); - verifyNoInteractions(mockSink); - LoggingChannelProvider.shutdown(); - LoggingServerProvider.shutdown(); + public void clientServer_interceptorCalled_logNever() throws Exception { + Class runnable = + classLoader.loadClass(LoggingTest.StaticTestingClassLogNever.class.getName()); + ((Runnable) runnable.getDeclaredConstructor().newInstance()).run(); } @Test - public void clientServer_interceptorCalled_doNotLogMessageEvents() throws IOException { - LogHelper mockLogHelper = mock(LogHelper.class); - ConfigFilterHelper mockFilterHelper2 = mock(ConfigFilterHelper.class); - FilterParams logAlwaysFilterParams = - FilterParams.create(true, 0, 0); - when(mockFilterHelper2.isMethodToBeLogged(any(MethodDescriptor.class))) - .thenReturn(logAlwaysFilterParams); - when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_REQUEST_HEADER)) - .thenReturn(true); - when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_HEADER)) - .thenReturn(true); - when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_HALF_CLOSE)) - .thenReturn(true); - when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_TRAILER)) - .thenReturn(true); - when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_CANCEL)) - .thenReturn(true); - when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_REQUEST_MESSAGE)) - .thenReturn(false); - when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_MESSAGE)) - .thenReturn(false); - LoggingServerProvider.init( - new FactoryImpl(mockLogHelper, mockFilterHelper2)); - Server server = ServerBuilder.forPort(0).addService(new LoggingTestHelper.SimpleServiceImpl()) - .build().start(); - int port = cleanupRule.register(server).getPort(); - LoggingChannelProvider.init( - new InternalLoggingChannelInterceptor.FactoryImpl(mockLogHelper, mockFilterHelper2)); - SimpleServiceGrpc.SimpleServiceBlockingStub stub = SimpleServiceGrpc.newBlockingStub( - cleanupRule.register(ManagedChannelBuilder.forAddress("localhost", port) - .usePlaintext().build())); - assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub)) - .isEqualTo("Hello buddy"); - // Total number of calls should have been 14 (6 from client and 6 from server) - // Since cancel is not invoked, it will be 12. - // Request message(Total count:2 (1 from client and 1 from server) and Response message(count:2) - // events are not in the event_types list, i.e 14 - 2(cancel) - 2(req_msg) - 2(resp_msg) = 8 - assertThat(Mockito.mockingDetails(mockLogHelper).getInvocations().size()).isEqualTo(8); - LoggingChannelProvider.shutdown(); - LoggingServerProvider.shutdown(); + public void clientServer_interceptorCalled_logFewEvents() throws Exception { + Class runnable = + classLoader.loadClass(LoggingTest.StaticTestingClassLogFewEvents.class.getName()); + ((Runnable) runnable.getDeclaredConstructor().newInstance()).run(); + } + + // UsedReflectively + public static final class StaticTestingClassEndtoEndLogging implements Runnable { + + @Override + public void run() { + try { + Sink sink = new GcpLogSink(PROJECT_ID, locationTags, customTags, flushLimit); + ObservabilityConfig config = mock(ObservabilityConfig.class); + when(config.isEnableCloudLogging()).thenReturn(true); + LogHelper spyLogHelper = spy(new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER)); + ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class); + FilterParams logAlwaysFilterParams = + FilterParams.create(true, 0, 0); + when(mockFilterHelper.isMethodToBeLogged(any(MethodDescriptor.class))) + .thenReturn(logAlwaysFilterParams); + when(mockFilterHelper.isEventToBeLogged(any(GrpcLogRecord.EventType.class))) + .thenReturn(true); + InternalLoggingChannelInterceptor.Factory channelInterceptorFactory = + new InternalLoggingChannelInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper); + InternalLoggingServerInterceptor.Factory serverInterceptorFactory = + new InternalLoggingServerInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper); + GcpObservability unused = + GcpObservability.grpcInit( + sink, config, channelInterceptorFactory, serverInterceptorFactory); + + try { + Server server = + ServerBuilder.forPort(0) + .addService(new LoggingTestHelper.SimpleServiceImpl()) + .build() + .start(); + int port = cleanupRule.register(server).getPort(); + SimpleServiceGrpc.SimpleServiceBlockingStub stub = + SimpleServiceGrpc.newBlockingStub( + cleanupRule.register( + ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build())); + assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub)) + .isEqualTo("Hello buddy"); + } catch (IOException ioe) { + fail("Encountered exception: " + ioe); + } + + assertThat(Mockito.mockingDetails(spyLogHelper).getInvocations().size()).isGreaterThan(11); + } catch (IOException e) { + fail("Encountered exception: " + e); + } + } + } + + public static final class StaticTestingClassLogNever implements Runnable { + + @Override + public void run() { + Sink mockSink = mock(GcpLogSink.class); + ObservabilityConfig config = mock(ObservabilityConfig.class); + when(config.isEnableCloudLogging()).thenReturn(true); + LogHelper spyLogHelper = spy(new LogHelper(mockSink, TimeProvider.SYSTEM_TIME_PROVIDER)); + ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class); + FilterParams logNeverFilterParams = + FilterParams.create(false, 0, 0); + when(mockFilterHelper.isMethodToBeLogged(any(MethodDescriptor.class))) + .thenReturn(logNeverFilterParams); + when(mockFilterHelper.isEventToBeLogged(any(GrpcLogRecord.EventType.class))) + .thenReturn(true); + InternalLoggingChannelInterceptor.Factory channelInterceptorFactory = + new InternalLoggingChannelInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper); + InternalLoggingServerInterceptor.Factory serverInterceptorFactory = + new InternalLoggingServerInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper); + + try (GcpObservability unused = GcpObservability.grpcInit( + mockSink, config, channelInterceptorFactory, serverInterceptorFactory) ) { + Server server = ServerBuilder.forPort(0).addService( + new LoggingTestHelper.SimpleServiceImpl()).build().start(); + int port = cleanupRule.register(server).getPort(); + SimpleServiceGrpc.SimpleServiceBlockingStub stub = SimpleServiceGrpc.newBlockingStub( + cleanupRule.register(ManagedChannelBuilder.forAddress("localhost", port) + .usePlaintext().build())); + assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub)) + .isEqualTo("Hello buddy"); + verifyNoInteractions(spyLogHelper); + verifyNoInteractions(mockSink); + } catch (IOException e) { + fail("Encountered exception: " + e); + } + } + } + + public static final class StaticTestingClassLogFewEvents implements Runnable { + + @Override + public void run() { + Sink mockSink = mock(GcpLogSink.class); + ObservabilityConfig config = mock(ObservabilityConfig.class); + when(config.isEnableCloudLogging()).thenReturn(true); + LogHelper mockLogHelper = mock(LogHelper.class); + ConfigFilterHelper mockFilterHelper2 = mock(ConfigFilterHelper.class); + FilterParams logAlwaysFilterParams = FilterParams.create(true, 0, 0); + when(mockFilterHelper2.isMethodToBeLogged(any(MethodDescriptor.class))) + .thenReturn(logAlwaysFilterParams); + when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_REQUEST_HEADER)) + .thenReturn(true); + when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_HEADER)) + .thenReturn(true); + when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_HALF_CLOSE)).thenReturn(true); + when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_TRAILER)).thenReturn(true); + when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_CANCEL)).thenReturn(true); + when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_REQUEST_MESSAGE)) + .thenReturn(false); + when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_MESSAGE)) + .thenReturn(false); + InternalLoggingChannelInterceptor.Factory channelInterceptorFactory = + new InternalLoggingChannelInterceptor.FactoryImpl(mockLogHelper, mockFilterHelper2); + InternalLoggingServerInterceptor.Factory serverInterceptorFactory = + new InternalLoggingServerInterceptor.FactoryImpl(mockLogHelper, mockFilterHelper2); + + try (GcpObservability observability = + GcpObservability.grpcInit( + mockSink, config, channelInterceptorFactory, serverInterceptorFactory)) { + Server server = + ServerBuilder.forPort(0) + .addService(new LoggingTestHelper.SimpleServiceImpl()) + .build() + .start(); + int port = cleanupRule.register(server).getPort(); + SimpleServiceGrpc.SimpleServiceBlockingStub stub = + SimpleServiceGrpc.newBlockingStub( + cleanupRule.register( + ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build())); + assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub)) + .isEqualTo("Hello buddy"); + // Total number of calls should have been 14 (6 from client and 6 from server) + // Since cancel is not invoked, it will be 12. + // Request message(Total count:2 (1 from client and 1 from server) and Response + // message(count:2) + // events are not in the event_types list, i.e 14 - 2(cancel) - 2(req_msg) - 2(resp_msg) = + // 8 + assertThat(Mockito.mockingDetails(mockLogHelper).getInvocations().size()).isEqualTo(8); + } catch (IOException e) { + fail("Encountered exception: " + e); + } + } } } diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/GcpLogSinkTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/GcpLogSinkTest.java index 5c50679b531..f34bb4c3675 100644 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/GcpLogSinkTest.java +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/GcpLogSinkTest.java @@ -20,6 +20,7 @@ import static org.mockito.ArgumentMatchers.anyIterable; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -45,6 +46,7 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; @@ -98,32 +100,31 @@ public class GcpLogSinkTest { .putFields("rpc_id", Value.newBuilder().setStringValue(rpcId).build()) .build(); private Logging mockLogging; + private GcpLogSink spySink; @Before public void setUp() { mockLogging = mock(Logging.class); + spySink = spy(new GcpLogSink(destProjectName, locationTags, + customTags, flushLimit)); + Mockito.doReturn(mockLogging).when(spySink).createLoggingClient(); } @Test public void createSink() { - Sink mockSink = new GcpLogSink(mockLogging, destProjectName, locationTags, - customTags, flushLimit); - assertThat(mockSink).isInstanceOf(GcpLogSink.class); + assertThat(spySink).isInstanceOf(GcpLogSink.class); } @Test @SuppressWarnings("unchecked") public void verifyWrite() throws Exception { - Sink mockSink = new GcpLogSink(mockLogging, destProjectName, locationTags, - customTags, flushLimit); - mockSink.write(logProto); + spySink.write(logProto); ArgumentCaptor> logEntrySetCaptor = ArgumentCaptor.forClass( (Class) Collection.class); verify(mockLogging, times(1)).write(logEntrySetCaptor.capture()); for (Iterator it = logEntrySetCaptor.getValue().iterator(); it.hasNext(); ) { LogEntry entry = it.next(); - System.out.println(entry); assertThat(entry.getPayload().getData()).isEqualTo(expectedStructLogProto); assertThat(entry.getLogName()).isEqualTo(expectedLogName); } @@ -133,10 +134,8 @@ public void verifyWrite() throws Exception { @Test @SuppressWarnings("unchecked") public void verifyWriteWithTags() { - GcpLogSink mockSink = new GcpLogSink(mockLogging, destProjectName, locationTags, - customTags, flushLimit); MonitoredResource expectedMonitoredResource = GcpLogSink.getResource(locationTags); - mockSink.write(logProto); + spySink.write(logProto); ArgumentCaptor> logEntrySetCaptor = ArgumentCaptor.forClass( (Class) Collection.class); @@ -157,9 +156,10 @@ public void verifyWriteWithTags() { public void emptyCustomTags_labelsNotSet() { Map emptyCustomTags = null; Map expectedEmptyLabels = new HashMap<>(); - GcpLogSink mockSink = new GcpLogSink(mockLogging, destProjectName, locationTags, - emptyCustomTags, flushLimit); - mockSink.write(logProto); + GcpLogSink spySink2 = spy(new GcpLogSink(destProjectName, locationTags, + emptyCustomTags, flushLimit)); + Mockito.doReturn(mockLogging).when(spySink2).createLoggingClient(); + spySink2.write(logProto); ArgumentCaptor> logEntrySetCaptor = ArgumentCaptor.forClass( (Class) Collection.class); @@ -178,9 +178,10 @@ public void emptyCustomTags_setSourceProject() { String destinationProjectId = "DESTINATION_PROJECT"; Map expectedLabels = GcpLogSink.getCustomTags(emptyCustomTags, locationTags, destinationProjectId); - GcpLogSink mockSink = new GcpLogSink(mockLogging, destinationProjectId, locationTags, - emptyCustomTags, flushLimit); - mockSink.write(logProto); + GcpLogSink spySink2 = spy(new GcpLogSink(destinationProjectId, locationTags, + emptyCustomTags, flushLimit)); + Mockito.doReturn(mockLogging).when(spySink2).createLoggingClient(); + spySink2.write(logProto); ArgumentCaptor> logEntrySetCaptor = ArgumentCaptor.forClass( (Class) Collection.class); @@ -195,24 +196,26 @@ public void emptyCustomTags_setSourceProject() { @Test public void verifyFlush() { long lowerFlushLimit = 2L; - GcpLogSink mockSink = new GcpLogSink(mockLogging, destProjectName, locationTags, - customTags, lowerFlushLimit); - mockSink.write(logProto); + GcpLogSink spySink2 = spy(new GcpLogSink(destProjectName, locationTags, + customTags, lowerFlushLimit)); + Mockito.doReturn(mockLogging).when(spySink2).createLoggingClient(); + spySink2.write(logProto); verify(mockLogging, never()).flush(); - mockSink.write(logProto); + spySink2.write(logProto); verify(mockLogging, times(1)).flush(); - mockSink.write(logProto); - mockSink.write(logProto); + spySink2.write(logProto); + spySink2.write(logProto); verify(mockLogging, times(2)).flush(); } @Test public void verifyClose() throws Exception { - Sink mockSink = new GcpLogSink(mockLogging, destProjectName, locationTags, - customTags, flushLimit); - mockSink.write(logProto); + GcpLogSink spySink2 = spy(new GcpLogSink(destProjectName, locationTags, + customTags, flushLimit)); + Mockito.doReturn(mockLogging).when(spySink2).createLoggingClient(); + spySink2.write(logProto); verify(mockLogging, times(1)).write(anyIterable()); - mockSink.close(); + spySink2.close(); verify(mockLogging).close(); verifyNoMoreInteractions(mockLogging); } From e421a997b11cd51aecb50397f2513c9f04dea162 Mon Sep 17 00:00:00 2001 From: Vindhya Ningegowda Date: Tue, 2 Aug 2022 13:46:09 -0700 Subject: [PATCH 2/5] added sample e2e tests for metrics and traces --- .../gcp/observability/GcpObservability.java | 6 +- .../grpc/gcp/observability/LoggingTest.java | 310 ++++++++---------- .../grpc/gcp/observability/MetricsTest.java | 128 ++++++++ .../io/grpc/gcp/observability/TracesTest.java | 137 ++++++++ .../observability/logging/GcpLogSinkTest.java | 2 +- 5 files changed, 399 insertions(+), 184 deletions(-) create mode 100644 gcp-observability/src/test/java/io/grpc/gcp/observability/MetricsTest.java create mode 100644 gcp-observability/src/test/java/io/grpc/gcp/observability/TracesTest.java diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java index c91d5440e6a..f8dae8300e3 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java @@ -34,6 +34,7 @@ import io.grpc.gcp.observability.logging.GcpLogSink; import io.grpc.gcp.observability.logging.Sink; import io.grpc.internal.TimeProvider; +import io.opencensus.common.Duration; import io.opencensus.contrib.grpc.metrics.RpcViews; import io.opencensus.exporter.stats.stackdriver.StackdriverStatsConfiguration; import io.opencensus.exporter.stats.stackdriver.StackdriverStatsExporter; @@ -55,6 +56,7 @@ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/8869") public final class GcpObservability implements AutoCloseable { private static final Logger logger = Logger.getLogger(GcpObservability.class.getName()); + private static final int METRICS_EXPORT_INTERVAL = 30; private static GcpObservability instance = null; private final Sink sink; private final ObservabilityConfig config; @@ -136,7 +138,8 @@ private void setProducer( clientInterceptors, serverInterceptors, tracerFactories); } - private void registerStackDriverExporter(String projectId, Map customTags) + @VisibleForTesting + void registerStackDriverExporter(String projectId, Map customTags) throws IOException { if (config.isEnableCloudMonitoring()) { RpcViews.registerAllGrpcViews(); @@ -151,6 +154,7 @@ private void registerStackDriverExporter(String projectId, Map c e -> LabelValue.create(e.getValue()))); statsConfigurationBuilder.setConstantLabels(constantLabels); } + statsConfigurationBuilder.setExportInterval(Duration.create(METRICS_EXPORT_INTERVAL, 0)); StackdriverStatsExporter.createAndRegister(statsConfigurationBuilder.build()); metricsEnabled = true; } diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingTest.java index 32d946b5598..83482362bf9 100644 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingTest.java +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingTest.java @@ -17,7 +17,6 @@ package io.grpc.gcp.observability; import static com.google.common.truth.Truth.assertThat; -import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -29,7 +28,6 @@ import io.grpc.MethodDescriptor; import io.grpc.Server; import io.grpc.ServerBuilder; -import io.grpc.StaticTestingClassLoader; import io.grpc.gcp.observability.interceptors.ConfigFilterHelper; import io.grpc.gcp.observability.interceptors.ConfigFilterHelper.FilterParams; import io.grpc.gcp.observability.interceptors.InternalLoggingChannelInterceptor; @@ -44,21 +42,21 @@ import io.grpc.testing.protobuf.SimpleServiceGrpc; import java.io.IOException; import java.util.Map; -// import org.junit.Ignore; -import java.util.regex.Pattern; -import org.junit.ClassRule; +import org.junit.Ignore; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.Mockito; +//TODO(dnvindhya): Update tests to use StaticTestingClassLoader to test GlobalInterceptors usage @RunWith(JUnit4.class) public class LoggingTest { - @ClassRule - public static final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); + @Rule + public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); - private static final String PROJECT_ID = "vindhyan-gke-dev"; + private static final String PROJECT_ID = "PROJECT"; private static final Map locationTags = ImmutableMap.of( "project_id", "PROJECT", "location", "us-central1-c", @@ -70,188 +68,136 @@ public class LoggingTest { "KEY2", "VALUE2"); private static final long flushLimit = 100L; - private final StaticTestingClassLoader classLoader = - new StaticTestingClassLoader( - getClass().getClassLoader(), - Pattern.compile( - "io\\.grpc\\.[^.]+|" - + "io\\.grpc\\.testing\\.[^.]+|" - + "io\\.grpc\\.internal\\.[^.]+|" - + "io\\.grpc\\.InternalGlobalInterceptors|io\\.grpc\\.GlobalInterceptors|" - + "io\\.grpc\\.gcp\\.observability\\.[^.]+|" - + "io\\.grpc\\.gcp\\.observability\\.interceptors\\.[^.]+|" - + "io\\.grpc\\.gcp\\.observability\\.logging\\.[^.]+|" - + "io\\.grpc\\.gcp\\.observability\\.LoggingTest\\$.*")); - /** - * Cloud logging test. + * Cloud logging test using GlobalInterceptors. * - *

Ignoring test, because it calls external CLoud Logging APIs. To test cloud logging setup, 1. - * Set up Cloud Logging Auth credentials 2. Assign permissions to service account to write logs to - * project specified by variable PROJECT_ID 3. Comment @Ignore annotation + *

Ignoring test, because it calls external Cloud Logging APIs. + * To test cloud logging setup locally, + * 1. Set up Cloud auth credentials + * 2. Assign permissions to service account to write logs to project specified by + * variable PROJECT_ID + * 3. Comment @Ignore annotation + *

*/ - // @Ignore + @Ignore @Test - public void clientServer_interceptorCalled_logAlways() throws Exception { - Class runnable = - classLoader.loadClass(LoggingTest.StaticTestingClassEndtoEndLogging.class.getName()); - ((Runnable) runnable.getDeclaredConstructor().newInstance()).run(); + public void clientServer_interceptorCalled_logAlways() + throws IOException { + Sink sink = new GcpLogSink(PROJECT_ID, locationTags, customTags, flushLimit); + ObservabilityConfig config = mock(ObservabilityConfig.class); + LogHelper spyLogHelper = spy(new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER)); + ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class); + InternalLoggingChannelInterceptor.Factory channelInterceptorFactory = + new InternalLoggingChannelInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper); + InternalLoggingServerInterceptor.Factory serverInterceptorFactory = + new InternalLoggingServerInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper); + + when(config.isEnableCloudLogging()).thenReturn(true); + FilterParams logAlwaysFilterParams = + FilterParams.create(true, 0, 0); + when(mockFilterHelper.isMethodToBeLogged(any(MethodDescriptor.class))) + .thenReturn(logAlwaysFilterParams); + when(mockFilterHelper.isEventToBeLogged(any(GrpcLogRecord.EventType.class))) + .thenReturn(true); + + GcpObservability unused = + GcpObservability.grpcInit( + sink, config, channelInterceptorFactory, serverInterceptorFactory); + Server server = ServerBuilder.forPort(0).addService(new LoggingTestHelper.SimpleServiceImpl()) + .build().start(); + int port = cleanupRule.register(server).getPort(); + SimpleServiceGrpc.SimpleServiceBlockingStub stub = SimpleServiceGrpc.newBlockingStub( + cleanupRule.register(ManagedChannelBuilder.forAddress("localhost", port) + .usePlaintext().build())); + assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub)) + .isEqualTo("Hello buddy"); + assertThat(Mockito.mockingDetails(spyLogHelper).getInvocations().size()).isGreaterThan(11); + sink.close(); } + @Ignore @Test - public void clientServer_interceptorCalled_logNever() throws Exception { - Class runnable = - classLoader.loadClass(LoggingTest.StaticTestingClassLogNever.class.getName()); - ((Runnable) runnable.getDeclaredConstructor().newInstance()).run(); + public void clientServer_interceptorCalled_logNever() throws IOException { + Sink mockSink = mock(GcpLogSink.class); + ObservabilityConfig config = mock(ObservabilityConfig.class); + LogHelper spyLogHelper = spy(new LogHelper(mockSink, TimeProvider.SYSTEM_TIME_PROVIDER)); + ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class); + InternalLoggingChannelInterceptor.Factory channelInterceptorFactory = + new InternalLoggingChannelInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper); + InternalLoggingServerInterceptor.Factory serverInterceptorFactory = + new InternalLoggingServerInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper); + + when(config.isEnableCloudLogging()).thenReturn(true); + FilterParams logNeverFilterParams = + FilterParams.create(false, 0, 0); + when(mockFilterHelper.isMethodToBeLogged(any(MethodDescriptor.class))) + .thenReturn(logNeverFilterParams); + when(mockFilterHelper.isEventToBeLogged(any(GrpcLogRecord.EventType.class))) + .thenReturn(true); + + GcpObservability observability = + GcpObservability.grpcInit( + mockSink, config, channelInterceptorFactory, serverInterceptorFactory); + Server server = ServerBuilder.forPort(0).addService(new LoggingTestHelper.SimpleServiceImpl()) + .build().start(); + int port = cleanupRule.register(server).getPort(); + SimpleServiceGrpc.SimpleServiceBlockingStub stub = SimpleServiceGrpc.newBlockingStub( + cleanupRule.register(ManagedChannelBuilder.forAddress("localhost", port) + .usePlaintext().build())); + assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub)) + .isEqualTo("Hello buddy"); + verifyNoInteractions(spyLogHelper); + verifyNoInteractions(mockSink); + observability.close(); } @Test - public void clientServer_interceptorCalled_logFewEvents() throws Exception { - Class runnable = - classLoader.loadClass(LoggingTest.StaticTestingClassLogFewEvents.class.getName()); - ((Runnable) runnable.getDeclaredConstructor().newInstance()).run(); - } - - // UsedReflectively - public static final class StaticTestingClassEndtoEndLogging implements Runnable { - - @Override - public void run() { - try { - Sink sink = new GcpLogSink(PROJECT_ID, locationTags, customTags, flushLimit); - ObservabilityConfig config = mock(ObservabilityConfig.class); - when(config.isEnableCloudLogging()).thenReturn(true); - LogHelper spyLogHelper = spy(new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER)); - ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class); - FilterParams logAlwaysFilterParams = - FilterParams.create(true, 0, 0); - when(mockFilterHelper.isMethodToBeLogged(any(MethodDescriptor.class))) - .thenReturn(logAlwaysFilterParams); - when(mockFilterHelper.isEventToBeLogged(any(GrpcLogRecord.EventType.class))) - .thenReturn(true); - InternalLoggingChannelInterceptor.Factory channelInterceptorFactory = - new InternalLoggingChannelInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper); - InternalLoggingServerInterceptor.Factory serverInterceptorFactory = - new InternalLoggingServerInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper); - GcpObservability unused = - GcpObservability.grpcInit( - sink, config, channelInterceptorFactory, serverInterceptorFactory); - - try { - Server server = - ServerBuilder.forPort(0) - .addService(new LoggingTestHelper.SimpleServiceImpl()) - .build() - .start(); - int port = cleanupRule.register(server).getPort(); - SimpleServiceGrpc.SimpleServiceBlockingStub stub = - SimpleServiceGrpc.newBlockingStub( - cleanupRule.register( - ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build())); - assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub)) - .isEqualTo("Hello buddy"); - } catch (IOException ioe) { - fail("Encountered exception: " + ioe); - } - - assertThat(Mockito.mockingDetails(spyLogHelper).getInvocations().size()).isGreaterThan(11); - } catch (IOException e) { - fail("Encountered exception: " + e); - } - } - } - - public static final class StaticTestingClassLogNever implements Runnable { - - @Override - public void run() { - Sink mockSink = mock(GcpLogSink.class); - ObservabilityConfig config = mock(ObservabilityConfig.class); - when(config.isEnableCloudLogging()).thenReturn(true); - LogHelper spyLogHelper = spy(new LogHelper(mockSink, TimeProvider.SYSTEM_TIME_PROVIDER)); - ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class); - FilterParams logNeverFilterParams = - FilterParams.create(false, 0, 0); - when(mockFilterHelper.isMethodToBeLogged(any(MethodDescriptor.class))) - .thenReturn(logNeverFilterParams); - when(mockFilterHelper.isEventToBeLogged(any(GrpcLogRecord.EventType.class))) - .thenReturn(true); - InternalLoggingChannelInterceptor.Factory channelInterceptorFactory = - new InternalLoggingChannelInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper); - InternalLoggingServerInterceptor.Factory serverInterceptorFactory = - new InternalLoggingServerInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper); - - try (GcpObservability unused = GcpObservability.grpcInit( - mockSink, config, channelInterceptorFactory, serverInterceptorFactory) ) { - Server server = ServerBuilder.forPort(0).addService( - new LoggingTestHelper.SimpleServiceImpl()).build().start(); - int port = cleanupRule.register(server).getPort(); - SimpleServiceGrpc.SimpleServiceBlockingStub stub = SimpleServiceGrpc.newBlockingStub( - cleanupRule.register(ManagedChannelBuilder.forAddress("localhost", port) - .usePlaintext().build())); - assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub)) - .isEqualTo("Hello buddy"); - verifyNoInteractions(spyLogHelper); - verifyNoInteractions(mockSink); - } catch (IOException e) { - fail("Encountered exception: " + e); - } - } - } - - public static final class StaticTestingClassLogFewEvents implements Runnable { - - @Override - public void run() { - Sink mockSink = mock(GcpLogSink.class); - ObservabilityConfig config = mock(ObservabilityConfig.class); - when(config.isEnableCloudLogging()).thenReturn(true); - LogHelper mockLogHelper = mock(LogHelper.class); - ConfigFilterHelper mockFilterHelper2 = mock(ConfigFilterHelper.class); - FilterParams logAlwaysFilterParams = FilterParams.create(true, 0, 0); - when(mockFilterHelper2.isMethodToBeLogged(any(MethodDescriptor.class))) - .thenReturn(logAlwaysFilterParams); - when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_REQUEST_HEADER)) - .thenReturn(true); - when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_HEADER)) - .thenReturn(true); - when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_HALF_CLOSE)).thenReturn(true); - when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_TRAILER)).thenReturn(true); - when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_CANCEL)).thenReturn(true); - when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_REQUEST_MESSAGE)) - .thenReturn(false); - when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_MESSAGE)) - .thenReturn(false); - InternalLoggingChannelInterceptor.Factory channelInterceptorFactory = - new InternalLoggingChannelInterceptor.FactoryImpl(mockLogHelper, mockFilterHelper2); - InternalLoggingServerInterceptor.Factory serverInterceptorFactory = - new InternalLoggingServerInterceptor.FactoryImpl(mockLogHelper, mockFilterHelper2); - - try (GcpObservability observability = - GcpObservability.grpcInit( - mockSink, config, channelInterceptorFactory, serverInterceptorFactory)) { - Server server = - ServerBuilder.forPort(0) - .addService(new LoggingTestHelper.SimpleServiceImpl()) - .build() - .start(); - int port = cleanupRule.register(server).getPort(); - SimpleServiceGrpc.SimpleServiceBlockingStub stub = - SimpleServiceGrpc.newBlockingStub( - cleanupRule.register( - ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build())); - assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub)) - .isEqualTo("Hello buddy"); - // Total number of calls should have been 14 (6 from client and 6 from server) - // Since cancel is not invoked, it will be 12. - // Request message(Total count:2 (1 from client and 1 from server) and Response - // message(count:2) - // events are not in the event_types list, i.e 14 - 2(cancel) - 2(req_msg) - 2(resp_msg) = - // 8 - assertThat(Mockito.mockingDetails(mockLogHelper).getInvocations().size()).isEqualTo(8); - } catch (IOException e) { - fail("Encountered exception: " + e); - } - } + public void clientServer_interceptorCalled_doNotLogMessageEvents() throws IOException { + Sink mockSink = mock(GcpLogSink.class); + ObservabilityConfig config = mock(ObservabilityConfig.class); + LogHelper mockLogHelper = mock(LogHelper.class); + ConfigFilterHelper mockFilterHelper2 = mock(ConfigFilterHelper.class); + InternalLoggingChannelInterceptor.Factory channelInterceptorFactory = + new InternalLoggingChannelInterceptor.FactoryImpl(mockLogHelper, mockFilterHelper2); + InternalLoggingServerInterceptor.Factory serverInterceptorFactory = + new InternalLoggingServerInterceptor.FactoryImpl(mockLogHelper, mockFilterHelper2); + + when(config.isEnableCloudLogging()).thenReturn(true); + FilterParams logAlwaysFilterParams = + FilterParams.create(true, 0, 0); + when(mockFilterHelper2.isMethodToBeLogged(any(MethodDescriptor.class))) + .thenReturn(logAlwaysFilterParams); + when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_REQUEST_HEADER)) + .thenReturn(true); + when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_HEADER)) + .thenReturn(true); + when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_HALF_CLOSE)) + .thenReturn(true); + when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_TRAILER)) + .thenReturn(true); + when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_CANCEL)) + .thenReturn(true); + when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_REQUEST_MESSAGE)) + .thenReturn(false); + when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_MESSAGE)) + .thenReturn(false); + + GcpObservability observability = + GcpObservability.grpcInit( + mockSink, config, channelInterceptorFactory, serverInterceptorFactory); + Server server = ServerBuilder.forPort(0).addService(new LoggingTestHelper.SimpleServiceImpl()) + .build().start(); + int port = cleanupRule.register(server).getPort(); + SimpleServiceGrpc.SimpleServiceBlockingStub stub = SimpleServiceGrpc.newBlockingStub( + cleanupRule.register(ManagedChannelBuilder.forAddress("localhost", port) + .usePlaintext().build())); + assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub)) + .isEqualTo("Hello buddy"); + // Total number of calls should have been 14 (6 from client and 6 from server) + // Since cancel is not invoked, it will be 12. + // Request message(Total count:2 (1 from client and 1 from server) and Response message(count:2) + // events are not in the event_types list, i.e 14 - 2(cancel) - 2(req_msg) - 2(resp_msg) = 8 + assertThat(Mockito.mockingDetails(mockLogHelper).getInvocations().size()).isEqualTo(8); + observability.close(); } -} +} \ No newline at end of file diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/MetricsTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/MetricsTest.java new file mode 100644 index 00000000000..3d879ee8b1d --- /dev/null +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/MetricsTest.java @@ -0,0 +1,128 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.gcp.observability; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.cloud.monitoring.v3.MetricServiceClient; +import com.google.cloud.monitoring.v3.MetricServiceClient.ListTimeSeriesPagedResponse; +import com.google.monitoring.v3.ListTimeSeriesRequest; +import com.google.monitoring.v3.ProjectName; +import com.google.monitoring.v3.TimeInterval; +import com.google.monitoring.v3.TimeSeries; +import com.google.protobuf.util.Timestamps; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.gcp.observability.interceptors.InternalLoggingChannelInterceptor; +import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor; +import io.grpc.gcp.observability.logging.GcpLogSink; +import io.grpc.gcp.observability.logging.Sink; +import io.grpc.testing.GrpcCleanupRule; +import io.grpc.testing.protobuf.SimpleServiceGrpc; +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class MetricsTest { + + private static final String PROJECT_ID = "PROJECT"; + private String customTagKey = "Version"; + private String customTagValue = + String.format("C67J9A-%s", String.valueOf(System.currentTimeMillis())); + private Map customTags = Collections.singletonMap(customTagKey, customTagValue); + + @Rule public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); + + /** + * Cloud Monitoring test using GlobalInterceptors. + * + *

Ignoring test, because it calls external Cloud Monitoring APIs. To test cloud monitoring + * setup locally, 1. Set up Cloud auth credentials 2. Assign permissions to service account to + * write metrics to project specified by variable PROJECT_ID 3. Comment @Ignore annotation + */ + @Ignore + @Test + public void testMetricsExporter() throws IOException, InterruptedException { + Sink mockSink = mock(GcpLogSink.class); + ObservabilityConfig mockConfig = mock(ObservabilityConfig.class); + InternalLoggingChannelInterceptor.Factory mockChannelInterceptorFactory = + mock(InternalLoggingChannelInterceptor.Factory.class); + InternalLoggingServerInterceptor.Factory mockServerInterceptorFactory = + mock(InternalLoggingServerInterceptor.Factory.class); + + when(mockConfig.isEnableCloudMonitoring()).thenReturn(true); + when(mockConfig.getDestinationProjectId()).thenReturn(PROJECT_ID); + + GcpObservability observability = + GcpObservability.grpcInit( + mockSink, mockConfig, mockChannelInterceptorFactory, mockServerInterceptorFactory); + observability.registerStackDriverExporter(PROJECT_ID, customTags); + + Server server = + ServerBuilder.forPort(0) + .addService(new LoggingTestHelper.SimpleServiceImpl()) + .build() + .start(); + int port = cleanupRule.register(server).getPort(); + SimpleServiceGrpc.SimpleServiceBlockingStub stub = + SimpleServiceGrpc.newBlockingStub( + cleanupRule.register( + ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build())); + assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub)).isEqualTo("Hello buddy"); + // Adding sleep to ensure metrics are exported before querying cloud monitoring backend + TimeUnit.SECONDS.sleep(40); + + // This checks Cloud monitoring for the new metrics that was just exported. + MetricServiceClient metricServiceClient = MetricServiceClient.create(); + // Restrict time to last 1 minute + long startMillis = System.currentTimeMillis() - ((60 * 1) * 1000); + TimeInterval interval = + TimeInterval.newBuilder() + .setStartTime(Timestamps.fromMillis(startMillis)) + .setEndTime(Timestamps.fromMillis(System.currentTimeMillis())) + .build(); + // Timeseries data + String metricsFilter = + String.format( + "metric.type=\"custom.googleapis.com/opencensus/grpc.io/client/completed_rpcs\"" + + " AND metric.labels.grpc_client_method=\"grpc.testing.SimpleService/UnaryRpc\"" + + " AND metric.labels.%s=%s", + customTagKey, customTagValue); + ListTimeSeriesRequest metricsRequest = + ListTimeSeriesRequest.newBuilder() + .setName(ProjectName.of(PROJECT_ID).toString()) + .setFilter(metricsFilter) + .setInterval(interval) + .build(); + ListTimeSeriesPagedResponse response = metricServiceClient.listTimeSeries(metricsRequest); + assertThat(response.iterateAll()).isNotEmpty(); + for (TimeSeries ts : response.iterateAll()) { + assertThat(ts.getPoints(0).getValue().getInt64Value()).isEqualTo(1); + } + observability.close(); + } +} diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/TracesTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/TracesTest.java new file mode 100644 index 00000000000..a1900b05ffa --- /dev/null +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/TracesTest.java @@ -0,0 +1,137 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.gcp.observability; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.cloud.trace.v1.TraceServiceClient; +import com.google.cloud.trace.v1.TraceServiceClient.ListTracesPagedResponse; +import com.google.devtools.cloudtrace.v1.GetTraceRequest; +import com.google.devtools.cloudtrace.v1.ListTracesRequest; +import com.google.devtools.cloudtrace.v1.Trace; +import com.google.devtools.cloudtrace.v1.TraceSpan; +import com.google.protobuf.util.Timestamps; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.gcp.observability.interceptors.InternalLoggingChannelInterceptor; +import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor; +import io.grpc.gcp.observability.logging.GcpLogSink; +import io.grpc.gcp.observability.logging.Sink; +import io.grpc.testing.GrpcCleanupRule; +import io.grpc.testing.protobuf.SimpleServiceGrpc; +import io.opencensus.trace.samplers.Samplers; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class TracesTest { + + private static final String PROJECT_ID = "PROJECT"; + private String customTagKey = "service"; + private String customTagValue = + String.format("payment-%s", String.valueOf(System.currentTimeMillis())); + private Map customTags = Collections.singletonMap(customTagKey, customTagValue); + + @Rule public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); + + /** + * Cloud Trace test using GlobalInterceptors. + * + *

Ignoring test, because it calls external Cloud Tracing APIs. To test cloud trace setup + * locally, 1. Set up Cloud auth credentials 2. Assign permissions to service account to write + * traces to project specified by variable PROJECT_ID 3. Comment @Ignore annotation + */ + @Ignore + @Test + public void testTracesExporter() throws IOException, InterruptedException { + Sink mockSink = mock(GcpLogSink.class); + ObservabilityConfig mockConfig = mock(ObservabilityConfig.class); + InternalLoggingChannelInterceptor.Factory mockChannelInterceptorFactory = + mock(InternalLoggingChannelInterceptor.Factory.class); + InternalLoggingServerInterceptor.Factory mockServerInterceptorFactory = + mock(InternalLoggingServerInterceptor.Factory.class); + + when(mockConfig.isEnableCloudTracing()).thenReturn(true); + when(mockConfig.getSampler()).thenReturn(Samplers.alwaysSample()); + when(mockConfig.getDestinationProjectId()).thenReturn(PROJECT_ID); + + GcpObservability observability = + GcpObservability.grpcInit( + mockSink, mockConfig, mockChannelInterceptorFactory, mockServerInterceptorFactory); + observability.registerStackDriverExporter(PROJECT_ID, customTags); + + Server server = + ServerBuilder.forPort(0) + .addService(new LoggingTestHelper.SimpleServiceImpl()) + .build() + .start(); + int port = cleanupRule.register(server).getPort(); + SimpleServiceGrpc.SimpleServiceBlockingStub stub = + SimpleServiceGrpc.newBlockingStub( + cleanupRule.register( + ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build())); + assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub)).isEqualTo("Hello buddy"); + // Adding sleep to ensure traces are exported before querying cloud monitoring backend + TimeUnit.SECONDS.sleep(5); + + TraceServiceClient traceServiceClient = TraceServiceClient.create(); + String traceFilter = + String.format("span:Sent.grpc.testing.SimpleService +%s:%s", customTagKey, customTagValue); + String traceOrder = "start"; + // Restrict time to last 1 minute + long startMillis = System.currentTimeMillis() - ((60 * 1) * 1000); + ListTracesRequest traceRequest = + ListTracesRequest.newBuilder() + .setProjectId(PROJECT_ID) + .setStartTime(Timestamps.fromMillis(startMillis)) + .setEndTime(Timestamps.fromMillis(System.currentTimeMillis())) + .setFilter(traceFilter) + .setOrderBy(traceOrder) + .build(); + ListTracesPagedResponse traceResponse = traceServiceClient.listTraces(traceRequest); + assertThat(traceResponse.iterateAll()).isNotEmpty(); + List traceIdList = new ArrayList<>(); + for (Trace t : traceResponse.iterateAll()) { + traceIdList.add(t.getTraceId()); + } + + for (String traceId : traceIdList) { + // This checks Cloud trace for the new trace that was just created. + GetTraceRequest getTraceRequest = + GetTraceRequest.newBuilder().setProjectId(PROJECT_ID).setTraceId(traceId).build(); + Trace trace = traceServiceClient.getTrace(getTraceRequest); + assertThat(trace.getSpansList()).hasSize(3); + for (TraceSpan span : trace.getSpansList()) { + assertThat(span.getName()).contains("grpc.testing.SimpleService.UnaryRpc"); + assertThat(span.getLabelsMap().get(customTagKey)).isEqualTo(customTagValue); + } + } + observability.close(); + } +} diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/GcpLogSinkTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/GcpLogSinkTest.java index f34bb4c3675..fd974ae7c79 100644 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/GcpLogSinkTest.java +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/GcpLogSinkTest.java @@ -67,7 +67,7 @@ public class GcpLogSinkTest { private static final Map customTags = ImmutableMap.of("KEY1", "Value1", "KEY2", "VALUE2"); private static final long flushLimit = 10L; - // gRPC is expected to alway use this log name when reporting to GCP cloud logging. + // gRPC is expected to always use this log name when reporting to GCP cloud logging. private static final String expectedLogName = "microservices.googleapis.com%2Fobservability%2Fgrpc"; private final long seqId = 1; From 5d7f02c3bf058f4fde01afd686649e6dbbaeeb0b Mon Sep 17 00:00:00 2001 From: Vindhya Ningegowda Date: Tue, 2 Aug 2022 13:54:30 -0700 Subject: [PATCH 3/5] removed code added for debugging --- context/src/test/java/io/grpc/StaticTestingClassLoader.java | 1 - 1 file changed, 1 deletion(-) diff --git a/context/src/test/java/io/grpc/StaticTestingClassLoader.java b/context/src/test/java/io/grpc/StaticTestingClassLoader.java index ce18571349f..716a887e8da 100644 --- a/context/src/test/java/io/grpc/StaticTestingClassLoader.java +++ b/context/src/test/java/io/grpc/StaticTestingClassLoader.java @@ -37,7 +37,6 @@ public StaticTestingClassLoader(ClassLoader parent, Pattern classesToDefine) { @Override protected Class findClass(String name) throws ClassNotFoundException { if (!classesToDefine.matcher(name).matches()) { - System.out.println("NF class: " + name); throw new ClassNotFoundException(name); } InputStream is = getResourceAsStream(name.replace('.', '/') + ".class"); From 84802e77597f5b13b09d627ff00af5ba076bef34 Mon Sep 17 00:00:00 2001 From: Vindhya Ningegowda Date: Wed, 3 Aug 2022 18:14:20 -0700 Subject: [PATCH 4/5] addressed comments(1) --- .../gcp/observability/logging/GcpLogSink.java | 15 +- .../observability/GcpObservabilityTest.java | 1 - .../grpc/gcp/observability/LoggingTest.java | 282 ++++++++++-------- .../grpc/gcp/observability/MetricsTest.java | 153 ++++++---- .../io/grpc/gcp/observability/TracesTest.java | 158 ++++++---- .../observability/logging/GcpLogSinkTest.java | 91 +++--- 6 files changed, 403 insertions(+), 297 deletions(-) diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/GcpLogSink.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/GcpLogSink.java index 1333159ec14..9b198250afb 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/GcpLogSink.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/GcpLogSink.java @@ -58,9 +58,18 @@ public class GcpLogSink implements Sink { private final Map customTags; private final MonitoredResource kubernetesResource; private final Long flushLimit; - private Logging gcpLoggingClient; + /** Lazily initialize cloud logging client to avoid circular initialization. Because cloud + * logging APIs also uses gRPC. */ + private volatile Logging gcpLoggingClient; private long flushCounter; + @VisibleForTesting + GcpLogSink(Logging loggingClient, String destinationProjectId, Map locationTags, + Map customTags, Long flushLimit) { + this(destinationProjectId, locationTags, customTags, flushLimit); + this.gcpLoggingClient = loggingClient; + } + /** * Retrieves a single instance of GcpLogSink. * @@ -84,7 +93,9 @@ public GcpLogSink(String destinationProjectId, Map locationTags, public void write(GrpcLogRecord logProto) { if (gcpLoggingClient == null) { synchronized (this) { - gcpLoggingClient = createLoggingClient(); + if (gcpLoggingClient == null) { + gcpLoggingClient = createLoggingClient(); + } } } if (SERVICE_TO_EXCLUDE.equals(logProto.getServiceName())) { diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/GcpObservabilityTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/GcpObservabilityTest.java index 5c96e47e34b..97e9031631b 100644 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/GcpObservabilityTest.java +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/GcpObservabilityTest.java @@ -82,7 +82,6 @@ public static final class StaticTestingClassInitFinish implements Runnable { @Override public void run() { - // TODO(dnvindhya) : Remove usage of Providers on cleaning up Logging*Provider Sink sink = mock(Sink.class); ObservabilityConfig config = mock(ObservabilityConfig.class); InternalLoggingChannelInterceptor.Factory channelInterceptorFactory = diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingTest.java index 83482362bf9..9204af29229 100644 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingTest.java +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingTest.java @@ -28,6 +28,7 @@ import io.grpc.MethodDescriptor; import io.grpc.Server; import io.grpc.ServerBuilder; +import io.grpc.StaticTestingClassLoader; import io.grpc.gcp.observability.interceptors.ConfigFilterHelper; import io.grpc.gcp.observability.interceptors.ConfigFilterHelper.FilterParams; import io.grpc.gcp.observability.interceptors.InternalLoggingChannelInterceptor; @@ -42,19 +43,19 @@ import io.grpc.testing.protobuf.SimpleServiceGrpc; import java.io.IOException; import java.util.Map; +import java.util.regex.Pattern; +import org.junit.ClassRule; import org.junit.Ignore; -import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.Mockito; -//TODO(dnvindhya): Update tests to use StaticTestingClassLoader to test GlobalInterceptors usage @RunWith(JUnit4.class) public class LoggingTest { - @Rule - public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); + @ClassRule + public static final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); private static final String PROJECT_ID = "PROJECT"; private static final Map locationTags = ImmutableMap.of( @@ -66,7 +67,10 @@ public class LoggingTest { private static final Map customTags = ImmutableMap.of( "KEY1", "Value1", "KEY2", "VALUE2"); - private static final long flushLimit = 100L; + private static final long FLUSH_LIMIT = 100L; + + private final StaticTestingClassLoader classLoader = + new StaticTestingClassLoader(getClass().getClassLoader(), Pattern.compile("io\\.grpc\\..*")); /** * Cloud logging test using GlobalInterceptors. @@ -77,127 +81,169 @@ public class LoggingTest { * 2. Assign permissions to service account to write logs to project specified by * variable PROJECT_ID * 3. Comment @Ignore annotation + * 4. This test is expected to pass when ran with above setup. This has been verified manually. *

*/ @Ignore @Test - public void clientServer_interceptorCalled_logAlways() - throws IOException { - Sink sink = new GcpLogSink(PROJECT_ID, locationTags, customTags, flushLimit); - ObservabilityConfig config = mock(ObservabilityConfig.class); - LogHelper spyLogHelper = spy(new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER)); - ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class); - InternalLoggingChannelInterceptor.Factory channelInterceptorFactory = - new InternalLoggingChannelInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper); - InternalLoggingServerInterceptor.Factory serverInterceptorFactory = - new InternalLoggingServerInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper); - - when(config.isEnableCloudLogging()).thenReturn(true); - FilterParams logAlwaysFilterParams = - FilterParams.create(true, 0, 0); - when(mockFilterHelper.isMethodToBeLogged(any(MethodDescriptor.class))) - .thenReturn(logAlwaysFilterParams); - when(mockFilterHelper.isEventToBeLogged(any(GrpcLogRecord.EventType.class))) - .thenReturn(true); - - GcpObservability unused = - GcpObservability.grpcInit( - sink, config, channelInterceptorFactory, serverInterceptorFactory); - Server server = ServerBuilder.forPort(0).addService(new LoggingTestHelper.SimpleServiceImpl()) - .build().start(); - int port = cleanupRule.register(server).getPort(); - SimpleServiceGrpc.SimpleServiceBlockingStub stub = SimpleServiceGrpc.newBlockingStub( - cleanupRule.register(ManagedChannelBuilder.forAddress("localhost", port) - .usePlaintext().build())); - assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub)) - .isEqualTo("Hello buddy"); - assertThat(Mockito.mockingDetails(spyLogHelper).getInvocations().size()).isGreaterThan(11); - sink.close(); + public void clientServer_interceptorCalled_logAlways() throws Exception { + Class runnable = + classLoader.loadClass(LoggingTest.StaticTestingClassEndtoEndLogging.class.getName()); + ((Runnable) runnable.getDeclaredConstructor().newInstance()).run(); } - @Ignore @Test - public void clientServer_interceptorCalled_logNever() throws IOException { - Sink mockSink = mock(GcpLogSink.class); - ObservabilityConfig config = mock(ObservabilityConfig.class); - LogHelper spyLogHelper = spy(new LogHelper(mockSink, TimeProvider.SYSTEM_TIME_PROVIDER)); - ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class); - InternalLoggingChannelInterceptor.Factory channelInterceptorFactory = - new InternalLoggingChannelInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper); - InternalLoggingServerInterceptor.Factory serverInterceptorFactory = - new InternalLoggingServerInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper); - - when(config.isEnableCloudLogging()).thenReturn(true); - FilterParams logNeverFilterParams = - FilterParams.create(false, 0, 0); - when(mockFilterHelper.isMethodToBeLogged(any(MethodDescriptor.class))) - .thenReturn(logNeverFilterParams); - when(mockFilterHelper.isEventToBeLogged(any(GrpcLogRecord.EventType.class))) - .thenReturn(true); - - GcpObservability observability = - GcpObservability.grpcInit( - mockSink, config, channelInterceptorFactory, serverInterceptorFactory); - Server server = ServerBuilder.forPort(0).addService(new LoggingTestHelper.SimpleServiceImpl()) - .build().start(); - int port = cleanupRule.register(server).getPort(); - SimpleServiceGrpc.SimpleServiceBlockingStub stub = SimpleServiceGrpc.newBlockingStub( - cleanupRule.register(ManagedChannelBuilder.forAddress("localhost", port) - .usePlaintext().build())); - assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub)) - .isEqualTo("Hello buddy"); - verifyNoInteractions(spyLogHelper); - verifyNoInteractions(mockSink); - observability.close(); + public void clientServer_interceptorCalled_logNever() throws Exception { + Class runnable = + classLoader.loadClass(LoggingTest.StaticTestingClassLogNever.class.getName()); + ((Runnable) runnable.getDeclaredConstructor().newInstance()).run(); } @Test - public void clientServer_interceptorCalled_doNotLogMessageEvents() throws IOException { - Sink mockSink = mock(GcpLogSink.class); - ObservabilityConfig config = mock(ObservabilityConfig.class); - LogHelper mockLogHelper = mock(LogHelper.class); - ConfigFilterHelper mockFilterHelper2 = mock(ConfigFilterHelper.class); - InternalLoggingChannelInterceptor.Factory channelInterceptorFactory = - new InternalLoggingChannelInterceptor.FactoryImpl(mockLogHelper, mockFilterHelper2); - InternalLoggingServerInterceptor.Factory serverInterceptorFactory = - new InternalLoggingServerInterceptor.FactoryImpl(mockLogHelper, mockFilterHelper2); - - when(config.isEnableCloudLogging()).thenReturn(true); - FilterParams logAlwaysFilterParams = - FilterParams.create(true, 0, 0); - when(mockFilterHelper2.isMethodToBeLogged(any(MethodDescriptor.class))) - .thenReturn(logAlwaysFilterParams); - when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_REQUEST_HEADER)) - .thenReturn(true); - when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_HEADER)) - .thenReturn(true); - when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_HALF_CLOSE)) - .thenReturn(true); - when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_TRAILER)) - .thenReturn(true); - when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_CANCEL)) - .thenReturn(true); - when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_REQUEST_MESSAGE)) - .thenReturn(false); - when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_MESSAGE)) - .thenReturn(false); - - GcpObservability observability = - GcpObservability.grpcInit( - mockSink, config, channelInterceptorFactory, serverInterceptorFactory); - Server server = ServerBuilder.forPort(0).addService(new LoggingTestHelper.SimpleServiceImpl()) - .build().start(); - int port = cleanupRule.register(server).getPort(); - SimpleServiceGrpc.SimpleServiceBlockingStub stub = SimpleServiceGrpc.newBlockingStub( - cleanupRule.register(ManagedChannelBuilder.forAddress("localhost", port) - .usePlaintext().build())); - assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub)) - .isEqualTo("Hello buddy"); - // Total number of calls should have been 14 (6 from client and 6 from server) - // Since cancel is not invoked, it will be 12. - // Request message(Total count:2 (1 from client and 1 from server) and Response message(count:2) - // events are not in the event_types list, i.e 14 - 2(cancel) - 2(req_msg) - 2(resp_msg) = 8 - assertThat(Mockito.mockingDetails(mockLogHelper).getInvocations().size()).isEqualTo(8); - observability.close(); + public void clientServer_interceptorCalled_logFewEvents() throws Exception { + Class runnable = + classLoader.loadClass(LoggingTest.StaticTestingClassLogFewEvents.class.getName()); + ((Runnable) runnable.getDeclaredConstructor().newInstance()).run(); + } + + // UsedReflectively + public static final class StaticTestingClassEndtoEndLogging implements Runnable { + + @Override + public void run() { + Sink sink = new GcpLogSink(PROJECT_ID, locationTags, customTags, FLUSH_LIMIT); + ObservabilityConfig config = mock(ObservabilityConfig.class); + LogHelper spyLogHelper = spy(new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER)); + ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class); + InternalLoggingChannelInterceptor.Factory channelInterceptorFactory = + new InternalLoggingChannelInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper); + InternalLoggingServerInterceptor.Factory serverInterceptorFactory = + new InternalLoggingServerInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper); + + when(config.isEnableCloudLogging()).thenReturn(true); + FilterParams logAlwaysFilterParams = FilterParams.create(true, 0, 0); + when(mockFilterHelper.isMethodToBeLogged(any(MethodDescriptor.class))) + .thenReturn(logAlwaysFilterParams); + when(mockFilterHelper.isEventToBeLogged(any(GrpcLogRecord.EventType.class))).thenReturn(true); + + try (GcpObservability unused = + GcpObservability.grpcInit( + sink, config, channelInterceptorFactory, serverInterceptorFactory)) { + Server server = + ServerBuilder.forPort(0) + .addService(new LoggingTestHelper.SimpleServiceImpl()) + .build() + .start(); + int port = cleanupRule.register(server).getPort(); + SimpleServiceGrpc.SimpleServiceBlockingStub stub = + SimpleServiceGrpc.newBlockingStub( + cleanupRule.register( + ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build())); + assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub)) + .isEqualTo("Hello buddy"); + assertThat(Mockito.mockingDetails(spyLogHelper).getInvocations().size()).isGreaterThan(11); + } catch (IOException e) { + throw new AssertionError("Exception while testing logging", e); + } + } + } + + public static final class StaticTestingClassLogNever implements Runnable { + + @Override + public void run() { + Sink mockSink = mock(GcpLogSink.class); + ObservabilityConfig config = mock(ObservabilityConfig.class); + LogHelper spyLogHelper = spy(new LogHelper(mockSink, TimeProvider.SYSTEM_TIME_PROVIDER)); + ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class); + InternalLoggingChannelInterceptor.Factory channelInterceptorFactory = + new InternalLoggingChannelInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper); + InternalLoggingServerInterceptor.Factory serverInterceptorFactory = + new InternalLoggingServerInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper); + + when(config.isEnableCloudLogging()).thenReturn(true); + FilterParams logNeverFilterParams = FilterParams.create(false, 0, 0); + when(mockFilterHelper.isMethodToBeLogged(any(MethodDescriptor.class))) + .thenReturn(logNeverFilterParams); + when(mockFilterHelper.isEventToBeLogged(any(GrpcLogRecord.EventType.class))).thenReturn(true); + + try (GcpObservability unused = + GcpObservability.grpcInit( + mockSink, config, channelInterceptorFactory, serverInterceptorFactory)) { + Server server = + ServerBuilder.forPort(0) + .addService(new LoggingTestHelper.SimpleServiceImpl()) + .build() + .start(); + int port = cleanupRule.register(server).getPort(); + SimpleServiceGrpc.SimpleServiceBlockingStub stub = + SimpleServiceGrpc.newBlockingStub( + cleanupRule.register( + ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build())); + assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub)) + .isEqualTo("Hello buddy"); + verifyNoInteractions(spyLogHelper); + verifyNoInteractions(mockSink); + } catch (IOException e) { + throw new AssertionError("Exception while testing logging event filter", e); + } + } + } + + public static final class StaticTestingClassLogFewEvents implements Runnable { + + @Override + public void run() { + Sink mockSink = mock(GcpLogSink.class); + ObservabilityConfig config = mock(ObservabilityConfig.class); + LogHelper mockLogHelper = mock(LogHelper.class); + ConfigFilterHelper mockFilterHelper2 = mock(ConfigFilterHelper.class); + InternalLoggingChannelInterceptor.Factory channelInterceptorFactory = + new InternalLoggingChannelInterceptor.FactoryImpl(mockLogHelper, mockFilterHelper2); + InternalLoggingServerInterceptor.Factory serverInterceptorFactory = + new InternalLoggingServerInterceptor.FactoryImpl(mockLogHelper, mockFilterHelper2); + + when(config.isEnableCloudLogging()).thenReturn(true); + FilterParams logAlwaysFilterParams = FilterParams.create(true, 0, 0); + when(mockFilterHelper2.isMethodToBeLogged(any(MethodDescriptor.class))) + .thenReturn(logAlwaysFilterParams); + when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_REQUEST_HEADER)) + .thenReturn(true); + when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_HEADER)) + .thenReturn(true); + when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_HALF_CLOSE)).thenReturn(true); + when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_TRAILER)).thenReturn(true); + when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_CANCEL)).thenReturn(true); + when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_REQUEST_MESSAGE)) + .thenReturn(false); + when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_MESSAGE)) + .thenReturn(false); + + try (GcpObservability observability = + GcpObservability.grpcInit( + mockSink, config, channelInterceptorFactory, serverInterceptorFactory)) { + Server server = + ServerBuilder.forPort(0) + .addService(new LoggingTestHelper.SimpleServiceImpl()) + .build() + .start(); + int port = cleanupRule.register(server).getPort(); + SimpleServiceGrpc.SimpleServiceBlockingStub stub = + SimpleServiceGrpc.newBlockingStub( + cleanupRule.register( + ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build())); + assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub)) + .isEqualTo("Hello buddy"); + // Total number of calls should have been 14 (6 from client and 6 from server) + // Since cancel is not invoked, it will be 12. + // Request message(Total count:2 (1 from client and 1 from server) and Response + // message(count:2) + // events are not in the event_types list, i.e 14 - 2(cancel) - 2(req_msg) - 2(resp_msg) + // = 8 + assertThat(Mockito.mockingDetails(mockLogHelper).getInvocations().size()).isEqualTo(8); + } catch (IOException e) { + throw new AssertionError("Exception while testing logging event filter", e); + } + } } -} \ No newline at end of file +} diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/MetricsTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/MetricsTest.java index 3d879ee8b1d..23c7821c28c 100644 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/MetricsTest.java +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/MetricsTest.java @@ -30,6 +30,7 @@ import io.grpc.ManagedChannelBuilder; import io.grpc.Server; import io.grpc.ServerBuilder; +import io.grpc.StaticTestingClassLoader; import io.grpc.gcp.observability.interceptors.InternalLoggingChannelInterceptor; import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor; import io.grpc.gcp.observability.logging.GcpLogSink; @@ -40,8 +41,9 @@ import java.util.Collections; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; +import org.junit.ClassRule; import org.junit.Ignore; -import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -49,80 +51,109 @@ @RunWith(JUnit4.class) public class MetricsTest { + @ClassRule + public static final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); + private static final String PROJECT_ID = "PROJECT"; - private String customTagKey = "Version"; - private String customTagValue = + private static final String TEST_CLIENT_METHOD = "grpc.testing.SimpleService/UnaryRpc"; + private static final String CUSTOM_TAG_KEY = "Version"; + private static final String CUSTOM_TAG_VALUE = String.format("C67J9A-%s", String.valueOf(System.currentTimeMillis())); - private Map customTags = Collections.singletonMap(customTagKey, customTagValue); + private static final Map customTags = Collections.singletonMap(CUSTOM_TAG_KEY, + CUSTOM_TAG_VALUE); - @Rule public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); + private final StaticTestingClassLoader classLoader = + new StaticTestingClassLoader(getClass().getClassLoader(), + Pattern.compile("io\\.grpc\\..*|io\\.opencensus\\..*")); /** - * Cloud Monitoring test using GlobalInterceptors. + * End to end cloud monitoring test. * *

Ignoring test, because it calls external Cloud Monitoring APIs. To test cloud monitoring - * setup locally, 1. Set up Cloud auth credentials 2. Assign permissions to service account to - * write metrics to project specified by variable PROJECT_ID 3. Comment @Ignore annotation + * setup locally, + * 1. Set up Cloud auth credentials + * 2. Assign permissions to service account to write metrics to project specified by variable + * PROJECT_ID + * 3. Comment @Ignore annotation + * 4. This test is expected to pass when ran with above setup. This has been verified manually. */ @Ignore @Test - public void testMetricsExporter() throws IOException, InterruptedException { - Sink mockSink = mock(GcpLogSink.class); - ObservabilityConfig mockConfig = mock(ObservabilityConfig.class); - InternalLoggingChannelInterceptor.Factory mockChannelInterceptorFactory = - mock(InternalLoggingChannelInterceptor.Factory.class); - InternalLoggingServerInterceptor.Factory mockServerInterceptorFactory = - mock(InternalLoggingServerInterceptor.Factory.class); + public void testMetricsExporter() throws Exception { + Class runnable = + classLoader.loadClass(MetricsTest.StaticTestingClassTestMetricsExporter.class.getName()); + ((Runnable) runnable.getDeclaredConstructor().newInstance()).run(); + } + + public static final class StaticTestingClassTestMetricsExporter implements Runnable { + + @Override + public void run() { + Sink mockSink = mock(GcpLogSink.class); + ObservabilityConfig mockConfig = mock(ObservabilityConfig.class); + InternalLoggingChannelInterceptor.Factory mockChannelInterceptorFactory = + mock(InternalLoggingChannelInterceptor.Factory.class); + InternalLoggingServerInterceptor.Factory mockServerInterceptorFactory = + mock(InternalLoggingServerInterceptor.Factory.class); - when(mockConfig.isEnableCloudMonitoring()).thenReturn(true); - when(mockConfig.getDestinationProjectId()).thenReturn(PROJECT_ID); + when(mockConfig.isEnableCloudMonitoring()).thenReturn(true); + when(mockConfig.getDestinationProjectId()).thenReturn(PROJECT_ID); - GcpObservability observability = - GcpObservability.grpcInit( - mockSink, mockConfig, mockChannelInterceptorFactory, mockServerInterceptorFactory); - observability.registerStackDriverExporter(PROJECT_ID, customTags); + try { + GcpObservability observability = + GcpObservability.grpcInit( + mockSink, mockConfig, mockChannelInterceptorFactory, mockServerInterceptorFactory); + observability.registerStackDriverExporter(PROJECT_ID, customTags); - Server server = - ServerBuilder.forPort(0) - .addService(new LoggingTestHelper.SimpleServiceImpl()) - .build() - .start(); - int port = cleanupRule.register(server).getPort(); - SimpleServiceGrpc.SimpleServiceBlockingStub stub = - SimpleServiceGrpc.newBlockingStub( - cleanupRule.register( - ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build())); - assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub)).isEqualTo("Hello buddy"); - // Adding sleep to ensure metrics are exported before querying cloud monitoring backend - TimeUnit.SECONDS.sleep(40); + Server server = + ServerBuilder.forPort(0) + .addService(new LoggingTestHelper.SimpleServiceImpl()) + .build() + .start(); + int port = cleanupRule.register(server).getPort(); + SimpleServiceGrpc.SimpleServiceBlockingStub stub = + SimpleServiceGrpc.newBlockingStub( + cleanupRule.register( + ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build())); + assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub)) + .isEqualTo("Hello buddy"); + // Adding sleep to ensure metrics are exported before querying cloud monitoring backend + TimeUnit.SECONDS.sleep(40); - // This checks Cloud monitoring for the new metrics that was just exported. - MetricServiceClient metricServiceClient = MetricServiceClient.create(); - // Restrict time to last 1 minute - long startMillis = System.currentTimeMillis() - ((60 * 1) * 1000); - TimeInterval interval = - TimeInterval.newBuilder() - .setStartTime(Timestamps.fromMillis(startMillis)) - .setEndTime(Timestamps.fromMillis(System.currentTimeMillis())) - .build(); - // Timeseries data - String metricsFilter = - String.format( - "metric.type=\"custom.googleapis.com/opencensus/grpc.io/client/completed_rpcs\"" - + " AND metric.labels.grpc_client_method=\"grpc.testing.SimpleService/UnaryRpc\"" - + " AND metric.labels.%s=%s", - customTagKey, customTagValue); - ListTimeSeriesRequest metricsRequest = - ListTimeSeriesRequest.newBuilder() - .setName(ProjectName.of(PROJECT_ID).toString()) - .setFilter(metricsFilter) - .setInterval(interval) - .build(); - ListTimeSeriesPagedResponse response = metricServiceClient.listTimeSeries(metricsRequest); - assertThat(response.iterateAll()).isNotEmpty(); - for (TimeSeries ts : response.iterateAll()) { - assertThat(ts.getPoints(0).getValue().getInt64Value()).isEqualTo(1); + // This checks Cloud monitoring for the new metrics that was just exported. + MetricServiceClient metricServiceClient = MetricServiceClient.create(); + // Restrict time to last 1 minute + long startMillis = System.currentTimeMillis() - ((60 * 1) * 1000); + TimeInterval interval = + TimeInterval.newBuilder() + .setStartTime(Timestamps.fromMillis(startMillis)) + .setEndTime(Timestamps.fromMillis(System.currentTimeMillis())) + .build(); + // Timeseries data + String metricsFilter = + String.format( + "metric.type=\"custom.googleapis.com/opencensus/grpc.io/client/completed_rpcs\"" + + " AND metric.labels.grpc_client_method=\"%s\"" + + " AND metric.labels.%s=%s", + TEST_CLIENT_METHOD, CUSTOM_TAG_KEY, CUSTOM_TAG_VALUE); + ListTimeSeriesRequest metricsRequest = + ListTimeSeriesRequest.newBuilder() + .setName(ProjectName.of(PROJECT_ID).toString()) + .setFilter(metricsFilter) + .setInterval(interval) + .build(); + ListTimeSeriesPagedResponse response = metricServiceClient.listTimeSeries(metricsRequest); + assertThat(response.iterateAll()).isNotEmpty(); + for (TimeSeries ts : response.iterateAll()) { + assertThat(ts.getMetric().getLabelsMap().get("grpc_client_method")) + .isEqualTo(TEST_CLIENT_METHOD); + assertThat(ts.getMetric().getLabelsMap().get("grpc_client_status")).isEqualTo("OK"); + assertThat(ts.getPoints(0).getValue().getInt64Value()).isEqualTo(1); + } + observability.close(); + } catch (IOException | InterruptedException e) { + throw new AssertionError("Exception while testing metrics", e); + } } - observability.close(); } } diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/TracesTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/TracesTest.java index a1900b05ffa..fb534dea584 100644 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/TracesTest.java +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/TracesTest.java @@ -30,6 +30,7 @@ import io.grpc.ManagedChannelBuilder; import io.grpc.Server; import io.grpc.ServerBuilder; +import io.grpc.StaticTestingClassLoader; import io.grpc.gcp.observability.interceptors.InternalLoggingChannelInterceptor; import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor; import io.grpc.gcp.observability.logging.GcpLogSink; @@ -43,8 +44,9 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; +import org.junit.ClassRule; import org.junit.Ignore; -import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -52,86 +54,112 @@ @RunWith(JUnit4.class) public class TracesTest { + @ClassRule + public static final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); + private static final String PROJECT_ID = "PROJECT"; - private String customTagKey = "service"; - private String customTagValue = + private static final String CUSTOM_TAG_KEY = "service"; + private static final String CUSTOM_TAG_VALUE = String.format("payment-%s", String.valueOf(System.currentTimeMillis())); - private Map customTags = Collections.singletonMap(customTagKey, customTagValue); + private static final Map customTags = + Collections.singletonMap(CUSTOM_TAG_KEY, CUSTOM_TAG_VALUE); - @Rule public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); + private final StaticTestingClassLoader classLoader = + new StaticTestingClassLoader(getClass().getClassLoader(), + Pattern.compile("io\\.grpc\\..*|io\\.opencensus\\..*")); /** - * Cloud Trace test using GlobalInterceptors. + * End to end cloud trace test. * *

Ignoring test, because it calls external Cloud Tracing APIs. To test cloud trace setup - * locally, 1. Set up Cloud auth credentials 2. Assign permissions to service account to write - * traces to project specified by variable PROJECT_ID 3. Comment @Ignore annotation + * locally, + * 1. Set up Cloud auth credentials + * 2. Assign permissions to service account to write traces to project specified by variable + * PROJECT_ID + * 3. Comment @Ignore annotation + * 4. This test is expected to pass when ran with above setup. This has been verified manually. */ @Ignore @Test - public void testTracesExporter() throws IOException, InterruptedException { - Sink mockSink = mock(GcpLogSink.class); - ObservabilityConfig mockConfig = mock(ObservabilityConfig.class); - InternalLoggingChannelInterceptor.Factory mockChannelInterceptorFactory = - mock(InternalLoggingChannelInterceptor.Factory.class); - InternalLoggingServerInterceptor.Factory mockServerInterceptorFactory = - mock(InternalLoggingServerInterceptor.Factory.class); + public void testTracesExporter() throws Exception { + Class runnable = + classLoader.loadClass(TracesTest.StaticTestingClassTestTracesExporter.class.getName()); + ((Runnable) runnable.getDeclaredConstructor().newInstance()).run(); + } - when(mockConfig.isEnableCloudTracing()).thenReturn(true); - when(mockConfig.getSampler()).thenReturn(Samplers.alwaysSample()); - when(mockConfig.getDestinationProjectId()).thenReturn(PROJECT_ID); + public static final class StaticTestingClassTestTracesExporter implements Runnable { - GcpObservability observability = - GcpObservability.grpcInit( - mockSink, mockConfig, mockChannelInterceptorFactory, mockServerInterceptorFactory); - observability.registerStackDriverExporter(PROJECT_ID, customTags); + @Override + public void run() { + Sink mockSink = mock(GcpLogSink.class); + ObservabilityConfig mockConfig = mock(ObservabilityConfig.class); + InternalLoggingChannelInterceptor.Factory mockChannelInterceptorFactory = + mock(InternalLoggingChannelInterceptor.Factory.class); + InternalLoggingServerInterceptor.Factory mockServerInterceptorFactory = + mock(InternalLoggingServerInterceptor.Factory.class); - Server server = - ServerBuilder.forPort(0) - .addService(new LoggingTestHelper.SimpleServiceImpl()) - .build() - .start(); - int port = cleanupRule.register(server).getPort(); - SimpleServiceGrpc.SimpleServiceBlockingStub stub = - SimpleServiceGrpc.newBlockingStub( - cleanupRule.register( - ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build())); - assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub)).isEqualTo("Hello buddy"); - // Adding sleep to ensure traces are exported before querying cloud monitoring backend - TimeUnit.SECONDS.sleep(5); + when(mockConfig.isEnableCloudTracing()).thenReturn(true); + when(mockConfig.getSampler()).thenReturn(Samplers.alwaysSample()); + when(mockConfig.getDestinationProjectId()).thenReturn(PROJECT_ID); - TraceServiceClient traceServiceClient = TraceServiceClient.create(); - String traceFilter = - String.format("span:Sent.grpc.testing.SimpleService +%s:%s", customTagKey, customTagValue); - String traceOrder = "start"; - // Restrict time to last 1 minute - long startMillis = System.currentTimeMillis() - ((60 * 1) * 1000); - ListTracesRequest traceRequest = - ListTracesRequest.newBuilder() - .setProjectId(PROJECT_ID) - .setStartTime(Timestamps.fromMillis(startMillis)) - .setEndTime(Timestamps.fromMillis(System.currentTimeMillis())) - .setFilter(traceFilter) - .setOrderBy(traceOrder) - .build(); - ListTracesPagedResponse traceResponse = traceServiceClient.listTraces(traceRequest); - assertThat(traceResponse.iterateAll()).isNotEmpty(); - List traceIdList = new ArrayList<>(); - for (Trace t : traceResponse.iterateAll()) { - traceIdList.add(t.getTraceId()); - } + try { + GcpObservability observability = + GcpObservability.grpcInit( + mockSink, mockConfig, mockChannelInterceptorFactory, mockServerInterceptorFactory); + observability.registerStackDriverExporter(PROJECT_ID, customTags); + + Server server = + ServerBuilder.forPort(0) + .addService(new LoggingTestHelper.SimpleServiceImpl()) + .build() + .start(); + int port = cleanupRule.register(server).getPort(); + SimpleServiceGrpc.SimpleServiceBlockingStub stub = + SimpleServiceGrpc.newBlockingStub( + cleanupRule.register( + ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build())); + assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub)) + .isEqualTo("Hello buddy"); + // Adding sleep to ensure traces are exported before querying cloud tracing backend + TimeUnit.SECONDS.sleep(10); + + TraceServiceClient traceServiceClient = TraceServiceClient.create(); + String traceFilter = + String.format( + "span:Sent.grpc.testing.SimpleService +%s:%s", CUSTOM_TAG_KEY, CUSTOM_TAG_VALUE); + String traceOrder = "start"; + // Restrict time to last 1 minute + long startMillis = System.currentTimeMillis() - ((60 * 1) * 1000); + ListTracesRequest traceRequest = + ListTracesRequest.newBuilder() + .setProjectId(PROJECT_ID) + .setStartTime(Timestamps.fromMillis(startMillis)) + .setEndTime(Timestamps.fromMillis(System.currentTimeMillis())) + .setFilter(traceFilter) + .setOrderBy(traceOrder) + .build(); + ListTracesPagedResponse traceResponse = traceServiceClient.listTraces(traceRequest); + assertThat(traceResponse.iterateAll()).isNotEmpty(); + List traceIdList = new ArrayList<>(); + for (Trace t : traceResponse.iterateAll()) { + traceIdList.add(t.getTraceId()); + } - for (String traceId : traceIdList) { - // This checks Cloud trace for the new trace that was just created. - GetTraceRequest getTraceRequest = - GetTraceRequest.newBuilder().setProjectId(PROJECT_ID).setTraceId(traceId).build(); - Trace trace = traceServiceClient.getTrace(getTraceRequest); - assertThat(trace.getSpansList()).hasSize(3); - for (TraceSpan span : trace.getSpansList()) { - assertThat(span.getName()).contains("grpc.testing.SimpleService.UnaryRpc"); - assertThat(span.getLabelsMap().get(customTagKey)).isEqualTo(customTagValue); + for (String traceId : traceIdList) { + // This checks Cloud trace for the new trace that was just created. + GetTraceRequest getTraceRequest = + GetTraceRequest.newBuilder().setProjectId(PROJECT_ID).setTraceId(traceId).build(); + Trace trace = traceServiceClient.getTrace(getTraceRequest); + assertThat(trace.getSpansList()).hasSize(3); + for (TraceSpan span : trace.getSpansList()) { + assertThat(span.getName()).contains("grpc.testing.SimpleService.UnaryRpc"); + assertThat(span.getLabelsMap().get(CUSTOM_TAG_KEY)).isEqualTo(CUSTOM_TAG_VALUE); + } + } + observability.close(); + } catch (IOException | InterruptedException e) { + throw new AssertionError("Exception while testing traces", e); } } - observability.close(); } } diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/GcpLogSinkTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/GcpLogSinkTest.java index fd974ae7c79..1cdefae542a 100644 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/GcpLogSinkTest.java +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/GcpLogSinkTest.java @@ -18,9 +18,7 @@ import static com.google.common.truth.Truth.assertThat; import static org.mockito.ArgumentMatchers.anyIterable; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -40,13 +38,12 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.ArgumentCaptor; -import org.mockito.Mockito; +import org.mockito.Mock; import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; @@ -66,18 +63,18 @@ public class GcpLogSinkTest { "pod_name", "app1-6c7c58f897-n92c5"); private static final Map customTags = ImmutableMap.of("KEY1", "Value1", "KEY2", "VALUE2"); - private static final long flushLimit = 10L; + private static final long FLUSH_LIMIT = 10L; // gRPC is expected to always use this log name when reporting to GCP cloud logging. private static final String expectedLogName = "microservices.googleapis.com%2Fobservability%2Fgrpc"; - private final long seqId = 1; - private final String destProjectName = "PROJECT"; - private final String serviceName = "service"; - private final String methodName = "method"; - private final String authority = "authority"; - private final Duration timeout = Durations.fromMillis(1234); - private final String rpcId = "d155e885-9587-4e77-81f7-3aa5a443d47f"; - private final GrpcLogRecord logProto = GrpcLogRecord.newBuilder() + private static final long seqId = 1; + private static final String destProjectName = "PROJECT"; + private static final String serviceName = "service"; + private static final String methodName = "method"; + private static final String authority = "authority"; + private static final Duration timeout = Durations.fromMillis(1234); + private static final String rpcId = "d155e885-9587-4e77-81f7-3aa5a443d47f"; + private static final GrpcLogRecord LOG_PROTO = GrpcLogRecord.newBuilder() .setSequenceId(seqId) .setServiceName(serviceName) .setMethodName(methodName) @@ -87,7 +84,7 @@ public class GcpLogSinkTest { .setEventLogger(EventLogger.LOGGER_CLIENT) .setRpcId(rpcId) .build(); - private final Struct expectedStructLogProto = Struct.newBuilder() + private static final Struct EXPECTED_STRUCT_LOG_PROTO = Struct.newBuilder() .putFields("sequence_id", Value.newBuilder().setStringValue(String.valueOf(seqId)).build()) .putFields("service_name", Value.newBuilder().setStringValue(serviceName).build()) .putFields("method_name", Value.newBuilder().setStringValue(methodName).build()) @@ -99,33 +96,29 @@ public class GcpLogSinkTest { String.valueOf(EventLogger.LOGGER_CLIENT)).build()) .putFields("rpc_id", Value.newBuilder().setStringValue(rpcId).build()) .build(); + @Mock private Logging mockLogging; - private GcpLogSink spySink; - - @Before - public void setUp() { - mockLogging = mock(Logging.class); - spySink = spy(new GcpLogSink(destProjectName, locationTags, - customTags, flushLimit)); - Mockito.doReturn(mockLogging).when(spySink).createLoggingClient(); - } @Test public void createSink() { - assertThat(spySink).isInstanceOf(GcpLogSink.class); + GcpLogSink sink = new GcpLogSink(mockLogging, destProjectName, locationTags, + customTags, FLUSH_LIMIT); + assertThat(sink).isInstanceOf(Sink.class); } @Test @SuppressWarnings("unchecked") public void verifyWrite() throws Exception { - spySink.write(logProto); + GcpLogSink sink = new GcpLogSink(mockLogging, destProjectName, locationTags, + customTags, FLUSH_LIMIT); + sink.write(LOG_PROTO); ArgumentCaptor> logEntrySetCaptor = ArgumentCaptor.forClass( (Class) Collection.class); verify(mockLogging, times(1)).write(logEntrySetCaptor.capture()); for (Iterator it = logEntrySetCaptor.getValue().iterator(); it.hasNext(); ) { LogEntry entry = it.next(); - assertThat(entry.getPayload().getData()).isEqualTo(expectedStructLogProto); + assertThat(entry.getPayload().getData()).isEqualTo(EXPECTED_STRUCT_LOG_PROTO); assertThat(entry.getLogName()).isEqualTo(expectedLogName); } verifyNoMoreInteractions(mockLogging); @@ -134,8 +127,10 @@ public void verifyWrite() throws Exception { @Test @SuppressWarnings("unchecked") public void verifyWriteWithTags() { + GcpLogSink sink = new GcpLogSink(mockLogging, destProjectName, locationTags, + customTags, FLUSH_LIMIT); MonitoredResource expectedMonitoredResource = GcpLogSink.getResource(locationTags); - spySink.write(logProto); + sink.write(LOG_PROTO); ArgumentCaptor> logEntrySetCaptor = ArgumentCaptor.forClass( (Class) Collection.class); @@ -145,7 +140,7 @@ public void verifyWriteWithTags() { LogEntry entry = it.next(); assertThat(entry.getResource()).isEqualTo(expectedMonitoredResource); assertThat(entry.getLabels()).isEqualTo(customTags); - assertThat(entry.getPayload().getData()).isEqualTo(expectedStructLogProto); + assertThat(entry.getPayload().getData()).isEqualTo(EXPECTED_STRUCT_LOG_PROTO); assertThat(entry.getLogName()).isEqualTo(expectedLogName); } verifyNoMoreInteractions(mockLogging); @@ -156,10 +151,9 @@ public void verifyWriteWithTags() { public void emptyCustomTags_labelsNotSet() { Map emptyCustomTags = null; Map expectedEmptyLabels = new HashMap<>(); - GcpLogSink spySink2 = spy(new GcpLogSink(destProjectName, locationTags, - emptyCustomTags, flushLimit)); - Mockito.doReturn(mockLogging).when(spySink2).createLoggingClient(); - spySink2.write(logProto); + GcpLogSink sink = new GcpLogSink(mockLogging, destProjectName, locationTags, + emptyCustomTags, FLUSH_LIMIT); + sink.write(LOG_PROTO); ArgumentCaptor> logEntrySetCaptor = ArgumentCaptor.forClass( (Class) Collection.class); @@ -167,7 +161,7 @@ public void emptyCustomTags_labelsNotSet() { for (Iterator it = logEntrySetCaptor.getValue().iterator(); it.hasNext(); ) { LogEntry entry = it.next(); assertThat(entry.getLabels()).isEqualTo(expectedEmptyLabels); - assertThat(entry.getPayload().getData()).isEqualTo(expectedStructLogProto); + assertThat(entry.getPayload().getData()).isEqualTo(EXPECTED_STRUCT_LOG_PROTO); } } @@ -178,10 +172,9 @@ public void emptyCustomTags_setSourceProject() { String destinationProjectId = "DESTINATION_PROJECT"; Map expectedLabels = GcpLogSink.getCustomTags(emptyCustomTags, locationTags, destinationProjectId); - GcpLogSink spySink2 = spy(new GcpLogSink(destinationProjectId, locationTags, - emptyCustomTags, flushLimit)); - Mockito.doReturn(mockLogging).when(spySink2).createLoggingClient(); - spySink2.write(logProto); + GcpLogSink sink = new GcpLogSink(mockLogging, destinationProjectId, locationTags, + emptyCustomTags, FLUSH_LIMIT); + sink.write(LOG_PROTO); ArgumentCaptor> logEntrySetCaptor = ArgumentCaptor.forClass( (Class) Collection.class); @@ -189,33 +182,31 @@ public void emptyCustomTags_setSourceProject() { for (Iterator it = logEntrySetCaptor.getValue().iterator(); it.hasNext(); ) { LogEntry entry = it.next(); assertThat(entry.getLabels()).isEqualTo(expectedLabels); - assertThat(entry.getPayload().getData()).isEqualTo(expectedStructLogProto); + assertThat(entry.getPayload().getData()).isEqualTo(EXPECTED_STRUCT_LOG_PROTO); } } @Test public void verifyFlush() { long lowerFlushLimit = 2L; - GcpLogSink spySink2 = spy(new GcpLogSink(destProjectName, locationTags, - customTags, lowerFlushLimit)); - Mockito.doReturn(mockLogging).when(spySink2).createLoggingClient(); - spySink2.write(logProto); + GcpLogSink sink = new GcpLogSink(mockLogging, destProjectName, locationTags, + customTags, lowerFlushLimit); + sink.write(LOG_PROTO); verify(mockLogging, never()).flush(); - spySink2.write(logProto); + sink.write(LOG_PROTO); verify(mockLogging, times(1)).flush(); - spySink2.write(logProto); - spySink2.write(logProto); + sink.write(LOG_PROTO); + sink.write(LOG_PROTO); verify(mockLogging, times(2)).flush(); } @Test public void verifyClose() throws Exception { - GcpLogSink spySink2 = spy(new GcpLogSink(destProjectName, locationTags, - customTags, flushLimit)); - Mockito.doReturn(mockLogging).when(spySink2).createLoggingClient(); - spySink2.write(logProto); + GcpLogSink sink = new GcpLogSink(mockLogging, destProjectName, locationTags, + customTags, FLUSH_LIMIT); + sink.write(LOG_PROTO); verify(mockLogging, times(1)).write(anyIterable()); - spySink2.close(); + sink.close(); verify(mockLogging).close(); verifyNoMoreInteractions(mockLogging); } From 31a531554dd7a4b8334190df8cb5babac565fbc9 Mon Sep 17 00:00:00 2001 From: Vindhya Ningegowda Date: Mon, 8 Aug 2022 11:10:51 -0700 Subject: [PATCH 5/5] addressed comments(2) --- .../gcp/observability/GcpObservability.java | 2 + .../InternalLoggingChannelInterceptor.java | 1 + .../InternalLoggingServerInterceptor.java | 1 + .../grpc/gcp/observability/LoggingTest.java | 7 +- .../grpc/gcp/observability/MetricsTest.java | 4 +- .../io/grpc/gcp/observability/TracesTest.java | 4 +- .../observability/logging/GcpLogSinkTest.java | 81 +++++++++---------- 7 files changed, 49 insertions(+), 51 deletions(-) diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java index f8dae8300e3..e077e12a2a6 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java @@ -116,6 +116,8 @@ public void close() { } } + // TODO(dnvindhya): Remove InterceptorFactory and replace with respective + // interceptors private void setProducer( InternalLoggingChannelInterceptor.Factory channelInterceptorFactory, InternalLoggingServerInterceptor.Factory serverInterceptorFactory) { diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingChannelInterceptor.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingChannelInterceptor.java index 570ea35da62..81e0a9819af 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingChannelInterceptor.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingChannelInterceptor.java @@ -51,6 +51,7 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto private final LogHelper helper; private final ConfigFilterHelper filterHelper; + // TODO(dnvindhya): Remove factory and use interceptors directly public interface Factory { ClientInterceptor create(); } diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingServerInterceptor.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingServerInterceptor.java index 3797a744d5f..112a1c067b1 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingServerInterceptor.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingServerInterceptor.java @@ -51,6 +51,7 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor private final LogHelper helper; private final ConfigFilterHelper filterHelper; + // TODO(dnvindhya): Remove factory and use interceptors directly public interface Factory { ServerInterceptor create(); } diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingTest.java index 9204af29229..b4dea62c047 100644 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingTest.java +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingTest.java @@ -42,7 +42,6 @@ import io.grpc.testing.GrpcCleanupRule; import io.grpc.testing.protobuf.SimpleServiceGrpc; import java.io.IOException; -import java.util.Map; import java.util.regex.Pattern; import org.junit.ClassRule; import org.junit.Ignore; @@ -58,13 +57,13 @@ public class LoggingTest { public static final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); private static final String PROJECT_ID = "PROJECT"; - private static final Map locationTags = ImmutableMap.of( + private static final ImmutableMap LOCATION_TAGS = ImmutableMap.of( "project_id", "PROJECT", "location", "us-central1-c", "cluster_name", "grpc-observability-cluster", "namespace_name", "default" , "pod_name", "app1-6c7c58f897-n92c5"); - private static final Map customTags = ImmutableMap.of( + private static final ImmutableMap CUSTOM_TAGS = ImmutableMap.of( "KEY1", "Value1", "KEY2", "VALUE2"); private static final long FLUSH_LIMIT = 100L; @@ -111,7 +110,7 @@ public static final class StaticTestingClassEndtoEndLogging implements Runnable @Override public void run() { - Sink sink = new GcpLogSink(PROJECT_ID, locationTags, customTags, FLUSH_LIMIT); + Sink sink = new GcpLogSink(PROJECT_ID, LOCATION_TAGS, CUSTOM_TAGS, FLUSH_LIMIT); ObservabilityConfig config = mock(ObservabilityConfig.class); LogHelper spyLogHelper = spy(new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER)); ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class); diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/MetricsTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/MetricsTest.java index 23c7821c28c..f967b99fbcb 100644 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/MetricsTest.java +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/MetricsTest.java @@ -59,7 +59,7 @@ public class MetricsTest { private static final String CUSTOM_TAG_KEY = "Version"; private static final String CUSTOM_TAG_VALUE = String.format("C67J9A-%s", String.valueOf(System.currentTimeMillis())); - private static final Map customTags = Collections.singletonMap(CUSTOM_TAG_KEY, + private static final Map CUSTOM_TAGS = Collections.singletonMap(CUSTOM_TAG_KEY, CUSTOM_TAG_VALUE); private final StaticTestingClassLoader classLoader = @@ -103,7 +103,7 @@ public void run() { GcpObservability observability = GcpObservability.grpcInit( mockSink, mockConfig, mockChannelInterceptorFactory, mockServerInterceptorFactory); - observability.registerStackDriverExporter(PROJECT_ID, customTags); + observability.registerStackDriverExporter(PROJECT_ID, CUSTOM_TAGS); Server server = ServerBuilder.forPort(0) diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/TracesTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/TracesTest.java index fb534dea584..ec759827737 100644 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/TracesTest.java +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/TracesTest.java @@ -61,7 +61,7 @@ public class TracesTest { private static final String CUSTOM_TAG_KEY = "service"; private static final String CUSTOM_TAG_VALUE = String.format("payment-%s", String.valueOf(System.currentTimeMillis())); - private static final Map customTags = + private static final Map CUSTOM_TAGS = Collections.singletonMap(CUSTOM_TAG_KEY, CUSTOM_TAG_VALUE); private final StaticTestingClassLoader classLoader = @@ -106,7 +106,7 @@ public void run() { GcpObservability observability = GcpObservability.grpcInit( mockSink, mockConfig, mockChannelInterceptorFactory, mockServerInterceptorFactory); - observability.registerStackDriverExporter(PROJECT_ID, customTags); + observability.registerStackDriverExporter(PROJECT_ID, CUSTOM_TAGS); Server server = ServerBuilder.forPort(0) diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/GcpLogSinkTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/GcpLogSinkTest.java index 1cdefae542a..031313c7304 100644 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/GcpLogSinkTest.java +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/GcpLogSinkTest.java @@ -56,61 +56,56 @@ public class GcpLogSinkTest { @Rule public final MockitoRule mockito = MockitoJUnit.rule(); - private static final Map locationTags = ImmutableMap.of("project_id", "PROJECT", + private static final ImmutableMap LOCATION_TAGS = + ImmutableMap.of("project_id", "PROJECT", "location", "us-central1-c", "cluster_name", "grpc-observability-cluster", "namespace_name", "default" , "pod_name", "app1-6c7c58f897-n92c5"); - private static final Map customTags = ImmutableMap.of("KEY1", "Value1", + private static final ImmutableMap CUSTOM_TAGS = + ImmutableMap.of("KEY1", "Value1", "KEY2", "VALUE2"); private static final long FLUSH_LIMIT = 10L; // gRPC is expected to always use this log name when reporting to GCP cloud logging. - private static final String expectedLogName = + private static final String EXPECTED_LOG_NAME = "microservices.googleapis.com%2Fobservability%2Fgrpc"; - private static final long seqId = 1; - private static final String destProjectName = "PROJECT"; - private static final String serviceName = "service"; - private static final String methodName = "method"; - private static final String authority = "authority"; - private static final Duration timeout = Durations.fromMillis(1234); - private static final String rpcId = "d155e885-9587-4e77-81f7-3aa5a443d47f"; + private static final long SEQ_ID = 1; + private static final String DEST_PROJECT_NAME = "PROJECT"; + private static final String SERVICE_NAME = "service"; + private static final String METHOD_NAME = "method"; + private static final String AUTHORITY = "authority"; + private static final Duration TIMEOUT = Durations.fromMillis(1234); + private static final String RPC_ID = "d155e885-9587-4e77-81f7-3aa5a443d47f"; private static final GrpcLogRecord LOG_PROTO = GrpcLogRecord.newBuilder() - .setSequenceId(seqId) - .setServiceName(serviceName) - .setMethodName(methodName) - .setAuthority(authority) - .setTimeout(timeout) + .setSequenceId(SEQ_ID) + .setServiceName(SERVICE_NAME) + .setMethodName(METHOD_NAME) + .setAuthority(AUTHORITY) + .setTimeout(TIMEOUT) .setEventType(EventType.GRPC_CALL_REQUEST_HEADER) .setEventLogger(EventLogger.LOGGER_CLIENT) - .setRpcId(rpcId) + .setRpcId(RPC_ID) .build(); private static final Struct EXPECTED_STRUCT_LOG_PROTO = Struct.newBuilder() - .putFields("sequence_id", Value.newBuilder().setStringValue(String.valueOf(seqId)).build()) - .putFields("service_name", Value.newBuilder().setStringValue(serviceName).build()) - .putFields("method_name", Value.newBuilder().setStringValue(methodName).build()) - .putFields("authority", Value.newBuilder().setStringValue(authority).build()) + .putFields("sequence_id", Value.newBuilder().setStringValue(String.valueOf(SEQ_ID)).build()) + .putFields("service_name", Value.newBuilder().setStringValue(SERVICE_NAME).build()) + .putFields("method_name", Value.newBuilder().setStringValue(METHOD_NAME).build()) + .putFields("authority", Value.newBuilder().setStringValue(AUTHORITY).build()) .putFields("timeout", Value.newBuilder().setStringValue("1.234s").build()) .putFields("event_type", Value.newBuilder().setStringValue( String.valueOf(EventType.GRPC_CALL_REQUEST_HEADER)).build()) .putFields("event_logger", Value.newBuilder().setStringValue( String.valueOf(EventLogger.LOGGER_CLIENT)).build()) - .putFields("rpc_id", Value.newBuilder().setStringValue(rpcId).build()) + .putFields("rpc_id", Value.newBuilder().setStringValue(RPC_ID).build()) .build(); @Mock private Logging mockLogging; - @Test - public void createSink() { - GcpLogSink sink = new GcpLogSink(mockLogging, destProjectName, locationTags, - customTags, FLUSH_LIMIT); - assertThat(sink).isInstanceOf(Sink.class); - } - @Test @SuppressWarnings("unchecked") public void verifyWrite() throws Exception { - GcpLogSink sink = new GcpLogSink(mockLogging, destProjectName, locationTags, - customTags, FLUSH_LIMIT); + GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS, + CUSTOM_TAGS, FLUSH_LIMIT); sink.write(LOG_PROTO); ArgumentCaptor> logEntrySetCaptor = ArgumentCaptor.forClass( @@ -119,7 +114,7 @@ public void verifyWrite() throws Exception { for (Iterator it = logEntrySetCaptor.getValue().iterator(); it.hasNext(); ) { LogEntry entry = it.next(); assertThat(entry.getPayload().getData()).isEqualTo(EXPECTED_STRUCT_LOG_PROTO); - assertThat(entry.getLogName()).isEqualTo(expectedLogName); + assertThat(entry.getLogName()).isEqualTo(EXPECTED_LOG_NAME); } verifyNoMoreInteractions(mockLogging); } @@ -127,9 +122,9 @@ public void verifyWrite() throws Exception { @Test @SuppressWarnings("unchecked") public void verifyWriteWithTags() { - GcpLogSink sink = new GcpLogSink(mockLogging, destProjectName, locationTags, - customTags, FLUSH_LIMIT); - MonitoredResource expectedMonitoredResource = GcpLogSink.getResource(locationTags); + GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS, + CUSTOM_TAGS, FLUSH_LIMIT); + MonitoredResource expectedMonitoredResource = GcpLogSink.getResource(LOCATION_TAGS); sink.write(LOG_PROTO); ArgumentCaptor> logEntrySetCaptor = ArgumentCaptor.forClass( @@ -139,9 +134,9 @@ public void verifyWriteWithTags() { for (Iterator it = logEntrySetCaptor.getValue().iterator(); it.hasNext(); ) { LogEntry entry = it.next(); assertThat(entry.getResource()).isEqualTo(expectedMonitoredResource); - assertThat(entry.getLabels()).isEqualTo(customTags); + assertThat(entry.getLabels()).isEqualTo(CUSTOM_TAGS); assertThat(entry.getPayload().getData()).isEqualTo(EXPECTED_STRUCT_LOG_PROTO); - assertThat(entry.getLogName()).isEqualTo(expectedLogName); + assertThat(entry.getLogName()).isEqualTo(EXPECTED_LOG_NAME); } verifyNoMoreInteractions(mockLogging); } @@ -151,7 +146,7 @@ public void verifyWriteWithTags() { public void emptyCustomTags_labelsNotSet() { Map emptyCustomTags = null; Map expectedEmptyLabels = new HashMap<>(); - GcpLogSink sink = new GcpLogSink(mockLogging, destProjectName, locationTags, + GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS, emptyCustomTags, FLUSH_LIMIT); sink.write(LOG_PROTO); @@ -170,9 +165,9 @@ public void emptyCustomTags_labelsNotSet() { public void emptyCustomTags_setSourceProject() { Map emptyCustomTags = null; String destinationProjectId = "DESTINATION_PROJECT"; - Map expectedLabels = GcpLogSink.getCustomTags(emptyCustomTags, locationTags, + Map expectedLabels = GcpLogSink.getCustomTags(emptyCustomTags, LOCATION_TAGS, destinationProjectId); - GcpLogSink sink = new GcpLogSink(mockLogging, destinationProjectId, locationTags, + GcpLogSink sink = new GcpLogSink(mockLogging, destinationProjectId, LOCATION_TAGS, emptyCustomTags, FLUSH_LIMIT); sink.write(LOG_PROTO); @@ -189,8 +184,8 @@ public void emptyCustomTags_setSourceProject() { @Test public void verifyFlush() { long lowerFlushLimit = 2L; - GcpLogSink sink = new GcpLogSink(mockLogging, destProjectName, locationTags, - customTags, lowerFlushLimit); + GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS, + CUSTOM_TAGS, lowerFlushLimit); sink.write(LOG_PROTO); verify(mockLogging, never()).flush(); sink.write(LOG_PROTO); @@ -202,8 +197,8 @@ public void verifyFlush() { @Test public void verifyClose() throws Exception { - GcpLogSink sink = new GcpLogSink(mockLogging, destProjectName, locationTags, - customTags, FLUSH_LIMIT); + GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS, + CUSTOM_TAGS, FLUSH_LIMIT); sink.write(LOG_PROTO); verify(mockLogging, times(1)).write(anyIterable()); sink.close();