-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: adds metrics processor component and topology test #270
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
status = error | ||
name = PropertiesConfig | ||
|
||
appender.console.type = Console | ||
appender.console.name = STDOUT | ||
appender.console.layout.type = PatternLayout | ||
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n | ||
|
||
appender.rolling.type = RollingFile | ||
appender.rolling.name = ROLLING_FILE | ||
appender.rolling.fileName = ${sys:service.name:-hypertrace-ingester}.log | ||
appender.rolling.filePattern = ${sys:service.name:-hypertrace-ingester}-%d{MM-dd-yy-HH-mm-ss}-%i.log.gz | ||
appender.rolling.layout.type = PatternLayout | ||
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n | ||
appender.rolling.policies.type = Policies | ||
appender.rolling.policies.time.type = TimeBasedTriggeringPolicy | ||
appender.rolling.policies.time.interval = 3600 | ||
appender.rolling.policies.time.modulate = true | ||
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy | ||
appender.rolling.policies.size.size = 20MB | ||
appender.rolling.strategy.type = DefaultRolloverStrategy | ||
appender.rolling.strategy.max = 5 | ||
|
||
rootLogger.level = INFO | ||
rootLogger.appenderRef.stdout.ref = STDOUT | ||
rootLogger.appenderRef.rolling.ref = ROLLING_FILE | ||
|
||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
subprojects { | ||
group = "org.hypertrace.metrics.processor" | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
plugins { | ||
java | ||
application | ||
jacoco | ||
id("org.hypertrace.docker-java-application-plugin") | ||
id("org.hypertrace.docker-publish-plugin") | ||
id("org.hypertrace.jacoco-report-plugin") | ||
} | ||
|
||
application { | ||
mainClass.set("org.hypertrace.core.serviceframework.PlatformServiceLauncher") | ||
} | ||
|
||
hypertraceDocker { | ||
defaultImage { | ||
javaApplication { | ||
serviceName.set("${project.name}") | ||
adminPort.set(8099) | ||
} | ||
} | ||
} | ||
|
||
tasks.test { | ||
useJUnitPlatform() | ||
} | ||
|
||
dependencies { | ||
// internal projects | ||
implementation(project(":hypertrace-view-generator:hypertrace-view-generator-api")) | ||
|
||
// frameworks | ||
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.26") | ||
implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.26") | ||
implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.21") | ||
|
||
// serde (todo) | ||
// implementation("io.confluent:kafka-streams-protobuf-serde:6.0.1") | ||
|
||
// open telemetry proto | ||
implementation("io.opentelemetry:opentelemetry-proto:1.6.0-alpha") | ||
|
||
// test | ||
testImplementation("org.junit.jupiter:junit-jupiter:5.7.1") | ||
testImplementation("org.mockito:mockito-core:3.8.0") | ||
testImplementation("org.junit-pioneer:junit-pioneer:1.3.8") | ||
testImplementation("org.apache.kafka:kafka-streams-test-utils:6.0.1-ccs") | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
package org.hypertrace.metrics.processor; | ||
|
||
import io.opentelemetry.proto.metrics.v1.ResourceMetrics; | ||
import org.apache.kafka.streams.KeyValue; | ||
import org.apache.kafka.streams.kstream.Transformer; | ||
import org.apache.kafka.streams.processor.ProcessorContext; | ||
|
||
public class MetricsEnricher | ||
implements Transformer<byte[], ResourceMetrics, KeyValue<byte[], ResourceMetrics>> { | ||
|
||
@Override | ||
public void init(ProcessorContext context) {} | ||
|
||
@Override | ||
public KeyValue<byte[], ResourceMetrics> transform(byte[] key, ResourceMetrics value) { | ||
// noop enricher for now | ||
return new KeyValue<>(key, value); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the first iteration, we will be extracting metrics from traces, and they will already have enriched attributes as labels. We will enhance this later, for now, they are just pass through. If requires, we can check if the metrics contain API and service id, and if not, we can drop them. |
||
} | ||
|
||
@Override | ||
public void close() {} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
package org.hypertrace.metrics.processor; | ||
|
||
import io.opentelemetry.proto.metrics.v1.ResourceMetrics; | ||
import org.apache.kafka.streams.KeyValue; | ||
import org.apache.kafka.streams.kstream.Transformer; | ||
import org.apache.kafka.streams.processor.ProcessorContext; | ||
|
||
public class MetricsNormalizer | ||
implements Transformer<byte[], ResourceMetrics, KeyValue<byte[], ResourceMetrics>> { | ||
|
||
@Override | ||
public void init(ProcessorContext context) {} | ||
|
||
@Override | ||
public KeyValue<byte[], ResourceMetrics> transform(byte[] key, ResourceMetrics value) { | ||
// noop normalizer for now | ||
return new KeyValue<>(key, value); | ||
} | ||
|
||
@Override | ||
public void close() {} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
package org.hypertrace.metrics.processor; | ||
|
||
import com.typesafe.config.Config; | ||
import io.opentelemetry.proto.metrics.v1.ResourceMetrics; | ||
import java.util.List; | ||
import java.util.Map; | ||
import org.apache.kafka.common.serialization.Serdes; | ||
import org.apache.kafka.streams.StreamsBuilder; | ||
import org.apache.kafka.streams.kstream.Consumed; | ||
import org.apache.kafka.streams.kstream.KStream; | ||
import org.apache.kafka.streams.kstream.Produced; | ||
import org.hypertrace.core.kafkastreams.framework.KafkaStreamsApp; | ||
import org.hypertrace.core.serviceframework.config.ConfigClient; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class MetricsProcessor extends KafkaStreamsApp { | ||
private static final Logger logger = LoggerFactory.getLogger(MetricsProcessor.class); | ||
public static final String INPUT_TOPIC_CONFIG_KEY = "input.topic"; | ||
public static final String OUTPUT_TOPIC_CONFIG_KEY = "output.topic"; | ||
private static final String METRICS_PROCESSOR_JOB_CONFIG = "metrics-processor-job-config"; | ||
|
||
public MetricsProcessor(ConfigClient configClient) { | ||
super(configClient); | ||
} | ||
|
||
@Override | ||
public StreamsBuilder buildTopology( | ||
Map<String, Object> streamsProperties, | ||
StreamsBuilder streamsBuilder, | ||
Map<String, KStream<?, ?>> inputStreams) { | ||
|
||
Config jobConfig = getJobConfig(streamsProperties); | ||
String inputTopic = jobConfig.getString(INPUT_TOPIC_CONFIG_KEY); | ||
String outputTopic = jobConfig.getString(OUTPUT_TOPIC_CONFIG_KEY); | ||
|
||
// input stream | ||
KStream<byte[], ResourceMetrics> inputStream = | ||
(KStream<byte[], ResourceMetrics>) inputStreams.get(inputTopic); | ||
if (inputStream == null) { | ||
inputStream = | ||
streamsBuilder.stream( | ||
inputTopic, Consumed.with(Serdes.ByteArray(), new OtlpMetricsSerde())); | ||
inputStreams.put(inputTopic, inputStream); | ||
} | ||
|
||
inputStream | ||
.transform(MetricsNormalizer::new) | ||
.transform(MetricsEnricher::new) | ||
.to(outputTopic, Produced.with(Serdes.ByteArray(), new OtlpMetricsSerde())); | ||
|
||
return streamsBuilder; | ||
} | ||
|
||
@Override | ||
public String getJobConfigKey() { | ||
return METRICS_PROCESSOR_JOB_CONFIG; | ||
} | ||
|
||
@Override | ||
public Logger getLogger() { | ||
return logger; | ||
} | ||
|
||
@Override | ||
public List<String> getInputTopics(Map<String, Object> properties) { | ||
Config jobConfig = getJobConfig(properties); | ||
return List.of(jobConfig.getString(INPUT_TOPIC_CONFIG_KEY)); | ||
} | ||
|
||
@Override | ||
public List<String> getOutputTopics(Map<String, Object> properties) { | ||
Config jobConfig = getJobConfig(properties); | ||
return List.of(jobConfig.getString(OUTPUT_TOPIC_CONFIG_KEY)); | ||
} | ||
|
||
private Config getJobConfig(Map<String, Object> properties) { | ||
return (Config) properties.get(getJobConfigKey()); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
package org.hypertrace.metrics.processor; | ||
|
||
import com.google.protobuf.InvalidProtocolBufferException; | ||
import io.opentelemetry.proto.metrics.v1.ResourceMetrics; | ||
import java.util.Map; | ||
import org.apache.kafka.common.serialization.Deserializer; | ||
import org.apache.kafka.common.serialization.Serde; | ||
import org.apache.kafka.common.serialization.Serializer; | ||
|
||
public class OtlpMetricsSerde implements Serde<ResourceMetrics> { | ||
|
||
@Override | ||
public void configure(Map<String, ?> configs, boolean isKey) {} | ||
|
||
@Override | ||
public void close() {} | ||
|
||
@Override | ||
public Serializer<ResourceMetrics> serializer() { | ||
return new OtlpMetricsSerde.Ser(); | ||
} | ||
|
||
@Override | ||
public Deserializer<ResourceMetrics> deserializer() { | ||
return new OtlpMetricsSerde.De(); | ||
} | ||
|
||
public static class Ser implements Serializer<ResourceMetrics> { | ||
@Override | ||
public byte[] serialize(String topic, ResourceMetrics data) { | ||
return data.toByteArray(); | ||
} | ||
} | ||
|
||
public static class De implements Deserializer<ResourceMetrics> { | ||
@Override | ||
public ResourceMetrics deserialize(String topic, byte[] data) { | ||
try { | ||
return ResourceMetrics.parseFrom(data); | ||
} catch (InvalidProtocolBufferException e) { | ||
throw new RuntimeException(e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. assuming an exception handler is part of config that can deal with any exceptions ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this will be handle by default for deserialization exception handler - LogAndContinueExceptionHandler. So it will just be logged and such messages will be dropped. |
||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
service.name = hypertrace-metrics-processor | ||
service.admin.port = 8099 | ||
|
||
main.class = org.hypertrace.metrics.processor.MetricsProcessor | ||
|
||
input.topic = "otlp-metrics" | ||
output.topic = "enriched-otlp-metrics" | ||
|
||
precreate.topics = false | ||
precreate.topics = ${?PRE_CREATE_TOPICS} | ||
|
||
kafka.streams.config = { | ||
application.id = metrics-processor-job | ||
num.stream.threads = 2 | ||
num.stream.threads = ${?NUM_STREAM_THREADS} | ||
|
||
bootstrap.servers = "localhost:9092" | ||
bootstrap.servers = ${?KAFKA_BOOTSTRAP_SERVERS} | ||
|
||
schema.registry.url = "http://localhost:8081" | ||
schema.registry.url = ${?SCHEMA_REGISTRY_URL} | ||
value.subject.name.strategy = "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is it TopicRecordNameStrategy ? are there going to be different message types sent on the topic ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
} | ||
|
||
processor { | ||
defaultTenantId = ${?DEFAULT_TENANT_ID} | ||
} | ||
|
||
logger.names = ["file"] | ||
logger.file.dir = "/var/logs/metrics-processor" | ||
|
||
metrics.reporter.prefix = org.hypertrace.metrics.processor.MetricsProcessor | ||
metrics.reporter.names = ["prometheus"] | ||
metrics.reportInterval = 60 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't you need this ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, I am using internally defined OltpMetricsSerde. I was facing some issue with this so working as follow up item on this. For now, removed this.