From c404b3279ac21986e5d840dd2c95f5dfc8d1c3fc Mon Sep 17 00:00:00 2001 From: Laxman Ch Date: Mon, 28 Mar 2022 17:03:15 +0530 Subject: [PATCH 1/2] grouper/enricher produces/consumes keyed input --- hypertrace-ingester/build.gradle.kts | 2 +- .../build.gradle.kts | 2 +- .../build.gradle.kts | 2 +- .../build.gradle.kts | 3 +- .../StructuredTraceEnrichProcessor.java | 5 +- .../trace/enricher/TraceEnricher.java | 7 +- .../java/HypertraceTraceEnricherTest.java | 67 ++++++++++++------- .../raw-spans-grouper/build.gradle.kts | 2 +- .../core/rawspansgrouper/RawSpansGrouper.java | 3 +- .../rawspansgrouper/RawSpansProcessor.java | 4 +- .../rawspansgrouper/TraceEmitPunctuator.java | 2 +- .../rawspansgrouper/RawSpansGrouperTest.java | 20 +++--- .../span-normalizer/build.gradle.kts | 2 +- 13 files changed, 71 insertions(+), 50 deletions(-) diff --git a/hypertrace-ingester/build.gradle.kts b/hypertrace-ingester/build.gradle.kts index 6057dc487..297d7408d 100644 --- a/hypertrace-ingester/build.gradle.kts +++ b/hypertrace-ingester/build.gradle.kts @@ -25,7 +25,7 @@ hypertraceDocker { } dependencies { - implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.23") + implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.25") implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.33") implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.33") implementation("org.hypertrace.core.datamodel:data-model:0.1.20") diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/build.gradle.kts b/hypertrace-metrics-generator/hypertrace-metrics-generator/build.gradle.kts index 711e1371a..9d742186f 100644 --- a/hypertrace-metrics-generator/hypertrace-metrics-generator/build.gradle.kts +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/build.gradle.kts @@ -30,7 +30,7 @@ dependencies { implementation(project(":hypertrace-view-generator:hypertrace-view-generator-api")) implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.33") implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.33") - implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.23") + implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.25") // open telemetry proto implementation("io.opentelemetry:opentelemetry-proto:1.6.0-alpha") diff --git a/hypertrace-metrics-processor/hypertrace-metrics-processor/build.gradle.kts b/hypertrace-metrics-processor/hypertrace-metrics-processor/build.gradle.kts index eade074f6..feb0fbac2 100644 --- a/hypertrace-metrics-processor/hypertrace-metrics-processor/build.gradle.kts +++ b/hypertrace-metrics-processor/hypertrace-metrics-processor/build.gradle.kts @@ -31,7 +31,7 @@ dependencies { // frameworks implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.33") implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.33") - implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.23") + implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.25") // open telemetry proto implementation("io.opentelemetry:opentelemetry-proto:1.6.0-alpha") diff --git a/hypertrace-trace-enricher/hypertrace-trace-enricher/build.gradle.kts b/hypertrace-trace-enricher/hypertrace-trace-enricher/build.gradle.kts index 943f53156..20f7f53c3 100644 --- a/hypertrace-trace-enricher/hypertrace-trace-enricher/build.gradle.kts +++ b/hypertrace-trace-enricher/hypertrace-trace-enricher/build.gradle.kts @@ -30,13 +30,14 @@ tasks.test { dependencies { implementation(project(":hypertrace-trace-enricher:hypertrace-trace-enricher-impl")) + implementation(project(":span-normalizer:span-normalizer-api")) implementation("org.hypertrace.core.datamodel:data-model:0.1.20") implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.33") implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.33") implementation("org.hypertrace.entity.service:entity-service-client:0.8.5") implementation("com.typesafe:config:1.4.1") - implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.23") + implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.25") constraints { runtimeOnly("io.netty:netty-codec-http2:4.1.71.Final") runtimeOnly("io.netty:netty-handler-proxy:4.1.71.Final") diff --git a/hypertrace-trace-enricher/hypertrace-trace-enricher/src/main/java/org/hypertrace/traceenricher/trace/enricher/StructuredTraceEnrichProcessor.java b/hypertrace-trace-enricher/hypertrace-trace-enricher/src/main/java/org/hypertrace/traceenricher/trace/enricher/StructuredTraceEnrichProcessor.java index b3b0c72c1..8408ca24c 100644 --- a/hypertrace-trace-enricher/hypertrace-trace-enricher/src/main/java/org/hypertrace/traceenricher/trace/enricher/StructuredTraceEnrichProcessor.java +++ b/hypertrace-trace-enricher/hypertrace-trace-enricher/src/main/java/org/hypertrace/traceenricher/trace/enricher/StructuredTraceEnrichProcessor.java @@ -17,12 +17,13 @@ import org.apache.kafka.streams.kstream.Transformer; import org.apache.kafka.streams.processor.ProcessorContext; import org.hypertrace.core.datamodel.StructuredTrace; +import org.hypertrace.core.spannormalizer.TraceIdentity; import org.hypertrace.traceenricher.enrichment.EnrichmentProcessor; import org.hypertrace.traceenricher.enrichment.EnrichmentRegistry; import org.hypertrace.traceenricher.enrichment.clients.DefaultClientRegistry; public class StructuredTraceEnrichProcessor - implements Transformer> { + implements Transformer> { private static EnrichmentProcessor processor = null; private DefaultClientRegistry clientRegistry; @@ -53,7 +54,7 @@ public void init(ProcessorContext context) { } @Override - public KeyValue transform(String key, StructuredTrace value) { + public KeyValue transform(TraceIdentity key, StructuredTrace value) { processor.process(value); return new KeyValue<>(null, value); } diff --git a/hypertrace-trace-enricher/hypertrace-trace-enricher/src/main/java/org/hypertrace/traceenricher/trace/enricher/TraceEnricher.java b/hypertrace-trace-enricher/hypertrace-trace-enricher/src/main/java/org/hypertrace/traceenricher/trace/enricher/TraceEnricher.java index 7dbc3eff7..3fd23cd28 100644 --- a/hypertrace-trace-enricher/hypertrace-trace-enricher/src/main/java/org/hypertrace/traceenricher/trace/enricher/TraceEnricher.java +++ b/hypertrace-trace-enricher/hypertrace-trace-enricher/src/main/java/org/hypertrace/traceenricher/trace/enricher/TraceEnricher.java @@ -16,6 +16,7 @@ import org.hypertrace.core.datamodel.StructuredTrace; import org.hypertrace.core.kafkastreams.framework.KafkaStreamsApp; import org.hypertrace.core.serviceframework.config.ConfigClient; +import org.hypertrace.core.spannormalizer.TraceIdentity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,10 +38,10 @@ public StreamsBuilder buildTopology( String inputTopic = jobConfig.getString(INPUT_TOPIC_CONFIG_KEY); String outputTopic = jobConfig.getString(OUTPUT_TOPIC_CONFIG_KEY); - KStream inputStream = - (KStream) inputStreams.get(inputTopic); + KStream inputStream = + (KStream) inputStreams.get(inputTopic); if (inputStream == null) { - inputStream = streamsBuilder.stream(inputTopic, Consumed.with(Serdes.String(), null)); + inputStream = streamsBuilder.stream(inputTopic, Consumed.with(null, null)); inputStreams.put(inputTopic, inputStream); } diff --git a/hypertrace-trace-enricher/hypertrace-trace-enricher/src/test/java/HypertraceTraceEnricherTest.java b/hypertrace-trace-enricher/hypertrace-trace-enricher/src/test/java/HypertraceTraceEnricherTest.java index 541671ea8..e5ea8170d 100644 --- a/hypertrace-trace-enricher/hypertrace-trace-enricher/src/test/java/HypertraceTraceEnricherTest.java +++ b/hypertrace-trace-enricher/hypertrace-trace-enricher/src/test/java/HypertraceTraceEnricherTest.java @@ -1,3 +1,5 @@ +import static org.junit.jupiter.api.Assertions.assertEquals; + import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import java.nio.ByteBuffer; @@ -7,18 +9,15 @@ import java.util.Properties; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.TestInputTopic; -import org.apache.kafka.streams.TestOutputTopic; -import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.*; import org.hypertrace.core.datamodel.Attributes; import org.hypertrace.core.datamodel.StructuredTrace; import org.hypertrace.core.datamodel.shared.HexUtils; import org.hypertrace.core.kafkastreams.framework.serdes.AvroSerde; import org.hypertrace.core.serviceframework.config.ConfigClientFactory; +import org.hypertrace.core.spannormalizer.TraceIdentity; import org.hypertrace.traceenricher.trace.enricher.StructuredTraceEnricherConstants; import org.hypertrace.traceenricher.trace.enricher.TraceEnricher; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junitpioneer.jupiter.SetEnvironmentVariable; @@ -64,48 +63,64 @@ public void testTraceEnricherTopology() { // create topology test driver for trace-enricher TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), props); + Serde traceKeySerde = new AvroSerde<>(); Serde htStructuredTraceSerde = new AvroSerde<>(); // create input topic for HT-model StructuredTrace - TestInputTopic inputTopic = + TestInputTopic inputTopic = topologyTestDriver.createInputTopic( config.getString(StructuredTraceEnricherConstants.INPUT_TOPIC_CONFIG_KEY), - Serdes.String().serializer(), + traceKeySerde.serializer(), htStructuredTraceSerde.serializer()); // create output topic for closed-model StructuredTrace - TestOutputTopic outputTopic = + TestOutputTopic outputTopic = topologyTestDriver.createOutputTopic( config.getString(StructuredTraceEnricherConstants.OUTPUT_TOPIC_CONFIG_KEY), Serdes.String().deserializer(), htStructuredTraceSerde.deserializer()); // create instance of HT-model StructuredTrace - StructuredTrace htStructuredTrace = createHTStructuredTrace("customer1", "1234"); + KeyValue traceKVPair1 = + createHTStructuredTrace("customer1", "1234"); + KeyValue traceKVPair2 = + createHTStructuredTrace("customer2", "5678"); // Write an input record into input topic - inputTopic.pipeInput(htStructuredTrace); + inputTopic.pipeInput(traceKVPair1.key, traceKVPair1.value); + // enricher should be able to handle null keys as well for backward compatibility + inputTopic.pipeInput(null, traceKVPair2.value); // Read the output record from output topic - StructuredTrace structuredTrace = (StructuredTrace) outputTopic.readValue(); + StructuredTrace structuredTrace1 = outputTopic.readValue(); + StructuredTrace structuredTrace2 = outputTopic.readValue(); - Assertions.assertEquals( - HexUtils.getHex("1234".getBytes()), HexUtils.getHex(structuredTrace.getTraceId())); + assertEquals( + HexUtils.getHex("1234".getBytes()), HexUtils.getHex(structuredTrace1.getTraceId())); + assertEquals( + HexUtils.getHex("5678".getBytes()), HexUtils.getHex(structuredTrace2.getTraceId())); } - private org.hypertrace.core.datamodel.StructuredTrace createHTStructuredTrace( + private KeyValue createHTStructuredTrace( String customerId, String traceId) { - return StructuredTrace.newBuilder() - .setCustomerId(customerId) - .setTraceId(ByteBuffer.wrap(traceId.getBytes())) - .setStartTimeMillis(System.currentTimeMillis() - 10000) - .setEndTimeMillis(System.currentTimeMillis()) - .setAttributes(Attributes.newBuilder().build()) - .setEntityList(new ArrayList<>()) - .setEntityEdgeList(new ArrayList<>()) - .setEventEdgeList(new ArrayList<>()) - .setEntityEventEdgeList(new ArrayList<>()) - .setEventList(new ArrayList<>()) - .build(); + TraceIdentity traceKey = + TraceIdentity.newBuilder() + .setTenantId(customerId) + .setTraceId(ByteBuffer.wrap(traceId.getBytes())) + .build(); + StructuredTrace trace = + StructuredTrace.newBuilder() + .setCustomerId(customerId) + .setTraceId(ByteBuffer.wrap(traceId.getBytes())) + .setStartTimeMillis(System.currentTimeMillis() - 10000) + .setEndTimeMillis(System.currentTimeMillis()) + .setAttributes(Attributes.newBuilder().build()) + .setEntityList(new ArrayList<>()) + .setEntityEdgeList(new ArrayList<>()) + .setEventEdgeList(new ArrayList<>()) + .setEntityEventEdgeList(new ArrayList<>()) + .setEventList(new ArrayList<>()) + .build(); + return KeyValue.pair(traceKey, trace); } } diff --git a/raw-spans-grouper/raw-spans-grouper/build.gradle.kts b/raw-spans-grouper/raw-spans-grouper/build.gradle.kts index 545c15579..789fe6238 100644 --- a/raw-spans-grouper/raw-spans-grouper/build.gradle.kts +++ b/raw-spans-grouper/raw-spans-grouper/build.gradle.kts @@ -37,7 +37,7 @@ dependencies { implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.33") implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.33") - implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.23") + implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.25") implementation("com.typesafe:config:1.4.1") implementation("de.javakaffee:kryo-serializers:0.45") implementation("io.confluent:kafka-avro-serializer:5.5.0") diff --git a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouper.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouper.java index 78ffe7406..f9701184b 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouper.java +++ b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouper.java @@ -11,7 +11,6 @@ import java.util.List; import java.util.Map; import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; @@ -73,7 +72,7 @@ public StreamsBuilder buildTopology( streamsBuilder.addStateStore(spanStoreBuilder); streamsBuilder.addStateStore(traceStateStoreBuilder); - Produced outputTopicProducer = Produced.with(Serdes.String(), null); + Produced outputTopicProducer = Produced.with(null, null); outputTopicProducer = outputTopicProducer.withName(OUTPUT_TOPIC_PRODUCER); inputStream diff --git a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java index c0a944c5b..388765fcf 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java +++ b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java @@ -51,7 +51,7 @@ * will get an additional {@link RawSpansProcessor#groupingWindowTimeoutMs} time to accept spans. */ public class RawSpansProcessor - implements Transformer> { + implements Transformer> { private static final Logger logger = LoggerFactory.getLogger(RawSpansProcessor.class); private static final String PROCESSING_LATENCY_TIMER = @@ -109,7 +109,7 @@ public void init(ProcessorContext context) { restorePunctuators(); } - public KeyValue transform(TraceIdentity key, RawSpan value) { + public KeyValue transform(TraceIdentity key, RawSpan value) { Instant start = Instant.now(); long currentTimeMs = System.currentTimeMillis(); diff --git a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/TraceEmitPunctuator.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/TraceEmitPunctuator.java index 2c2872357..c0de2c057 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/TraceEmitPunctuator.java +++ b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/TraceEmitPunctuator.java @@ -214,7 +214,7 @@ public void punctuate(long timestamp) { PUNCTUATE_LATENCY_TIMER, Map.of("tenantId", k))) .record(Duration.between(startTime, Instant.now()).toMillis(), TimeUnit.MILLISECONDS); - context.forward(null, trace, outputTopicProducer); + context.forward(key, trace, outputTopicProducer); } else { // implies spans for the trace have arrived within the last 'sessionTimeoutMs' interval // so the session inactivity window is extended from the last timestamp diff --git a/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java index f93123e41..4e6c7f2e4 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java +++ b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.TestOutputTopic; import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.test.TestRecord; import org.hypertrace.core.datamodel.Event; import org.hypertrace.core.datamodel.RawSpan; import org.hypertrace.core.datamodel.StructuredTrace; @@ -69,7 +70,7 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir traceIdentitySerde.serializer(), defaultValueSerde.serializer()); - TestOutputTopic outputTopic = + TestOutputTopic outputTopic = td.createOutputTopic( config.getString(RawSpanGrouperConstants.OUTPUT_TOPIC_CONFIG_KEY), Serdes.String().deserializer(), @@ -214,7 +215,7 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir td.advanceWallClockTime(Duration.ofSeconds(32)); // trace1 should have 2 span span1, span2 - StructuredTrace trace = (StructuredTrace) outputTopic.readValue(); + StructuredTrace trace = outputTopic.readValue(); assertEquals(2, trace.getEventList().size()); Set traceEventIds = trace.getEventList().stream() @@ -224,7 +225,7 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir assertTrue(traceEventIds.contains("event-2")); // trace2 should have 1 span span3 - trace = (StructuredTrace) outputTopic.readValue(); + trace = outputTopic.readValue(); assertEquals(1, trace.getEventList().size()); assertEquals("event-4", new String(trace.getEventList().get(0).getEventId().array())); @@ -235,12 +236,12 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir td.advanceWallClockTime(Duration.ofSeconds(35)); // trace1 should have 1 span i.e. span3 - trace = (StructuredTrace) outputTopic.readValue(); + trace = outputTopic.readValue(); assertEquals(1, trace.getEventList().size()); assertEquals("event-3", new String(trace.getEventList().get(0).getEventId().array())); // trace2 should have 1 span i.e. span4 - trace = (StructuredTrace) outputTopic.readValue(); + trace = outputTopic.readValue(); assertEquals(1, trace.getEventList().size()); assertEquals("event-5", new String(trace.getEventList().get(0).getEventId().array())); @@ -253,7 +254,7 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir td.advanceWallClockTime(Duration.ofSeconds(35)); // trace should be truncated with 5 spans - trace = (StructuredTrace) outputTopic.readValue(); + trace = outputTopic.readValue(); assertEquals(5, trace.getEventList().size()); // input 8 spans of trace-4 for tenant2, as there is global upper limit apply, it will emit only @@ -267,8 +268,11 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span18); inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span19); td.advanceWallClockTime(Duration.ofSeconds(35)); - trace = (StructuredTrace) outputTopic.readValue(); - assertEquals(6, trace.getEventList().size()); + + TestRecord testRecord = outputTopic.readRecord(); + + assertEquals(tenant2, testRecord.getKey().getTenantId()); + assertEquals(6, testRecord.getValue().getEventList().size()); } private Event createEvent(String eventId, String tenantId) { diff --git a/span-normalizer/span-normalizer/build.gradle.kts b/span-normalizer/span-normalizer/build.gradle.kts index a1815d7a7..25d5deca3 100644 --- a/span-normalizer/span-normalizer/build.gradle.kts +++ b/span-normalizer/span-normalizer/build.gradle.kts @@ -37,7 +37,7 @@ dependencies { implementation("org.hypertrace.core.datamodel:data-model:0.1.20") implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.33") implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.33") - implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.23") + implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.25") implementation("org.hypertrace.config.service:span-processing-config-service-api:0.1.27") implementation("org.hypertrace.config.service:config-utils:0.1.30") implementation("org.hypertrace.core.grpcutils:grpc-client-utils:0.7.1") From feae75dce5d6e8edb31b7c10adaf31c4a8eaf079 Mon Sep 17 00:00:00 2001 From: Ronak Date: Mon, 28 Mar 2022 17:19:12 +0530 Subject: [PATCH 2/2] fixed the failing test due serde config --- .../hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java index 4e6c7f2e4..3d96e9b53 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java +++ b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java @@ -16,7 +16,6 @@ import java.util.Set; import java.util.stream.Collectors; import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.TestInputTopic; @@ -73,7 +72,7 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir TestOutputTopic outputTopic = td.createOutputTopic( config.getString(RawSpanGrouperConstants.OUTPUT_TOPIC_CONFIG_KEY), - Serdes.String().deserializer(), + traceIdentitySerde.deserializer(), defaultValueSerde.deserializer()); String tenantId = "tenant1";