Skip to content

Commit

Permalink
feat: adds metrics processor component and topology test (#270)
Browse files Browse the repository at this point in the history
* feat: adds metrics processor component and topology test

* removed un-used kstream component

* removed un-used logger, and api

* address comments
  • Loading branch information
kotharironak committed Oct 14, 2021
1 parent 31cf9fd commit 896081c
Show file tree
Hide file tree
Showing 15 changed files with 560 additions and 11 deletions.
49 changes: 41 additions & 8 deletions hypertrace-ingester/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ dependencies {
implementation(project(":raw-spans-grouper:raw-spans-grouper"))
implementation(project(":hypertrace-trace-enricher:hypertrace-trace-enricher"))
implementation(project(":hypertrace-view-generator:hypertrace-view-generator"))
implementation(project(":hypertrace-metrics-processor:hypertrace-metrics-processor"))

testImplementation("org.junit.jupiter:junit-jupiter:5.7.1")
testImplementation("org.mockito:mockito-core:3.8.0")
Expand All @@ -61,10 +62,26 @@ tasks.processResources {

tasks.register<Copy>("copyServiceConfigs") {
with(
createCopySpec("span-normalizer", "span-normalizer", "main", "common"),
createCopySpec("raw-spans-grouper", "raw-spans-grouper", "main", "common"),
createCopySpec("hypertrace-trace-enricher", "hypertrace-trace-enricher", "main", "common"),
createCopySpec("hypertrace-view-generator", "hypertrace-view-generator", "main", "common")
createCopySpec("span-normalizer",
"span-normalizer",
"main",
"common"),
createCopySpec("raw-spans-grouper",
"raw-spans-grouper",
"main",
"common"),
createCopySpec("hypertrace-trace-enricher",
"hypertrace-trace-enricher",
"main",
"common"),
createCopySpec("hypertrace-view-generator",
"hypertrace-view-generator",
"main",
"common"),
createCopySpec("hypertrace-metrics-processor",
"hypertrace-metrics-processor",
"main",
"common")
).into("./build/resources/main/configs/")
}

Expand Down Expand Up @@ -101,10 +118,26 @@ tasks.test {

tasks.register<Copy>("copyServiceConfigsTest") {
with(
createCopySpec("span-normalizer", "span-normalizer", "test", "span-normalizer"),
createCopySpec("raw-spans-grouper", "raw-spans-grouper", "test", "raw-spans-grouper"),
createCopySpec("hypertrace-trace-enricher", "hypertrace-trace-enricher", "test", "hypertrace-trace-enricher"),
createCopySpec("hypertrace-view-generator", "hypertrace-view-generator", "test", "hypertrace-view-generator")
createCopySpec("span-normalizer",
"span-normalizer",
"test",
"span-normalizer"),
createCopySpec("raw-spans-grouper",
"raw-spans-grouper",
"test",
"raw-spans-grouper"),
createCopySpec("hypertrace-trace-enricher",
"hypertrace-trace-enricher",
"test",
"hypertrace-trace-enricher"),
createCopySpec("hypertrace-view-generator",
"hypertrace-view-generator",
"test",
"hypertrace-view-generator"),
createCopySpec("hypertrace-metrics-processor",
"hypertrace-metrics-processor",
"test",
"hypertrace-metrics-processor")
).into("./build/resources/test/configs/")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.hypertrace.core.serviceframework.config.ConfigUtils;
import org.hypertrace.core.spannormalizer.SpanNormalizer;
import org.hypertrace.core.viewgenerator.service.MultiViewGeneratorLauncher;
import org.hypertrace.metrics.processor.MetricsProcessor;
import org.hypertrace.traceenricher.trace.enricher.TraceEnricher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -49,6 +50,9 @@ private KafkaStreamsApp getSubTopologyInstance(String name) {
case "all-views":
kafkaStreamsApp = new MultiViewGeneratorLauncher(ConfigClientFactory.getClient());
break;
case "hypertrace-metrics-processor":
kafkaStreamsApp = new MetricsProcessor(ConfigClientFactory.getClient());
break;
default:
throw new RuntimeException(String.format("Invalid configured sub-topology : [%s]", name));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@ main.class = org.hypertrace.ingester.HypertraceIngester
service.name = hypertrace-ingester
service.admin.port = 8099

sub.topology.names = ["span-normalizer", "raw-spans-grouper", "hypertrace-trace-enricher", "all-views"]
sub.topology.names = [
"span-normalizer",
"raw-spans-grouper",
"hypertrace-trace-enricher",
"all-views",
"hypertrace-metrics-processor"
]

precreate.topics = false
precreate.topics = ${?PRE_CREATE_TOPICS}
Expand Down
29 changes: 29 additions & 0 deletions hypertrace-ingester/src/main/resources/log4j2.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
status = error
name = PropertiesConfig

appender.console.type = Console
appender.console.name = STDOUT
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n

appender.rolling.type = RollingFile
appender.rolling.name = ROLLING_FILE
appender.rolling.fileName = ${sys:service.name:-hypertrace-ingester}.log
appender.rolling.filePattern = ${sys:service.name:-hypertrace-ingester}-%d{MM-dd-yy-HH-mm-ss}-%i.log.gz
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n
appender.rolling.policies.type = Policies
appender.rolling.policies.time.type = TimeBasedTriggeringPolicy
appender.rolling.policies.time.interval = 3600
appender.rolling.policies.time.modulate = true
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size = 20MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 5

rootLogger.level = INFO
rootLogger.appenderRef.stdout.ref = STDOUT
rootLogger.appenderRef.rolling.ref = ROLLING_FILE



3 changes: 3 additions & 0 deletions hypertrace-metrics-processor/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
subprojects {
group = "org.hypertrace.metrics.processor"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
plugins {
java
application
jacoco
id("org.hypertrace.docker-java-application-plugin")
id("org.hypertrace.docker-publish-plugin")
id("org.hypertrace.jacoco-report-plugin")
}

application {
mainClass.set("org.hypertrace.core.serviceframework.PlatformServiceLauncher")
}

hypertraceDocker {
defaultImage {
javaApplication {
serviceName.set("${project.name}")
adminPort.set(8099)
}
}
}

tasks.test {
useJUnitPlatform()
}

dependencies {
// internal projects
implementation(project(":hypertrace-view-generator:hypertrace-view-generator-api"))

// frameworks
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.26")
implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.26")
implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.21")

// open telemetry proto
implementation("io.opentelemetry:opentelemetry-proto:1.6.0-alpha")

// test
testImplementation("org.junit.jupiter:junit-jupiter:5.7.1")
testImplementation("org.mockito:mockito-core:3.8.0")
testImplementation("org.junit-pioneer:junit-pioneer:1.3.8")
testImplementation("org.apache.kafka:kafka-streams-test-utils:6.0.1-ccs")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.hypertrace.metrics.processor;

import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext;

public class MetricsEnricher
implements Transformer<byte[], ResourceMetrics, KeyValue<byte[], ResourceMetrics>> {

@Override
public void init(ProcessorContext context) {}

@Override
public KeyValue<byte[], ResourceMetrics> transform(byte[] key, ResourceMetrics value) {
// noop enricher for now
return new KeyValue<>(key, value);
}

@Override
public void close() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.hypertrace.metrics.processor;

import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext;

public class MetricsNormalizer
implements Transformer<byte[], ResourceMetrics, KeyValue<byte[], ResourceMetrics>> {

@Override
public void init(ProcessorContext context) {}

@Override
public KeyValue<byte[], ResourceMetrics> transform(byte[] key, ResourceMetrics value) {
// noop normalizer for now
return new KeyValue<>(key, value);
}

@Override
public void close() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package org.hypertrace.metrics.processor;

import com.typesafe.config.Config;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.hypertrace.core.kafkastreams.framework.KafkaStreamsApp;
import org.hypertrace.core.serviceframework.config.ConfigClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MetricsProcessor extends KafkaStreamsApp {
private static final Logger logger = LoggerFactory.getLogger(MetricsProcessor.class);
public static final String INPUT_TOPIC_CONFIG_KEY = "input.topic";
public static final String OUTPUT_TOPIC_CONFIG_KEY = "output.topic";
private static final String METRICS_PROCESSOR_JOB_CONFIG = "metrics-processor-job-config";

public MetricsProcessor(ConfigClient configClient) {
super(configClient);
}

@Override
public StreamsBuilder buildTopology(
Map<String, Object> streamsProperties,
StreamsBuilder streamsBuilder,
Map<String, KStream<?, ?>> inputStreams) {

Config jobConfig = getJobConfig(streamsProperties);
String inputTopic = jobConfig.getString(INPUT_TOPIC_CONFIG_KEY);
String outputTopic = jobConfig.getString(OUTPUT_TOPIC_CONFIG_KEY);

// input stream
KStream<byte[], ResourceMetrics> inputStream =
(KStream<byte[], ResourceMetrics>) inputStreams.get(inputTopic);
if (inputStream == null) {
inputStream =
streamsBuilder.stream(
inputTopic, Consumed.with(Serdes.ByteArray(), new OtlpMetricsSerde()));
inputStreams.put(inputTopic, inputStream);
}

inputStream
.transform(MetricsNormalizer::new)
.transform(MetricsEnricher::new)
.to(outputTopic, Produced.with(Serdes.ByteArray(), new OtlpMetricsSerde()));

return streamsBuilder;
}

@Override
public String getJobConfigKey() {
return METRICS_PROCESSOR_JOB_CONFIG;
}

@Override
public Logger getLogger() {
return logger;
}

@Override
public List<String> getInputTopics(Map<String, Object> properties) {
Config jobConfig = getJobConfig(properties);
return List.of(jobConfig.getString(INPUT_TOPIC_CONFIG_KEY));
}

@Override
public List<String> getOutputTopics(Map<String, Object> properties) {
Config jobConfig = getJobConfig(properties);
return List.of(jobConfig.getString(OUTPUT_TOPIC_CONFIG_KEY));
}

private Config getJobConfig(Map<String, Object> properties) {
return (Config) properties.get(getJobConfigKey());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.hypertrace.metrics.processor;

import com.google.protobuf.InvalidProtocolBufferException;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;

public class OtlpMetricsSerde implements Serde<ResourceMetrics> {

@Override
public void configure(Map<String, ?> configs, boolean isKey) {}

@Override
public void close() {}

@Override
public Serializer<ResourceMetrics> serializer() {
return new OtlpMetricsSerde.Ser();
}

@Override
public Deserializer<ResourceMetrics> deserializer() {
return new OtlpMetricsSerde.De();
}

public static class Ser implements Serializer<ResourceMetrics> {
@Override
public byte[] serialize(String topic, ResourceMetrics data) {
return data.toByteArray();
}
}

public static class De implements Deserializer<ResourceMetrics> {
@Override
public ResourceMetrics deserialize(String topic, byte[] data) {
try {
return ResourceMetrics.parseFrom(data);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
service.name = hypertrace-metrics-processor
service.admin.port = 8099

main.class = org.hypertrace.metrics.processor.MetricsProcessor

input.topic = "otlp-metrics"
output.topic = "enriched-otlp-metrics"

precreate.topics = false
precreate.topics = ${?PRE_CREATE_TOPICS}

kafka.streams.config = {
application.id = metrics-processor-job
num.stream.threads = 2
num.stream.threads = ${?NUM_STREAM_THREADS}

bootstrap.servers = "localhost:9092"
bootstrap.servers = ${?KAFKA_BOOTSTRAP_SERVERS}

schema.registry.url = "http://localhost:8081"
schema.registry.url = ${?SCHEMA_REGISTRY_URL}
}

processor {
defaultTenantId = ${?DEFAULT_TENANT_ID}
}

logger.names = ["file"]
logger.file.dir = "/var/logs/metrics-processor"

metrics.reporter.prefix = org.hypertrace.metrics.processor.MetricsProcessor
metrics.reporter.names = ["prometheus"]
metrics.reportInterval = 60
Loading

0 comments on commit 896081c

Please sign in to comment.