diff --git a/hypertrace-metrics-generator/build.gradle.kts b/hypertrace-metrics-generator/build.gradle.kts new file mode 100644 index 000000000..b1720497c --- /dev/null +++ b/hypertrace-metrics-generator/build.gradle.kts @@ -0,0 +1,3 @@ +subprojects { + group = "org.hypertrace.metrics.generator" +} diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator-api/build.gradle.kts b/hypertrace-metrics-generator/hypertrace-metrics-generator-api/build.gradle.kts new file mode 100644 index 000000000..be6b04334 --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator-api/build.gradle.kts @@ -0,0 +1,27 @@ +import com.google.protobuf.gradle.protobuf +import com.google.protobuf.gradle.protoc + +plugins { + `java-library` + id("com.google.protobuf") version "0.8.15" +} + +protobuf { + protoc { + artifact = "com.google.protobuf:protoc:3.17.3" + } +} + +sourceSets { + main { + java { + srcDirs("src/main/java", "build/generated/source/proto/main/java") + } + } +} + +dependencies { + implementation("com.google.protobuf:protobuf-java:3.17.3") + implementation("org.apache.kafka:kafka-clients:6.0.1-ccs") + implementation("io.opentelemetry:opentelemetry-proto:1.6.0-alpha") +} diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/java/org/hypertrace/metrics/generator/api/v1/serde/MetricIdentitySerde.java b/hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/java/org/hypertrace/metrics/generator/api/v1/serde/MetricIdentitySerde.java new file mode 100644 index 000000000..bbd471b65 --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/java/org/hypertrace/metrics/generator/api/v1/serde/MetricIdentitySerde.java @@ -0,0 +1,50 @@ +package org.hypertrace.metrics.generator.api.v1.serde; + +import com.google.protobuf.InvalidProtocolBufferException; +import java.util.Map; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; +import org.hypertrace.metrics.generator.api.v1.MetricIdentity; + +public class MetricIdentitySerde implements Serde { + + @Override + public void configure(Map configs, boolean isKey) {} + + @Override + public void close() {} + + @Override + public Serializer serializer() { + return new MetricIdentitySerde.MetricIdentitySerializer(); + } + + @Override + public Deserializer deserializer() { + return new MetricIdentitySerde.MetricIdentityDeserializer(); + } + + private static class MetricIdentitySerializer implements Serializer { + @Override + public byte[] serialize(String topic, MetricIdentity data) { + try { + return data.toByteArray(); + } catch (Exception e) { + // ignore error + } + return null; + } + } + + private static class MetricIdentityDeserializer implements Deserializer { + @Override + public MetricIdentity deserialize(String topic, byte[] data) { + try { + return MetricIdentity.parseFrom(data); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/java/org/hypertrace/metrics/generator/api/v1/serde/MetricSerde.java b/hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/java/org/hypertrace/metrics/generator/api/v1/serde/MetricSerde.java new file mode 100644 index 000000000..7acb2bb41 --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/java/org/hypertrace/metrics/generator/api/v1/serde/MetricSerde.java @@ -0,0 +1,49 @@ +package org.hypertrace.metrics.generator.api.v1.serde; + +import com.google.protobuf.InvalidProtocolBufferException; +import java.util.Map; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; +import org.hypertrace.metrics.generator.api.v1.Metric; + +public class MetricSerde implements Serde { + @Override + public void configure(Map configs, boolean isKey) {} + + @Override + public void close() {} + + @Override + public Serializer serializer() { + return new MetricSerde.MetricSerializer(); + } + + @Override + public Deserializer deserializer() { + return new MetricSerde.MetricDeserializer(); + } + + private static class MetricSerializer implements Serializer { + @Override + public byte[] serialize(String topic, Metric data) { + try { + return data.toByteArray(); + } catch (Exception e) { + // ignore error + } + return null; + } + } + + private static class MetricDeserializer implements Deserializer { + @Override + public Metric deserialize(String topic, byte[] data) { + try { + return Metric.parseFrom(data); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/java/org/hypertrace/metrics/generator/api/v1/serde/OtlpMetricsSerde.java b/hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/java/org/hypertrace/metrics/generator/api/v1/serde/OtlpMetricsSerde.java new file mode 100644 index 000000000..5c64d98e8 --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/java/org/hypertrace/metrics/generator/api/v1/serde/OtlpMetricsSerde.java @@ -0,0 +1,55 @@ +package org.hypertrace.metrics.generator.api.v1.serde; + +import com.google.protobuf.InvalidProtocolBufferException; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import java.util.Map; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OtlpMetricsSerde implements Serde { + private static final Logger LOGGER = LoggerFactory.getLogger(OtlpMetricsSerde.class); + + @Override + public void configure(Map configs, boolean isKey) {} + + @Override + public void close() {} + + @Override + public Serializer serializer() { + return new OtlpMetricsSerde.OtlpMetricsSerializer(); + } + + @Override + public Deserializer deserializer() { + return new OtlpMetricsSerde.OtlpMetricsDeserializer(); + } + + private static class OtlpMetricsSerializer implements Serializer { + @Override + public byte[] serialize(String topic, ResourceMetrics data) { + try { + return data.toByteArray(); + } catch (Exception e) { + LOGGER.error("serialization error:", e); + } + return null; + } + } + + private static class OtlpMetricsDeserializer implements Deserializer { + @Override + public ResourceMetrics deserialize(String topic, byte[] data) { + try { + LOGGER.info("deserialize:{}", OtlpMetricsSerde.class.getName()); + return ResourceMetrics.parseFrom(data); + } catch (InvalidProtocolBufferException e) { + LOGGER.error("error:", e); + throw new RuntimeException(e); + } + } + } +} diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/proto/metric.proto b/hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/proto/metric.proto new file mode 100644 index 000000000..713ac41ba --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/proto/metric.proto @@ -0,0 +1,18 @@ +syntax = "proto3"; + +option java_multiple_files = true; + +package org.hypertrace.metrics.generator.api.v1; + + +message MetricIdentity { + sfixed64 timestamp_millis = 1; + string metric_key = 2; +} + +message Metric { + string name = 1; + map attributes = 2; + string description = 3; + string unit = 4; +} diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/build.gradle.kts b/hypertrace-metrics-generator/hypertrace-metrics-generator/build.gradle.kts new file mode 100644 index 000000000..2ea44f87d --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/build.gradle.kts @@ -0,0 +1,50 @@ +plugins { + java + application + jacoco + id("org.hypertrace.docker-java-application-plugin") + id("org.hypertrace.docker-publish-plugin") + id("org.hypertrace.jacoco-report-plugin") +} + +application { + mainClass.set("org.hypertrace.core.serviceframework.PlatformServiceLauncher") +} + +hypertraceDocker { + defaultImage { + javaApplication { + serviceName.set("${project.name}") + adminPort.set(8099) + } + } +} + +tasks.test { + useJUnitPlatform() +} + +dependencies { + // common and framework + implementation(project(":hypertrace-metrics-generator:hypertrace-metrics-generator-api")) + implementation(project(":hypertrace-view-generator:hypertrace-view-generator-api")) + implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.31") + implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.31") + implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.21") + + // open telemetry proto + implementation("io.opentelemetry:opentelemetry-proto:1.6.0-alpha") + + // all constraints + constraints { + implementation("org.glassfish.jersey.core:jersey-common:2.34") { + because("https://snyk.io/vuln/SNYK-JAVA-ORGGLASSFISHJERSEYCORE-1255637") + } + } + + // test + testImplementation("org.junit.jupiter:junit-jupiter:5.7.1") + testImplementation("org.mockito:mockito-core:3.8.0") + testImplementation("org.junit-pioneer:junit-pioneer:1.3.8") + testImplementation("org.apache.kafka:kafka-streams-test-utils:6.0.1-ccs") +} diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricEmitPunctuator.java b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricEmitPunctuator.java new file mode 100644 index 000000000..7cda72cf1 --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricEmitPunctuator.java @@ -0,0 +1,132 @@ +package org.hypertrace.metrics.generator; + +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.InstrumentationLibrary; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.metrics.v1.Gauge; +import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics; +import io.opentelemetry.proto.metrics.v1.NumberDataPoint; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.resource.v1.Resource; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.kafka.streams.processor.Cancellable; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.Punctuator; +import org.apache.kafka.streams.processor.To; +import org.apache.kafka.streams.state.KeyValueStore; +import org.hypertrace.metrics.generator.api.v1.Metric; +import org.hypertrace.metrics.generator.api.v1.MetricIdentity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MetricEmitPunctuator implements Punctuator { + private static final Logger LOGGER = LoggerFactory.getLogger(MetricEmitPunctuator.class); + + private static final String RESOURCE_KEY_SERVICE = "service"; + private static final String RESOURCE_KEY_SERVICE_VALUE = "metrics-generator"; + private static final String INSTRUMENTATION_LIB_NAME = "Generated-From-View"; + + private MetricIdentity key; + private ProcessorContext context; + private KeyValueStore metricIdentityStore; + private KeyValueStore metricStore; + private Cancellable cancellable; + private To outputTopicProducer; + + public MetricEmitPunctuator( + MetricIdentity key, + ProcessorContext context, + KeyValueStore metricIdentityStore, + KeyValueStore metricStore, + To outputTopicProducer) { + this.key = key; + this.context = context; + this.metricIdentityStore = metricIdentityStore; + this.metricStore = metricStore; + this.outputTopicProducer = outputTopicProducer; + } + + public void setCancellable(Cancellable cancellable) { + this.cancellable = cancellable; + } + + @Override + public void punctuate(long timestamp) { + // always cancel the punctuator else it will get re-scheduled automatically + cancellable.cancel(); + + // read the value from a key + Long value = metricIdentityStore.get(this.key); + if (value != null) { + long diff = timestamp - this.key.getTimestampMillis(); + LOGGER.debug("Metrics with key:{} is emitted after duration {}", this.key, diff); + + Metric metric = metricStore.get(this.key); + metricIdentityStore.delete(this.key); + metricStore.delete(this.key); + // convert to Resource Metrics + ResourceMetrics resourceMetrics = convertToResourceMetric(this.key, value, metric); + context.forward(null, resourceMetrics, outputTopicProducer); + } else { + LOGGER.debug("The value for metrics with key:{} is null", this.key); + } + } + + private ResourceMetrics convertToResourceMetric( + MetricIdentity metricIdentity, Long value, Metric metric) { + ResourceMetrics.Builder resourceMetricsBuilder = ResourceMetrics.newBuilder(); + resourceMetricsBuilder.setResource( + Resource.newBuilder() + .addAttributes( + io.opentelemetry.proto.common.v1.KeyValue.newBuilder() + .setKey(RESOURCE_KEY_SERVICE) + .setValue( + AnyValue.newBuilder().setStringValue(RESOURCE_KEY_SERVICE_VALUE).build()) + .build())); + + io.opentelemetry.proto.metrics.v1.Metric.Builder metricBuilder = + io.opentelemetry.proto.metrics.v1.Metric.newBuilder(); + metricBuilder.setName(metric.getName()); + metricBuilder.setDescription(metric.getDescription()); + metricBuilder.setUnit(metric.getUnit()); + + NumberDataPoint.Builder numberDataPointBuilder = NumberDataPoint.newBuilder(); + List attributes = toAttributes(metric.getAttributes()); + numberDataPointBuilder.addAllAttributes(attributes); + numberDataPointBuilder.setTimeUnixNano( + TimeUnit.NANOSECONDS.convert(metricIdentity.getTimestampMillis(), TimeUnit.MILLISECONDS)); + numberDataPointBuilder.setAsInt(value); + + Gauge.Builder gaugeBuilder = Gauge.newBuilder(); + gaugeBuilder.addDataPoints(numberDataPointBuilder.build()); + metricBuilder.setGauge(gaugeBuilder.build()); + + resourceMetricsBuilder.addInstrumentationLibraryMetrics( + InstrumentationLibraryMetrics.newBuilder() + .addMetrics(metricBuilder.build()) + .setInstrumentationLibrary( + InstrumentationLibrary.newBuilder().setName(INSTRUMENTATION_LIB_NAME).build()) + .build()); + + return resourceMetricsBuilder.build(); + } + + private List toAttributes(Map labels) { + List attributes = + labels.entrySet().stream() + .map( + k -> { + io.opentelemetry.proto.common.v1.KeyValue.Builder keyValueBuilder = + io.opentelemetry.proto.common.v1.KeyValue.newBuilder(); + keyValueBuilder.setKey(k.getKey()); + String value = k.getValue() != null ? k.getValue() : ""; + keyValueBuilder.setValue(AnyValue.newBuilder().setStringValue(value).build()); + return keyValueBuilder.build(); + }) + .collect(Collectors.toList()); + return attributes; + } +} diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsConstants.java b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsConstants.java new file mode 100644 index 000000000..fa062c769 --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsConstants.java @@ -0,0 +1,14 @@ +package org.hypertrace.metrics.generator; + +public class MetricsConstants { + public static final String TENANT_ID_ATTR = "tenant_id"; + public static final String SERVICE_ID_ATTR = "service_id"; + public static final String SERVICE_NAME_ATTR = "service_name"; + public static final String API_ID = "api_id"; + public static final String API_NAME = "api_name"; + public static final String PROTOCOL_NAME = "protocol_name"; + public static final String STATUS_CODE = "status_code"; + public static final String METRIC_NUM_CALLS = "num_calls"; + public static final String METRIC_NUM_CALLS_DESCRIPTION = "num of calls"; + public static final String METRIC_NUM_CALLS_UNIT = "1"; +} diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsGenerator.java b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsGenerator.java new file mode 100644 index 000000000..e85cf4ab3 --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsGenerator.java @@ -0,0 +1,127 @@ +package org.hypertrace.metrics.generator; + +import com.typesafe.config.Config; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import java.util.List; +import java.util.Map; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serdes.LongSerde; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Named; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.hypertrace.core.kafkastreams.framework.KafkaStreamsApp; +import org.hypertrace.core.serviceframework.config.ConfigClient; +import org.hypertrace.metrics.generator.api.v1.Metric; +import org.hypertrace.metrics.generator.api.v1.MetricIdentity; +import org.hypertrace.metrics.generator.api.v1.serde.MetricIdentitySerde; +import org.hypertrace.metrics.generator.api.v1.serde.MetricSerde; +import org.hypertrace.metrics.generator.api.v1.serde.OtlpMetricsSerde; +import org.hypertrace.viewgenerator.api.RawServiceView; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MetricsGenerator extends KafkaStreamsApp { + private static final Logger LOGGER = LoggerFactory.getLogger(MetricsGenerator.class); + public static final String INPUT_TOPIC_CONFIG_KEY = "input.topic"; + public static final String OUTPUT_TOPIC_CONFIG_KEY = "output.topic"; + public static final String METRICS_GENERATOR_JOB_CONFIG = "metrics-generator-job-config"; + public static final String METRICS_IDENTITY_STORE = "metric-identity-store"; + public static final String METRICS_IDENTITY_VALUE_STORE = "metric-identity-value-Store"; + public static final String OUTPUT_TOPIC_PRODUCER = "output-topic-producer"; + + public MetricsGenerator(ConfigClient configClient) { + super(configClient); + } + + @Override + public StreamsBuilder buildTopology( + Map streamsProperties, + StreamsBuilder streamsBuilder, + Map> inputStreams) { + + Config jobConfig = getJobConfig(streamsProperties); + String inputTopic = jobConfig.getString(INPUT_TOPIC_CONFIG_KEY); + String outputTopic = jobConfig.getString(OUTPUT_TOPIC_CONFIG_KEY); + + KStream inputStream = + (KStream) inputStreams.get(inputTopic); + if (inputStream == null) { + inputStream = streamsBuilder.stream(inputTopic, Consumed.with(Serdes.String(), (Serde) null)); + inputStreams.put(inputTopic, inputStream); + } + + StoreBuilder> metricIdentityStoreBuilder = + Stores.keyValueStoreBuilder( + Stores.persistentKeyValueStore(METRICS_IDENTITY_STORE), + new MetricIdentitySerde(), + new LongSerde()) + .withCachingEnabled(); + + StoreBuilder> metricIdentityToValueStoreBuilder = + Stores.keyValueStoreBuilder( + Stores.persistentKeyValueStore(METRICS_IDENTITY_VALUE_STORE), + new MetricIdentitySerde(), + new MetricSerde()) + .withCachingEnabled(); + + streamsBuilder.addStateStore(metricIdentityStoreBuilder); + streamsBuilder.addStateStore(metricIdentityToValueStoreBuilder); + + Produced outputTopicProducer = + Produced.with(Serdes.ByteArray(), new OtlpMetricsSerde()); + outputTopicProducer = outputTopicProducer.withName(OUTPUT_TOPIC_PRODUCER); + + inputStream + .transform( + MetricsProcessor::new, + Named.as(MetricsProcessor.class.getSimpleName()), + METRICS_IDENTITY_STORE, + METRICS_IDENTITY_VALUE_STORE) + .to(outputTopic, outputTopicProducer); + + return streamsBuilder; + } + + @Override + public String getJobConfigKey() { + return METRICS_GENERATOR_JOB_CONFIG; + } + + @Override + public Logger getLogger() { + return LOGGER; + } + + @Override + public List getInputTopics(Map properties) { + Config jobConfig = getJobConfig(properties); + return List.of(jobConfig.getString(INPUT_TOPIC_CONFIG_KEY)); + } + + @Override + public List getOutputTopics(Map properties) { + Config jobConfig = getJobConfig(properties); + return List.of(jobConfig.getString(OUTPUT_TOPIC_CONFIG_KEY)); + } + + private Config getJobConfig(Map properties) { + return (Config) properties.get(getJobConfigKey()); + } + + private Serde defaultValueSerde(Map properties) { + StreamsConfig config = new StreamsConfig(properties); + return config.defaultValueSerde(); + } + + private Serde defaultKeySerde(Map properties) { + StreamsConfig config = new StreamsConfig(properties); + return config.defaultKeySerde(); + } +} diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsProcessor.java b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsProcessor.java new file mode 100644 index 000000000..ccfa78b16 --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsProcessor.java @@ -0,0 +1,169 @@ +package org.hypertrace.metrics.generator; + +import static java.util.stream.Collectors.joining; +import static org.hypertrace.metrics.generator.MetricsConstants.API_ID; +import static org.hypertrace.metrics.generator.MetricsConstants.API_NAME; +import static org.hypertrace.metrics.generator.MetricsConstants.METRIC_NUM_CALLS; +import static org.hypertrace.metrics.generator.MetricsConstants.METRIC_NUM_CALLS_DESCRIPTION; +import static org.hypertrace.metrics.generator.MetricsConstants.METRIC_NUM_CALLS_UNIT; +import static org.hypertrace.metrics.generator.MetricsConstants.PROTOCOL_NAME; +import static org.hypertrace.metrics.generator.MetricsConstants.SERVICE_ID_ATTR; +import static org.hypertrace.metrics.generator.MetricsConstants.SERVICE_NAME_ATTR; +import static org.hypertrace.metrics.generator.MetricsConstants.STATUS_CODE; +import static org.hypertrace.metrics.generator.MetricsConstants.TENANT_ID_ATTR; +import static org.hypertrace.metrics.generator.MetricsGenerator.METRICS_GENERATOR_JOB_CONFIG; +import static org.hypertrace.metrics.generator.MetricsGenerator.OUTPUT_TOPIC_PRODUCER; + +import com.typesafe.config.Config; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.processor.Cancellable; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.To; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.hypertrace.metrics.generator.api.v1.Metric; +import org.hypertrace.metrics.generator.api.v1.MetricIdentity; +import org.hypertrace.viewgenerator.api.RawServiceView; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MetricsProcessor + implements Transformer> { + + private static final Logger logger = LoggerFactory.getLogger(MetricsProcessor.class); + + private static final String DELIMITER = ":"; + private static final String METRIC_AGGREGATION_TIME_MS = "metric.aggregation.timeMs"; + private static final String METRIC_EMIT_WAIT_TIME_MS = "metric.emit.waitTimeMs"; + + private ProcessorContext context; + private KeyValueStore metricsIdentityStore; + private KeyValueStore metricsStore; + private long metricAggregationTimeMs; + private long metricEmitWaitTimeMs; + private To outputTopicProducer; + + @Override + public void init(ProcessorContext context) { + + this.context = context; + this.metricsIdentityStore = + (KeyValueStore) + context.getStateStore(MetricsGenerator.METRICS_IDENTITY_STORE); + this.metricsStore = + (KeyValueStore) + context.getStateStore(MetricsGenerator.METRICS_IDENTITY_VALUE_STORE); + this.outputTopicProducer = To.child(OUTPUT_TOPIC_PRODUCER); + + Config jobConfig = (Config) (context.appConfigs().get(METRICS_GENERATOR_JOB_CONFIG)); + this.metricAggregationTimeMs = jobConfig.getLong(METRIC_AGGREGATION_TIME_MS); + this.metricEmitWaitTimeMs = jobConfig.getLong(METRIC_EMIT_WAIT_TIME_MS); + + restorePunctuators(); + } + + @Override + public KeyValue transform(String key, RawServiceView value) { + // construct metric attributes & metric + Map attributes = new HashMap<>(); + attributes.put(TENANT_ID_ATTR, value.getTenantId()); + attributes.put(SERVICE_ID_ATTR, value.getServiceId()); + attributes.put(SERVICE_NAME_ATTR, value.getServiceName()); + attributes.put(API_ID, value.getApiId()); + attributes.put(API_NAME, value.getApiName()); + if (value.getProtocolName() != null) { + attributes.put(PROTOCOL_NAME, value.getProtocolName()); + } + if (value.getStatusCode() != null) { + attributes.put(STATUS_CODE, value.getStatusCode()); + } + + Metric metric = + Metric.newBuilder() + .setName(METRIC_NUM_CALLS) + .setDescription(METRIC_NUM_CALLS_DESCRIPTION) + .setUnit(METRIC_NUM_CALLS_UNIT) + .putAllAttributes(attributes) + .build(); + + // create metrics identity (timestamp, metric_key) + Instant instant = + Instant.ofEpochMilli(value.getStartTimeMillis()) + .plusMillis(metricAggregationTimeMs) + .truncatedTo(ChronoUnit.SECONDS); + + MetricIdentity metricsIdentity = + MetricIdentity.newBuilder() + .setTimestampMillis(instant.toEpochMilli()) + .setMetricKey(generateKey(metric)) + .build(); + + // update the cache + Long preValue = metricsIdentityStore.get(metricsIdentity); + if (preValue == null) { + // first entry + metricsIdentityStore.put(metricsIdentity, (long) value.getNumCalls()); + metricsStore.put(metricsIdentity, metric); + + // schedule a punctuator + schedulePunctuator(metricsIdentity); + } else { + metricsIdentityStore.put(metricsIdentity, preValue + (long) value.getNumCalls()); + } + + return null; + } + + @Override + public void close() {} + + private String generateKey(Metric metric) { + String attributesStr = + metric.getAttributes().entrySet().stream() + .map(Object::toString) + .collect(joining(DELIMITER)); + String id = String.join(DELIMITER, metric.getName(), attributesStr); + return UUID.nameUUIDFromBytes(id.getBytes()).toString(); + } + + private void restorePunctuators() { + long count = 0; + Instant start = Instant.now(); + try (KeyValueIterator it = metricsIdentityStore.all()) { + while (it.hasNext()) { + schedulePunctuator(it.next().key); + count++; + } + logger.info( + "Restored=[{}] punctuators, Duration=[{}]", + count, + Duration.between(start, Instant.now())); + } + } + + private void schedulePunctuator(MetricIdentity key) { + MetricEmitPunctuator punctuator = + new MetricEmitPunctuator( + key, context, metricsIdentityStore, metricsStore, outputTopicProducer); + + Cancellable cancellable = + context.schedule( + Duration.ofMillis(metricEmitWaitTimeMs), PunctuationType.WALL_CLOCK_TIME, punctuator); + + punctuator.setCancellable(cancellable); + + logger.debug( + "Scheduled a punctuator to emit trace for key=[{}] to run after [{}] ms", + key, + metricEmitWaitTimeMs); + } +} diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/resources/configs/common/application.conf b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/resources/configs/common/application.conf new file mode 100644 index 000000000..dc1336237 --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/resources/configs/common/application.conf @@ -0,0 +1,37 @@ +service.name = hypertrace-metrics-generator +service.admin.port = 8099 + +main.class = org.hypertrace.metrics.generator.MetricsGenerator + +input.topic = "raw-service-view-events" +output.topic = "otlp-metrics" +input.class = org.hypertrace.viewgenerator.api.RawServiceView +precreate.topics = false +precreate.topics = ${?PRE_CREATE_TOPICS} + +metric.aggregation.timeMs = 5000 +metric.emit.waitTimeMs = 15000 + +kafka.streams.config = { + application.id = metrics-generator-from-raw-service-view-events-job + num.stream.threads = 2 + num.stream.threads = ${?NUM_STREAM_THREADS} + + bootstrap.servers = "localhost:9092" + bootstrap.servers = ${?KAFKA_BOOTSTRAP_SERVERS} + + schema.registry.url = "http://localhost:8081" + schema.registry.url = ${?SCHEMA_REGISTRY_URL} + value.subject.name.strategy = "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy" +} + +processor { + defaultTenantId = ${?DEFAULT_TENANT_ID} +} + +logger.names = ["file"] +logger.file.dir = "/var/logs/metrics-generator" + +metrics.reporter.prefix = org.hypertrace.metrics.generator.MetricsGenerator +metrics.reporter.names = ["prometheus"] +metrics.reportInterval = 60 diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/resources/log4j2.properties b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/resources/log4j2.properties new file mode 100644 index 000000000..d91bc7bfe --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/resources/log4j2.properties @@ -0,0 +1,23 @@ +status=error +name=PropertiesConfig +appender.console.type=Console +appender.console.name=STDOUT +appender.console.layout.type=PatternLayout +appender.console.layout.pattern=%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n +appender.rolling.type=RollingFile +appender.rolling.name=ROLLING_FILE +appender.rolling.fileName=${sys:service.name:-service}.log +appender.rolling.filePattern=${sys:service.name:-service}-%d{MM-dd-yy-HH-mm-ss}-%i.log.gz +appender.rolling.layout.type=PatternLayout +appender.rolling.layout.pattern=%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n +appender.rolling.policies.type=Policies +appender.rolling.policies.time.type=TimeBasedTriggeringPolicy +appender.rolling.policies.time.interval=3600 +appender.rolling.policies.time.modulate=true +appender.rolling.policies.size.type=SizeBasedTriggeringPolicy +appender.rolling.policies.size.size=20MB +appender.rolling.strategy.type=DefaultRolloverStrategy +appender.rolling.strategy.max=5 +rootLogger.level=INFO +rootLogger.appenderRef.stdout.ref=STDOUT +rootLogger.appenderRef.rolling.ref=ROLLING_FILE diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/java/MetricsGeneratorTest.java b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/java/MetricsGeneratorTest.java new file mode 100644 index 000000000..f14fdbbde --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/java/MetricsGeneratorTest.java @@ -0,0 +1,164 @@ +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; +import io.opentelemetry.proto.metrics.v1.Gauge; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import java.io.File; +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TestOutputTopic; +import org.apache.kafka.streams.TopologyTestDriver; +import org.hypertrace.core.serviceframework.config.ConfigClientFactory; +import org.hypertrace.metrics.generator.MetricsGenerator; +import org.hypertrace.metrics.generator.api.v1.serde.OtlpMetricsSerde; +import org.hypertrace.viewgenerator.api.RawServiceView; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junitpioneer.jupiter.SetEnvironmentVariable; + +public class MetricsGeneratorTest { + @Test + @SetEnvironmentVariable(key = "SERVICE_NAME", value = "hypertrace-metrics-generator") + public void testMetricsGenerator(@TempDir Path tempDir) { + + MetricsGenerator underTest = new MetricsGenerator(ConfigClientFactory.getClient()); + Config config = + ConfigFactory.parseURL( + getClass() + .getClassLoader() + .getResource("configs/hypertrace-metrics-generator/application.conf")); + + Map baseProps = underTest.getBaseStreamsConfig(); + Map streamsProps = underTest.getStreamsConfig(config); + baseProps.forEach(streamsProps::put); + Map mergedProps = streamsProps; + + File file = tempDir.resolve("state").toFile(); + mergedProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); + mergedProps.put(MetricsGenerator.METRICS_GENERATOR_JOB_CONFIG, config); + mergedProps.put(StreamsConfig.STATE_DIR_CONFIG, file.getAbsolutePath()); + + StreamsBuilder streamsBuilder = + underTest.buildTopology(mergedProps, new StreamsBuilder(), new HashMap<>()); + + Properties props = new Properties(); + mergedProps.forEach(props::put); + + Serde defaultValueSerde = new StreamsConfig(mergedProps).defaultValueSerde(); + + TopologyTestDriver td = new TopologyTestDriver(streamsBuilder.build(), props); + TestInputTopic inputTopic = + td.createInputTopic( + config.getString(MetricsGenerator.INPUT_TOPIC_CONFIG_KEY), + Serdes.String().serializer(), + defaultValueSerde.serializer()); + + TestOutputTopic outputTopic = + td.createOutputTopic( + config.getString(MetricsGenerator.OUTPUT_TOPIC_CONFIG_KEY), + Serdes.ByteArray().deserializer(), + new OtlpMetricsSerde().deserializer()); + + String tenantId = "tenant1"; + + // create 3 rows within 30 secs interval + RawServiceView row1 = + RawServiceView.newBuilder() + .setTenantId(tenantId) + .setTraceId(ByteBuffer.wrap("trace-1".getBytes())) + .setSpanId(ByteBuffer.wrap("span-1".getBytes())) + .setApiId("api-1234") + .setApiName("GET /api/v1/name") + .setServiceId("svc-1234") + .setServiceName("Cart Service") + .setStartTimeMillis(1636982920000L) + .setEndTimeMillis(1636982920015L) + .setDurationMillis(15L) + .setNumCalls(1) + .setProtocolName("HTTP") + .setStatusCode("200") + .build(); + + RawServiceView row2 = + RawServiceView.newBuilder() + .setTenantId(tenantId) + .setTraceId(ByteBuffer.wrap("trace-2".getBytes())) + .setSpanId(ByteBuffer.wrap("span-2".getBytes())) + .setApiId("api-1234") + .setApiName("GET /api/v1/name") + .setServiceId("svc-1234") + .setServiceName("Cart Service") + .setStartTimeMillis(1636982920200L) + .setEndTimeMillis(1636982920215L) + .setDurationMillis(15L) + .setNumCalls(1) + .setProtocolName("HTTP") + .setStatusCode("200") + .build(); + + RawServiceView row3 = + RawServiceView.newBuilder() + .setTenantId(tenantId) + .setTraceId(ByteBuffer.wrap("trace-3".getBytes())) + .setSpanId(ByteBuffer.wrap("span-3".getBytes())) + .setApiId("api-1234") + .setApiName("GET /api/v1/name") + .setServiceId("svc-1234") + .setServiceName("Cart Service") + .setStartTimeMillis(1636982920400L) + .setEndTimeMillis(1636982920415L) + .setDurationMillis(15L) + .setNumCalls(1) + .setProtocolName("HTTP") + .setStatusCode("200") + .build(); + + // pipe the data + inputTopic.pipeInput(null, row1); + inputTopic.pipeInput(null, row2); + inputTopic.pipeInput(null, row3); + + // advance clock < 30s + td.advanceWallClockTime(Duration.ofMillis(200)); + assertTrue(outputTopic.isEmpty()); + + // advance to > 15s + td.advanceWallClockTime(Duration.ofSeconds(17)); + ResourceMetrics resourceMetrics = (ResourceMetrics) outputTopic.readValue(); + + // assert for basics + assertNotNull(resourceMetrics); + Assertions.assertNotNull(resourceMetrics.getResource()); + + Assertions.assertEquals(1, resourceMetrics.getInstrumentationLibraryMetricsCount()); + Assertions.assertEquals( + 1, resourceMetrics.getInstrumentationLibraryMetrics(0).getMetricsCount()); + + Assertions.assertEquals( + "num_calls", resourceMetrics.getInstrumentationLibraryMetrics(0).getMetrics(0).getName()); + + // assert that num_calls is 3 + Gauge outGauge = resourceMetrics.getInstrumentationLibraryMetrics(0).getMetrics(0).getGauge(); + Assertions.assertNotNull(outGauge); + Assertions.assertEquals(1, outGauge.getDataPointsCount()); + Assertions.assertEquals( + 1636982925000L, + TimeUnit.MILLISECONDS.convert( + outGauge.getDataPoints(0).getTimeUnixNano(), TimeUnit.NANOSECONDS)); + Assertions.assertEquals(3L, outGauge.getDataPoints(0).getAsInt()); + } +} diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/resources/configs/hypertrace-metrics-generator/application.conf b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/resources/configs/hypertrace-metrics-generator/application.conf new file mode 100644 index 000000000..5aac99d62 --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/resources/configs/hypertrace-metrics-generator/application.conf @@ -0,0 +1,31 @@ +service.name = hypertrace-metrics-generator +service.admin.port = 8099 + +main.class = org.hypertrace.metrics.generator.MetricsGenerator + +input.topic = "raw-service-view-events" +output.topic = "otlp-metrics" +input.class = org.hypertrace.viewgenerator.api.RawServiceView +precreate.topics = false + +metric.aggregation.timeMs = 5000 +metric.emit.waitTimeMs = 15000 + +kafka.streams.config = { + application.id = metrics-generator-from-raw-service-view-events-job + num.stream.threads = 1 + bootstrap.servers = "localhost:9092" + schema.registry.url = "mock://localhost:8081" + value.subject.name.strategy = "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy" +} + +processor { + defaultTenantId = "__default" +} + +logger.names = ["file"] +logger.file.dir = "/var/logs/metrics-generator" + +metrics.reporter.prefix = org.hypertrace.metrics.generator.MetricsGenerator +metrics.reporter.names = ["prometheus"] +metrics.reportInterval = 60 diff --git a/settings.gradle.kts b/settings.gradle.kts index 2e19348e2..ca7db4f21 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -37,6 +37,8 @@ include("span-normalizer:span-normalizer-constants") // metrics pipeline include("hypertrace-metrics-processor:hypertrace-metrics-processor") include("hypertrace-metrics-exporter:hypertrace-metrics-exporter") +include("hypertrace-metrics-generator:hypertrace-metrics-generator-api") +include("hypertrace-metrics-generator:hypertrace-metrics-generator") // utils include("semantic-convention-utils")