Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions platform-grpc-service-framework/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ dependencies {
api(platform("io.grpc:grpc-bom:1.47.0"))
api("io.grpc:grpc-api")
api("io.grpc:grpc-services")
api("org.hypertrace.core.grpcutils:grpc-client-utils:0.9.1")
api("org.hypertrace.core.grpcutils:grpc-client-utils:0.10.0")
api("com.typesafe:config:1.4.2")
api(project(":service-framework-spi"))

annotationProcessor("org.projectlombok:lombok:1.18.24")
compileOnly("org.projectlombok:lombok:1.18.24")

implementation(project(":platform-metrics"))
implementation("org.slf4j:slf4j-api:1.7.36")
implementation("org.hypertrace.core.grpcutils:grpc-server-utils:0.9.1")
implementation("org.hypertrace.core.grpcutils:grpc-server-utils:0.10.0")
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,6 @@ public ConsolidatedGrpcPlatformServiceContainer(ConfigClient configClient) {
super(configClient);
}

@Override
protected InProcessGrpcChannelRegistry buildChannelRegistry() {
return new InProcessGrpcChannelRegistry(
this.getAuthorityInProcessOverrideMap(this.getInProcessServerName()));
}

@Override
protected GrpcServiceContainerEnvironment buildContainerEnvironment(
InProcessGrpcChannelRegistry channelRegistry, HealthStatusManager healthStatusManager) {
Expand Down Expand Up @@ -63,8 +57,10 @@ protected Collection<String> getAuthoritiesToTreatAsInProcess() {
return Collections.emptySet();
}

private Map<String, String> getAuthorityInProcessOverrideMap(String inProcessName) {
protected Map<String, String> getAuthorityInProcessOverrideMap() {
return this.getAuthoritiesToTreatAsInProcess().stream()
.collect(Collectors.toUnmodifiableMap(Function.identity(), unused -> inProcessName));
.collect(
Collectors.toUnmodifiableMap(
Function.identity(), unused -> this.getInProcessServerName()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
import io.grpc.health.v1.HealthGrpc.HealthBlockingStub;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.protobuf.services.HealthStatusManager;
import io.micrometer.core.instrument.binder.grpc.MetricCollectingClientInterceptor;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand All @@ -23,13 +25,17 @@
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import io.micrometer.core.instrument.binder.grpc.MetricCollectingServerInterceptor;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.hypertrace.core.grpcutils.client.GrpcRegistryConfig;
import org.hypertrace.core.grpcutils.client.InProcessGrpcChannelRegistry;
import org.hypertrace.core.grpcutils.server.InterceptorUtil;
import org.hypertrace.core.grpcutils.server.ServerManagementUtil;
import org.hypertrace.core.serviceframework.PlatformService;
import org.hypertrace.core.serviceframework.config.ConfigClient;
import org.hypertrace.core.serviceframework.metrics.PlatformMetricsRegistry;
import org.hypertrace.core.serviceframework.spi.PlatformServiceLifecycle.State;

@Slf4j
Expand All @@ -56,6 +62,8 @@ protected void doInit() {
.collect(Collectors.toUnmodifiableMap(Function.identity(), this::initializeBuilder));
final ServerBuilder<?> inProcessServerBuilder =
InProcessServerBuilder.forName(this.getInProcessServerName())
.intercept(
new MetricCollectingServerInterceptor(PlatformMetricsRegistry.getMeterRegistry()))
.addService(this.healthStatusManager.getHealthService());
final GrpcServiceContainerEnvironment serviceContainerEnvironment =
this.buildContainerEnvironment(this.grpcChannelRegistry, this.healthStatusManager);
Expand Down Expand Up @@ -186,7 +194,12 @@ public boolean healthCheck() {
}

protected InProcessGrpcChannelRegistry buildChannelRegistry() {
return new InProcessGrpcChannelRegistry();
return new InProcessGrpcChannelRegistry(
this.getAuthorityInProcessOverrideMap(),
GrpcRegistryConfig.builder()
.defaultInterceptor(
new MetricCollectingClientInterceptor(PlatformMetricsRegistry.getMeterRegistry()))
.build());
}

protected String getInProcessServerName() {
Expand All @@ -211,6 +224,10 @@ protected void registerManagedPeriodicTask(PlatformPeriodicTaskDefinition period
}
}

protected Map<String, String> getAuthorityInProcessOverrideMap() {
return Collections.emptyMap();
}

protected abstract List<GrpcPlatformServerDefinition> getServerDefinitions();

protected abstract GrpcServiceContainerEnvironment buildContainerEnvironment(
Expand All @@ -222,6 +239,9 @@ private ServerBuilder<?> initializeBuilder(GrpcPlatformServerDefinition serverDe
if (serverDefinition.getMaxInboundMessageSize() > 0) {
builder.maxInboundMessageSize(serverDefinition.getMaxInboundMessageSize());
}
// add micrometer-grpc interceptor to collect server metrics.
builder.intercept(
new MetricCollectingServerInterceptor(PlatformMetricsRegistry.getMeterRegistry()));

serverDefinition.getServerInterceptors().forEach(builder::intercept);
return builder;
Expand Down
2 changes: 1 addition & 1 deletion platform-http-service-framework/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ plugins {

dependencies {
api(project(":platform-service-framework"))
api("org.hypertrace.core.grpcutils:grpc-client-utils:0.9.1")
api("org.hypertrace.core.grpcutils:grpc-client-utils:0.10.0")
api("com.typesafe:config:1.4.2")
api("javax.servlet:javax.servlet-api:4.0.1")
api("com.google.inject:guice:5.1.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.typesafe.config.Config;
import io.github.mweirauch.micrometer.jvm.extras.ProcessMemoryMetrics;
import io.github.mweirauch.micrometer.jvm.extras.ProcessThreadMetrics;
import io.micrometer.common.util.StringUtils;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
Expand All @@ -33,7 +34,6 @@
import io.micrometer.core.instrument.logging.LoggingMeterRegistry;
import io.micrometer.core.instrument.logging.LoggingRegistryConfig;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import io.micrometer.core.instrument.util.StringUtils;
import io.micrometer.core.lang.NonNull;
import io.micrometer.core.lang.Nullable;
import io.micrometer.prometheus.PrometheusConfig;
Expand All @@ -42,6 +42,7 @@
import io.prometheus.client.dropwizard.DropwizardExports;
import io.prometheus.client.exporter.PushGateway;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -98,20 +99,19 @@ public class PlatformMetricsRegistry {

}};
private static boolean isInit = false;
private static final Set<Tag> DEFAULT_TAGS = new HashSet<>();

/**
* Main MetricMeter registry, with which all the metrics should be registered. We use
* a {@link CompositeMeterRegistry} here so that we can even registry multiple registries
* like Prometheus and Logging registries, if needed.
*/
private static final CompositeMeterRegistry METER_REGISTRY = new CompositeMeterRegistry();
private static CompositeMeterRegistry meterRegistry = new CompositeMeterRegistry();

private static void initPrometheusReporter(int reportInterval) {
LOGGER.info("Trying to init PrometheusReporter");

// Add Prometheus registry to the composite registry.
METER_REGISTRY.add(new PrometheusMeterRegistry(new PrometheusConfig() {
meterRegistry.add(new PrometheusMeterRegistry(new PrometheusConfig() {
@Override
@NonNull
public Duration step() {
Expand All @@ -138,7 +138,7 @@ private static void initConsoleMetricsReporter(final int reportIntervalSec) {
private static void initLoggingMetricsReporter(int reportIntervalSec) {
LOGGER.info("Initializing the logging metric reporter.");

METER_REGISTRY.add(new LoggingMeterRegistry(new LoggingRegistryConfig() {
meterRegistry.add(new LoggingMeterRegistry(new LoggingRegistryConfig() {
@Override
@NonNull
public Duration step() {
Expand All @@ -156,7 +156,7 @@ public String get(String key) {
private static void initTestingMetricsReporter() {
LOGGER.info("Initializing the testing metric reporter.");

METER_REGISTRY.add(new SimpleMeterRegistry());
meterRegistry.add(new SimpleMeterRegistry());
}

private static void initPrometheusPushGatewayReporter(String serviceName,
Expand All @@ -170,7 +170,7 @@ private static void initPrometheusPushGatewayReporter(String serviceName,
throw new IllegalArgumentException("pushUrlAddress configuration is not specified.");
}

METER_REGISTRY.add(new PrometheusPushMeterRegistry(
meterRegistry.add(new PrometheusPushMeterRegistry(
new PrometheusPushRegistryConfig() {
@Override
public String jobName() {
Expand Down Expand Up @@ -261,19 +261,21 @@ public synchronized static void initMetricsRegistry(String serviceName, Config c
}

LOGGER.info("Setting default tags for all metrics to: {}", defaultTags);
defaultTags.forEach((key, value) -> DEFAULT_TAGS.add(new ImmutableTag(key, value)));
defaultTags.forEach((key, value) -> {
meterRegistry.config().commonTags(List.of((new ImmutableTag(key, value))));
});

// Register different metrics with the registry.
new ClassLoaderMetrics(DEFAULT_TAGS).bindTo(METER_REGISTRY);
new JvmGcMetrics(DEFAULT_TAGS).bindTo(METER_REGISTRY);
new ProcessorMetrics(DEFAULT_TAGS).bindTo(METER_REGISTRY);
new JvmThreadMetrics(DEFAULT_TAGS).bindTo(METER_REGISTRY);
new JvmMemoryMetrics(DEFAULT_TAGS).bindTo(METER_REGISTRY);
new UptimeMetrics(DEFAULT_TAGS).bindTo(METER_REGISTRY);
new Log4j2Metrics(DEFAULT_TAGS).bindTo(METER_REGISTRY);
new ClassLoaderMetrics().bindTo(meterRegistry);
new JvmGcMetrics().bindTo(meterRegistry);
new ProcessorMetrics().bindTo(meterRegistry);
new JvmThreadMetrics().bindTo(meterRegistry);
new JvmMemoryMetrics().bindTo(meterRegistry);
new UptimeMetrics().bindTo(meterRegistry);
new Log4j2Metrics().bindTo(meterRegistry);

new ProcessMemoryMetrics().bindTo(METER_REGISTRY);
new ProcessThreadMetrics().bindTo(METER_REGISTRY);
new ProcessMemoryMetrics().bindTo(meterRegistry);
new ProcessThreadMetrics().bindTo(meterRegistry);

for (String key : DEFAULT_METRIC_SET.keySet()) {
METRIC_REGISTRY
Expand Down Expand Up @@ -302,7 +304,7 @@ public static void register(String metricName, Metric metric) {
* See https://micrometer.io/docs/concepts#_counters for more details on the Counter.
*/
public static Counter registerCounter(String name, Map<String, String> tags) {
return METER_REGISTRY.counter(name, addDefaultTags(tags));
return meterRegistry.counter(name, toIterable(tags));
}

/**
Expand All @@ -326,11 +328,12 @@ public static Timer registerTimer(String name, Map<String, String> tags) {
public static Timer registerTimer(String name, Map<String, String> tags, boolean histogram) {
Timer.Builder builder = Timer.builder(name)
.publishPercentiles(0.5, 0.95, 0.99)
.tags(addDefaultTags(tags));
.tags(toIterable(tags));

if (histogram) {
builder = builder.publishPercentileHistogram();
}
return builder.register(METER_REGISTRY);
return builder.register(meterRegistry);
}

/**
Expand All @@ -341,7 +344,7 @@ public static Timer registerTimer(String name, Map<String, String> tags, boolean
* See https://micrometer.io/docs/concepts#_gauges for more details on the Gauges.
*/
public static <T extends Number> T registerGauge(String name, Map<String, String> tags, T number) {
Gauge.builder(name, number, Number::doubleValue).tags(addDefaultTags(tags)).strongReference(true).register(METER_REGISTRY);
Gauge.builder(name, number, Number::doubleValue).tags(toIterable(tags)).strongReference(true).register(meterRegistry);
return number;
}

Expand Down Expand Up @@ -372,19 +375,19 @@ public static DistributionSummary registerDistributionSummary(String name,
Map<String, String> tags, boolean histogram) {
DistributionSummary.Builder builder = DistributionSummary.builder(name)
.publishPercentiles(0.5, 0.95, 0.99)
.tags(addDefaultTags(tags));
.tags(toIterable(tags));
if (histogram) {
builder = builder.publishPercentileHistogram();
}
return builder.register(METER_REGISTRY);
return builder.register(meterRegistry);
}

/**
* Registers metrics for GuavaCaches using micrometer's GuavaCacheMetrics under the given
* cacheName for the given guavaCache
*/
public static <K, V> void registerCache(String cacheName, Cache<K, V> guavaCache, Map<String, String> tags) {
GuavaCacheMetrics.monitor(METER_REGISTRY, guavaCache, cacheName, addDefaultTags(tags));
GuavaCacheMetrics.monitor(meterRegistry, guavaCache, cacheName, toIterable(tags));

}

Expand All @@ -397,16 +400,16 @@ public static <K, V> void registerCache(String cacheName, Cache<K, V> guavaCache
*/
public static void monitorExecutorService(String name, ExecutorService executorService,
@Nullable Map<String, String> tags) {
new ExecutorServiceMetrics(executorService, name, addDefaultTags(tags)).bindTo(METER_REGISTRY);
new ExecutorServiceMetrics(executorService, name, toIterable(tags)).bindTo(meterRegistry);
}

private static Iterable<Tag> addDefaultTags(Map<String, String> tags) {
if (tags == null || tags.isEmpty()) {
return DEFAULT_TAGS;
private static Iterable<Tag> toIterable(Map<String, String> tags) {
List<Tag> newTags = new ArrayList<>();

if (tags != null) {
tags.forEach((k, v) -> newTags.add(new ImmutableTag(k, v)));
}

Set<Tag> newTags = new HashSet<>(DEFAULT_TAGS);
tags.forEach((k, v) -> newTags.add(new ImmutableTag(k, v)));
return newTags;
}

Expand All @@ -415,22 +418,22 @@ public static MetricRegistry getMetricRegistry() {
}

public static MeterRegistry getMeterRegistry() {
return METER_REGISTRY;
return meterRegistry;
}

public static synchronized void stop() {
stopConsoleMetricsReporter();
METRIC_REGISTRY.getNames().forEach(METRIC_REGISTRY::remove);

DEFAULT_TAGS.clear();
/* For each meter registry in this composite, it will call the close function */
METER_REGISTRY.getRegistries().forEach(MeterRegistry::close);
METER_REGISTRY.forEachMeter(METER_REGISTRY::remove);
METER_REGISTRY.getRegistries().forEach(MeterRegistry::clear);
Set<MeterRegistry> registries = new HashSet<>(METER_REGISTRY.getRegistries());
registries.forEach(METER_REGISTRY::remove);
meterRegistry.getRegistries().forEach(MeterRegistry::close);
meterRegistry.forEachMeter(meterRegistry::remove);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - a lot of this code - 430-434 I believe - is redundant now that we're just replacing the whole registry. We don't need to go through and clear stuff out any more.

meterRegistry.getRegistries().forEach(MeterRegistry::clear);
Set<MeterRegistry> registries = new HashSet<>(meterRegistry.getRegistries());
registries.forEach(meterRegistry::remove);
registries.clear();
CollectorRegistry.defaultRegistry.clear();
meterRegistry = new CompositeMeterRegistry();
isInit = false;
}

Expand Down