From e1abeceb10cd43ce8f7236ae0397cb84edc8f8cc Mon Sep 17 00:00:00 2001 From: Ronak Date: Fri, 12 Nov 2021 18:32:49 +0530 Subject: [PATCH 01/11] feat: add metrics generator for num_calls --- hypertrace-metrics-generator/build.gradle.kts | 3 + .../build.gradle.kts | 9 + .../src/main/avro/metric-identity.avdl | 7 + .../src/main/avro/metric.avdl | 9 + .../build.gradle.kts | 42 +++++ .../generator/MetricEmitPunctuator.java | 137 +++++++++++++++ .../metrics/generator/MetricsGenerator.java | 124 ++++++++++++++ .../metrics/generator/MetricsProcessor.java | 158 ++++++++++++++++++ .../metrics/generator/OtlpMetricsSerde.java | 55 ++++++ .../resources/configs/common/application.conf | 34 ++++ .../src/main/resources/log4j2.properties | 23 +++ settings.gradle.kts | 2 + 12 files changed, 603 insertions(+) create mode 100644 hypertrace-metrics-generator/build.gradle.kts create mode 100644 hypertrace-metrics-generator/hypertrace-metrics-generator-api/build.gradle.kts create mode 100644 hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/avro/metric-identity.avdl create mode 100644 hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/avro/metric.avdl create mode 100644 hypertrace-metrics-generator/hypertrace-metrics-generator/build.gradle.kts create mode 100644 hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricEmitPunctuator.java create mode 100644 hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsGenerator.java create mode 100644 hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsProcessor.java create mode 100644 hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/OtlpMetricsSerde.java create mode 100644 hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/resources/configs/common/application.conf create mode 100644 hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/resources/log4j2.properties diff --git a/hypertrace-metrics-generator/build.gradle.kts b/hypertrace-metrics-generator/build.gradle.kts new file mode 100644 index 000000000..b1720497c --- /dev/null +++ b/hypertrace-metrics-generator/build.gradle.kts @@ -0,0 +1,3 @@ +subprojects { + group = "org.hypertrace.metrics.generator" +} diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator-api/build.gradle.kts b/hypertrace-metrics-generator/hypertrace-metrics-generator-api/build.gradle.kts new file mode 100644 index 000000000..a512a70f7 --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator-api/build.gradle.kts @@ -0,0 +1,9 @@ +plugins { + `java-library` + id("org.hypertrace.publish-plugin") + id("org.hypertrace.avro-plugin") +} + +dependencies { + api("org.apache.avro:avro:1.10.2") +} diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/avro/metric-identity.avdl b/hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/avro/metric-identity.avdl new file mode 100644 index 000000000..52325295e --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/avro/metric-identity.avdl @@ -0,0 +1,7 @@ +@namespace("org.hypertrace.metrics.generator.api") +protocol MetricIdentityProtocol { + record MetricIdentity { + long timestamp_millis = 0; + union {null, string} metric_key = null; + } +} \ No newline at end of file diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/avro/metric.avdl b/hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/avro/metric.avdl new file mode 100644 index 000000000..60bc8310b --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/avro/metric.avdl @@ -0,0 +1,9 @@ +@namespace("org.hypertrace.metrics.generator.api") +protocol MetricProtocol { + record Metric { + union {null, string} name = null; + map attributes = {}; + union {null, string} description = null; + union {null, string} unit = null; + } +} \ No newline at end of file diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/build.gradle.kts b/hypertrace-metrics-generator/hypertrace-metrics-generator/build.gradle.kts new file mode 100644 index 000000000..136b5ddcd --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/build.gradle.kts @@ -0,0 +1,42 @@ +plugins { + java + application + jacoco + id("org.hypertrace.docker-java-application-plugin") + id("org.hypertrace.docker-publish-plugin") + id("org.hypertrace.jacoco-report-plugin") +} + +application { + mainClass.set("org.hypertrace.core.serviceframework.PlatformServiceLauncher") +} + +hypertraceDocker { + defaultImage { + javaApplication { + serviceName.set("${project.name}") + adminPort.set(8099) + } + } +} + +tasks.test { + useJUnitPlatform() +} + +dependencies { + // common and framework + implementation(project(":hypertrace-metrics-generator:hypertrace-metrics-generator-api")) + implementation(project(":hypertrace-view-generator:hypertrace-view-generator-api")) + implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.30-SNAPSHOT") + implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.30-SNAPSHOT") + implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.21") + + // open telemetry proto + implementation("io.opentelemetry:opentelemetry-proto:1.6.0-alpha") + + // test + testImplementation("org.junit.jupiter:junit-jupiter:5.7.1") + testImplementation("org.mockito:mockito-core:3.8.0") + testImplementation("com.google.code.gson:gson:2.8.7") +} diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricEmitPunctuator.java b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricEmitPunctuator.java new file mode 100644 index 000000000..6cba45b2f --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricEmitPunctuator.java @@ -0,0 +1,137 @@ +package org.hypertrace.metrics.generator; + +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.InstrumentationLibrary; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.metrics.v1.Gauge; +import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics; +import io.opentelemetry.proto.metrics.v1.NumberDataPoint; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.resource.v1.Resource; +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.kafka.streams.processor.Cancellable; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; +import org.apache.kafka.streams.processor.To; +import org.apache.kafka.streams.state.KeyValueStore; +import org.hypertrace.metrics.generator.api.Metric; +import org.hypertrace.metrics.generator.api.MetricIdentity; + +public class MetricEmitPunctuator implements Punctuator { + private static final String RESOURCE_KEY_SERVICE = "service"; + private static final String RESOURCE_KEY_SERVICE_VALUE = "metrics-generator"; + private static final String INSTRUMENTATION_LIB_NAME = "Generated-From-View"; + + private MetricIdentity key; + private ProcessorContext context; + private KeyValueStore metricIdentityStore; + private KeyValueStore metricStore; + private long groupingWindowTimeoutMs; + private Cancellable cancellable; + private To outputTopicProducer; + + public MetricEmitPunctuator( + MetricIdentity key, + ProcessorContext context, + KeyValueStore metricIdentityStore, + KeyValueStore metricStore, + long groupingWindowTimeoutMs, + To outputTopicProducer) { + this.key = key; + this.context = context; + this.metricIdentityStore = metricIdentityStore; + this.metricStore = metricStore; + this.groupingWindowTimeoutMs = groupingWindowTimeoutMs; + this.outputTopicProducer = outputTopicProducer; + } + + public void setCancellable(Cancellable cancellable) { + this.cancellable = cancellable; + } + + @Override + public void punctuate(long timestamp) { + Instant startTime = Instant.now(); + // always cancel the punctuator else it will get re-scheduled automatically + cancellable.cancel(); + + // read the value from a key + Long value = metricIdentityStore.get(this.key); + if (value != null) { + long diff = timestamp - this.key.getTimestampMillis(); + if (diff > groupingWindowTimeoutMs) { + Metric metric = metricStore.get(this.key); + metricIdentityStore.delete(this.key); + metricStore.delete(this.key); + // convert to Resource Metrics + ResourceMetrics resourceMetrics = convertToResourceMetric(this.key, value, metric); + context.forward(null, resourceMetrics, outputTopicProducer); + } else { + long duration = Math.max(1000, diff); + cancellable = + context.schedule(Duration.ofMillis(duration), PunctuationType.WALL_CLOCK_TIME, this); + } + } + } + + private ResourceMetrics convertToResourceMetric( + MetricIdentity metricIdentity, Long value, Metric metric) { + ResourceMetrics.Builder resourceMetricsBuilder = ResourceMetrics.newBuilder(); + resourceMetricsBuilder.setResource( + Resource.newBuilder() + .addAttributes( + io.opentelemetry.proto.common.v1.KeyValue.newBuilder() + .setKey(RESOURCE_KEY_SERVICE) + .setValue( + AnyValue.newBuilder().setStringValue(RESOURCE_KEY_SERVICE_VALUE).build()) + .build())); + + io.opentelemetry.proto.metrics.v1.Metric.Builder metricBuilder = + io.opentelemetry.proto.metrics.v1.Metric.newBuilder(); + metricBuilder.setName(metric.getName()); + metricBuilder.setDescription(metric.getDescription()); + metricBuilder.setUnit(metric.getUnit()); + + NumberDataPoint.Builder numberDataPointBuilder = NumberDataPoint.newBuilder(); + List attributes = toAttributes(metric.getAttributes()); + numberDataPointBuilder.addAllAttributes(attributes); + numberDataPointBuilder.setTimeUnixNano( + TimeUnit.NANOSECONDS.convert(metricIdentity.getTimestampMillis(), TimeUnit.MILLISECONDS)); + numberDataPointBuilder.setAsInt(value); + + Gauge.Builder gaugeBuilder = Gauge.newBuilder(); + gaugeBuilder.addDataPoints(numberDataPointBuilder.build()); + metricBuilder.setGauge(gaugeBuilder.build()); + + resourceMetricsBuilder.addInstrumentationLibraryMetrics( + InstrumentationLibraryMetrics.newBuilder() + .addMetrics(metricBuilder.build()) + .setInstrumentationLibrary( + InstrumentationLibrary.newBuilder().setName(INSTRUMENTATION_LIB_NAME).build()) + .build()); + + return resourceMetricsBuilder.build(); + } + + private List toAttributes(Map labels) { + List attributes = + labels.entrySet().stream() + .map( + k -> { + io.opentelemetry.proto.common.v1.KeyValue.Builder keyValueBuilder = + io.opentelemetry.proto.common.v1.KeyValue.newBuilder(); + keyValueBuilder.setKey(k.getKey()); + String value = k.getValue() != null ? k.getValue() : ""; + keyValueBuilder.setValue(AnyValue.newBuilder().setStringValue(value).build()); + return keyValueBuilder.build(); + }) + .collect(Collectors.toList()); + return attributes; + } +} diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsGenerator.java b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsGenerator.java new file mode 100644 index 000000000..e33e712bb --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsGenerator.java @@ -0,0 +1,124 @@ +package org.hypertrace.metrics.generator; + +import com.typesafe.config.Config; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import java.util.List; +import java.util.Map; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serdes.LongSerde; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Named; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.hypertrace.core.kafkastreams.framework.KafkaStreamsApp; +import org.hypertrace.core.serviceframework.config.ConfigClient; +import org.hypertrace.metrics.generator.api.Metric; +import org.hypertrace.metrics.generator.api.MetricIdentity; +import org.hypertrace.viewgenerator.api.RawServiceView; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MetricsGenerator extends KafkaStreamsApp { + private static final Logger LOGGER = LoggerFactory.getLogger(MetricsGenerator.class); + private static final String INPUT_TOPIC_CONFIG_KEY = "input.topic"; + private static final String OUTPUT_TOPIC_CONFIG_KEY = "output.topic"; + private static final String METRICS_GENERATOR_JOB_CONFIG = "metrics-generator-job-config"; + public static final String METRICS_IDENTITY_STORE = "metric-identity-store"; + public static final String METRICS_IDENTITY_VALUE_STORE = "metric-identity-value-Store"; + public static final String OUTPUT_TOPIC_PRODUCER = "output-topic-producer"; + + public MetricsGenerator(ConfigClient configClient) { + super(configClient); + } + + @Override + public StreamsBuilder buildTopology( + Map streamsProperties, + StreamsBuilder streamsBuilder, + Map> inputStreams) { + + Config jobConfig = getJobConfig(streamsProperties); + String inputTopic = jobConfig.getString(INPUT_TOPIC_CONFIG_KEY); + String outputTopic = jobConfig.getString(OUTPUT_TOPIC_CONFIG_KEY); + + KStream inputStream = + (KStream) inputStreams.get(inputTopic); + if (inputStream == null) { + inputStream = streamsBuilder.stream(inputTopic, Consumed.with(Serdes.String(), (Serde) null)); + inputStreams.put(inputTopic, inputStream); + } + + // Retrieve the default value serde defined in config and use it + Serde valueSerde = defaultValueSerde(streamsProperties); + Serde keySerde = defaultKeySerde(streamsProperties); + + StoreBuilder> metricIdentityStoreBuilder = + Stores.keyValueStoreBuilder( + Stores.persistentKeyValueStore(METRICS_IDENTITY_STORE), keySerde, new LongSerde()) + .withCachingEnabled(); + + StoreBuilder> metricIdentityToValueStoreBuilder = + Stores.keyValueStoreBuilder( + Stores.persistentKeyValueStore(METRICS_IDENTITY_VALUE_STORE), keySerde, valueSerde) + .withCachingEnabled(); + + streamsBuilder.addStateStore(metricIdentityStoreBuilder); + streamsBuilder.addStateStore(metricIdentityToValueStoreBuilder); + + Produced outputTopicProducer = + Produced.with(Serdes.ByteArray(), new OtlpMetricsSerde()); + outputTopicProducer = outputTopicProducer.withName(OUTPUT_TOPIC_PRODUCER); + + inputStream + .transform( + MetricsProcessor::new, + Named.as(MetricsProcessor.class.getSimpleName()), + METRICS_IDENTITY_STORE, + METRICS_IDENTITY_VALUE_STORE) + .to(outputTopic, outputTopicProducer); + + return streamsBuilder; + } + + @Override + public String getJobConfigKey() { + return METRICS_GENERATOR_JOB_CONFIG; + } + + @Override + public Logger getLogger() { + return LOGGER; + } + + @Override + public List getInputTopics(Map properties) { + Config jobConfig = getJobConfig(properties); + return List.of(jobConfig.getString(INPUT_TOPIC_CONFIG_KEY)); + } + + @Override + public List getOutputTopics(Map properties) { + Config jobConfig = getJobConfig(properties); + return List.of(jobConfig.getString(OUTPUT_TOPIC_CONFIG_KEY)); + } + + private Config getJobConfig(Map properties) { + return (Config) properties.get(getJobConfigKey()); + } + + private Serde defaultValueSerde(Map properties) { + StreamsConfig config = new StreamsConfig(properties); + return config.defaultValueSerde(); + } + + private Serde defaultKeySerde(Map properties) { + StreamsConfig config = new StreamsConfig(properties); + return config.defaultKeySerde(); + } +} diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsProcessor.java b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsProcessor.java new file mode 100644 index 000000000..1052b820a --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsProcessor.java @@ -0,0 +1,158 @@ +package org.hypertrace.metrics.generator; + +import static java.util.stream.Collectors.joining; +import static org.hypertrace.metrics.generator.MetricsGenerator.OUTPUT_TOPIC_PRODUCER; + +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.processor.Cancellable; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.To; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.hypertrace.metrics.generator.api.Metric; +import org.hypertrace.metrics.generator.api.MetricIdentity; +import org.hypertrace.viewgenerator.api.RawServiceView; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MetricsProcessor + implements Transformer> { + + private static final Logger logger = LoggerFactory.getLogger(MetricsProcessor.class); + + private static final String TENANT_ID_ATTR = "tenant_id"; + private static final String SERVICE_ID_ATTR = "service_id"; + private static final String SERVICE_NAME_ATTR = "service_name"; + private static final String API_ID = "api_id"; + private static final String API_NAME = "api_name"; + private static final String CONSUMER_ID_ATTR = "consumer_id"; + private static final String METRIC_NUM_CALLS = "num_calls"; + private static final String METRIC_NUM_CALLS_DESCRIPTION = "num of calls"; + private static final String METRIC_NUM_CALLS_UNIT = "1"; + private static final String DELIMITER = ":"; + + private ProcessorContext context; + private KeyValueStore metricsIdentityStore; + private KeyValueStore metricsStore; + private long groupingWindowTimeoutMs = 30000L; + private To outputTopicProducer; + + @Override + public void init(ProcessorContext context) { + + this.context = context; + this.metricsIdentityStore = + (KeyValueStore) + context.getStateStore(MetricsGenerator.METRICS_IDENTITY_STORE); + this.metricsStore = + (KeyValueStore) + context.getStateStore(MetricsGenerator.METRICS_IDENTITY_VALUE_STORE); + this.outputTopicProducer = To.child(OUTPUT_TOPIC_PRODUCER); + restorePunctuators(); + } + + @Override + public KeyValue transform(String key, RawServiceView value) { + + // create a metricX + Map attributes = new HashMap<>(); + attributes.put(TENANT_ID_ATTR, value.getTenantId()); + attributes.put(CONSUMER_ID_ATTR, "1"); + attributes.put(SERVICE_ID_ATTR, value.getServiceId()); + attributes.put(SERVICE_NAME_ATTR, value.getServiceName()); + attributes.put(API_ID, value.getApiId()); + attributes.put(API_NAME, value.getApiName()); + + Metric metric = + Metric.newBuilder() + .setName(METRIC_NUM_CALLS) + .setDescription(METRIC_NUM_CALLS_DESCRIPTION) + .setUnit(METRIC_NUM_CALLS_UNIT) + .setAttributes(attributes) + .build(); + + // create metricX identity (timestamp, metric_key) + Instant instant = + Instant.ofEpochMilli(value.getStartTimeMillis()) + .plusSeconds(1) + .truncatedTo(ChronoUnit.SECONDS); + + MetricIdentity metricsIdentity = + MetricIdentity.newBuilder() + .setTimestampMillis(instant.toEpochMilli()) + .setMetricKey(generateKey(metric)) + .build(); + + // update the cache + Long preValue = metricsIdentityStore.get(metricsIdentity); + if (preValue == null) { + // first entry + metricsIdentityStore.put(metricsIdentity, 1L); + metricsStore.put(metricsIdentity, metric); + + // schedule a punctuator + schedulePunctuator(metricsIdentity); + } else { + metricsIdentityStore.put(metricsIdentity, preValue + 1); + } + + return null; + } + + @Override + public void close() {} + + private String generateKey(Metric metric) { + String attributesStr = + metric.getAttributes().entrySet().stream() + .map(Object::toString) + .collect(joining(DELIMITER)); + String id = String.join(DELIMITER, metric.getName(), attributesStr); + return UUID.nameUUIDFromBytes(id.getBytes()).toString(); + } + + private void restorePunctuators() { + long count = 0; + Instant start = Instant.now(); + try (KeyValueIterator it = metricsIdentityStore.all()) { + while (it.hasNext()) { + schedulePunctuator(it.next().key); + count++; + } + logger.info( + "Restored=[{}] punctuators, Duration=[{}]", + count, + Duration.between(start, Instant.now())); + } + } + + private void schedulePunctuator(MetricIdentity key) { + MetricEmitPunctuator punctuator = + new MetricEmitPunctuator( + key, + context, + metricsIdentityStore, + metricsStore, + groupingWindowTimeoutMs, + outputTopicProducer); + Cancellable cancellable = + context.schedule( + Duration.ofMillis(groupingWindowTimeoutMs), + PunctuationType.WALL_CLOCK_TIME, + punctuator); + punctuator.setCancellable(cancellable); + logger.debug( + "Scheduled a punctuator to emit trace for key=[{}] to run after [{}] ms", + key, + groupingWindowTimeoutMs); + } +} diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/OtlpMetricsSerde.java b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/OtlpMetricsSerde.java new file mode 100644 index 000000000..8a6c63137 --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/OtlpMetricsSerde.java @@ -0,0 +1,55 @@ +package org.hypertrace.metrics.generator; + +import com.google.protobuf.InvalidProtocolBufferException; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import java.util.Map; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OtlpMetricsSerde implements Serde { + private static final Logger LOGGER = LoggerFactory.getLogger(OtlpMetricsSerde.class); + + @Override + public void configure(Map configs, boolean isKey) {} + + @Override + public void close() {} + + @Override + public Serializer serializer() { + return new OtlpMetricsSerde.Ser(); + } + + @Override + public Deserializer deserializer() { + return new OtlpMetricsSerde.De(); + } + + public static class Ser implements Serializer { + @Override + public byte[] serialize(String topic, ResourceMetrics data) { + try { + return data.toByteArray(); + } catch (Exception e) { + LOGGER.error("serialization error:", e); + } + return null; + } + } + + public static class De implements Deserializer { + @Override + public ResourceMetrics deserialize(String topic, byte[] data) { + try { + LOGGER.info("deserialize:{}", OtlpMetricsSerde.class.getName()); + return ResourceMetrics.parseFrom(data); + } catch (InvalidProtocolBufferException e) { + LOGGER.error("error:", e); + throw new RuntimeException(e); + } + } + } +} diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/resources/configs/common/application.conf b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/resources/configs/common/application.conf new file mode 100644 index 000000000..eb8caac59 --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/resources/configs/common/application.conf @@ -0,0 +1,34 @@ +service.name = hypertrace-metrics-generator +service.admin.port = 8099 + +main.class = org.hypertrace.metrics.generator.MetricsGenerator + +input.topic = "raw-service-view-events" +output.topic = "otlp-metrics" +input.class = org.hypertrace.viewgenerator.api.RawServiceView +precreate.topics = false +precreate.topics = ${?PRE_CREATE_TOPICS} + +kafka.streams.config = { + application.id = metrics-generator-from-raw-service-view-events-job + num.stream.threads = 2 + num.stream.threads = ${?NUM_STREAM_THREADS} + + bootstrap.servers = "localhost:9092" + bootstrap.servers = ${?KAFKA_BOOTSTRAP_SERVERS} + + schema.registry.url = "http://localhost:8081" + schema.registry.url = ${?SCHEMA_REGISTRY_URL} + value.subject.name.strategy = "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy" +} + +processor { + defaultTenantId = ${?DEFAULT_TENANT_ID} +} + +logger.names = ["file"] +logger.file.dir = "/var/logs/metrics-generator" + +metrics.reporter.prefix = org.hypertrace.metrics.generator.MetricsGenerator +metrics.reporter.names = ["prometheus"] +metrics.reportInterval = 60 diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/resources/log4j2.properties b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/resources/log4j2.properties new file mode 100644 index 000000000..d91bc7bfe --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/resources/log4j2.properties @@ -0,0 +1,23 @@ +status=error +name=PropertiesConfig +appender.console.type=Console +appender.console.name=STDOUT +appender.console.layout.type=PatternLayout +appender.console.layout.pattern=%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n +appender.rolling.type=RollingFile +appender.rolling.name=ROLLING_FILE +appender.rolling.fileName=${sys:service.name:-service}.log +appender.rolling.filePattern=${sys:service.name:-service}-%d{MM-dd-yy-HH-mm-ss}-%i.log.gz +appender.rolling.layout.type=PatternLayout +appender.rolling.layout.pattern=%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n +appender.rolling.policies.type=Policies +appender.rolling.policies.time.type=TimeBasedTriggeringPolicy +appender.rolling.policies.time.interval=3600 +appender.rolling.policies.time.modulate=true +appender.rolling.policies.size.type=SizeBasedTriggeringPolicy +appender.rolling.policies.size.size=20MB +appender.rolling.strategy.type=DefaultRolloverStrategy +appender.rolling.strategy.max=5 +rootLogger.level=INFO +rootLogger.appenderRef.stdout.ref=STDOUT +rootLogger.appenderRef.rolling.ref=ROLLING_FILE diff --git a/settings.gradle.kts b/settings.gradle.kts index 2e19348e2..ca7db4f21 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -37,6 +37,8 @@ include("span-normalizer:span-normalizer-constants") // metrics pipeline include("hypertrace-metrics-processor:hypertrace-metrics-processor") include("hypertrace-metrics-exporter:hypertrace-metrics-exporter") +include("hypertrace-metrics-generator:hypertrace-metrics-generator-api") +include("hypertrace-metrics-generator:hypertrace-metrics-generator") // utils include("semantic-convention-utils") From 64d4ffccd191b8230c1347e9f83d5bb6b7323a81 Mon Sep 17 00:00:00 2001 From: Ronak Date: Fri, 12 Nov 2021 19:12:29 +0530 Subject: [PATCH 02/11] fix the avro compatibility issue --- .../hypertrace-metrics-generator-api/build.gradle.kts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator-api/build.gradle.kts b/hypertrace-metrics-generator/hypertrace-metrics-generator-api/build.gradle.kts index a512a70f7..945f24b3c 100644 --- a/hypertrace-metrics-generator/hypertrace-metrics-generator-api/build.gradle.kts +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator-api/build.gradle.kts @@ -7,3 +7,8 @@ plugins { dependencies { api("org.apache.avro:avro:1.10.2") } + +hypertraceAvro { + previousArtifact.set("${project.group}/${project.name}:latest.release") + relocatedToArtifact.set("${project.group}/${project.name}:latest.release") +} \ No newline at end of file From 6f83dcf03b03ad355c0f50b8c1441c5ce04d197f Mon Sep 17 00:00:00 2001 From: Ronak Date: Tue, 16 Nov 2021 09:32:46 +0530 Subject: [PATCH 03/11] test: add topology test --- .../build.gradle.kts | 7 +- .../build.gradle.kts | 3 +- .../metrics/generator/MetricsGenerator.java | 6 +- .../src/test/java/MetricsGeneratorTest.java | 117 ++++++++++++++++++ .../application.conf | 34 +++++ 5 files changed, 159 insertions(+), 8 deletions(-) create mode 100644 hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/java/MetricsGeneratorTest.java create mode 100644 hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/resources/configs/hypertrace-metrics-generator/application.conf diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator-api/build.gradle.kts b/hypertrace-metrics-generator/hypertrace-metrics-generator-api/build.gradle.kts index 945f24b3c..23c35f121 100644 --- a/hypertrace-metrics-generator/hypertrace-metrics-generator-api/build.gradle.kts +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator-api/build.gradle.kts @@ -8,7 +8,6 @@ dependencies { api("org.apache.avro:avro:1.10.2") } -hypertraceAvro { - previousArtifact.set("${project.group}/${project.name}:latest.release") - relocatedToArtifact.set("${project.group}/${project.name}:latest.release") -} \ No newline at end of file +tasks.named("avroCompatibilityCheck") { + enabled = false +} diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/build.gradle.kts b/hypertrace-metrics-generator/hypertrace-metrics-generator/build.gradle.kts index 136b5ddcd..c468abced 100644 --- a/hypertrace-metrics-generator/hypertrace-metrics-generator/build.gradle.kts +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/build.gradle.kts @@ -38,5 +38,6 @@ dependencies { // test testImplementation("org.junit.jupiter:junit-jupiter:5.7.1") testImplementation("org.mockito:mockito-core:3.8.0") - testImplementation("com.google.code.gson:gson:2.8.7") + testImplementation("org.junit-pioneer:junit-pioneer:1.3.8") + testImplementation("org.apache.kafka:kafka-streams-test-utils:6.0.1-ccs") } diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsGenerator.java b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsGenerator.java index e33e712bb..4aa6d8e7a 100644 --- a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsGenerator.java +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsGenerator.java @@ -26,9 +26,9 @@ public class MetricsGenerator extends KafkaStreamsApp { private static final Logger LOGGER = LoggerFactory.getLogger(MetricsGenerator.class); - private static final String INPUT_TOPIC_CONFIG_KEY = "input.topic"; - private static final String OUTPUT_TOPIC_CONFIG_KEY = "output.topic"; - private static final String METRICS_GENERATOR_JOB_CONFIG = "metrics-generator-job-config"; + public static final String INPUT_TOPIC_CONFIG_KEY = "input.topic"; + public static final String OUTPUT_TOPIC_CONFIG_KEY = "output.topic"; + public static final String METRICS_GENERATOR_JOB_CONFIG = "metrics-generator-job-config"; public static final String METRICS_IDENTITY_STORE = "metric-identity-store"; public static final String METRICS_IDENTITY_VALUE_STORE = "metric-identity-value-Store"; public static final String OUTPUT_TOPIC_PRODUCER = "output-topic-producer"; diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/java/MetricsGeneratorTest.java b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/java/MetricsGeneratorTest.java new file mode 100644 index 000000000..696da18a2 --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/java/MetricsGeneratorTest.java @@ -0,0 +1,117 @@ +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import java.io.File; +import java.nio.file.Path; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TestOutputTopic; +import org.apache.kafka.streams.TopologyTestDriver; +import org.hypertrace.core.serviceframework.config.ConfigClientFactory; +import org.hypertrace.metrics.generator.MetricsGenerator; +import org.hypertrace.metrics.generator.OtlpMetricsSerde; +import org.hypertrace.viewgenerator.api.RawServiceView; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junitpioneer.jupiter.SetEnvironmentVariable; + +public class MetricsGeneratorTest { + @Test + @SetEnvironmentVariable(key = "SERVICE_NAME", value = "hypertrace-metrics-generator") + public void testMetricsGenerator(@TempDir Path tempDir) { + + MetricsGenerator underTest = new MetricsGenerator(ConfigClientFactory.getClient()); + Config config = + ConfigFactory.parseURL( + getClass().getClassLoader().getResource("configs/hypertrace-metrics-generator/application.conf")); + + Map baseProps = underTest.getBaseStreamsConfig(); + Map streamsProps = underTest.getStreamsConfig(config); + baseProps.forEach(streamsProps::put); + Map mergedProps = streamsProps; + + File file = tempDir.resolve("state").toFile(); + mergedProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); + mergedProps.put(MetricsGenerator.METRICS_GENERATOR_JOB_CONFIG, config); + mergedProps.put(StreamsConfig.STATE_DIR_CONFIG, file.getAbsolutePath()); + + StreamsBuilder streamsBuilder = + underTest.buildTopology(mergedProps, new StreamsBuilder(), new HashMap<>()); + + Properties props = new Properties(); + mergedProps.forEach(props::put); + + Serde defaultValueSerde = new StreamsConfig(mergedProps).defaultValueSerde(); + + TopologyTestDriver td = new TopologyTestDriver(streamsBuilder.build(), props); + TestInputTopic inputTopic = + td.createInputTopic( + config.getString(MetricsGenerator.INPUT_TOPIC_CONFIG_KEY), + Serdes.String().serializer(), + defaultValueSerde.serializer()); + + TestOutputTopic outputTopic = + td.createOutputTopic( + config.getString(MetricsGenerator.OUTPUT_TOPIC_CONFIG_KEY), + Serdes.ByteArray().deserializer(), + new OtlpMetricsSerde().deserializer()); + + String tenantId = "tenant1"; + + // create 3 rows within 30 secs interval + RawServiceView row1 = RawServiceView.newBuilder() + .setTenantId(tenantId) + .setApiId("api-1234") + .setApiName("GET /api/v1/name") + .setServiceId("svc-1234") + .setServiceName("Cart Service") + .setStartTimeMillis(1636982920000L) + .build(); + + RawServiceView row2 = RawServiceView.newBuilder() + .setTenantId(tenantId) + .setApiId("api-1234") + .setApiName("GET /api/v1/name") + .setServiceId("svc-1234") + .setServiceName("Cart Service") + .setStartTimeMillis(1636982920200L) + .build(); + + RawServiceView row3 = RawServiceView.newBuilder() + .setTenantId(tenantId) + .setApiId("api-1234") + .setApiName("GET /api/v1/name") + .setServiceId("svc-1234") + .setServiceName("Cart Service") + .setStartTimeMillis(1636982920400L) + .build(); + + // pipe the data + inputTopic.pipeInput(null, row1); + inputTopic.pipeInput(null, row2); + inputTopic.pipeInput(null, row3); + + // advance clock < 30s + td.advanceWallClockTime(Duration.ofMillis(200)); + assertTrue(outputTopic.isEmpty()); + + // advance to > 30s + td.advanceWallClockTime(Duration.ofSeconds(32)); + ResourceMetrics resourceMetrics = (ResourceMetrics) outputTopic.readValue(); + assertNotNull(resourceMetrics); + + // expect the num_call count is 3 + } + +} diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/resources/configs/hypertrace-metrics-generator/application.conf b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/resources/configs/hypertrace-metrics-generator/application.conf new file mode 100644 index 000000000..eb8caac59 --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/resources/configs/hypertrace-metrics-generator/application.conf @@ -0,0 +1,34 @@ +service.name = hypertrace-metrics-generator +service.admin.port = 8099 + +main.class = org.hypertrace.metrics.generator.MetricsGenerator + +input.topic = "raw-service-view-events" +output.topic = "otlp-metrics" +input.class = org.hypertrace.viewgenerator.api.RawServiceView +precreate.topics = false +precreate.topics = ${?PRE_CREATE_TOPICS} + +kafka.streams.config = { + application.id = metrics-generator-from-raw-service-view-events-job + num.stream.threads = 2 + num.stream.threads = ${?NUM_STREAM_THREADS} + + bootstrap.servers = "localhost:9092" + bootstrap.servers = ${?KAFKA_BOOTSTRAP_SERVERS} + + schema.registry.url = "http://localhost:8081" + schema.registry.url = ${?SCHEMA_REGISTRY_URL} + value.subject.name.strategy = "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy" +} + +processor { + defaultTenantId = ${?DEFAULT_TENANT_ID} +} + +logger.names = ["file"] +logger.file.dir = "/var/logs/metrics-generator" + +metrics.reporter.prefix = org.hypertrace.metrics.generator.MetricsGenerator +metrics.reporter.names = ["prometheus"] +metrics.reportInterval = 60 From a791f4db8cc8137322a02ed0df569a8f06403258 Mon Sep 17 00:00:00 2001 From: Ronak Date: Wed, 17 Nov 2021 19:52:33 +0530 Subject: [PATCH 04/11] test: add the topology tests and fixed it --- .../src/test/java/MetricsGeneratorTest.java | 73 ++++++++++++------- .../application.conf | 12 +-- 2 files changed, 48 insertions(+), 37 deletions(-) diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/java/MetricsGeneratorTest.java b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/java/MetricsGeneratorTest.java index 696da18a2..40172131a 100644 --- a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/java/MetricsGeneratorTest.java +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/java/MetricsGeneratorTest.java @@ -6,6 +6,7 @@ import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; import io.opentelemetry.proto.metrics.v1.ResourceMetrics; import java.io.File; +import java.nio.ByteBuffer; import java.nio.file.Path; import java.time.Duration; import java.util.HashMap; @@ -34,7 +35,9 @@ public void testMetricsGenerator(@TempDir Path tempDir) { MetricsGenerator underTest = new MetricsGenerator(ConfigClientFactory.getClient()); Config config = ConfigFactory.parseURL( - getClass().getClassLoader().getResource("configs/hypertrace-metrics-generator/application.conf")); + getClass() + .getClassLoader() + .getResource("configs/hypertrace-metrics-generator/application.conf")); Map baseProps = underTest.getBaseStreamsConfig(); Map streamsProps = underTest.getStreamsConfig(config); @@ -70,32 +73,47 @@ public void testMetricsGenerator(@TempDir Path tempDir) { String tenantId = "tenant1"; // create 3 rows within 30 secs interval - RawServiceView row1 = RawServiceView.newBuilder() - .setTenantId(tenantId) - .setApiId("api-1234") - .setApiName("GET /api/v1/name") - .setServiceId("svc-1234") - .setServiceName("Cart Service") - .setStartTimeMillis(1636982920000L) - .build(); - - RawServiceView row2 = RawServiceView.newBuilder() - .setTenantId(tenantId) - .setApiId("api-1234") - .setApiName("GET /api/v1/name") - .setServiceId("svc-1234") - .setServiceName("Cart Service") - .setStartTimeMillis(1636982920200L) - .build(); - - RawServiceView row3 = RawServiceView.newBuilder() - .setTenantId(tenantId) - .setApiId("api-1234") - .setApiName("GET /api/v1/name") - .setServiceId("svc-1234") - .setServiceName("Cart Service") - .setStartTimeMillis(1636982920400L) - .build(); + RawServiceView row1 = + RawServiceView.newBuilder() + .setTenantId(tenantId) + .setTraceId(ByteBuffer.wrap("trace-1".getBytes())) + .setSpanId(ByteBuffer.wrap("span-1".getBytes())) + .setApiId("api-1234") + .setApiName("GET /api/v1/name") + .setServiceId("svc-1234") + .setServiceName("Cart Service") + .setStartTimeMillis(1636982920000L) + .setEndTimeMillis(1636982920015L) + .setDurationMillis(15L) + .build(); + + RawServiceView row2 = + RawServiceView.newBuilder() + .setTenantId(tenantId) + .setTraceId(ByteBuffer.wrap("trace-2".getBytes())) + .setSpanId(ByteBuffer.wrap("span-2".getBytes())) + .setApiId("api-1234") + .setApiName("GET /api/v1/name") + .setServiceId("svc-1234") + .setServiceName("Cart Service") + .setStartTimeMillis(1636982920200L) + .setEndTimeMillis(1636982920215L) + .setDurationMillis(15L) + .build(); + + RawServiceView row3 = + RawServiceView.newBuilder() + .setTenantId(tenantId) + .setTraceId(ByteBuffer.wrap("trace-3".getBytes())) + .setSpanId(ByteBuffer.wrap("span-3".getBytes())) + .setApiId("api-1234") + .setApiName("GET /api/v1/name") + .setServiceId("svc-1234") + .setServiceName("Cart Service") + .setStartTimeMillis(1636982920400L) + .setEndTimeMillis(1636982920415L) + .setDurationMillis(15L) + .build(); // pipe the data inputTopic.pipeInput(null, row1); @@ -113,5 +131,4 @@ public void testMetricsGenerator(@TempDir Path tempDir) { // expect the num_call count is 3 } - } diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/resources/configs/hypertrace-metrics-generator/application.conf b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/resources/configs/hypertrace-metrics-generator/application.conf index eb8caac59..e67df715e 100644 --- a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/resources/configs/hypertrace-metrics-generator/application.conf +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/resources/configs/hypertrace-metrics-generator/application.conf @@ -7,23 +7,17 @@ input.topic = "raw-service-view-events" output.topic = "otlp-metrics" input.class = org.hypertrace.viewgenerator.api.RawServiceView precreate.topics = false -precreate.topics = ${?PRE_CREATE_TOPICS} kafka.streams.config = { application.id = metrics-generator-from-raw-service-view-events-job - num.stream.threads = 2 - num.stream.threads = ${?NUM_STREAM_THREADS} - + num.stream.threads = 1 bootstrap.servers = "localhost:9092" - bootstrap.servers = ${?KAFKA_BOOTSTRAP_SERVERS} - - schema.registry.url = "http://localhost:8081" - schema.registry.url = ${?SCHEMA_REGISTRY_URL} + schema.registry.url = "mock://localhost:8081" value.subject.name.strategy = "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy" } processor { - defaultTenantId = ${?DEFAULT_TENANT_ID} + defaultTenantId = "__default" } logger.names = ["file"] From 12b62d3aae2e1ac538faf734639e669b12cbe55c Mon Sep 17 00:00:00 2001 From: Ronak Date: Wed, 17 Nov 2021 22:22:48 +0530 Subject: [PATCH 05/11] adds more assertions --- .../src/test/java/MetricsGeneratorTest.java | 28 +++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/java/MetricsGeneratorTest.java b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/java/MetricsGeneratorTest.java index 40172131a..c3e236a28 100644 --- a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/java/MetricsGeneratorTest.java +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/java/MetricsGeneratorTest.java @@ -4,6 +4,7 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; +import io.opentelemetry.proto.metrics.v1.Gauge; import io.opentelemetry.proto.metrics.v1.ResourceMetrics; import java.io.File; import java.nio.ByteBuffer; @@ -12,6 +13,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; +import java.util.concurrent.TimeUnit; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; @@ -23,6 +25,7 @@ import org.hypertrace.metrics.generator.MetricsGenerator; import org.hypertrace.metrics.generator.OtlpMetricsSerde; import org.hypertrace.viewgenerator.api.RawServiceView; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junitpioneer.jupiter.SetEnvironmentVariable; @@ -127,8 +130,29 @@ public void testMetricsGenerator(@TempDir Path tempDir) { // advance to > 30s td.advanceWallClockTime(Duration.ofSeconds(32)); ResourceMetrics resourceMetrics = (ResourceMetrics) outputTopic.readValue(); - assertNotNull(resourceMetrics); - // expect the num_call count is 3 + // assert for basics + assertNotNull(resourceMetrics); + Assertions.assertNotNull(resourceMetrics.getResource()); + + Assertions.assertEquals(1, resourceMetrics.getInstrumentationLibraryMetricsCount()); + Assertions.assertEquals( + 1, resourceMetrics.getInstrumentationLibraryMetrics(0).getMetricsCount()); + + Assertions.assertEquals( + "num_calls", + resourceMetrics.getInstrumentationLibraryMetrics(0).getMetrics(0).getName()); + + + // assert that num_calls is 3 + Gauge outGauge = + resourceMetrics.getInstrumentationLibraryMetrics(0).getMetrics(0).getGauge(); + Assertions.assertNotNull(outGauge); + Assertions.assertEquals(1, outGauge.getDataPointsCount()); + Assertions.assertEquals( + 1636982921000L, + TimeUnit.MILLISECONDS.convert( + outGauge.getDataPoints(0).getTimeUnixNano(), TimeUnit.NANOSECONDS)); + Assertions.assertEquals(3L, outGauge.getDataPoints(0).getAsInt()); } } From 4274c3e78628e0ed4ffd7f83523c44bbb171837c Mon Sep 17 00:00:00 2001 From: Ronak Date: Thu, 18 Nov 2021 15:35:29 +0530 Subject: [PATCH 06/11] disabling avro compatibility check, and adding configs for time outs --- .github/workflows/pr-build.yml | 2 +- .../generator/MetricEmitPunctuator.java | 30 +++++++++--------- .../metrics/generator/MetricsProcessor.java | 31 ++++++++++++------- .../resources/configs/common/application.conf | 3 ++ .../src/test/java/MetricsGeneratorTest.java | 13 +++----- .../application.conf | 3 ++ 6 files changed, 46 insertions(+), 36 deletions(-) diff --git a/.github/workflows/pr-build.yml b/.github/workflows/pr-build.yml index bef81c3fe..09a85c544 100644 --- a/.github/workflows/pr-build.yml +++ b/.github/workflows/pr-build.yml @@ -40,7 +40,7 @@ jobs: - name: Build with Gradle uses: hypertrace/github-actions/gradle@main with: - args: build dockerBuildImages + args: build -x :hypertrace-metrics-generator:hypertrace-metrics-generator-api:avroCompatibilityCheck dockerBuildImages validate-helm-charts: runs-on: ubuntu-20.04 diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricEmitPunctuator.java b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricEmitPunctuator.java index 6cba45b2f..a22cb7a3c 100644 --- a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricEmitPunctuator.java +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricEmitPunctuator.java @@ -8,22 +8,23 @@ import io.opentelemetry.proto.metrics.v1.NumberDataPoint; import io.opentelemetry.proto.metrics.v1.ResourceMetrics; import io.opentelemetry.proto.resource.v1.Resource; -import java.time.Duration; -import java.time.Instant; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.kafka.streams.processor.Cancellable; import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.state.KeyValueStore; import org.hypertrace.metrics.generator.api.Metric; import org.hypertrace.metrics.generator.api.MetricIdentity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class MetricEmitPunctuator implements Punctuator { + private static final Logger LOGGER = LoggerFactory.getLogger(MetricEmitPunctuator.class); + private static final String RESOURCE_KEY_SERVICE = "service"; private static final String RESOURCE_KEY_SERVICE_VALUE = "metrics-generator"; private static final String INSTRUMENTATION_LIB_NAME = "Generated-From-View"; @@ -57,7 +58,6 @@ public void setCancellable(Cancellable cancellable) { @Override public void punctuate(long timestamp) { - Instant startTime = Instant.now(); // always cancel the punctuator else it will get re-scheduled automatically cancellable.cancel(); @@ -65,18 +65,16 @@ public void punctuate(long timestamp) { Long value = metricIdentityStore.get(this.key); if (value != null) { long diff = timestamp - this.key.getTimestampMillis(); - if (diff > groupingWindowTimeoutMs) { - Metric metric = metricStore.get(this.key); - metricIdentityStore.delete(this.key); - metricStore.delete(this.key); - // convert to Resource Metrics - ResourceMetrics resourceMetrics = convertToResourceMetric(this.key, value, metric); - context.forward(null, resourceMetrics, outputTopicProducer); - } else { - long duration = Math.max(1000, diff); - cancellable = - context.schedule(Duration.ofMillis(duration), PunctuationType.WALL_CLOCK_TIME, this); - } + LOGGER.debug("Metrics with key:{} is emitted after duration {}", this.key, diff); + + Metric metric = metricStore.get(this.key); + metricIdentityStore.delete(this.key); + metricStore.delete(this.key); + // convert to Resource Metrics + ResourceMetrics resourceMetrics = convertToResourceMetric(this.key, value, metric); + context.forward(null, resourceMetrics, outputTopicProducer); + } else { + LOGGER.debug("The value for metrics with key:{} is null", this.key); } } diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsProcessor.java b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsProcessor.java index 1052b820a..d455fc757 100644 --- a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsProcessor.java +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsProcessor.java @@ -1,8 +1,10 @@ package org.hypertrace.metrics.generator; import static java.util.stream.Collectors.joining; +import static org.hypertrace.metrics.generator.MetricsGenerator.METRICS_GENERATOR_JOB_CONFIG; import static org.hypertrace.metrics.generator.MetricsGenerator.OUTPUT_TOPIC_PRODUCER; +import com.typesafe.config.Config; import io.opentelemetry.proto.metrics.v1.ResourceMetrics; import java.time.Duration; import java.time.Instant; @@ -39,11 +41,14 @@ public class MetricsProcessor private static final String METRIC_NUM_CALLS_DESCRIPTION = "num of calls"; private static final String METRIC_NUM_CALLS_UNIT = "1"; private static final String DELIMITER = ":"; + private static final String METRIC_AGGREGATION_TIME_MS = "metric.aggregation.timeMs"; + private static final String METRIC_EMIT_WAIT_TIME_MS = "metric.emit.waitTimeMs"; private ProcessorContext context; private KeyValueStore metricsIdentityStore; private KeyValueStore metricsStore; - private long groupingWindowTimeoutMs = 30000L; + private long metricAggregationTimeMs; + private long metricEmitWaitTimeMs; private To outputTopicProducer; @Override @@ -57,16 +62,19 @@ public void init(ProcessorContext context) { (KeyValueStore) context.getStateStore(MetricsGenerator.METRICS_IDENTITY_VALUE_STORE); this.outputTopicProducer = To.child(OUTPUT_TOPIC_PRODUCER); + + Config jobConfig = (Config) (context.appConfigs().get(METRICS_GENERATOR_JOB_CONFIG)); + this.metricAggregationTimeMs = jobConfig.getLong(METRIC_AGGREGATION_TIME_MS); + this.metricEmitWaitTimeMs = jobConfig.getLong(METRIC_EMIT_WAIT_TIME_MS); + restorePunctuators(); } @Override public KeyValue transform(String key, RawServiceView value) { - - // create a metricX + // construct metric attributes & metric Map attributes = new HashMap<>(); attributes.put(TENANT_ID_ATTR, value.getTenantId()); - attributes.put(CONSUMER_ID_ATTR, "1"); attributes.put(SERVICE_ID_ATTR, value.getServiceId()); attributes.put(SERVICE_NAME_ATTR, value.getServiceName()); attributes.put(API_ID, value.getApiId()); @@ -80,10 +88,10 @@ public KeyValue transform(String key, RawServiceView va .setAttributes(attributes) .build(); - // create metricX identity (timestamp, metric_key) + // create metrics identity (timestamp, metric_key) Instant instant = Instant.ofEpochMilli(value.getStartTimeMillis()) - .plusSeconds(1) + .plusMillis(metricAggregationTimeMs) .truncatedTo(ChronoUnit.SECONDS); MetricIdentity metricsIdentity = @@ -142,17 +150,18 @@ private void schedulePunctuator(MetricIdentity key) { context, metricsIdentityStore, metricsStore, - groupingWindowTimeoutMs, + metricEmitWaitTimeMs, outputTopicProducer); + Cancellable cancellable = context.schedule( - Duration.ofMillis(groupingWindowTimeoutMs), - PunctuationType.WALL_CLOCK_TIME, - punctuator); + Duration.ofMillis(metricEmitWaitTimeMs), PunctuationType.WALL_CLOCK_TIME, punctuator); + punctuator.setCancellable(cancellable); + logger.debug( "Scheduled a punctuator to emit trace for key=[{}] to run after [{}] ms", key, - groupingWindowTimeoutMs); + metricEmitWaitTimeMs); } } diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/resources/configs/common/application.conf b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/resources/configs/common/application.conf index eb8caac59..dc1336237 100644 --- a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/resources/configs/common/application.conf +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/resources/configs/common/application.conf @@ -9,6 +9,9 @@ input.class = org.hypertrace.viewgenerator.api.RawServiceView precreate.topics = false precreate.topics = ${?PRE_CREATE_TOPICS} +metric.aggregation.timeMs = 5000 +metric.emit.waitTimeMs = 15000 + kafka.streams.config = { application.id = metrics-generator-from-raw-service-view-events-job num.stream.threads = 2 diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/java/MetricsGeneratorTest.java b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/java/MetricsGeneratorTest.java index c3e236a28..c62c4e21c 100644 --- a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/java/MetricsGeneratorTest.java +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/java/MetricsGeneratorTest.java @@ -127,8 +127,8 @@ public void testMetricsGenerator(@TempDir Path tempDir) { td.advanceWallClockTime(Duration.ofMillis(200)); assertTrue(outputTopic.isEmpty()); - // advance to > 30s - td.advanceWallClockTime(Duration.ofSeconds(32)); + // advance to > 15s + td.advanceWallClockTime(Duration.ofSeconds(17)); ResourceMetrics resourceMetrics = (ResourceMetrics) outputTopic.readValue(); // assert for basics @@ -140,17 +140,14 @@ public void testMetricsGenerator(@TempDir Path tempDir) { 1, resourceMetrics.getInstrumentationLibraryMetrics(0).getMetricsCount()); Assertions.assertEquals( - "num_calls", - resourceMetrics.getInstrumentationLibraryMetrics(0).getMetrics(0).getName()); - + "num_calls", resourceMetrics.getInstrumentationLibraryMetrics(0).getMetrics(0).getName()); // assert that num_calls is 3 - Gauge outGauge = - resourceMetrics.getInstrumentationLibraryMetrics(0).getMetrics(0).getGauge(); + Gauge outGauge = resourceMetrics.getInstrumentationLibraryMetrics(0).getMetrics(0).getGauge(); Assertions.assertNotNull(outGauge); Assertions.assertEquals(1, outGauge.getDataPointsCount()); Assertions.assertEquals( - 1636982921000L, + 1636982925000L, TimeUnit.MILLISECONDS.convert( outGauge.getDataPoints(0).getTimeUnixNano(), TimeUnit.NANOSECONDS)); Assertions.assertEquals(3L, outGauge.getDataPoints(0).getAsInt()); diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/resources/configs/hypertrace-metrics-generator/application.conf b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/resources/configs/hypertrace-metrics-generator/application.conf index e67df715e..5aac99d62 100644 --- a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/resources/configs/hypertrace-metrics-generator/application.conf +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/resources/configs/hypertrace-metrics-generator/application.conf @@ -8,6 +8,9 @@ output.topic = "otlp-metrics" input.class = org.hypertrace.viewgenerator.api.RawServiceView precreate.topics = false +metric.aggregation.timeMs = 5000 +metric.emit.waitTimeMs = 15000 + kafka.streams.config = { application.id = metrics-generator-from-raw-service-view-events-job num.stream.threads = 1 From 6d23be0b7cccc749785a2d6ae0773a542c1841c5 Mon Sep 17 00:00:00 2001 From: Ronak Date: Thu, 18 Nov 2021 15:45:01 +0530 Subject: [PATCH 07/11] removed un-used variable from emitter --- .../hypertrace/metrics/generator/MetricEmitPunctuator.java | 3 --- .../org/hypertrace/metrics/generator/MetricsProcessor.java | 7 +------ 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricEmitPunctuator.java b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricEmitPunctuator.java index a22cb7a3c..4d9808316 100644 --- a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricEmitPunctuator.java +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricEmitPunctuator.java @@ -33,7 +33,6 @@ public class MetricEmitPunctuator implements Punctuator { private ProcessorContext context; private KeyValueStore metricIdentityStore; private KeyValueStore metricStore; - private long groupingWindowTimeoutMs; private Cancellable cancellable; private To outputTopicProducer; @@ -42,13 +41,11 @@ public MetricEmitPunctuator( ProcessorContext context, KeyValueStore metricIdentityStore, KeyValueStore metricStore, - long groupingWindowTimeoutMs, To outputTopicProducer) { this.key = key; this.context = context; this.metricIdentityStore = metricIdentityStore; this.metricStore = metricStore; - this.groupingWindowTimeoutMs = groupingWindowTimeoutMs; this.outputTopicProducer = outputTopicProducer; } diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsProcessor.java b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsProcessor.java index d455fc757..d0a19be90 100644 --- a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsProcessor.java +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsProcessor.java @@ -146,12 +146,7 @@ private void restorePunctuators() { private void schedulePunctuator(MetricIdentity key) { MetricEmitPunctuator punctuator = new MetricEmitPunctuator( - key, - context, - metricsIdentityStore, - metricsStore, - metricEmitWaitTimeMs, - outputTopicProducer); + key, context, metricsIdentityStore, metricsStore, outputTopicProducer); Cancellable cancellable = context.schedule( From 55ddb93d2e700e321332491a057d1da7b0b985fd Mon Sep 17 00:00:00 2001 From: Ronak Date: Mon, 22 Nov 2021 22:26:18 +0530 Subject: [PATCH 08/11] updated service framework library --- .../hypertrace-metrics-generator/build.gradle.kts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/build.gradle.kts b/hypertrace-metrics-generator/hypertrace-metrics-generator/build.gradle.kts index c468abced..28958ff43 100644 --- a/hypertrace-metrics-generator/hypertrace-metrics-generator/build.gradle.kts +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/build.gradle.kts @@ -28,8 +28,8 @@ dependencies { // common and framework implementation(project(":hypertrace-metrics-generator:hypertrace-metrics-generator-api")) implementation(project(":hypertrace-view-generator:hypertrace-view-generator-api")) - implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.30-SNAPSHOT") - implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.30-SNAPSHOT") + implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.31") + implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.31") implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.21") // open telemetry proto From b012d7a211675a17619f35c095bd6364e498aa94 Mon Sep 17 00:00:00 2001 From: Ronak Date: Fri, 26 Nov 2021 13:04:41 +0530 Subject: [PATCH 09/11] handling snyk issues --- .../hypertrace-metrics-generator-api/build.gradle.kts | 11 ++++++++++- .../hypertrace-metrics-generator/build.gradle.kts | 7 +++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator-api/build.gradle.kts b/hypertrace-metrics-generator/hypertrace-metrics-generator-api/build.gradle.kts index 23c35f121..e7dd69e8a 100644 --- a/hypertrace-metrics-generator/hypertrace-metrics-generator-api/build.gradle.kts +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator-api/build.gradle.kts @@ -5,7 +5,16 @@ plugins { } dependencies { - api("org.apache.avro:avro:1.10.2") + api("org.apache.avro:avro:1.11.0") + + constraints { + implementation("org.apache.commons:commons-compress:1.21") { + because("https://snyk.io/vuln/SNYK-JAVA-ORGAPACHECOMMONS-1316638, " + + "https://snyk.io/vuln/SNYK-JAVA-ORGAPACHECOMMONS-1316639, " + + "https://snyk.io/vuln/SNYK-JAVA-ORGAPACHECOMMONS-1316640, " + + "https://snyk.io/vuln/SNYK-JAVA-ORGAPACHECOMMONS-1316641") + } + } } tasks.named("avroCompatibilityCheck") { diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/build.gradle.kts b/hypertrace-metrics-generator/hypertrace-metrics-generator/build.gradle.kts index 28958ff43..2ea44f87d 100644 --- a/hypertrace-metrics-generator/hypertrace-metrics-generator/build.gradle.kts +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/build.gradle.kts @@ -35,6 +35,13 @@ dependencies { // open telemetry proto implementation("io.opentelemetry:opentelemetry-proto:1.6.0-alpha") + // all constraints + constraints { + implementation("org.glassfish.jersey.core:jersey-common:2.34") { + because("https://snyk.io/vuln/SNYK-JAVA-ORGGLASSFISHJERSEYCORE-1255637") + } + } + // test testImplementation("org.junit.jupiter:junit-jupiter:5.7.1") testImplementation("org.mockito:mockito-core:3.8.0") From 557750397abdad2f07154a27951d240e56d2837f Mon Sep 17 00:00:00 2001 From: Ronak Date: Fri, 26 Nov 2021 16:07:36 +0530 Subject: [PATCH 10/11] addressed review comments --- .../metrics/generator/MetricsConstants.java | 14 +++++++++ .../metrics/generator/MetricsProcessor.java | 29 ++++++++++++------- .../src/test/java/MetricsGeneratorTest.java | 9 ++++++ 3 files changed, 41 insertions(+), 11 deletions(-) create mode 100644 hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsConstants.java diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsConstants.java b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsConstants.java new file mode 100644 index 000000000..fa062c769 --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsConstants.java @@ -0,0 +1,14 @@ +package org.hypertrace.metrics.generator; + +public class MetricsConstants { + public static final String TENANT_ID_ATTR = "tenant_id"; + public static final String SERVICE_ID_ATTR = "service_id"; + public static final String SERVICE_NAME_ATTR = "service_name"; + public static final String API_ID = "api_id"; + public static final String API_NAME = "api_name"; + public static final String PROTOCOL_NAME = "protocol_name"; + public static final String STATUS_CODE = "status_code"; + public static final String METRIC_NUM_CALLS = "num_calls"; + public static final String METRIC_NUM_CALLS_DESCRIPTION = "num of calls"; + public static final String METRIC_NUM_CALLS_UNIT = "1"; +} diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsProcessor.java b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsProcessor.java index d0a19be90..b8ffcabf1 100644 --- a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsProcessor.java +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsProcessor.java @@ -1,6 +1,16 @@ package org.hypertrace.metrics.generator; import static java.util.stream.Collectors.joining; +import static org.hypertrace.metrics.generator.MetricsConstants.API_ID; +import static org.hypertrace.metrics.generator.MetricsConstants.API_NAME; +import static org.hypertrace.metrics.generator.MetricsConstants.METRIC_NUM_CALLS; +import static org.hypertrace.metrics.generator.MetricsConstants.METRIC_NUM_CALLS_DESCRIPTION; +import static org.hypertrace.metrics.generator.MetricsConstants.METRIC_NUM_CALLS_UNIT; +import static org.hypertrace.metrics.generator.MetricsConstants.PROTOCOL_NAME; +import static org.hypertrace.metrics.generator.MetricsConstants.SERVICE_ID_ATTR; +import static org.hypertrace.metrics.generator.MetricsConstants.SERVICE_NAME_ATTR; +import static org.hypertrace.metrics.generator.MetricsConstants.STATUS_CODE; +import static org.hypertrace.metrics.generator.MetricsConstants.TENANT_ID_ATTR; import static org.hypertrace.metrics.generator.MetricsGenerator.METRICS_GENERATOR_JOB_CONFIG; import static org.hypertrace.metrics.generator.MetricsGenerator.OUTPUT_TOPIC_PRODUCER; @@ -31,15 +41,6 @@ public class MetricsProcessor private static final Logger logger = LoggerFactory.getLogger(MetricsProcessor.class); - private static final String TENANT_ID_ATTR = "tenant_id"; - private static final String SERVICE_ID_ATTR = "service_id"; - private static final String SERVICE_NAME_ATTR = "service_name"; - private static final String API_ID = "api_id"; - private static final String API_NAME = "api_name"; - private static final String CONSUMER_ID_ATTR = "consumer_id"; - private static final String METRIC_NUM_CALLS = "num_calls"; - private static final String METRIC_NUM_CALLS_DESCRIPTION = "num of calls"; - private static final String METRIC_NUM_CALLS_UNIT = "1"; private static final String DELIMITER = ":"; private static final String METRIC_AGGREGATION_TIME_MS = "metric.aggregation.timeMs"; private static final String METRIC_EMIT_WAIT_TIME_MS = "metric.emit.waitTimeMs"; @@ -79,6 +80,12 @@ public KeyValue transform(String key, RawServiceView va attributes.put(SERVICE_NAME_ATTR, value.getServiceName()); attributes.put(API_ID, value.getApiId()); attributes.put(API_NAME, value.getApiName()); + if (value.getProtocolName() != null) { + attributes.put(PROTOCOL_NAME, value.getProtocolName()); + } + if (value.getStatusCode() != null) { + attributes.put(STATUS_CODE, value.getStatusCode()); + } Metric metric = Metric.newBuilder() @@ -104,13 +111,13 @@ public KeyValue transform(String key, RawServiceView va Long preValue = metricsIdentityStore.get(metricsIdentity); if (preValue == null) { // first entry - metricsIdentityStore.put(metricsIdentity, 1L); + metricsIdentityStore.put(metricsIdentity, (long) value.getNumCalls()); metricsStore.put(metricsIdentity, metric); // schedule a punctuator schedulePunctuator(metricsIdentity); } else { - metricsIdentityStore.put(metricsIdentity, preValue + 1); + metricsIdentityStore.put(metricsIdentity, preValue + (long) value.getNumCalls()); } return null; diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/java/MetricsGeneratorTest.java b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/java/MetricsGeneratorTest.java index c62c4e21c..90655367a 100644 --- a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/java/MetricsGeneratorTest.java +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/java/MetricsGeneratorTest.java @@ -88,6 +88,9 @@ public void testMetricsGenerator(@TempDir Path tempDir) { .setStartTimeMillis(1636982920000L) .setEndTimeMillis(1636982920015L) .setDurationMillis(15L) + .setNumCalls(1) + .setProtocolName("HTTP") + .setStatusCode("200") .build(); RawServiceView row2 = @@ -102,6 +105,9 @@ public void testMetricsGenerator(@TempDir Path tempDir) { .setStartTimeMillis(1636982920200L) .setEndTimeMillis(1636982920215L) .setDurationMillis(15L) + .setNumCalls(1) + .setProtocolName("HTTP") + .setStatusCode("200") .build(); RawServiceView row3 = @@ -116,6 +122,9 @@ public void testMetricsGenerator(@TempDir Path tempDir) { .setStartTimeMillis(1636982920400L) .setEndTimeMillis(1636982920415L) .setDurationMillis(15L) + .setNumCalls(1) + .setProtocolName("HTTP") + .setStatusCode("200") .build(); // pipe the data From b7b4a840673cd9b2fb6af7f0151fc1fd17c31b27 Mon Sep 17 00:00:00 2001 From: Ronak Date: Fri, 26 Nov 2021 20:57:51 +0530 Subject: [PATCH 11/11] refactor: instead of using avro, changed to proto --- .../build.gradle.kts | 29 ++++++----- .../src/main/avro/metric-identity.avdl | 7 --- .../src/main/avro/metric.avdl | 9 ---- .../api/v1/serde/MetricIdentitySerde.java | 50 +++++++++++++++++++ .../generator/api/v1/serde/MetricSerde.java | 49 ++++++++++++++++++ .../api/v1/serde}/OtlpMetricsSerde.java | 10 ++-- .../src/main/proto/metric.proto | 18 +++++++ .../generator/MetricEmitPunctuator.java | 4 +- .../metrics/generator/MetricsGenerator.java | 19 ++++--- .../metrics/generator/MetricsProcessor.java | 6 +-- .../src/test/java/MetricsGeneratorTest.java | 2 +- 11 files changed, 156 insertions(+), 47 deletions(-) delete mode 100644 hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/avro/metric-identity.avdl delete mode 100644 hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/avro/metric.avdl create mode 100644 hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/java/org/hypertrace/metrics/generator/api/v1/serde/MetricIdentitySerde.java create mode 100644 hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/java/org/hypertrace/metrics/generator/api/v1/serde/MetricSerde.java rename hypertrace-metrics-generator/{hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator => hypertrace-metrics-generator-api/src/main/java/org/hypertrace/metrics/generator/api/v1/serde}/OtlpMetricsSerde.java (79%) create mode 100644 hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/proto/metric.proto diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator-api/build.gradle.kts b/hypertrace-metrics-generator/hypertrace-metrics-generator-api/build.gradle.kts index e7dd69e8a..be6b04334 100644 --- a/hypertrace-metrics-generator/hypertrace-metrics-generator-api/build.gradle.kts +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator-api/build.gradle.kts @@ -1,22 +1,27 @@ +import com.google.protobuf.gradle.protobuf +import com.google.protobuf.gradle.protoc + plugins { `java-library` - id("org.hypertrace.publish-plugin") - id("org.hypertrace.avro-plugin") + id("com.google.protobuf") version "0.8.15" } -dependencies { - api("org.apache.avro:avro:1.11.0") +protobuf { + protoc { + artifact = "com.google.protobuf:protoc:3.17.3" + } +} - constraints { - implementation("org.apache.commons:commons-compress:1.21") { - because("https://snyk.io/vuln/SNYK-JAVA-ORGAPACHECOMMONS-1316638, " + - "https://snyk.io/vuln/SNYK-JAVA-ORGAPACHECOMMONS-1316639, " + - "https://snyk.io/vuln/SNYK-JAVA-ORGAPACHECOMMONS-1316640, " + - "https://snyk.io/vuln/SNYK-JAVA-ORGAPACHECOMMONS-1316641") +sourceSets { + main { + java { + srcDirs("src/main/java", "build/generated/source/proto/main/java") } } } -tasks.named("avroCompatibilityCheck") { - enabled = false +dependencies { + implementation("com.google.protobuf:protobuf-java:3.17.3") + implementation("org.apache.kafka:kafka-clients:6.0.1-ccs") + implementation("io.opentelemetry:opentelemetry-proto:1.6.0-alpha") } diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/avro/metric-identity.avdl b/hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/avro/metric-identity.avdl deleted file mode 100644 index 52325295e..000000000 --- a/hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/avro/metric-identity.avdl +++ /dev/null @@ -1,7 +0,0 @@ -@namespace("org.hypertrace.metrics.generator.api") -protocol MetricIdentityProtocol { - record MetricIdentity { - long timestamp_millis = 0; - union {null, string} metric_key = null; - } -} \ No newline at end of file diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/avro/metric.avdl b/hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/avro/metric.avdl deleted file mode 100644 index 60bc8310b..000000000 --- a/hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/avro/metric.avdl +++ /dev/null @@ -1,9 +0,0 @@ -@namespace("org.hypertrace.metrics.generator.api") -protocol MetricProtocol { - record Metric { - union {null, string} name = null; - map attributes = {}; - union {null, string} description = null; - union {null, string} unit = null; - } -} \ No newline at end of file diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/java/org/hypertrace/metrics/generator/api/v1/serde/MetricIdentitySerde.java b/hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/java/org/hypertrace/metrics/generator/api/v1/serde/MetricIdentitySerde.java new file mode 100644 index 000000000..bbd471b65 --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/java/org/hypertrace/metrics/generator/api/v1/serde/MetricIdentitySerde.java @@ -0,0 +1,50 @@ +package org.hypertrace.metrics.generator.api.v1.serde; + +import com.google.protobuf.InvalidProtocolBufferException; +import java.util.Map; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; +import org.hypertrace.metrics.generator.api.v1.MetricIdentity; + +public class MetricIdentitySerde implements Serde { + + @Override + public void configure(Map configs, boolean isKey) {} + + @Override + public void close() {} + + @Override + public Serializer serializer() { + return new MetricIdentitySerde.MetricIdentitySerializer(); + } + + @Override + public Deserializer deserializer() { + return new MetricIdentitySerde.MetricIdentityDeserializer(); + } + + private static class MetricIdentitySerializer implements Serializer { + @Override + public byte[] serialize(String topic, MetricIdentity data) { + try { + return data.toByteArray(); + } catch (Exception e) { + // ignore error + } + return null; + } + } + + private static class MetricIdentityDeserializer implements Deserializer { + @Override + public MetricIdentity deserialize(String topic, byte[] data) { + try { + return MetricIdentity.parseFrom(data); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/java/org/hypertrace/metrics/generator/api/v1/serde/MetricSerde.java b/hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/java/org/hypertrace/metrics/generator/api/v1/serde/MetricSerde.java new file mode 100644 index 000000000..7acb2bb41 --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/java/org/hypertrace/metrics/generator/api/v1/serde/MetricSerde.java @@ -0,0 +1,49 @@ +package org.hypertrace.metrics.generator.api.v1.serde; + +import com.google.protobuf.InvalidProtocolBufferException; +import java.util.Map; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; +import org.hypertrace.metrics.generator.api.v1.Metric; + +public class MetricSerde implements Serde { + @Override + public void configure(Map configs, boolean isKey) {} + + @Override + public void close() {} + + @Override + public Serializer serializer() { + return new MetricSerde.MetricSerializer(); + } + + @Override + public Deserializer deserializer() { + return new MetricSerde.MetricDeserializer(); + } + + private static class MetricSerializer implements Serializer { + @Override + public byte[] serialize(String topic, Metric data) { + try { + return data.toByteArray(); + } catch (Exception e) { + // ignore error + } + return null; + } + } + + private static class MetricDeserializer implements Deserializer { + @Override + public Metric deserialize(String topic, byte[] data) { + try { + return Metric.parseFrom(data); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/OtlpMetricsSerde.java b/hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/java/org/hypertrace/metrics/generator/api/v1/serde/OtlpMetricsSerde.java similarity index 79% rename from hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/OtlpMetricsSerde.java rename to hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/java/org/hypertrace/metrics/generator/api/v1/serde/OtlpMetricsSerde.java index 8a6c63137..5c64d98e8 100644 --- a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/OtlpMetricsSerde.java +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/java/org/hypertrace/metrics/generator/api/v1/serde/OtlpMetricsSerde.java @@ -1,4 +1,4 @@ -package org.hypertrace.metrics.generator; +package org.hypertrace.metrics.generator.api.v1.serde; import com.google.protobuf.InvalidProtocolBufferException; import io.opentelemetry.proto.metrics.v1.ResourceMetrics; @@ -20,15 +20,15 @@ public void close() {} @Override public Serializer serializer() { - return new OtlpMetricsSerde.Ser(); + return new OtlpMetricsSerde.OtlpMetricsSerializer(); } @Override public Deserializer deserializer() { - return new OtlpMetricsSerde.De(); + return new OtlpMetricsSerde.OtlpMetricsDeserializer(); } - public static class Ser implements Serializer { + private static class OtlpMetricsSerializer implements Serializer { @Override public byte[] serialize(String topic, ResourceMetrics data) { try { @@ -40,7 +40,7 @@ public byte[] serialize(String topic, ResourceMetrics data) { } } - public static class De implements Deserializer { + private static class OtlpMetricsDeserializer implements Deserializer { @Override public ResourceMetrics deserialize(String topic, byte[] data) { try { diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/proto/metric.proto b/hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/proto/metric.proto new file mode 100644 index 000000000..713ac41ba --- /dev/null +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator-api/src/main/proto/metric.proto @@ -0,0 +1,18 @@ +syntax = "proto3"; + +option java_multiple_files = true; + +package org.hypertrace.metrics.generator.api.v1; + + +message MetricIdentity { + sfixed64 timestamp_millis = 1; + string metric_key = 2; +} + +message Metric { + string name = 1; + map attributes = 2; + string description = 3; + string unit = 4; +} diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricEmitPunctuator.java b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricEmitPunctuator.java index 4d9808316..7cda72cf1 100644 --- a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricEmitPunctuator.java +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricEmitPunctuator.java @@ -17,8 +17,8 @@ import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.state.KeyValueStore; -import org.hypertrace.metrics.generator.api.Metric; -import org.hypertrace.metrics.generator.api.MetricIdentity; +import org.hypertrace.metrics.generator.api.v1.Metric; +import org.hypertrace.metrics.generator.api.v1.MetricIdentity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsGenerator.java b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsGenerator.java index 4aa6d8e7a..e85cf4ab3 100644 --- a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsGenerator.java +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsGenerator.java @@ -18,8 +18,11 @@ import org.apache.kafka.streams.state.Stores; import org.hypertrace.core.kafkastreams.framework.KafkaStreamsApp; import org.hypertrace.core.serviceframework.config.ConfigClient; -import org.hypertrace.metrics.generator.api.Metric; -import org.hypertrace.metrics.generator.api.MetricIdentity; +import org.hypertrace.metrics.generator.api.v1.Metric; +import org.hypertrace.metrics.generator.api.v1.MetricIdentity; +import org.hypertrace.metrics.generator.api.v1.serde.MetricIdentitySerde; +import org.hypertrace.metrics.generator.api.v1.serde.MetricSerde; +import org.hypertrace.metrics.generator.api.v1.serde.OtlpMetricsSerde; import org.hypertrace.viewgenerator.api.RawServiceView; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,18 +57,18 @@ public StreamsBuilder buildTopology( inputStreams.put(inputTopic, inputStream); } - // Retrieve the default value serde defined in config and use it - Serde valueSerde = defaultValueSerde(streamsProperties); - Serde keySerde = defaultKeySerde(streamsProperties); - StoreBuilder> metricIdentityStoreBuilder = Stores.keyValueStoreBuilder( - Stores.persistentKeyValueStore(METRICS_IDENTITY_STORE), keySerde, new LongSerde()) + Stores.persistentKeyValueStore(METRICS_IDENTITY_STORE), + new MetricIdentitySerde(), + new LongSerde()) .withCachingEnabled(); StoreBuilder> metricIdentityToValueStoreBuilder = Stores.keyValueStoreBuilder( - Stores.persistentKeyValueStore(METRICS_IDENTITY_VALUE_STORE), keySerde, valueSerde) + Stores.persistentKeyValueStore(METRICS_IDENTITY_VALUE_STORE), + new MetricIdentitySerde(), + new MetricSerde()) .withCachingEnabled(); streamsBuilder.addStateStore(metricIdentityStoreBuilder); diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsProcessor.java b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsProcessor.java index b8ffcabf1..ccfa78b16 100644 --- a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsProcessor.java +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsProcessor.java @@ -30,8 +30,8 @@ import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; -import org.hypertrace.metrics.generator.api.Metric; -import org.hypertrace.metrics.generator.api.MetricIdentity; +import org.hypertrace.metrics.generator.api.v1.Metric; +import org.hypertrace.metrics.generator.api.v1.MetricIdentity; import org.hypertrace.viewgenerator.api.RawServiceView; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -92,7 +92,7 @@ public KeyValue transform(String key, RawServiceView va .setName(METRIC_NUM_CALLS) .setDescription(METRIC_NUM_CALLS_DESCRIPTION) .setUnit(METRIC_NUM_CALLS_UNIT) - .setAttributes(attributes) + .putAllAttributes(attributes) .build(); // create metrics identity (timestamp, metric_key) diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/java/MetricsGeneratorTest.java b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/java/MetricsGeneratorTest.java index 90655367a..f14fdbbde 100644 --- a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/java/MetricsGeneratorTest.java +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/test/java/MetricsGeneratorTest.java @@ -23,7 +23,7 @@ import org.apache.kafka.streams.TopologyTestDriver; import org.hypertrace.core.serviceframework.config.ConfigClientFactory; import org.hypertrace.metrics.generator.MetricsGenerator; -import org.hypertrace.metrics.generator.OtlpMetricsSerde; +import org.hypertrace.metrics.generator.api.v1.serde.OtlpMetricsSerde; import org.hypertrace.viewgenerator.api.RawServiceView; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test;