From 2d7302d4fdfdea3394e4db148a120b927e8815cf Mon Sep 17 00:00:00 2001 From: sanjaypujare Date: Mon, 21 Mar 2022 16:05:43 -0700 Subject: [PATCH] observability: integrate globalTags and configuration into rest of observability (#9000) * observability: integrate globalTags and configuration into rest of observability wire observabilityConfig and globalTags into Observability and make these available to the channel and server interceptors and specifically to the LogHelper. Also separate globalTags into custom-tags and location-tags as required by the log-helper --- .../grpc/observability/GlobalLoggingTags.java | 26 ++-- .../observability/LoggingChannelProvider.java | 2 +- .../observability/LoggingServerProvider.java | 2 +- .../io/grpc/observability/Observability.java | 66 ++++++---- .../observability/ObservabilityConfig.java | 118 +++-------------- .../ObservabilityConfigImpl.java | 124 ++++++++++++++++++ .../InternalLoggingChannelInterceptor.java | 13 +- .../InternalLoggingServerInterceptor.java | 14 +- .../observability/interceptors/LogHelper.java | 12 +- .../LoggingChannelProviderTest.java | 13 +- .../LoggingServerProviderTest.java | 10 +- .../io/grpc/observability/LoggingTest.java | 8 +- ....java => ObservabilityConfigImplTest.java} | 4 +- .../grpc/observability/ObservabilityTest.java | 40 ++++-- ...InternalLoggingChannelInterceptorTest.java | 2 +- .../InternalLoggingServerInterceptorTest.java | 2 +- .../interceptors/LogHelperTest.java | 14 +- 17 files changed, 286 insertions(+), 184 deletions(-) create mode 100644 observability/src/main/java/io/grpc/observability/ObservabilityConfigImpl.java rename observability/src/test/java/io/grpc/observability/{ObservabilityConfigTest.java => ObservabilityConfigImplTest.java} (97%) diff --git a/observability/src/main/java/io/grpc/observability/GlobalLoggingTags.java b/observability/src/main/java/io/grpc/observability/GlobalLoggingTags.java index fb5452ecdab..cc1efe41211 100644 --- a/observability/src/main/java/io/grpc/observability/GlobalLoggingTags.java +++ b/observability/src/main/java/io/grpc/observability/GlobalLoggingTags.java @@ -39,16 +39,23 @@ final class GlobalLoggingTags { private static final Logger logger = Logger.getLogger(GlobalLoggingTags.class.getName()); private static final String ENV_KEY_PREFIX = "GRPC_OBSERVABILITY_"; - private final Map tags; + private final Map locationTags; + private final Map customTags; GlobalLoggingTags() { - ImmutableMap.Builder builder = ImmutableMap.builder(); - populate(builder); - tags = builder.build(); + ImmutableMap.Builder locationTagsBuilder = ImmutableMap.builder(); + ImmutableMap.Builder customTagsBuilder = ImmutableMap.builder(); + populate(locationTagsBuilder, customTagsBuilder); + locationTags = locationTagsBuilder.build(); + customTags = customTagsBuilder.build(); } - Map getTags() { - return tags; + Map getLocationTags() { + return locationTags; + } + + Map getCustomTags() { + return customTags; } @VisibleForTesting @@ -139,10 +146,11 @@ static void populateFromMap(Map map, }); } - static void populate(ImmutableMap.Builder customTags) { + static void populate(ImmutableMap.Builder locationTags, + ImmutableMap.Builder customTags) { populateFromEnvironmentVars(customTags); - populateFromMetadataServer(customTags); - populateFromKubernetesValues(customTags, + populateFromMetadataServer(locationTags); + populateFromKubernetesValues(locationTags, "/var/run/secrets/kubernetes.io/serviceaccount/namespace", "/etc/hostname", "/proc/self/cgroup"); } diff --git a/observability/src/main/java/io/grpc/observability/LoggingChannelProvider.java b/observability/src/main/java/io/grpc/observability/LoggingChannelProvider.java index 5011068c176..10eb64fa5ee 100644 --- a/observability/src/main/java/io/grpc/observability/LoggingChannelProvider.java +++ b/observability/src/main/java/io/grpc/observability/LoggingChannelProvider.java @@ -45,7 +45,7 @@ static synchronized void init(InternalLoggingChannelInterceptor.Factory factory) ManagedChannelRegistry.getDefaultRegistry().register(instance); } - static synchronized void finish() { + static synchronized void shutdown() { if (instance == null) { throw new IllegalStateException("LoggingChannelProvider not initialized!"); } diff --git a/observability/src/main/java/io/grpc/observability/LoggingServerProvider.java b/observability/src/main/java/io/grpc/observability/LoggingServerProvider.java index 5277bcf572b..4931a1f7f96 100644 --- a/observability/src/main/java/io/grpc/observability/LoggingServerProvider.java +++ b/observability/src/main/java/io/grpc/observability/LoggingServerProvider.java @@ -45,7 +45,7 @@ static synchronized void init(InternalLoggingServerInterceptor.Factory factory) ServerRegistry.getDefaultRegistry().register(instance); } - static synchronized void finish() { + static synchronized void shutdown() { if (instance == null) { throw new IllegalStateException("LoggingServerProvider not initialized!"); } diff --git a/observability/src/main/java/io/grpc/observability/Observability.java b/observability/src/main/java/io/grpc/observability/Observability.java index 617da68a5c7..f5039ce390e 100644 --- a/observability/src/main/java/io/grpc/observability/Observability.java +++ b/observability/src/main/java/io/grpc/observability/Observability.java @@ -16,49 +16,71 @@ package io.grpc.observability; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.annotations.VisibleForTesting; import io.grpc.ExperimentalApi; import io.grpc.ManagedChannelProvider.ProviderNotFoundException; import io.grpc.observability.interceptors.InternalLoggingChannelInterceptor; import io.grpc.observability.interceptors.InternalLoggingServerInterceptor; import io.grpc.observability.logging.GcpLogSink; import io.grpc.observability.logging.Sink; +import java.io.IOException; /** The main class for gRPC Observability features. */ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/8869") public final class Observability { - private static boolean initialized = false; - private static final String PROJECT_ID = "PROJECT"; + private static Observability instance = null; + private final Sink sink; /** * Initialize grpc-observability. * * @throws ProviderNotFoundException if no underlying channel/server provider is available. */ - public static synchronized void grpcInit() { - if (initialized) { - throw new IllegalStateException("Observability already initialized!"); + public static synchronized Observability grpcInit() throws IOException { + if (instance == null) { + GlobalLoggingTags globalLoggingTags = new GlobalLoggingTags(); + ObservabilityConfigImpl observabilityConfig = ObservabilityConfigImpl.getInstance(); + Sink sink = new GcpLogSink(observabilityConfig.getDestinationProjectId()); + instance = grpcInit(sink, + new InternalLoggingChannelInterceptor.FactoryImpl(sink, + globalLoggingTags.getLocationTags(), globalLoggingTags.getCustomTags(), + observabilityConfig), + new InternalLoggingServerInterceptor.FactoryImpl(sink, + globalLoggingTags.getLocationTags(), globalLoggingTags.getCustomTags(), + observabilityConfig)); + } + return instance; + } + + @VisibleForTesting static Observability grpcInit(Sink sink, + InternalLoggingChannelInterceptor.Factory channelInterceptorFactory, + InternalLoggingServerInterceptor.Factory serverInterceptorFactory) { + if (instance == null) { + instance = new Observability(sink, channelInterceptorFactory, serverInterceptorFactory); } - // TODO(dnvindhya): PROJECT_ID to be replaced with configured destinationProjectId - Sink sink = new GcpLogSink(PROJECT_ID); - LoggingChannelProvider.init(new InternalLoggingChannelInterceptor.FactoryImpl(sink)); - LoggingServerProvider.init(new InternalLoggingServerInterceptor.FactoryImpl(sink)); - // TODO(sanjaypujare): initialize customTags map - initialized = true; + return instance; } - /** Un-initialize or finish grpc-observability. */ - // TODO(sanjaypujare): Once Observability is made into Singleton object, - // close() on sink will be called as part of grpcFinish() - public static synchronized void grpcFinish() { - if (!initialized) { - throw new IllegalStateException("Observability not initialized!"); + /** Un-initialize/shutdown grpc-observability. */ + public void grpcShutdown() { + synchronized (Observability.class) { + if (instance == null) { + throw new IllegalStateException("Observability already shutdown!"); + } + LoggingChannelProvider.shutdown(); + LoggingServerProvider.shutdown(); + sink.close(); + instance = null; } - LoggingChannelProvider.finish(); - LoggingServerProvider.finish(); - // TODO(sanjaypujare): finish customTags map - initialized = false; } - private Observability() { + private Observability(Sink sink, + InternalLoggingChannelInterceptor.Factory channelInterceptorFactory, + InternalLoggingServerInterceptor.Factory serverInterceptorFactory) { + this.sink = checkNotNull(sink); + LoggingChannelProvider.init(checkNotNull(channelInterceptorFactory)); + LoggingServerProvider.init(checkNotNull(serverInterceptorFactory)); } } diff --git a/observability/src/main/java/io/grpc/observability/ObservabilityConfig.java b/observability/src/main/java/io/grpc/observability/ObservabilityConfig.java index 29cb371eb4d..61c5f90835b 100644 --- a/observability/src/main/java/io/grpc/observability/ObservabilityConfig.java +++ b/observability/src/main/java/io/grpc/observability/ObservabilityConfig.java @@ -16,31 +16,29 @@ package io.grpc.observability; -import static com.google.common.base.Preconditions.checkArgument; - -import io.grpc.internal.JsonParser; -import io.grpc.internal.JsonUtil; -import io.grpc.observabilitylog.v1.GrpcLogRecord; import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType; -import java.io.IOException; -import java.util.List; -import java.util.Map; -/** gRPC Observability configuration processor. */ -final class ObservabilityConfig { - private static final String CONFIG_ENV_VAR_NAME = "GRPC_CONFIG_OBSERVABILITY"; +public interface ObservabilityConfig { + /** Is Cloud Logging enabled. */ + boolean isEnableCloudLogging(); + + /** Get destination project ID - where logs will go. */ + String getDestinationProjectId(); + + /** Get filters set for logging. */ + LogFilter[] getLogFilters(); - private boolean enableCloudLogging = true; - private String destinationProjectId = null; - private LogFilter[] logFilters; - private GrpcLogRecord.EventType[] eventTypes; + /** Get event types to log. */ + EventType[] getEventTypes(); - /** POJO for representing a filter used in configuration. */ - static class LogFilter { + /** + * POJO for representing a filter used in configuration. + */ + public static class LogFilter { /** Pattern indicating which service/method to log. */ public final String pattern; - /** Number of bytes of each header to log. */ + /** Number of bytes of each header to log. */ public final Integer headerBytes; /** Number of bytes of each header to log. */ @@ -52,88 +50,4 @@ static class LogFilter { this.messageBytes = messageBytes; } } - - static ObservabilityConfig getInstance() throws IOException { - ObservabilityConfig config = new ObservabilityConfig(); - config.parse(System.getenv(CONFIG_ENV_VAR_NAME)); - return config; - } - - @SuppressWarnings("unchecked") - void parse(String config) throws IOException { - checkArgument(config != null, CONFIG_ENV_VAR_NAME + " value is null!"); - parseLoggingConfig( - JsonUtil.getObject((Map) JsonParser.parse(config), "logging_config")); - } - - private void parseLoggingConfig(Map loggingConfig) { - if (loggingConfig != null) { - Boolean value = JsonUtil.getBoolean(loggingConfig, "enable_cloud_logging"); - if (value != null) { - enableCloudLogging = value; - } - destinationProjectId = JsonUtil.getString(loggingConfig, "destination_project_id"); - List rawList = JsonUtil.getList(loggingConfig, "log_filters"); - if (rawList != null) { - List> jsonLogFilters = JsonUtil.checkObjectList(rawList); - this.logFilters = new LogFilter[jsonLogFilters.size()]; - for (int i = 0; i < jsonLogFilters.size(); i++) { - this.logFilters[i] = parseJsonLogFilter(jsonLogFilters.get(i)); - } - } - rawList = JsonUtil.getList(loggingConfig, "event_types"); - if (rawList != null) { - List jsonEventTypes = JsonUtil.checkStringList(rawList); - this.eventTypes = new GrpcLogRecord.EventType[jsonEventTypes.size()]; - for (int i = 0; i < jsonEventTypes.size(); i++) { - this.eventTypes[i] = convertEventType(jsonEventTypes.get(i)); - } - } - } - } - - private GrpcLogRecord.EventType convertEventType(String val) { - switch (val) { - case "GRPC_CALL_UNKNOWN": - return GrpcLogRecord.EventType.GRPC_CALL_UNKNOWN; - case "GRPC_CALL_REQUEST_HEADER": - return GrpcLogRecord.EventType.GRPC_CALL_REQUEST_HEADER; - case "GRPC_CALL_RESPONSE_HEADER": - return GrpcLogRecord.EventType.GRPC_CALL_RESPONSE_HEADER; - case"GRPC_CALL_REQUEST_MESSAGE": - return GrpcLogRecord.EventType.GRPC_CALL_REQUEST_MESSAGE; - case "GRPC_CALL_RESPONSE_MESSAGE": - return GrpcLogRecord.EventType.GRPC_CALL_RESPONSE_MESSAGE; - case "GRPC_CALL_TRAILER": - return GrpcLogRecord.EventType.GRPC_CALL_TRAILER; - case "GRPC_CALL_HALF_CLOSE": - return GrpcLogRecord.EventType.GRPC_CALL_HALF_CLOSE; - case "GRPC_CALL_CANCEL": - return GrpcLogRecord.EventType.GRPC_CALL_CANCEL; - default: - throw new IllegalArgumentException("Unknown event type value:" + val); - } - } - - private LogFilter parseJsonLogFilter(Map logFilterMap) { - return new LogFilter(JsonUtil.getString(logFilterMap, "pattern"), - JsonUtil.getNumberAsInteger(logFilterMap, "header_bytes"), - JsonUtil.getNumberAsInteger(logFilterMap, "message_bytes")); - } - - public boolean isEnableCloudLogging() { - return enableCloudLogging; - } - - public String getDestinationProjectId() { - return destinationProjectId; - } - - public LogFilter[] getLogFilters() { - return logFilters; - } - - public EventType[] getEventTypes() { - return eventTypes; - } } diff --git a/observability/src/main/java/io/grpc/observability/ObservabilityConfigImpl.java b/observability/src/main/java/io/grpc/observability/ObservabilityConfigImpl.java new file mode 100644 index 00000000000..86e60ed0bc4 --- /dev/null +++ b/observability/src/main/java/io/grpc/observability/ObservabilityConfigImpl.java @@ -0,0 +1,124 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.observability; + +import static com.google.common.base.Preconditions.checkArgument; + +import io.grpc.internal.JsonParser; +import io.grpc.internal.JsonUtil; +import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType; +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** gRPC Observability configuration processor. */ +final class ObservabilityConfigImpl implements ObservabilityConfig { + private static final String CONFIG_ENV_VAR_NAME = "GRPC_CONFIG_OBSERVABILITY"; + + private boolean enableCloudLogging = true; + private String destinationProjectId = null; + private LogFilter[] logFilters; + private EventType[] eventTypes; + + static ObservabilityConfigImpl getInstance() throws IOException { + ObservabilityConfigImpl config = new ObservabilityConfigImpl(); + config.parse(System.getenv(CONFIG_ENV_VAR_NAME)); + return config; + } + + @SuppressWarnings("unchecked") + void parse(String config) throws IOException { + checkArgument(config != null, CONFIG_ENV_VAR_NAME + " value is null!"); + parseLoggingConfig( + JsonUtil.getObject((Map) JsonParser.parse(config), "logging_config")); + } + + private void parseLoggingConfig(Map loggingConfig) { + if (loggingConfig != null) { + Boolean value = JsonUtil.getBoolean(loggingConfig, "enable_cloud_logging"); + if (value != null) { + enableCloudLogging = value; + } + destinationProjectId = JsonUtil.getString(loggingConfig, "destination_project_id"); + List rawList = JsonUtil.getList(loggingConfig, "log_filters"); + if (rawList != null) { + List> jsonLogFilters = JsonUtil.checkObjectList(rawList); + this.logFilters = new LogFilter[jsonLogFilters.size()]; + for (int i = 0; i < jsonLogFilters.size(); i++) { + this.logFilters[i] = parseJsonLogFilter(jsonLogFilters.get(i)); + } + } + rawList = JsonUtil.getList(loggingConfig, "event_types"); + if (rawList != null) { + List jsonEventTypes = JsonUtil.checkStringList(rawList); + this.eventTypes = new EventType[jsonEventTypes.size()]; + for (int i = 0; i < jsonEventTypes.size(); i++) { + this.eventTypes[i] = convertEventType(jsonEventTypes.get(i)); + } + } + } + } + + private EventType convertEventType(String val) { + switch (val) { + case "GRPC_CALL_UNKNOWN": + return EventType.GRPC_CALL_UNKNOWN; + case "GRPC_CALL_REQUEST_HEADER": + return EventType.GRPC_CALL_REQUEST_HEADER; + case "GRPC_CALL_RESPONSE_HEADER": + return EventType.GRPC_CALL_RESPONSE_HEADER; + case"GRPC_CALL_REQUEST_MESSAGE": + return EventType.GRPC_CALL_REQUEST_MESSAGE; + case "GRPC_CALL_RESPONSE_MESSAGE": + return EventType.GRPC_CALL_RESPONSE_MESSAGE; + case "GRPC_CALL_TRAILER": + return EventType.GRPC_CALL_TRAILER; + case "GRPC_CALL_HALF_CLOSE": + return EventType.GRPC_CALL_HALF_CLOSE; + case "GRPC_CALL_CANCEL": + return EventType.GRPC_CALL_CANCEL; + default: + throw new IllegalArgumentException("Unknown event type value:" + val); + } + } + + private LogFilter parseJsonLogFilter(Map logFilterMap) { + return new LogFilter(JsonUtil.getString(logFilterMap, "pattern"), + JsonUtil.getNumberAsInteger(logFilterMap, "header_bytes"), + JsonUtil.getNumberAsInteger(logFilterMap, "message_bytes")); + } + + @Override + public boolean isEnableCloudLogging() { + return enableCloudLogging; + } + + @Override + public String getDestinationProjectId() { + return destinationProjectId; + } + + @Override + public LogFilter[] getLogFilters() { + return logFilters; + } + + @Override + public EventType[] getEventTypes() { + return eventTypes; + } +} diff --git a/observability/src/main/java/io/grpc/observability/interceptors/InternalLoggingChannelInterceptor.java b/observability/src/main/java/io/grpc/observability/interceptors/InternalLoggingChannelInterceptor.java index 63d5280584d..6c4ee63f5c4 100644 --- a/observability/src/main/java/io/grpc/observability/interceptors/InternalLoggingChannelInterceptor.java +++ b/observability/src/main/java/io/grpc/observability/interceptors/InternalLoggingChannelInterceptor.java @@ -31,9 +31,11 @@ import io.grpc.MethodDescriptor; import io.grpc.Status; import io.grpc.internal.TimeProvider; +import io.grpc.observability.ObservabilityConfig; import io.grpc.observability.logging.Sink; import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger; import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType; +import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -58,13 +60,12 @@ public static class FactoryImpl implements Factory { private final Sink sink; private final LogHelper helper; - static LogHelper createLogHelper(Sink sink, TimeProvider provider) { - return new LogHelper(sink, provider); - } - - public FactoryImpl(Sink sink) { + /** Create the {@link Factory} we need to create our {@link ClientInterceptor}s. */ + public FactoryImpl(Sink sink, Map locationTags, Map customTags, + ObservabilityConfig observabilityConfig) { this.sink = sink; - this.helper = createLogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER); + this.helper = new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER, locationTags, customTags, + observabilityConfig); } @Override diff --git a/observability/src/main/java/io/grpc/observability/interceptors/InternalLoggingServerInterceptor.java b/observability/src/main/java/io/grpc/observability/interceptors/InternalLoggingServerInterceptor.java index 8d3337057c6..c9df0f65c43 100644 --- a/observability/src/main/java/io/grpc/observability/interceptors/InternalLoggingServerInterceptor.java +++ b/observability/src/main/java/io/grpc/observability/interceptors/InternalLoggingServerInterceptor.java @@ -29,10 +29,12 @@ import io.grpc.ServerInterceptor; import io.grpc.Status; import io.grpc.internal.TimeProvider; +import io.grpc.observability.ObservabilityConfig; import io.grpc.observability.logging.Sink; import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger; import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType; import java.net.SocketAddress; +import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -55,13 +57,13 @@ public static class FactoryImpl implements Factory { private final Sink sink; private final LogHelper helper; - static LogHelper createLogHelper(Sink sink, TimeProvider provider) { - return new LogHelper(sink, provider); - } - - public FactoryImpl(Sink sink) { + /** Create the {@link Factory} we need to create our {@link ServerInterceptor}s. */ + public FactoryImpl(Sink sink, Map locationTags, + Map customTags, + ObservabilityConfig observabilityConfig) { this.sink = sink; - this.helper = createLogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER); + this.helper = new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER, locationTags, customTags, + observabilityConfig); } @Override diff --git a/observability/src/main/java/io/grpc/observability/interceptors/LogHelper.java b/observability/src/main/java/io/grpc/observability/interceptors/LogHelper.java index d454c84c984..f4e705b4700 100644 --- a/observability/src/main/java/io/grpc/observability/interceptors/LogHelper.java +++ b/observability/src/main/java/io/grpc/observability/interceptors/LogHelper.java @@ -30,6 +30,7 @@ import io.grpc.Metadata; import io.grpc.Status; import io.grpc.internal.TimeProvider; +import io.grpc.observability.ObservabilityConfig; import io.grpc.observability.logging.Sink; import io.grpc.observabilitylog.v1.GrpcLogRecord; import io.grpc.observabilitylog.v1.GrpcLogRecord.Address; @@ -41,6 +42,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -59,10 +61,18 @@ class LogHelper { private final Sink sink; private final TimeProvider timeProvider; + // TODO(DNvindhya) remove unused annotation once the following 2 are actually used + @SuppressWarnings({"unused"}) private final Map locationTags; + @SuppressWarnings({"unused"}) private final Map customTags; + @SuppressWarnings({"unused"}) private final ObservabilityConfig observabilityConfig; - LogHelper(Sink sink, TimeProvider timeProvider) { + LogHelper(Sink sink, TimeProvider timeProvider, Map locationTags, + Map customTags, ObservabilityConfig observabilityConfig) { this.sink = sink; this.timeProvider = timeProvider; + this.locationTags = locationTags; + this.customTags = customTags; + this.observabilityConfig = observabilityConfig; } /** diff --git a/observability/src/test/java/io/grpc/observability/LoggingChannelProviderTest.java b/observability/src/test/java/io/grpc/observability/LoggingChannelProviderTest.java index 31e1268262e..b8de06cf56d 100644 --- a/observability/src/test/java/io/grpc/observability/LoggingChannelProviderTest.java +++ b/observability/src/test/java/io/grpc/observability/LoggingChannelProviderTest.java @@ -59,16 +59,17 @@ public void initTwiceCausesException() { ManagedChannelProvider prevProvider = ManagedChannelProvider.provider(); assertThat(prevProvider).isNotInstanceOf(LoggingChannelProvider.class); Sink mockSink = mock(GcpLogSink.class); - LoggingChannelProvider.init(new InternalLoggingChannelInterceptor.FactoryImpl(mockSink)); + LoggingChannelProvider.init( + new InternalLoggingChannelInterceptor.FactoryImpl(mockSink, null, null, null)); assertThat(ManagedChannelProvider.provider()).isInstanceOf(LoggingChannelProvider.class); try { LoggingChannelProvider.init( - new InternalLoggingChannelInterceptor.FactoryImpl(mockSink)); + new InternalLoggingChannelInterceptor.FactoryImpl(mockSink, null, null, null)); fail("should have failed for calling init() again"); } catch (IllegalStateException e) { assertThat(e).hasMessageThat().contains("LoggingChannelProvider already initialized!"); } - LoggingChannelProvider.finish(); + LoggingChannelProvider.shutdown(); assertThat(ManagedChannelProvider.provider()).isSameInstanceAs(prevProvider); } @@ -88,7 +89,7 @@ public void forTarget_interceptorCalled() { verify(interceptor) .interceptCall(same(method), same(callOptions), ArgumentMatchers.any()); channel.shutdownNow(); - LoggingChannelProvider.finish(); + LoggingChannelProvider.shutdown(); } @Test @@ -107,7 +108,7 @@ public void forAddress_interceptorCalled() { verify(interceptor) .interceptCall(same(method), same(callOptions), ArgumentMatchers.any()); channel.shutdownNow(); - LoggingChannelProvider.finish(); + LoggingChannelProvider.shutdown(); } @Test @@ -127,7 +128,7 @@ public void newChannelBuilder_interceptorCalled() { verify(interceptor) .interceptCall(same(method), same(callOptions), ArgumentMatchers.any()); channel.shutdownNow(); - LoggingChannelProvider.finish(); + LoggingChannelProvider.shutdown(); } private static class NoopInterceptor implements ClientInterceptor { diff --git a/observability/src/test/java/io/grpc/observability/LoggingServerProviderTest.java b/observability/src/test/java/io/grpc/observability/LoggingServerProviderTest.java index 21e86ac1827..8f837f44f4d 100644 --- a/observability/src/test/java/io/grpc/observability/LoggingServerProviderTest.java +++ b/observability/src/test/java/io/grpc/observability/LoggingServerProviderTest.java @@ -60,15 +60,17 @@ public void initTwiceCausesException() { ServerProvider prevProvider = ServerProvider.provider(); assertThat(prevProvider).isNotInstanceOf(LoggingServerProvider.class); Sink mockSink = mock(GcpLogSink.class); - LoggingServerProvider.init(new InternalLoggingServerInterceptor.FactoryImpl(mockSink)); + LoggingServerProvider.init( + new InternalLoggingServerInterceptor.FactoryImpl(mockSink, null, null, null)); assertThat(ServerProvider.provider()).isInstanceOf(ServerProvider.class); try { - LoggingServerProvider.init(new InternalLoggingServerInterceptor.FactoryImpl(mockSink)); + LoggingServerProvider.init( + new InternalLoggingServerInterceptor.FactoryImpl(mockSink, null, null, null)); fail("should have failed for calling init() again"); } catch (IllegalStateException e) { assertThat(e).hasMessageThat().contains("LoggingServerProvider already initialized!"); } - LoggingServerProvider.finish(); + LoggingServerProvider.shutdown(); assertThat(ServerProvider.provider()).isSameInstanceAs(prevProvider); } @@ -100,7 +102,7 @@ private void serverBuilder_interceptorCalled(Supplier> serverBu cleanupRule.register(channel)); assertThat(unaryRpc("buddy", stub)).isEqualTo("Hello buddy"); verify(interceptor).interceptCall(any(ServerCall.class), any(Metadata.class), anyCallHandler()); - LoggingServerProvider.finish(); + LoggingServerProvider.shutdown(); } private ServerCallHandler anyCallHandler() { diff --git a/observability/src/test/java/io/grpc/observability/LoggingTest.java b/observability/src/test/java/io/grpc/observability/LoggingTest.java index bceb9bb31d4..8c2a0a69653 100644 --- a/observability/src/test/java/io/grpc/observability/LoggingTest.java +++ b/observability/src/test/java/io/grpc/observability/LoggingTest.java @@ -52,12 +52,12 @@ public void clientServer_interceptorCalled() throws IOException { Sink sink = new GcpLogSink(PROJECT_ID); LoggingServerProvider.init( - new InternalLoggingServerInterceptor.FactoryImpl(sink)); + new InternalLoggingServerInterceptor.FactoryImpl(sink, null, null, null)); Server server = ServerBuilder.forPort(0).addService(new LoggingTestHelper.SimpleServiceImpl()) .build().start(); int port = cleanupRule.register(server).getPort(); LoggingChannelProvider.init( - new InternalLoggingChannelInterceptor.FactoryImpl(sink)); + new InternalLoggingChannelInterceptor.FactoryImpl(sink, null, null, null)); ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", port) .usePlaintext().build(); SimpleServiceGrpc.SimpleServiceBlockingStub stub = SimpleServiceGrpc.newBlockingStub( @@ -65,7 +65,7 @@ public void clientServer_interceptorCalled() assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub)) .isEqualTo("Hello buddy"); sink.close(); - LoggingChannelProvider.finish(); - LoggingServerProvider.finish(); + LoggingChannelProvider.shutdown(); + LoggingServerProvider.shutdown(); } } diff --git a/observability/src/test/java/io/grpc/observability/ObservabilityConfigTest.java b/observability/src/test/java/io/grpc/observability/ObservabilityConfigImplTest.java similarity index 97% rename from observability/src/test/java/io/grpc/observability/ObservabilityConfigTest.java rename to observability/src/test/java/io/grpc/observability/ObservabilityConfigImplTest.java index f4147f0b62f..ee290317481 100644 --- a/observability/src/test/java/io/grpc/observability/ObservabilityConfigTest.java +++ b/observability/src/test/java/io/grpc/observability/ObservabilityConfigImplTest.java @@ -29,7 +29,7 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) -public class ObservabilityConfigTest { +public class ObservabilityConfigImplTest { private static final String EVENT_TYPES = "{\n" + " \"logging_config\": {\n" + " \"enable_cloud_logging\": false,\n" @@ -66,7 +66,7 @@ public class ObservabilityConfigTest { + " \"enable_cloud_logging\": false\n" + " }\n" + "}"; - ObservabilityConfig observabilityConfig = new ObservabilityConfig(); + ObservabilityConfigImpl observabilityConfig = new ObservabilityConfigImpl(); @Test public void nullConfig() throws IOException { diff --git a/observability/src/test/java/io/grpc/observability/ObservabilityTest.java b/observability/src/test/java/io/grpc/observability/ObservabilityTest.java index 37fadb6a004..79273cd04ba 100644 --- a/observability/src/test/java/io/grpc/observability/ObservabilityTest.java +++ b/observability/src/test/java/io/grpc/observability/ObservabilityTest.java @@ -18,7 +18,14 @@ import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import io.grpc.ManagedChannelProvider; +import io.grpc.ServerProvider; +import io.grpc.observability.interceptors.InternalLoggingChannelInterceptor; +import io.grpc.observability.interceptors.InternalLoggingServerInterceptor; +import io.grpc.observability.logging.Sink; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -28,19 +35,30 @@ public class ObservabilityTest { @Test public void initFinish() { - Observability.grpcInit(); - try { - Observability.grpcInit(); - fail("should have failed for calling grpcInit() again"); - } catch (IllegalStateException e) { - assertThat(e).hasMessageThat().contains("Observability already initialized!"); - } - Observability.grpcFinish(); + ManagedChannelProvider prevChannelProvider = ManagedChannelProvider.provider(); + ServerProvider prevServerProvider = ServerProvider.provider(); + Sink sink = mock(Sink.class); + InternalLoggingChannelInterceptor.Factory channelInterceptorFactory = mock( + InternalLoggingChannelInterceptor.Factory.class); + InternalLoggingServerInterceptor.Factory serverInterceptorFactory = mock( + InternalLoggingServerInterceptor.Factory.class); + Observability observability = Observability.grpcInit(sink, channelInterceptorFactory, + serverInterceptorFactory); + assertThat(ManagedChannelProvider.provider()).isInstanceOf(LoggingChannelProvider.class); + assertThat(ServerProvider.provider()).isInstanceOf(ServerProvider.class); + Observability observability1 = Observability.grpcInit(sink, channelInterceptorFactory, + serverInterceptorFactory); + assertThat(observability1).isSameInstanceAs(observability); + + observability.grpcShutdown(); + verify(sink).close(); + assertThat(ManagedChannelProvider.provider()).isSameInstanceAs(prevChannelProvider); + assertThat(ServerProvider.provider()).isSameInstanceAs(prevServerProvider); try { - Observability.grpcFinish(); - fail("should have failed for calling grpcFinish() on uninitialized"); + observability.grpcShutdown(); + fail("should have failed for calling grpcShutdown() second time"); } catch (IllegalStateException e) { - assertThat(e).hasMessageThat().contains("Observability not initialized!"); + assertThat(e).hasMessageThat().contains("Observability already shutdown!"); } } } diff --git a/observability/src/test/java/io/grpc/observability/interceptors/InternalLoggingChannelInterceptorTest.java b/observability/src/test/java/io/grpc/observability/interceptors/InternalLoggingChannelInterceptorTest.java index 6ddb2abc320..ae145575665 100644 --- a/observability/src/test/java/io/grpc/observability/interceptors/InternalLoggingChannelInterceptorTest.java +++ b/observability/src/test/java/io/grpc/observability/interceptors/InternalLoggingChannelInterceptorTest.java @@ -86,7 +86,7 @@ public class InternalLoggingChannelInterceptorTest { @Before public void setup() throws Exception { - factory = new InternalLoggingChannelInterceptor.FactoryImpl(mockSink); + factory = new InternalLoggingChannelInterceptor.FactoryImpl(mockSink, null, null, null); interceptedListener = new AtomicReference<>(); actualClientInitial = new AtomicReference<>(); actualRequest = new AtomicReference<>(); diff --git a/observability/src/test/java/io/grpc/observability/interceptors/InternalLoggingServerInterceptorTest.java b/observability/src/test/java/io/grpc/observability/interceptors/InternalLoggingServerInterceptorTest.java index c567d76335b..17813f0a27f 100644 --- a/observability/src/test/java/io/grpc/observability/interceptors/InternalLoggingServerInterceptorTest.java +++ b/observability/src/test/java/io/grpc/observability/interceptors/InternalLoggingServerInterceptorTest.java @@ -83,7 +83,7 @@ public class InternalLoggingServerInterceptorTest { @Before @SuppressWarnings("unchecked") public void setup() throws Exception { - factory = new InternalLoggingServerInterceptor.FactoryImpl(mockSink); + factory = new InternalLoggingServerInterceptor.FactoryImpl(mockSink, null, null, null); interceptedLoggingCall = new AtomicReference<>(); mockListener = mock(ServerCall.Listener.class); actualServerInitial = new AtomicReference<>(); diff --git a/observability/src/test/java/io/grpc/observability/interceptors/LogHelperTest.java b/observability/src/test/java/io/grpc/observability/interceptors/LogHelperTest.java index 7998021d80a..ec5ed6d3fdb 100644 --- a/observability/src/test/java/io/grpc/observability/interceptors/LogHelperTest.java +++ b/observability/src/test/java/io/grpc/observability/interceptors/LogHelperTest.java @@ -35,6 +35,7 @@ import io.grpc.MethodDescriptor.Marshaller; import io.grpc.Status; import io.grpc.internal.TimeProvider; +import io.grpc.observability.ObservabilityConfig; import io.grpc.observability.interceptors.LogHelper.PayloadBuilder; import io.grpc.observability.logging.GcpLogSink; import io.grpc.observability.logging.Sink; @@ -53,6 +54,7 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.charset.Charset; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.junit.Before; import org.junit.Test; @@ -101,16 +103,14 @@ public class LogHelperTest { private final Sink sink = mock(GcpLogSink.class); private final Timestamp timestamp = Timestamp.newBuilder().setSeconds(9876).setNanos(54321).build(); - private final TimeProvider timeProvider = new TimeProvider() { - @Override - public long currentTimeNanos() { - return TimeUnit.SECONDS.toNanos(9876) + 54321; - } - }; + private final TimeProvider timeProvider = () -> TimeUnit.SECONDS.toNanos(9876) + 54321; + @SuppressWarnings("unchecked") private final Map locationTags = mock(Map.class); + @SuppressWarnings("unchecked") private final Map customTags = mock(Map.class); + private final ObservabilityConfig observabilityConfig = mock(ObservabilityConfig.class); private final LogHelper logHelper = new LogHelper( sink, - timeProvider); + timeProvider, locationTags, customTags, observabilityConfig); @Before public void setUp() throws Exception {