diff --git a/platform-grpc-service-framework/build.gradle.kts b/platform-grpc-service-framework/build.gradle.kts index eb757a2..0dd6b60 100644 --- a/platform-grpc-service-framework/build.gradle.kts +++ b/platform-grpc-service-framework/build.gradle.kts @@ -10,13 +10,14 @@ dependencies { api(platform("io.grpc:grpc-bom:1.47.0")) api("io.grpc:grpc-api") api("io.grpc:grpc-services") - api("org.hypertrace.core.grpcutils:grpc-client-utils:0.9.1") + api("org.hypertrace.core.grpcutils:grpc-client-utils:0.10.0") api("com.typesafe:config:1.4.2") api(project(":service-framework-spi")) annotationProcessor("org.projectlombok:lombok:1.18.24") compileOnly("org.projectlombok:lombok:1.18.24") + implementation(project(":platform-metrics")) implementation("org.slf4j:slf4j-api:1.7.36") - implementation("org.hypertrace.core.grpcutils:grpc-server-utils:0.9.1") + implementation("org.hypertrace.core.grpcutils:grpc-server-utils:0.10.0") } diff --git a/platform-grpc-service-framework/src/main/java/org/hypertrace/core/serviceframework/grpc/ConsolidatedGrpcPlatformServiceContainer.java b/platform-grpc-service-framework/src/main/java/org/hypertrace/core/serviceframework/grpc/ConsolidatedGrpcPlatformServiceContainer.java index 97746ac..c0fab1f 100644 --- a/platform-grpc-service-framework/src/main/java/org/hypertrace/core/serviceframework/grpc/ConsolidatedGrpcPlatformServiceContainer.java +++ b/platform-grpc-service-framework/src/main/java/org/hypertrace/core/serviceframework/grpc/ConsolidatedGrpcPlatformServiceContainer.java @@ -21,12 +21,6 @@ public ConsolidatedGrpcPlatformServiceContainer(ConfigClient configClient) { super(configClient); } - @Override - protected InProcessGrpcChannelRegistry buildChannelRegistry() { - return new InProcessGrpcChannelRegistry( - this.getAuthorityInProcessOverrideMap(this.getInProcessServerName())); - } - @Override protected GrpcServiceContainerEnvironment buildContainerEnvironment( InProcessGrpcChannelRegistry channelRegistry, HealthStatusManager healthStatusManager) { @@ -63,8 +57,10 @@ protected Collection getAuthoritiesToTreatAsInProcess() { return Collections.emptySet(); } - private Map getAuthorityInProcessOverrideMap(String inProcessName) { + protected Map getAuthorityInProcessOverrideMap() { return this.getAuthoritiesToTreatAsInProcess().stream() - .collect(Collectors.toUnmodifiableMap(Function.identity(), unused -> inProcessName)); + .collect( + Collectors.toUnmodifiableMap( + Function.identity(), unused -> this.getInProcessServerName())); } } diff --git a/platform-grpc-service-framework/src/main/java/org/hypertrace/core/serviceframework/grpc/GrpcPlatformServiceContainer.java b/platform-grpc-service-framework/src/main/java/org/hypertrace/core/serviceframework/grpc/GrpcPlatformServiceContainer.java index 34de8e5..c3167eb 100644 --- a/platform-grpc-service-framework/src/main/java/org/hypertrace/core/serviceframework/grpc/GrpcPlatformServiceContainer.java +++ b/platform-grpc-service-framework/src/main/java/org/hypertrace/core/serviceframework/grpc/GrpcPlatformServiceContainer.java @@ -12,8 +12,10 @@ import io.grpc.health.v1.HealthGrpc.HealthBlockingStub; import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.protobuf.services.HealthStatusManager; +import io.micrometer.core.instrument.binder.grpc.MetricCollectingClientInterceptor; import java.io.IOException; import java.util.Collection; +import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -23,13 +25,17 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; + +import io.micrometer.core.instrument.binder.grpc.MetricCollectingServerInterceptor; import lombok.Value; import lombok.extern.slf4j.Slf4j; +import org.hypertrace.core.grpcutils.client.GrpcRegistryConfig; import org.hypertrace.core.grpcutils.client.InProcessGrpcChannelRegistry; import org.hypertrace.core.grpcutils.server.InterceptorUtil; import org.hypertrace.core.grpcutils.server.ServerManagementUtil; import org.hypertrace.core.serviceframework.PlatformService; import org.hypertrace.core.serviceframework.config.ConfigClient; +import org.hypertrace.core.serviceframework.metrics.PlatformMetricsRegistry; import org.hypertrace.core.serviceframework.spi.PlatformServiceLifecycle.State; @Slf4j @@ -56,6 +62,8 @@ protected void doInit() { .collect(Collectors.toUnmodifiableMap(Function.identity(), this::initializeBuilder)); final ServerBuilder inProcessServerBuilder = InProcessServerBuilder.forName(this.getInProcessServerName()) + .intercept( + new MetricCollectingServerInterceptor(PlatformMetricsRegistry.getMeterRegistry())) .addService(this.healthStatusManager.getHealthService()); final GrpcServiceContainerEnvironment serviceContainerEnvironment = this.buildContainerEnvironment(this.grpcChannelRegistry, this.healthStatusManager); @@ -186,7 +194,12 @@ public boolean healthCheck() { } protected InProcessGrpcChannelRegistry buildChannelRegistry() { - return new InProcessGrpcChannelRegistry(); + return new InProcessGrpcChannelRegistry( + this.getAuthorityInProcessOverrideMap(), + GrpcRegistryConfig.builder() + .defaultInterceptor( + new MetricCollectingClientInterceptor(PlatformMetricsRegistry.getMeterRegistry())) + .build()); } protected String getInProcessServerName() { @@ -211,6 +224,10 @@ protected void registerManagedPeriodicTask(PlatformPeriodicTaskDefinition period } } + protected Map getAuthorityInProcessOverrideMap() { + return Collections.emptyMap(); + } + protected abstract List getServerDefinitions(); protected abstract GrpcServiceContainerEnvironment buildContainerEnvironment( @@ -222,6 +239,9 @@ private ServerBuilder initializeBuilder(GrpcPlatformServerDefinition serverDe if (serverDefinition.getMaxInboundMessageSize() > 0) { builder.maxInboundMessageSize(serverDefinition.getMaxInboundMessageSize()); } + // add micrometer-grpc interceptor to collect server metrics. + builder.intercept( + new MetricCollectingServerInterceptor(PlatformMetricsRegistry.getMeterRegistry())); serverDefinition.getServerInterceptors().forEach(builder::intercept); return builder; diff --git a/platform-http-service-framework/build.gradle.kts b/platform-http-service-framework/build.gradle.kts index 072e141..43d975d 100644 --- a/platform-http-service-framework/build.gradle.kts +++ b/platform-http-service-framework/build.gradle.kts @@ -5,7 +5,7 @@ plugins { dependencies { api(project(":platform-service-framework")) - api("org.hypertrace.core.grpcutils:grpc-client-utils:0.9.1") + api("org.hypertrace.core.grpcutils:grpc-client-utils:0.10.0") api("com.typesafe:config:1.4.2") api("javax.servlet:javax.servlet-api:4.0.1") api("com.google.inject:guice:5.1.0") diff --git a/platform-metrics/src/main/java/org/hypertrace/core/serviceframework/metrics/PlatformMetricsRegistry.java b/platform-metrics/src/main/java/org/hypertrace/core/serviceframework/metrics/PlatformMetricsRegistry.java index 2eef97d..f79ee32 100644 --- a/platform-metrics/src/main/java/org/hypertrace/core/serviceframework/metrics/PlatformMetricsRegistry.java +++ b/platform-metrics/src/main/java/org/hypertrace/core/serviceframework/metrics/PlatformMetricsRegistry.java @@ -12,6 +12,7 @@ import com.typesafe.config.Config; import io.github.mweirauch.micrometer.jvm.extras.ProcessMemoryMetrics; import io.github.mweirauch.micrometer.jvm.extras.ProcessThreadMetrics; +import io.micrometer.common.util.StringUtils; import io.micrometer.core.instrument.Clock; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.DistributionSummary; @@ -33,7 +34,6 @@ import io.micrometer.core.instrument.logging.LoggingMeterRegistry; import io.micrometer.core.instrument.logging.LoggingRegistryConfig; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; -import io.micrometer.core.instrument.util.StringUtils; import io.micrometer.core.lang.NonNull; import io.micrometer.core.lang.Nullable; import io.micrometer.prometheus.PrometheusConfig; @@ -42,6 +42,7 @@ import io.prometheus.client.dropwizard.DropwizardExports; import io.prometheus.client.exporter.PushGateway; import java.time.Duration; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -98,20 +99,19 @@ public class PlatformMetricsRegistry { }}; private static boolean isInit = false; - private static final Set DEFAULT_TAGS = new HashSet<>(); /** * Main MetricMeter registry, with which all the metrics should be registered. We use * a {@link CompositeMeterRegistry} here so that we can even registry multiple registries * like Prometheus and Logging registries, if needed. */ - private static final CompositeMeterRegistry METER_REGISTRY = new CompositeMeterRegistry(); + private static CompositeMeterRegistry meterRegistry = new CompositeMeterRegistry(); private static void initPrometheusReporter(int reportInterval) { LOGGER.info("Trying to init PrometheusReporter"); // Add Prometheus registry to the composite registry. - METER_REGISTRY.add(new PrometheusMeterRegistry(new PrometheusConfig() { + meterRegistry.add(new PrometheusMeterRegistry(new PrometheusConfig() { @Override @NonNull public Duration step() { @@ -138,7 +138,7 @@ private static void initConsoleMetricsReporter(final int reportIntervalSec) { private static void initLoggingMetricsReporter(int reportIntervalSec) { LOGGER.info("Initializing the logging metric reporter."); - METER_REGISTRY.add(new LoggingMeterRegistry(new LoggingRegistryConfig() { + meterRegistry.add(new LoggingMeterRegistry(new LoggingRegistryConfig() { @Override @NonNull public Duration step() { @@ -156,7 +156,7 @@ public String get(String key) { private static void initTestingMetricsReporter() { LOGGER.info("Initializing the testing metric reporter."); - METER_REGISTRY.add(new SimpleMeterRegistry()); + meterRegistry.add(new SimpleMeterRegistry()); } private static void initPrometheusPushGatewayReporter(String serviceName, @@ -170,7 +170,7 @@ private static void initPrometheusPushGatewayReporter(String serviceName, throw new IllegalArgumentException("pushUrlAddress configuration is not specified."); } - METER_REGISTRY.add(new PrometheusPushMeterRegistry( + meterRegistry.add(new PrometheusPushMeterRegistry( new PrometheusPushRegistryConfig() { @Override public String jobName() { @@ -261,19 +261,21 @@ public synchronized static void initMetricsRegistry(String serviceName, Config c } LOGGER.info("Setting default tags for all metrics to: {}", defaultTags); - defaultTags.forEach((key, value) -> DEFAULT_TAGS.add(new ImmutableTag(key, value))); + defaultTags.forEach((key, value) -> { + meterRegistry.config().commonTags(List.of((new ImmutableTag(key, value)))); + }); // Register different metrics with the registry. - new ClassLoaderMetrics(DEFAULT_TAGS).bindTo(METER_REGISTRY); - new JvmGcMetrics(DEFAULT_TAGS).bindTo(METER_REGISTRY); - new ProcessorMetrics(DEFAULT_TAGS).bindTo(METER_REGISTRY); - new JvmThreadMetrics(DEFAULT_TAGS).bindTo(METER_REGISTRY); - new JvmMemoryMetrics(DEFAULT_TAGS).bindTo(METER_REGISTRY); - new UptimeMetrics(DEFAULT_TAGS).bindTo(METER_REGISTRY); - new Log4j2Metrics(DEFAULT_TAGS).bindTo(METER_REGISTRY); + new ClassLoaderMetrics().bindTo(meterRegistry); + new JvmGcMetrics().bindTo(meterRegistry); + new ProcessorMetrics().bindTo(meterRegistry); + new JvmThreadMetrics().bindTo(meterRegistry); + new JvmMemoryMetrics().bindTo(meterRegistry); + new UptimeMetrics().bindTo(meterRegistry); + new Log4j2Metrics().bindTo(meterRegistry); - new ProcessMemoryMetrics().bindTo(METER_REGISTRY); - new ProcessThreadMetrics().bindTo(METER_REGISTRY); + new ProcessMemoryMetrics().bindTo(meterRegistry); + new ProcessThreadMetrics().bindTo(meterRegistry); for (String key : DEFAULT_METRIC_SET.keySet()) { METRIC_REGISTRY @@ -302,7 +304,7 @@ public static void register(String metricName, Metric metric) { * See https://micrometer.io/docs/concepts#_counters for more details on the Counter. */ public static Counter registerCounter(String name, Map tags) { - return METER_REGISTRY.counter(name, addDefaultTags(tags)); + return meterRegistry.counter(name, toIterable(tags)); } /** @@ -326,11 +328,12 @@ public static Timer registerTimer(String name, Map tags) { public static Timer registerTimer(String name, Map tags, boolean histogram) { Timer.Builder builder = Timer.builder(name) .publishPercentiles(0.5, 0.95, 0.99) - .tags(addDefaultTags(tags)); + .tags(toIterable(tags)); + if (histogram) { builder = builder.publishPercentileHistogram(); } - return builder.register(METER_REGISTRY); + return builder.register(meterRegistry); } /** @@ -341,7 +344,7 @@ public static Timer registerTimer(String name, Map tags, boolean * See https://micrometer.io/docs/concepts#_gauges for more details on the Gauges. */ public static T registerGauge(String name, Map tags, T number) { - Gauge.builder(name, number, Number::doubleValue).tags(addDefaultTags(tags)).strongReference(true).register(METER_REGISTRY); + Gauge.builder(name, number, Number::doubleValue).tags(toIterable(tags)).strongReference(true).register(meterRegistry); return number; } @@ -372,11 +375,11 @@ public static DistributionSummary registerDistributionSummary(String name, Map tags, boolean histogram) { DistributionSummary.Builder builder = DistributionSummary.builder(name) .publishPercentiles(0.5, 0.95, 0.99) - .tags(addDefaultTags(tags)); + .tags(toIterable(tags)); if (histogram) { builder = builder.publishPercentileHistogram(); } - return builder.register(METER_REGISTRY); + return builder.register(meterRegistry); } /** @@ -384,7 +387,7 @@ public static DistributionSummary registerDistributionSummary(String name, * cacheName for the given guavaCache */ public static void registerCache(String cacheName, Cache guavaCache, Map tags) { - GuavaCacheMetrics.monitor(METER_REGISTRY, guavaCache, cacheName, addDefaultTags(tags)); + GuavaCacheMetrics.monitor(meterRegistry, guavaCache, cacheName, toIterable(tags)); } @@ -397,16 +400,16 @@ public static void registerCache(String cacheName, Cache guavaCache */ public static void monitorExecutorService(String name, ExecutorService executorService, @Nullable Map tags) { - new ExecutorServiceMetrics(executorService, name, addDefaultTags(tags)).bindTo(METER_REGISTRY); + new ExecutorServiceMetrics(executorService, name, toIterable(tags)).bindTo(meterRegistry); } - private static Iterable addDefaultTags(Map tags) { - if (tags == null || tags.isEmpty()) { - return DEFAULT_TAGS; + private static Iterable toIterable(Map tags) { + List newTags = new ArrayList<>(); + + if (tags != null) { + tags.forEach((k, v) -> newTags.add(new ImmutableTag(k, v))); } - Set newTags = new HashSet<>(DEFAULT_TAGS); - tags.forEach((k, v) -> newTags.add(new ImmutableTag(k, v))); return newTags; } @@ -415,22 +418,22 @@ public static MetricRegistry getMetricRegistry() { } public static MeterRegistry getMeterRegistry() { - return METER_REGISTRY; + return meterRegistry; } public static synchronized void stop() { stopConsoleMetricsReporter(); METRIC_REGISTRY.getNames().forEach(METRIC_REGISTRY::remove); - DEFAULT_TAGS.clear(); /* For each meter registry in this composite, it will call the close function */ - METER_REGISTRY.getRegistries().forEach(MeterRegistry::close); - METER_REGISTRY.forEachMeter(METER_REGISTRY::remove); - METER_REGISTRY.getRegistries().forEach(MeterRegistry::clear); - Set registries = new HashSet<>(METER_REGISTRY.getRegistries()); - registries.forEach(METER_REGISTRY::remove); + meterRegistry.getRegistries().forEach(MeterRegistry::close); + meterRegistry.forEachMeter(meterRegistry::remove); + meterRegistry.getRegistries().forEach(MeterRegistry::clear); + Set registries = new HashSet<>(meterRegistry.getRegistries()); + registries.forEach(meterRegistry::remove); registries.clear(); CollectorRegistry.defaultRegistry.clear(); + meterRegistry = new CompositeMeterRegistry(); isInit = false; }