diff --git a/hypertrace-ingester/build.gradle.kts b/hypertrace-ingester/build.gradle.kts index c587865b3..e9711609b 100644 --- a/hypertrace-ingester/build.gradle.kts +++ b/hypertrace-ingester/build.gradle.kts @@ -26,8 +26,8 @@ hypertraceDocker { dependencies { implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.21") - implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.26") - implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.26") + implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.30") + implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.30") implementation("org.hypertrace.core.datamodel:data-model:0.1.19") implementation("org.hypertrace.core.viewgenerator:view-generator-framework:0.3.1") implementation("com.typesafe:config:1.4.1") @@ -41,6 +41,7 @@ dependencies { implementation(project(":hypertrace-trace-enricher:hypertrace-trace-enricher")) implementation(project(":hypertrace-view-generator:hypertrace-view-generator")) implementation(project(":hypertrace-metrics-processor:hypertrace-metrics-processor")) + implementation(project(":hypertrace-metrics-exporter:hypertrace-metrics-exporter")) testImplementation("org.junit.jupiter:junit-jupiter:5.7.1") testImplementation("org.mockito:mockito-core:3.8.0") @@ -81,7 +82,11 @@ tasks.register("copyServiceConfigs") { createCopySpec("hypertrace-metrics-processor", "hypertrace-metrics-processor", "main", - "common") + "common"), + createCopySpec("hypertrace-metrics-exporter", + "hypertrace-metrics-exporter", + "main", + "common") ).into("./build/resources/main/configs/") } @@ -137,7 +142,11 @@ tasks.register("copyServiceConfigsTest") { createCopySpec("hypertrace-metrics-processor", "hypertrace-metrics-processor", "test", - "hypertrace-metrics-processor") + "hypertrace-metrics-processor"), + createCopySpec("hypertrace-metrics-exporter", + "hypertrace-metrics-exporter", + "test", + "hypertrace-metrics-exporter") ).into("./build/resources/test/configs/") } diff --git a/hypertrace-ingester/src/main/java/org/hypertrace/ingester/HypertraceIngester.java b/hypertrace-ingester/src/main/java/org/hypertrace/ingester/HypertraceIngester.java index 78e6afa58..3151c8979 100644 --- a/hypertrace-ingester/src/main/java/org/hypertrace/ingester/HypertraceIngester.java +++ b/hypertrace-ingester/src/main/java/org/hypertrace/ingester/HypertraceIngester.java @@ -17,6 +17,7 @@ import org.hypertrace.core.serviceframework.config.ConfigUtils; import org.hypertrace.core.spannormalizer.SpanNormalizer; import org.hypertrace.core.viewgenerator.service.MultiViewGeneratorLauncher; +import org.hypertrace.metrics.exporter.MetricsExporterService; import org.hypertrace.metrics.processor.MetricsProcessor; import org.hypertrace.traceenricher.trace.enricher.TraceEnricher; import org.slf4j.Logger; @@ -30,9 +31,12 @@ public class HypertraceIngester extends KafkaStreamsApp { private static final String HYPERTRACE_INGESTER_JOB_CONFIG = "hypertrace-ingester-job-config"; private Map> jobNameToSubTopology = new HashMap<>(); + MetricsExporterService metricsExporterService; public HypertraceIngester(ConfigClient configClient) { super(configClient); + metricsExporterService = new MetricsExporterService(configClient); + metricsExporterService.setConfig(getSubJobConfig("hypertrace-metrics-exporter")); } private KafkaStreamsApp getSubTopologyInstance(String name) { @@ -118,6 +122,24 @@ public List getOutputTopics(Map properties) { return outputTopics.stream().collect(Collectors.toList()); } + @Override + protected void doInit() { + super.doInit(); + this.metricsExporterService.doInit(); + } + + @Override + protected void doStart() { + super.doStart(); + this.metricsExporterService.doStart(); + } + + @Override + protected void doStop() { + super.doStop(); + this.metricsExporterService.doStop(); + } + private List getSubTopologiesNames(Map properties) { return getJobConfig(properties).getStringList("sub.topology.names"); } diff --git a/hypertrace-metrics-exporter/build.gradle.kts b/hypertrace-metrics-exporter/build.gradle.kts new file mode 100644 index 000000000..612342977 --- /dev/null +++ b/hypertrace-metrics-exporter/build.gradle.kts @@ -0,0 +1,3 @@ +subprojects { + group = "org.hypertrace.metrics.exporter" +} \ No newline at end of file diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/build.gradle.kts b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/build.gradle.kts new file mode 100644 index 000000000..7ff01cc1a --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/build.gradle.kts @@ -0,0 +1,50 @@ +plugins { + java + application + jacoco + id("org.hypertrace.docker-java-application-plugin") + id("org.hypertrace.docker-publish-plugin") + id("org.hypertrace.jacoco-report-plugin") +} + +application { + mainClass.set("org.hypertrace.core.serviceframework.PlatformServiceLauncher") +} + +hypertraceDocker { + defaultImage { + javaApplication { + serviceName.set("${project.name}") + adminPort.set(8099) + } + } +} + +tasks.test { + useJUnitPlatform() +} + +dependencies { + // common and framework + implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.30") + implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.30") + + // open telemetry + implementation("io.opentelemetry:opentelemetry-sdk-metrics:1.7.0-alpah") + // TODO: Upgrade opentelemetry-exporter-prometheus to 1.8.0 release when available + // to include time stamp related changes + // https://github.com/open-telemetry/opentelemetry-java/pull/3700 + // For now, the exported time stamp will be the current time stamp. + implementation("io.opentelemetry:opentelemetry-exporter-prometheus:1.7.0-alpha") + + // open telemetry proto + implementation("io.opentelemetry:opentelemetry-proto:1.6.0-alpha") + + // kafka + implementation("org.apache.kafka:kafka-clients:2.6.0") + + // test + testImplementation("org.junit.jupiter:junit-jupiter:5.7.1") + testImplementation("org.mockito:mockito-core:3.8.0") + testImplementation("com.google.code.gson:gson:2.8.7") +} diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporterService.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporterService.java new file mode 100644 index 000000000..511686bfe --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/MetricsExporterService.java @@ -0,0 +1,62 @@ +package org.hypertrace.metrics.exporter; + +import com.typesafe.config.Config; +import io.opentelemetry.exporter.prometheus.PrometheusCollector; +import org.hypertrace.core.serviceframework.PlatformService; +import org.hypertrace.core.serviceframework.config.ConfigClient; +import org.hypertrace.metrics.exporter.consumer.MetricsKafkaConsumer; +import org.hypertrace.metrics.exporter.producer.InMemoryMetricsProducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MetricsExporterService extends PlatformService { + private static final Logger LOGGER = LoggerFactory.getLogger(MetricsExporterService.class); + + private Config config; + private MetricsKafkaConsumer metricsKafkaConsumer; + private Thread metricsConsumerThread; + private InMemoryMetricsProducer inMemoryMetricsProducer; + + public MetricsExporterService(ConfigClient configClient) { + super(configClient); + } + + public void setConfig(Config config) { + this.config = config; + } + + @Override + public void doInit() { + config = (config != null) ? config : getAppConfig(); + inMemoryMetricsProducer = new InMemoryMetricsProducer(config); + metricsKafkaConsumer = new MetricsKafkaConsumer(config, inMemoryMetricsProducer); + metricsConsumerThread = new Thread(metricsKafkaConsumer); + // TODO: Upgrade opentelemetry-exporter-prometheus to 1.8.0 release when available + // to include time stamp related changes + // https://github.com/open-telemetry/opentelemetry-java/pull/3700 + // For now, the exported time stamp will be the current time stamp. + PrometheusCollector.create().apply(inMemoryMetricsProducer); + } + + @Override + public void doStart() { + metricsConsumerThread = new Thread(metricsKafkaConsumer); + metricsConsumerThread.start(); + try { + metricsConsumerThread.join(); + } catch (InterruptedException e) { + LOGGER.error("Exception in starting the thread:", e); + } + } + + @Override + public void doStop() { + metricsConsumerThread.interrupt(); + metricsKafkaConsumer.close(); + } + + @Override + public boolean healthCheck() { + return true; + } +} diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/consumer/MetricsKafkaConsumer.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/consumer/MetricsKafkaConsumer.java new file mode 100644 index 000000000..a6395498a --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/consumer/MetricsKafkaConsumer.java @@ -0,0 +1,115 @@ +package org.hypertrace.metrics.exporter.consumer; + +import com.google.protobuf.InvalidProtocolBufferException; +import com.typesafe.config.Config; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.sdk.metrics.data.MetricData; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.hypertrace.metrics.exporter.producer.InMemoryMetricsProducer; +import org.hypertrace.metrics.exporter.utils.OtlpProtoToMetricDataConverter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MetricsKafkaConsumer implements Runnable { + private static final Logger LOGGER = LoggerFactory.getLogger(MetricsKafkaConsumer.class); + + private static final int CONSUMER_POLL_TIMEOUT_MS = 100; + private static final int QUEUE_WAIT_TIME_MS = 500; + private static final int WAIT_TIME_MS = 100; + + private static final String KAFKA_CONFIG_KEY = "kafka.config"; + private static final String INPUT_TOPIC_KEY = "input.topic"; + + private final KafkaConsumer consumer; + private final InMemoryMetricsProducer inMemoryMetricsProducer; + + public MetricsKafkaConsumer(Config config, InMemoryMetricsProducer inMemoryMetricsProducer) { + consumer = new KafkaConsumer<>(prepareProperties(config.getConfig(KAFKA_CONFIG_KEY))); + consumer.subscribe(Collections.singletonList(config.getString(INPUT_TOPIC_KEY))); + this.inMemoryMetricsProducer = inMemoryMetricsProducer; + } + + public void run() { + while (true && !Thread.currentThread().isInterrupted()) { + List resourceMetrics = new ArrayList<>(); + + ConsumerRecords records = + consumer.poll(Duration.ofMillis(CONSUMER_POLL_TIMEOUT_MS)); + + records.forEach( + record -> { + try { + resourceMetrics.add(ResourceMetrics.parseFrom(record.value())); + } catch (InvalidProtocolBufferException e) { + LOGGER.warn("Skipping record due to error", e); + } + }); + + resourceMetrics.forEach( + rm -> { + try { + List metricData = OtlpProtoToMetricDataConverter.toMetricData(rm); + boolean result = false; + while (!result) { + result = inMemoryMetricsProducer.addMetricData(metricData); + waitForSec(QUEUE_WAIT_TIME_MS); + } + } catch (Exception e) { + LOGGER.debug("skipping the resource metrics due to error: {}", e); + } + }); + + waitForSec(WAIT_TIME_MS); + } + } + + public void close() { + consumer.close(); + } + + private Properties prepareProperties(Config config) { + Map overrideProperties = new HashMap(); + config.entrySet().stream() + .forEach(e -> overrideProperties.put(e.getKey(), config.getString(e.getKey()))); + + Map baseProperties = getBaseProperties(); + overrideProperties.forEach(baseProperties::put); + + Properties properties = new Properties(); + properties.putAll(baseProperties); + return properties; + } + + private Map getBaseProperties() { + Map baseProperties = new HashMap<>(); + baseProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "hypertrace-metrics-exporter"); + baseProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + baseProperties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); + baseProperties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); + baseProperties.put( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + baseProperties.put( + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + return baseProperties; + } + + private void waitForSec(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.debug("While waiting, the consumer thread has interrupted"); + } + } +} diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/producer/InMemoryMetricsProducer.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/producer/InMemoryMetricsProducer.java new file mode 100644 index 000000000..34c6aedd9 --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/producer/InMemoryMetricsProducer.java @@ -0,0 +1,47 @@ +package org.hypertrace.metrics.exporter.producer; + +import com.typesafe.config.Config; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.export.MetricProducer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + +public class InMemoryMetricsProducer implements MetricProducer { + + private static final String BUFFER_CONFIG_KEY = "buffer.config"; + private static final String MAX_QUEUE_SIZE = "max.queue.size"; + private static final String MAX_BATCH_SIZE = "max.batch.size"; + + private BlockingQueue metricDataQueue; + private int maxQueueSize; + private int maxBatchSize; + + public InMemoryMetricsProducer(Config config) { + maxQueueSize = config.getConfig(BUFFER_CONFIG_KEY).getInt(MAX_QUEUE_SIZE); + maxBatchSize = config.getConfig(BUFFER_CONFIG_KEY).getInt(MAX_BATCH_SIZE); + this.metricDataQueue = new ArrayBlockingQueue<>(maxQueueSize); + } + + public boolean addMetricData(List metricData) { + if (this.metricDataQueue.size() + metricData.size() > maxQueueSize) { + return false; + } + + for (MetricData md : metricData) { + this.metricDataQueue.offer(md); + } + + return true; + } + + public Collection collectAllMetrics() { + List metricData = new ArrayList<>(); + while (metricData.size() < maxBatchSize && this.metricDataQueue.peek() != null) { + metricData.add(this.metricDataQueue.poll()); + } + return metricData; + } +} diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/utils/OtlpProtoToMetricDataConverter.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/utils/OtlpProtoToMetricDataConverter.java new file mode 100644 index 000000000..7f7d6b499 --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/java/org/hypertrace/metrics/exporter/utils/OtlpProtoToMetricDataConverter.java @@ -0,0 +1,137 @@ +package org.hypertrace.metrics.exporter.utils; + +import static io.opentelemetry.proto.metrics.v1.AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.proto.common.v1.InstrumentationLibrary; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.metrics.v1.Gauge; +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.NumberDataPoint; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.Sum; +import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.DoubleGaugeData; +import io.opentelemetry.sdk.metrics.data.DoublePointData; +import io.opentelemetry.sdk.metrics.data.DoubleSumData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.resources.Resource; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +public class OtlpProtoToMetricDataConverter { + + private static Resource toResource(io.opentelemetry.proto.resource.v1.Resource otlpResource) { + return Resource.create(toAttributes(otlpResource.getAttributesList())); + } + + private static InstrumentationLibraryInfo toInstrumentationLibraryInfo( + InstrumentationLibrary otlpInstrumentationLibraryInfo) { + return InstrumentationLibraryInfo.create( + otlpInstrumentationLibraryInfo.getName(), otlpInstrumentationLibraryInfo.getVersion()); + } + + private static Attributes toAttributes(List keyValues) { + AttributesBuilder attributesBuilder = Attributes.builder(); + keyValues.forEach( + keyValue -> { + attributesBuilder.put(keyValue.getKey(), keyValue.getValue().getStringValue()); + }); + return attributesBuilder.build(); + } + + private static List toDoublePointData(List numberDataPoints) { + return numberDataPoints.stream() + .map( + numberDataPoint -> + DoublePointData.create( + numberDataPoint.getStartTimeUnixNano(), + numberDataPoint.getTimeUnixNano(), + toAttributes(numberDataPoint.getAttributesList()), + numberDataPoint.hasAsInt() + ? numberDataPoint.getAsInt() + : numberDataPoint.getAsDouble())) + .collect(Collectors.toList()); + } + + private static AggregationTemporality toAggregationTemporality( + io.opentelemetry.proto.metrics.v1.AggregationTemporality aggregationTemporality) { + switch (aggregationTemporality) { + case AGGREGATION_TEMPORALITY_DELTA: + return AggregationTemporality.DELTA; + default: + return AggregationTemporality.CUMULATIVE; + } + } + + private static MetricData toGaugeMetricData( + Resource resource, InstrumentationLibraryInfo instrumentationLibraryInfo, Metric metric) { + Gauge gaugeMetric = metric.getGauge(); + + DoubleGaugeData data = + DoubleGaugeData.create(toDoublePointData(gaugeMetric.getDataPointsList())); + + return MetricData.createDoubleGauge( + resource, + instrumentationLibraryInfo, + metric.getName(), + metric.getDescription(), + metric.getUnit(), + data); + } + + private static MetricData toSumMetricData( + Resource resource, InstrumentationLibraryInfo instrumentationLibraryInfo, Metric metric) { + Sum sumMetric = metric.getSum(); + + DoubleSumData doubleSumData = + DoubleSumData.create( + sumMetric.getIsMonotonic(), + toAggregationTemporality(sumMetric.getAggregationTemporality()), + toDoublePointData(sumMetric.getDataPointsList())); + + return MetricData.createDoubleSum( + resource, + instrumentationLibraryInfo, + metric.getName(), + metric.getDescription(), + metric.getUnit(), + doubleSumData); + } + + private static MetricData toMetricData( + Resource resource, InstrumentationLibraryInfo instrumentationLibraryInfo, Metric metric) { + switch (metric.getDataCase()) { + case GAUGE: + return toGaugeMetricData(resource, instrumentationLibraryInfo, metric); + case SUM: + return toSumMetricData(resource, instrumentationLibraryInfo, metric); + default: + throw new UnsupportedOperationException( + String.format("Unsupported metric type: %s", metric.getDataCase())); + } + } + + public static List toMetricData(ResourceMetrics resourceMetrics) { + List metricData = new ArrayList<>(); + Resource resource = toResource(resourceMetrics.getResource()); + resourceMetrics + .getInstrumentationLibraryMetricsList() + .forEach( + instrumentationLibraryMetrics -> { + InstrumentationLibraryInfo instrumentationLibraryInfo = + toInstrumentationLibraryInfo( + instrumentationLibraryMetrics.getInstrumentationLibrary()); + instrumentationLibraryMetrics + .getMetricsList() + .forEach( + metric -> + metricData.add( + toMetricData(resource, instrumentationLibraryInfo, metric))); + }); + return metricData; + } +} diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/configs/common/application.conf b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/configs/common/application.conf new file mode 100644 index 000000000..16a968d23 --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/configs/common/application.conf @@ -0,0 +1,24 @@ +service.name = hypertrace-metrics-exporter +service.admin.port = 8099 + +main.class = org.hypertrace.metrics.exporter.MetricsExporterService + +input.topic = "enriched-otlp-metrics" + +buffer.config { + max.queue.size = 5000 + max.batch.size = 1000 +} + +kafka.config = { + application.id = hypertrace-metrics-exporter-job + bootstrap.servers = "localhost:9092" + bootstrap.servers = ${?KAFKA_BOOTSTRAP_SERVERS} +} + +logger.names = ["file"] +logger.file.dir = "/var/logs/metrics-generator" + +metrics.reporter.prefix = org.hypertrace.metrics.exporter.MetricsExporterService +metrics.reporter.names = ["prometheus"] +metrics.reportInterval = 60 \ No newline at end of file diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/log4j2.properties b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/log4j2.properties new file mode 100644 index 000000000..d91bc7bfe --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/log4j2.properties @@ -0,0 +1,23 @@ +status=error +name=PropertiesConfig +appender.console.type=Console +appender.console.name=STDOUT +appender.console.layout.type=PatternLayout +appender.console.layout.pattern=%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n +appender.rolling.type=RollingFile +appender.rolling.name=ROLLING_FILE +appender.rolling.fileName=${sys:service.name:-service}.log +appender.rolling.filePattern=${sys:service.name:-service}-%d{MM-dd-yy-HH-mm-ss}-%i.log.gz +appender.rolling.layout.type=PatternLayout +appender.rolling.layout.pattern=%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n +appender.rolling.policies.type=Policies +appender.rolling.policies.time.type=TimeBasedTriggeringPolicy +appender.rolling.policies.time.interval=3600 +appender.rolling.policies.time.modulate=true +appender.rolling.policies.size.type=SizeBasedTriggeringPolicy +appender.rolling.policies.size.size=20MB +appender.rolling.strategy.type=DefaultRolloverStrategy +appender.rolling.strategy.max=5 +rootLogger.level=INFO +rootLogger.appenderRef.stdout.ref=STDOUT +rootLogger.appenderRef.rolling.ref=ROLLING_FILE diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/test/java/org/hypertrace/metrics/exporter/producer/InMemoryMetricsProducerTest.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/test/java/org/hypertrace/metrics/exporter/producer/InMemoryMetricsProducerTest.java new file mode 100644 index 000000000..52d081e4c --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/test/java/org/hypertrace/metrics/exporter/producer/InMemoryMetricsProducerTest.java @@ -0,0 +1,72 @@ +package org.hypertrace.metrics.exporter.producer; + +import static org.hypertrace.metrics.exporter.utils.ResourceMetricsUtils.prepareMetric; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.sdk.metrics.data.MetricData; +import java.util.List; +import org.hypertrace.metrics.exporter.utils.OtlpProtoToMetricDataConverter; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class InMemoryMetricsProducerTest { + + private InMemoryMetricsProducer underTest; + + @BeforeEach + public void setUp() { + Config config = + ConfigFactory.parseURL( + getClass() + .getClassLoader() + .getResource("configs/hypertrace-metrics-exporter/application.conf")); + + underTest = new InMemoryMetricsProducer(config); + } + + @Test + public void testAddMetricDataAndCollectData() { + // insert 1 data + ResourceMetrics resourceMetrics = prepareMetric("int_num_calls", "number of calls", 1, "Gauge"); + List inMetricData = OtlpProtoToMetricDataConverter.toMetricData(resourceMetrics); + + Assertions.assertTrue(underTest.addMetricData(inMetricData)); + + // insert 2nd data + ResourceMetrics resourceMetrics1 = + prepareMetric("double_num_calls", "number of calls", 2.5, "Gauge"); + List inMetricData1 = OtlpProtoToMetricDataConverter.toMetricData(resourceMetrics1); + + // assert that can't add + Assertions.assertTrue(underTest.addMetricData(inMetricData1)); + + // insert 3nd data + ResourceMetrics resourceMetrics2 = + prepareMetric("double_num_calls", "number of calls", 3.5, "Gauge"); + List inMetricData2 = OtlpProtoToMetricDataConverter.toMetricData(resourceMetrics2); + + // assert that can't add + Assertions.assertFalse(underTest.addMetricData(inMetricData2)); + + // Now read data + List outData = (List) underTest.collectAllMetrics(); + Assertions.assertEquals(1, outData.size()); + Assertions.assertEquals(inMetricData.get(0), outData.get(0)); + + outData = (List) underTest.collectAllMetrics(); + Assertions.assertEquals(1, outData.size()); + Assertions.assertEquals(inMetricData1.get(0), outData.get(0)); + + outData = (List) underTest.collectAllMetrics(); + Assertions.assertEquals(0, outData.size()); + + // reinsert 3rd data point + Assertions.assertTrue(underTest.addMetricData(inMetricData2)); + outData = (List) underTest.collectAllMetrics(); + Assertions.assertEquals(1, outData.size()); + Assertions.assertEquals(inMetricData2.get(0), outData.get(0)); + } +} diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/test/java/org/hypertrace/metrics/exporter/utils/OtlpProtoToMetricDataConverterTest.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/test/java/org/hypertrace/metrics/exporter/utils/OtlpProtoToMetricDataConverterTest.java new file mode 100644 index 000000000..fc1c74c1a --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/test/java/org/hypertrace/metrics/exporter/utils/OtlpProtoToMetricDataConverterTest.java @@ -0,0 +1,100 @@ +package org.hypertrace.metrics.exporter.utils; + +import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.DELTA; +import static org.hypertrace.metrics.exporter.utils.ResourceMetricsUtils.prepareMetric; + +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.sdk.metrics.data.MetricData; +import java.util.List; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class OtlpProtoToMetricDataConverterTest { + + @Test + public void testGuageMetricData() { + // test for int values + ResourceMetrics resourceMetrics = + prepareMetric("int_num_calls", "number of calls", 10, "Gauge"); + + List underTestMetricData = + OtlpProtoToMetricDataConverter.toMetricData(resourceMetrics); + + Assertions.assertEquals(1, underTestMetricData.size()); + Assertions.assertEquals("int_num_calls", underTestMetricData.get(0).getName()); + Assertions.assertEquals(1, underTestMetricData.get(0).getDoubleGaugeData().getPoints().size()); + underTestMetricData + .get(0) + .getDoubleGaugeData() + .getPoints() + .forEach( + dpd -> { + Assertions.assertEquals(10.0, dpd.getValue()); + }); + + // test for double values + ResourceMetrics resourceMetrics1 = + prepareMetric("double_num_calls", "number of calls", 5.5, "Gauge"); + + List underTestMetricData1 = + underTestMetricData = OtlpProtoToMetricDataConverter.toMetricData(resourceMetrics1); + + Assertions.assertEquals(1, underTestMetricData1.size()); + Assertions.assertEquals("double_num_calls", underTestMetricData1.get(0).getName()); + Assertions.assertEquals(1, underTestMetricData1.get(0).getDoubleGaugeData().getPoints().size()); + underTestMetricData1 + .get(0) + .getDoubleGaugeData() + .getPoints() + .forEach( + dpd -> { + Assertions.assertEquals(5.5, dpd.getValue()); + }); + } + + @Test + public void testSumMetricData() { + // test for int values + ResourceMetrics resourceMetrics = prepareMetric("int_sum_calls", "number of calls", 10, "Sum"); + + List underTestMetricData = + OtlpProtoToMetricDataConverter.toMetricData(resourceMetrics); + + Assertions.assertEquals(1, underTestMetricData.size()); + Assertions.assertEquals("int_sum_calls", underTestMetricData.get(0).getName()); + Assertions.assertEquals(1, underTestMetricData.get(0).getDoubleSumData().getPoints().size()); + underTestMetricData + .get(0) + .getDoubleSumData() + .getPoints() + .forEach( + dpd -> { + Assertions.assertEquals(10.0, dpd.getValue()); + }); + Assertions.assertEquals(false, underTestMetricData.get(0).getDoubleSumData().isMonotonic()); + Assertions.assertEquals( + DELTA, underTestMetricData.get(0).getDoubleSumData().getAggregationTemporality()); + + // test for double values + ResourceMetrics resourceMetrics1 = + prepareMetric("double_sum_calls", "number of calls", 10.5, "Sum"); + + List underTestMetricData1 = + OtlpProtoToMetricDataConverter.toMetricData(resourceMetrics1); + + Assertions.assertEquals(1, underTestMetricData1.size()); + Assertions.assertEquals("double_sum_calls", underTestMetricData1.get(0).getName()); + Assertions.assertEquals(1, underTestMetricData1.get(0).getDoubleSumData().getPoints().size()); + underTestMetricData1 + .get(0) + .getDoubleSumData() + .getPoints() + .forEach( + dpd -> { + Assertions.assertEquals(10.5, dpd.getValue()); + }); + Assertions.assertEquals(false, underTestMetricData1.get(0).getDoubleSumData().isMonotonic()); + Assertions.assertEquals( + DELTA, underTestMetricData1.get(0).getDoubleSumData().getAggregationTemporality()); + } +} diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/test/java/org/hypertrace/metrics/exporter/utils/ResourceMetricsUtils.java b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/test/java/org/hypertrace/metrics/exporter/utils/ResourceMetricsUtils.java new file mode 100644 index 000000000..6ce0b9fea --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/test/java/org/hypertrace/metrics/exporter/utils/ResourceMetricsUtils.java @@ -0,0 +1,125 @@ +package org.hypertrace.metrics.exporter.utils; + +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.InstrumentationLibrary; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.metrics.v1.AggregationTemporality; +import io.opentelemetry.proto.metrics.v1.Gauge; +import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics; +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.NumberDataPoint; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.Sum; +import io.opentelemetry.proto.resource.v1.Resource; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class ResourceMetricsUtils { + private static Resource prepareResource() { + return Resource.newBuilder() + .addAttributes( + io.opentelemetry.proto.common.v1.KeyValue.newBuilder() + .setKey("Service") + .setValue( + AnyValue.newBuilder().setStringValue("hypertrace-metrics-exporter").build()) + .build()) + .build(); + } + + private static NumberDataPoint prepareNumberDataPoint(Number value) { + List attributes = + toAttributes( + Map.of( + "tenant_id", "__default", + "service_id", "1234", + "api_id", "4567")); + + NumberDataPoint.Builder numberDataPointBuilder = NumberDataPoint.newBuilder(); + numberDataPointBuilder.addAllAttributes(attributes); + numberDataPointBuilder.setTimeUnixNano( + TimeUnit.NANOSECONDS.convert( + 1634119810000L /*2021-10-13:10-10-10 GMT*/, TimeUnit.MILLISECONDS)); + + if (value instanceof Integer) { + numberDataPointBuilder.setAsInt(value.intValue()); + } else { + numberDataPointBuilder.setAsDouble(value.doubleValue()); + } + + return numberDataPointBuilder.build(); + } + + private static List toAttributes( + Map labels) { + List attributes = + labels.entrySet().stream() + .map( + k -> { + io.opentelemetry.proto.common.v1.KeyValue.Builder keyValueBuilder = + io.opentelemetry.proto.common.v1.KeyValue.newBuilder(); + keyValueBuilder.setKey(k.getKey()); + String value = k.getValue() != null ? k.getValue() : ""; + keyValueBuilder.setValue(AnyValue.newBuilder().setStringValue(value).build()); + return keyValueBuilder.build(); + }) + .collect(Collectors.toList()); + return attributes; + } + + private static Metric prepareGaugeMetric(String metricName, String metricDesc, Number value) { + io.opentelemetry.proto.metrics.v1.Metric.Builder metricBuilder = + io.opentelemetry.proto.metrics.v1.Metric.newBuilder(); + metricBuilder.setName(metricName); + metricBuilder.setDescription(metricDesc); + metricBuilder.setUnit("1"); + + NumberDataPoint numberDataPoint = prepareNumberDataPoint(value); + + Gauge.Builder gaugeBuilder = Gauge.newBuilder(); + gaugeBuilder.addDataPoints(numberDataPoint); + metricBuilder.setGauge(gaugeBuilder.build()); + return metricBuilder.build(); + } + + private static Metric prepareSumMetric(String metricName, String metricDesc, Number value) { + io.opentelemetry.proto.metrics.v1.Metric.Builder metricBuilder = + io.opentelemetry.proto.metrics.v1.Metric.newBuilder(); + metricBuilder.setName(metricName); + metricBuilder.setDescription(metricDesc); + metricBuilder.setUnit("1"); + + NumberDataPoint numberDataPoint = prepareNumberDataPoint(value); + + Sum.Builder sumBuilder = Sum.newBuilder(); + sumBuilder.addDataPoints(numberDataPoint); + sumBuilder.setAggregationTemporality(AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA); + sumBuilder.setIsMonotonic(false); + metricBuilder.setSum(sumBuilder.build()); + return metricBuilder.build(); + } + + public static ResourceMetrics prepareMetric( + String metricName, String metricDesc, Number value, String type) { + + ResourceMetrics.Builder resourceMetricsBuilder = ResourceMetrics.newBuilder(); + resourceMetricsBuilder.setResource(prepareResource()); + + Metric metric; + if (type.equals("Gauge")) { + metric = prepareGaugeMetric(metricName, metricDesc, value); + } else { + metric = prepareSumMetric(metricName, metricDesc, value); + } + + resourceMetricsBuilder.addInstrumentationLibraryMetrics( + InstrumentationLibraryMetrics.newBuilder() + .addMetrics(metric) + .setInstrumentationLibrary( + InstrumentationLibrary.newBuilder().setName("Generated").build()) + .build()); + + return resourceMetricsBuilder.build(); + } +} diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/test/resources/configs/hypertrace-metrics-exporter/application.conf b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/test/resources/configs/hypertrace-metrics-exporter/application.conf new file mode 100644 index 000000000..227d90013 --- /dev/null +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/test/resources/configs/hypertrace-metrics-exporter/application.conf @@ -0,0 +1,24 @@ +service.name = hypertrace-metrics-exporter +service.admin.port = 8099 + +main.class = org.hypertrace.metrics.exporter.MetricsExporterService + +input.topic = "enriched-otlp-metrics" + +buffer.config { + max.queue.size = 2 + max.batch.size = 1 +} + +kafka.config = { + application.id = hypertrace-metrics-exporter-job + bootstrap.servers = "localhost:9092" + bootstrap.servers = ${?KAFKA_BOOTSTRAP_SERVERS} +} + +logger.names = ["file"] +logger.file.dir = "/var/logs/metrics-generator" + +metrics.reporter.prefix = org.hypertrace.metrics.exporter.MetricsExporterService +metrics.reporter.names = ["prometheus"] +metrics.reportInterval = 60 \ No newline at end of file diff --git a/hypertrace-metrics-processor/hypertrace-metrics-processor/build.gradle.kts b/hypertrace-metrics-processor/hypertrace-metrics-processor/build.gradle.kts index 0d318f4d0..60b1e068e 100644 --- a/hypertrace-metrics-processor/hypertrace-metrics-processor/build.gradle.kts +++ b/hypertrace-metrics-processor/hypertrace-metrics-processor/build.gradle.kts @@ -29,8 +29,8 @@ dependencies { implementation(project(":hypertrace-view-generator:hypertrace-view-generator-api")) // frameworks - implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.26") - implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.26") + implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.30") + implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.30") implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.21") // open telemetry proto diff --git a/hypertrace-trace-enricher/hypertrace-trace-enricher/build.gradle.kts b/hypertrace-trace-enricher/hypertrace-trace-enricher/build.gradle.kts index 3585a30f1..ba7755c19 100644 --- a/hypertrace-trace-enricher/hypertrace-trace-enricher/build.gradle.kts +++ b/hypertrace-trace-enricher/hypertrace-trace-enricher/build.gradle.kts @@ -36,8 +36,8 @@ tasks.test { dependencies { implementation(project(":hypertrace-trace-enricher:hypertrace-trace-enricher-impl")) implementation("org.hypertrace.core.datamodel:data-model:0.1.19") - implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.26") - implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.26") + implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.30") + implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.30") implementation("org.hypertrace.entity.service:entity-service-client:0.8.5") implementation("com.typesafe:config:1.4.1") diff --git a/raw-spans-grouper/raw-spans-grouper/build.gradle.kts b/raw-spans-grouper/raw-spans-grouper/build.gradle.kts index 6332c209f..7293d48fa 100644 --- a/raw-spans-grouper/raw-spans-grouper/build.gradle.kts +++ b/raw-spans-grouper/raw-spans-grouper/build.gradle.kts @@ -39,8 +39,8 @@ dependencies { } implementation(project(":span-normalizer:span-normalizer-api")) implementation("org.hypertrace.core.datamodel:data-model:0.1.19") - implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.26") - implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.26") + implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.30") + implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.30") implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.21") implementation("com.typesafe:config:1.4.1") diff --git a/settings.gradle.kts b/settings.gradle.kts index 571660d40..2e19348e2 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -36,6 +36,7 @@ include("span-normalizer:span-normalizer-constants") // metrics pipeline include("hypertrace-metrics-processor:hypertrace-metrics-processor") +include("hypertrace-metrics-exporter:hypertrace-metrics-exporter") // utils include("semantic-convention-utils") diff --git a/span-normalizer/span-normalizer/build.gradle.kts b/span-normalizer/span-normalizer/build.gradle.kts index 0f02e9fd3..d7dff757a 100644 --- a/span-normalizer/span-normalizer/build.gradle.kts +++ b/span-normalizer/span-normalizer/build.gradle.kts @@ -35,8 +35,8 @@ dependencies { implementation(project(":semantic-convention-utils")) implementation("org.hypertrace.core.datamodel:data-model:0.1.19") - implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.26") - implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.26") + implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.30") + implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.30") implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.21") // Required for the GRPC clients.