Skip to content

Commit a4e8fa4

Browse files
grouper/enricher produces/consumes keyed input (#314)
* grouper/enricher produces/consumes keyed input * fixed the failing test due serde config Co-authored-by: Ronak <ronak@traceable.ai>
1 parent ef63989 commit a4e8fa4

File tree

13 files changed

+72
-52
lines changed

13 files changed

+72
-52
lines changed

hypertrace-ingester/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ hypertraceDocker {
2525
}
2626

2727
dependencies {
28-
implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.23")
28+
implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.25")
2929
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.33")
3030
implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.33")
3131
implementation("org.hypertrace.core.datamodel:data-model:0.1.20")

hypertrace-metrics-generator/hypertrace-metrics-generator/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ dependencies {
3030
implementation(project(":hypertrace-view-generator:hypertrace-view-generator-api"))
3131
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.33")
3232
implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.33")
33-
implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.23")
33+
implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.25")
3434

3535
// open telemetry proto
3636
implementation("io.opentelemetry:opentelemetry-proto:1.6.0-alpha")

hypertrace-metrics-processor/hypertrace-metrics-processor/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ dependencies {
3131
// frameworks
3232
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.33")
3333
implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.33")
34-
implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.23")
34+
implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.25")
3535

3636
// open telemetry proto
3737
implementation("io.opentelemetry:opentelemetry-proto:1.6.0-alpha")

hypertrace-trace-enricher/hypertrace-trace-enricher/build.gradle.kts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,14 @@ tasks.test {
3030

3131
dependencies {
3232
implementation(project(":hypertrace-trace-enricher:hypertrace-trace-enricher-impl"))
33+
implementation(project(":span-normalizer:span-normalizer-api"))
3334
implementation("org.hypertrace.core.datamodel:data-model:0.1.20")
3435
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.33")
3536
implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.33")
3637
implementation("org.hypertrace.entity.service:entity-service-client:0.8.5")
3738

3839
implementation("com.typesafe:config:1.4.1")
39-
implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.23")
40+
implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.25")
4041
constraints {
4142
runtimeOnly("io.netty:netty-codec-http2:4.1.71.Final")
4243
runtimeOnly("io.netty:netty-handler-proxy:4.1.71.Final")

hypertrace-trace-enricher/hypertrace-trace-enricher/src/main/java/org/hypertrace/traceenricher/trace/enricher/StructuredTraceEnrichProcessor.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@
1717
import org.apache.kafka.streams.kstream.Transformer;
1818
import org.apache.kafka.streams.processor.ProcessorContext;
1919
import org.hypertrace.core.datamodel.StructuredTrace;
20+
import org.hypertrace.core.spannormalizer.TraceIdentity;
2021
import org.hypertrace.traceenricher.enrichment.EnrichmentProcessor;
2122
import org.hypertrace.traceenricher.enrichment.EnrichmentRegistry;
2223
import org.hypertrace.traceenricher.enrichment.clients.DefaultClientRegistry;
2324

2425
public class StructuredTraceEnrichProcessor
25-
implements Transformer<String, StructuredTrace, KeyValue<String, StructuredTrace>> {
26+
implements Transformer<TraceIdentity, StructuredTrace, KeyValue<String, StructuredTrace>> {
2627

2728
private static EnrichmentProcessor processor = null;
2829
private DefaultClientRegistry clientRegistry;
@@ -53,7 +54,7 @@ public void init(ProcessorContext context) {
5354
}
5455

5556
@Override
56-
public KeyValue<String, StructuredTrace> transform(String key, StructuredTrace value) {
57+
public KeyValue<String, StructuredTrace> transform(TraceIdentity key, StructuredTrace value) {
5758
processor.process(value);
5859
return new KeyValue<>(null, value);
5960
}

hypertrace-trace-enricher/hypertrace-trace-enricher/src/main/java/org/hypertrace/traceenricher/trace/enricher/TraceEnricher.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.hypertrace.core.datamodel.StructuredTrace;
1717
import org.hypertrace.core.kafkastreams.framework.KafkaStreamsApp;
1818
import org.hypertrace.core.serviceframework.config.ConfigClient;
19+
import org.hypertrace.core.spannormalizer.TraceIdentity;
1920
import org.slf4j.Logger;
2021
import org.slf4j.LoggerFactory;
2122

@@ -37,10 +38,10 @@ public StreamsBuilder buildTopology(
3738
String inputTopic = jobConfig.getString(INPUT_TOPIC_CONFIG_KEY);
3839
String outputTopic = jobConfig.getString(OUTPUT_TOPIC_CONFIG_KEY);
3940

40-
KStream<String, StructuredTrace> inputStream =
41-
(KStream<String, StructuredTrace>) inputStreams.get(inputTopic);
41+
KStream<TraceIdentity, StructuredTrace> inputStream =
42+
(KStream<TraceIdentity, StructuredTrace>) inputStreams.get(inputTopic);
4243
if (inputStream == null) {
43-
inputStream = streamsBuilder.stream(inputTopic, Consumed.with(Serdes.String(), null));
44+
inputStream = streamsBuilder.stream(inputTopic, Consumed.with(null, null));
4445
inputStreams.put(inputTopic, inputStream);
4546
}
4647

hypertrace-trace-enricher/hypertrace-trace-enricher/src/test/java/HypertraceTraceEnricherTest.java

Lines changed: 41 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import static org.junit.jupiter.api.Assertions.assertEquals;
2+
13
import com.typesafe.config.Config;
24
import com.typesafe.config.ConfigFactory;
35
import java.nio.ByteBuffer;
@@ -7,18 +9,15 @@
79
import java.util.Properties;
810
import org.apache.kafka.common.serialization.Serde;
911
import org.apache.kafka.common.serialization.Serdes;
10-
import org.apache.kafka.streams.StreamsBuilder;
11-
import org.apache.kafka.streams.TestInputTopic;
12-
import org.apache.kafka.streams.TestOutputTopic;
13-
import org.apache.kafka.streams.TopologyTestDriver;
12+
import org.apache.kafka.streams.*;
1413
import org.hypertrace.core.datamodel.Attributes;
1514
import org.hypertrace.core.datamodel.StructuredTrace;
1615
import org.hypertrace.core.datamodel.shared.HexUtils;
1716
import org.hypertrace.core.kafkastreams.framework.serdes.AvroSerde;
1817
import org.hypertrace.core.serviceframework.config.ConfigClientFactory;
18+
import org.hypertrace.core.spannormalizer.TraceIdentity;
1919
import org.hypertrace.traceenricher.trace.enricher.StructuredTraceEnricherConstants;
2020
import org.hypertrace.traceenricher.trace.enricher.TraceEnricher;
21-
import org.junit.jupiter.api.Assertions;
2221
import org.junit.jupiter.api.BeforeEach;
2322
import org.junit.jupiter.api.Test;
2423
import org.junitpioneer.jupiter.SetEnvironmentVariable;
@@ -64,48 +63,64 @@ public void testTraceEnricherTopology() {
6463
// create topology test driver for trace-enricher
6564
TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), props);
6665

66+
Serde<TraceIdentity> traceKeySerde = new AvroSerde<>();
6767
Serde<StructuredTrace> htStructuredTraceSerde = new AvroSerde<>();
6868

6969
// create input topic for HT-model StructuredTrace
70-
TestInputTopic<String, StructuredTrace> inputTopic =
70+
TestInputTopic<TraceIdentity, StructuredTrace> inputTopic =
7171
topologyTestDriver.createInputTopic(
7272
config.getString(StructuredTraceEnricherConstants.INPUT_TOPIC_CONFIG_KEY),
73-
Serdes.String().serializer(),
73+
traceKeySerde.serializer(),
7474
htStructuredTraceSerde.serializer());
7575

7676
// create output topic for closed-model StructuredTrace
77-
TestOutputTopic outputTopic =
77+
TestOutputTopic<String, StructuredTrace> outputTopic =
7878
topologyTestDriver.createOutputTopic(
7979
config.getString(StructuredTraceEnricherConstants.OUTPUT_TOPIC_CONFIG_KEY),
8080
Serdes.String().deserializer(),
8181
htStructuredTraceSerde.deserializer());
8282

8383
// create instance of HT-model StructuredTrace
84-
StructuredTrace htStructuredTrace = createHTStructuredTrace("customer1", "1234");
84+
KeyValue<TraceIdentity, StructuredTrace> traceKVPair1 =
85+
createHTStructuredTrace("customer1", "1234");
86+
KeyValue<TraceIdentity, StructuredTrace> traceKVPair2 =
87+
createHTStructuredTrace("customer2", "5678");
8588

8689
// Write an input record into input topic
87-
inputTopic.pipeInput(htStructuredTrace);
90+
inputTopic.pipeInput(traceKVPair1.key, traceKVPair1.value);
91+
// enricher should be able to handle null keys as well for backward compatibility
92+
inputTopic.pipeInput(null, traceKVPair2.value);
8893

8994
// Read the output record from output topic
90-
StructuredTrace structuredTrace = (StructuredTrace) outputTopic.readValue();
95+
StructuredTrace structuredTrace1 = outputTopic.readValue();
96+
StructuredTrace structuredTrace2 = outputTopic.readValue();
9197

92-
Assertions.assertEquals(
93-
HexUtils.getHex("1234".getBytes()), HexUtils.getHex(structuredTrace.getTraceId()));
98+
assertEquals(
99+
HexUtils.getHex("1234".getBytes()), HexUtils.getHex(structuredTrace1.getTraceId()));
100+
assertEquals(
101+
HexUtils.getHex("5678".getBytes()), HexUtils.getHex(structuredTrace2.getTraceId()));
94102
}
95103

96-
private org.hypertrace.core.datamodel.StructuredTrace createHTStructuredTrace(
104+
private KeyValue<TraceIdentity, StructuredTrace> createHTStructuredTrace(
97105
String customerId, String traceId) {
98-
return StructuredTrace.newBuilder()
99-
.setCustomerId(customerId)
100-
.setTraceId(ByteBuffer.wrap(traceId.getBytes()))
101-
.setStartTimeMillis(System.currentTimeMillis() - 10000)
102-
.setEndTimeMillis(System.currentTimeMillis())
103-
.setAttributes(Attributes.newBuilder().build())
104-
.setEntityList(new ArrayList<>())
105-
.setEntityEdgeList(new ArrayList<>())
106-
.setEventEdgeList(new ArrayList<>())
107-
.setEntityEventEdgeList(new ArrayList<>())
108-
.setEventList(new ArrayList<>())
109-
.build();
106+
TraceIdentity traceKey =
107+
TraceIdentity.newBuilder()
108+
.setTenantId(customerId)
109+
.setTraceId(ByteBuffer.wrap(traceId.getBytes()))
110+
.build();
111+
StructuredTrace trace =
112+
StructuredTrace.newBuilder()
113+
.setCustomerId(customerId)
114+
.setTraceId(ByteBuffer.wrap(traceId.getBytes()))
115+
.setStartTimeMillis(System.currentTimeMillis() - 10000)
116+
.setEndTimeMillis(System.currentTimeMillis())
117+
.setAttributes(Attributes.newBuilder().build())
118+
.setEntityList(new ArrayList<>())
119+
.setEntityEdgeList(new ArrayList<>())
120+
.setEventEdgeList(new ArrayList<>())
121+
.setEntityEventEdgeList(new ArrayList<>())
122+
.setEventList(new ArrayList<>())
123+
.build();
124+
return KeyValue.pair(traceKey, trace);
110125
}
111126
}

raw-spans-grouper/raw-spans-grouper/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ dependencies {
3737
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.33")
3838
implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.33")
3939

40-
implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.23")
40+
implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.25")
4141
implementation("com.typesafe:config:1.4.1")
4242
implementation("de.javakaffee:kryo-serializers:0.45")
4343
implementation("io.confluent:kafka-avro-serializer:5.5.0")

raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouper.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import java.util.List;
1212
import java.util.Map;
1313
import org.apache.kafka.common.serialization.Serde;
14-
import org.apache.kafka.common.serialization.Serdes;
1514
import org.apache.kafka.streams.StreamsBuilder;
1615
import org.apache.kafka.streams.StreamsConfig;
1716
import org.apache.kafka.streams.kstream.KStream;
@@ -73,7 +72,7 @@ public StreamsBuilder buildTopology(
7372
streamsBuilder.addStateStore(spanStoreBuilder);
7473
streamsBuilder.addStateStore(traceStateStoreBuilder);
7574

76-
Produced<String, StructuredTrace> outputTopicProducer = Produced.with(Serdes.String(), null);
75+
Produced<TraceIdentity, StructuredTrace> outputTopicProducer = Produced.with(null, null);
7776
outputTopicProducer = outputTopicProducer.withName(OUTPUT_TOPIC_PRODUCER);
7877

7978
inputStream

raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
* will get an additional {@link RawSpansProcessor#groupingWindowTimeoutMs} time to accept spans.
5252
*/
5353
public class RawSpansProcessor
54-
implements Transformer<TraceIdentity, RawSpan, KeyValue<String, StructuredTrace>> {
54+
implements Transformer<TraceIdentity, RawSpan, KeyValue<TraceIdentity, StructuredTrace>> {
5555

5656
private static final Logger logger = LoggerFactory.getLogger(RawSpansProcessor.class);
5757
private static final String PROCESSING_LATENCY_TIMER =
@@ -109,7 +109,7 @@ public void init(ProcessorContext context) {
109109
restorePunctuators();
110110
}
111111

112-
public KeyValue<String, StructuredTrace> transform(TraceIdentity key, RawSpan value) {
112+
public KeyValue<TraceIdentity, StructuredTrace> transform(TraceIdentity key, RawSpan value) {
113113
Instant start = Instant.now();
114114
long currentTimeMs = System.currentTimeMillis();
115115

0 commit comments

Comments
 (0)