Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hypertrace-ingester/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ hypertraceDocker {
}

dependencies {
implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.23")
implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.25")
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.33")
implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.33")
implementation("org.hypertrace.core.datamodel:data-model:0.1.20")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ dependencies {
implementation(project(":hypertrace-view-generator:hypertrace-view-generator-api"))
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.33")
implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.33")
implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.23")
implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.25")

// open telemetry proto
implementation("io.opentelemetry:opentelemetry-proto:1.6.0-alpha")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ dependencies {
// frameworks
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.33")
implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.33")
implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.23")
implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.25")

// open telemetry proto
implementation("io.opentelemetry:opentelemetry-proto:1.6.0-alpha")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@ tasks.test {

dependencies {
implementation(project(":hypertrace-trace-enricher:hypertrace-trace-enricher-impl"))
implementation(project(":span-normalizer:span-normalizer-api"))
implementation("org.hypertrace.core.datamodel:data-model:0.1.20")
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.33")
implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.33")
implementation("org.hypertrace.entity.service:entity-service-client:0.8.5")

implementation("com.typesafe:config:1.4.1")
implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.23")
implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.25")
constraints {
runtimeOnly("io.netty:netty-codec-http2:4.1.71.Final")
runtimeOnly("io.netty:netty-handler-proxy:4.1.71.Final")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.hypertrace.core.datamodel.StructuredTrace;
import org.hypertrace.core.spannormalizer.TraceIdentity;
import org.hypertrace.traceenricher.enrichment.EnrichmentProcessor;
import org.hypertrace.traceenricher.enrichment.EnrichmentRegistry;
import org.hypertrace.traceenricher.enrichment.clients.DefaultClientRegistry;

public class StructuredTraceEnrichProcessor
implements Transformer<String, StructuredTrace, KeyValue<String, StructuredTrace>> {
implements Transformer<TraceIdentity, StructuredTrace, KeyValue<String, StructuredTrace>> {

private static EnrichmentProcessor processor = null;
private DefaultClientRegistry clientRegistry;
Expand Down Expand Up @@ -53,7 +54,7 @@ public void init(ProcessorContext context) {
}

@Override
public KeyValue<String, StructuredTrace> transform(String key, StructuredTrace value) {
public KeyValue<String, StructuredTrace> transform(TraceIdentity key, StructuredTrace value) {
processor.process(value);
return new KeyValue<>(null, value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.hypertrace.core.datamodel.StructuredTrace;
import org.hypertrace.core.kafkastreams.framework.KafkaStreamsApp;
import org.hypertrace.core.serviceframework.config.ConfigClient;
import org.hypertrace.core.spannormalizer.TraceIdentity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,10 +38,10 @@ public StreamsBuilder buildTopology(
String inputTopic = jobConfig.getString(INPUT_TOPIC_CONFIG_KEY);
String outputTopic = jobConfig.getString(OUTPUT_TOPIC_CONFIG_KEY);

KStream<String, StructuredTrace> inputStream =
(KStream<String, StructuredTrace>) inputStreams.get(inputTopic);
KStream<TraceIdentity, StructuredTrace> inputStream =
(KStream<TraceIdentity, StructuredTrace>) inputStreams.get(inputTopic);
if (inputStream == null) {
inputStream = streamsBuilder.stream(inputTopic, Consumed.with(Serdes.String(), null));
inputStream = streamsBuilder.stream(inputTopic, Consumed.with(null, null));
inputStreams.put(inputTopic, inputStream);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import static org.junit.jupiter.api.Assertions.assertEquals;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.nio.ByteBuffer;
Expand All @@ -7,18 +9,15 @@
import java.util.Properties;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.*;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to be specific? Or was this auto-suggested of replacing to * import?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done by spotless I guess. it prefers wildcard when number of classes used are more than a threshold

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Must be done by IDE. AFAIK spotless doesn't make this change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we need to set a high number like 999 as import class count to be optimised with * in intellij.

import org.hypertrace.core.datamodel.Attributes;
import org.hypertrace.core.datamodel.StructuredTrace;
import org.hypertrace.core.datamodel.shared.HexUtils;
import org.hypertrace.core.kafkastreams.framework.serdes.AvroSerde;
import org.hypertrace.core.serviceframework.config.ConfigClientFactory;
import org.hypertrace.core.spannormalizer.TraceIdentity;
import org.hypertrace.traceenricher.trace.enricher.StructuredTraceEnricherConstants;
import org.hypertrace.traceenricher.trace.enricher.TraceEnricher;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junitpioneer.jupiter.SetEnvironmentVariable;
Expand Down Expand Up @@ -64,48 +63,64 @@ public void testTraceEnricherTopology() {
// create topology test driver for trace-enricher
TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), props);

Serde<TraceIdentity> traceKeySerde = new AvroSerde<>();
Serde<StructuredTrace> htStructuredTraceSerde = new AvroSerde<>();

// create input topic for HT-model StructuredTrace
TestInputTopic<String, StructuredTrace> inputTopic =
TestInputTopic<TraceIdentity, StructuredTrace> inputTopic =
topologyTestDriver.createInputTopic(
config.getString(StructuredTraceEnricherConstants.INPUT_TOPIC_CONFIG_KEY),
Serdes.String().serializer(),
traceKeySerde.serializer(),
htStructuredTraceSerde.serializer());

// create output topic for closed-model StructuredTrace
TestOutputTopic outputTopic =
TestOutputTopic<String, StructuredTrace> outputTopic =
topologyTestDriver.createOutputTopic(
config.getString(StructuredTraceEnricherConstants.OUTPUT_TOPIC_CONFIG_KEY),
Serdes.String().deserializer(),
htStructuredTraceSerde.deserializer());

// create instance of HT-model StructuredTrace
StructuredTrace htStructuredTrace = createHTStructuredTrace("customer1", "1234");
KeyValue<TraceIdentity, StructuredTrace> traceKVPair1 =
createHTStructuredTrace("customer1", "1234");
KeyValue<TraceIdentity, StructuredTrace> traceKVPair2 =
createHTStructuredTrace("customer2", "5678");

// Write an input record into input topic
inputTopic.pipeInput(htStructuredTrace);
inputTopic.pipeInput(traceKVPair1.key, traceKVPair1.value);
// enricher should be able to handle null keys as well for backward compatibility
inputTopic.pipeInput(null, traceKVPair2.value);

// Read the output record from output topic
StructuredTrace structuredTrace = (StructuredTrace) outputTopic.readValue();
StructuredTrace structuredTrace1 = outputTopic.readValue();
StructuredTrace structuredTrace2 = outputTopic.readValue();

Assertions.assertEquals(
HexUtils.getHex("1234".getBytes()), HexUtils.getHex(structuredTrace.getTraceId()));
assertEquals(
HexUtils.getHex("1234".getBytes()), HexUtils.getHex(structuredTrace1.getTraceId()));
assertEquals(
HexUtils.getHex("5678".getBytes()), HexUtils.getHex(structuredTrace2.getTraceId()));
}

private org.hypertrace.core.datamodel.StructuredTrace createHTStructuredTrace(
private KeyValue<TraceIdentity, StructuredTrace> createHTStructuredTrace(
String customerId, String traceId) {
return StructuredTrace.newBuilder()
.setCustomerId(customerId)
.setTraceId(ByteBuffer.wrap(traceId.getBytes()))
.setStartTimeMillis(System.currentTimeMillis() - 10000)
.setEndTimeMillis(System.currentTimeMillis())
.setAttributes(Attributes.newBuilder().build())
.setEntityList(new ArrayList<>())
.setEntityEdgeList(new ArrayList<>())
.setEventEdgeList(new ArrayList<>())
.setEntityEventEdgeList(new ArrayList<>())
.setEventList(new ArrayList<>())
.build();
TraceIdentity traceKey =
TraceIdentity.newBuilder()
.setTenantId(customerId)
.setTraceId(ByteBuffer.wrap(traceId.getBytes()))
.build();
StructuredTrace trace =
StructuredTrace.newBuilder()
.setCustomerId(customerId)
.setTraceId(ByteBuffer.wrap(traceId.getBytes()))
.setStartTimeMillis(System.currentTimeMillis() - 10000)
.setEndTimeMillis(System.currentTimeMillis())
.setAttributes(Attributes.newBuilder().build())
.setEntityList(new ArrayList<>())
.setEntityEdgeList(new ArrayList<>())
.setEventEdgeList(new ArrayList<>())
.setEntityEventEdgeList(new ArrayList<>())
.setEventList(new ArrayList<>())
.build();
return KeyValue.pair(traceKey, trace);
}
}
2 changes: 1 addition & 1 deletion raw-spans-grouper/raw-spans-grouper/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ dependencies {
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.33")
implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.33")

implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.23")
implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.25")
implementation("com.typesafe:config:1.4.1")
implementation("de.javakaffee:kryo-serializers:0.45")
implementation("io.confluent:kafka-avro-serializer:5.5.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
Expand Down Expand Up @@ -73,7 +72,7 @@ public StreamsBuilder buildTopology(
streamsBuilder.addStateStore(spanStoreBuilder);
streamsBuilder.addStateStore(traceStateStoreBuilder);

Produced<String, StructuredTrace> outputTopicProducer = Produced.with(Serdes.String(), null);
Produced<TraceIdentity, StructuredTrace> outputTopicProducer = Produced.with(null, null);
outputTopicProducer = outputTopicProducer.withName(OUTPUT_TOPIC_PRODUCER);

inputStream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
* will get an additional {@link RawSpansProcessor#groupingWindowTimeoutMs} time to accept spans.
*/
public class RawSpansProcessor
implements Transformer<TraceIdentity, RawSpan, KeyValue<String, StructuredTrace>> {
implements Transformer<TraceIdentity, RawSpan, KeyValue<TraceIdentity, StructuredTrace>> {

private static final Logger logger = LoggerFactory.getLogger(RawSpansProcessor.class);
private static final String PROCESSING_LATENCY_TIMER =
Expand Down Expand Up @@ -109,7 +109,7 @@ public void init(ProcessorContext context) {
restorePunctuators();
}

public KeyValue<String, StructuredTrace> transform(TraceIdentity key, RawSpan value) {
public KeyValue<TraceIdentity, StructuredTrace> transform(TraceIdentity key, RawSpan value) {
Instant start = Instant.now();
long currentTimeMs = System.currentTimeMillis();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public void punctuate(long timestamp) {
PUNCTUATE_LATENCY_TIMER, Map.of("tenantId", k)))
.record(Duration.between(startTime, Instant.now()).toMillis(), TimeUnit.MILLISECONDS);

context.forward(null, trace, outputTopicProducer);
context.forward(key, trace, outputTopicProducer);
} else {
// implies spans for the trace have arrived within the last 'sessionTimeoutMs' interval
// so the session inactivity window is extended from the last timestamp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.test.TestRecord;
import org.hypertrace.core.datamodel.Event;
import org.hypertrace.core.datamodel.RawSpan;
import org.hypertrace.core.datamodel.StructuredTrace;
Expand Down Expand Up @@ -69,10 +69,10 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir
traceIdentitySerde.serializer(),
defaultValueSerde.serializer());

TestOutputTopic outputTopic =
TestOutputTopic<TraceIdentity, StructuredTrace> outputTopic =
td.createOutputTopic(
config.getString(RawSpanGrouperConstants.OUTPUT_TOPIC_CONFIG_KEY),
Serdes.String().deserializer(),
traceIdentitySerde.deserializer(),
defaultValueSerde.deserializer());

String tenantId = "tenant1";
Expand Down Expand Up @@ -214,7 +214,7 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir
td.advanceWallClockTime(Duration.ofSeconds(32));

// trace1 should have 2 span span1, span2
StructuredTrace trace = (StructuredTrace) outputTopic.readValue();
StructuredTrace trace = outputTopic.readValue();
assertEquals(2, trace.getEventList().size());
Set<String> traceEventIds =
trace.getEventList().stream()
Expand All @@ -224,7 +224,7 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir
assertTrue(traceEventIds.contains("event-2"));

// trace2 should have 1 span span3
trace = (StructuredTrace) outputTopic.readValue();
trace = outputTopic.readValue();
assertEquals(1, trace.getEventList().size());
assertEquals("event-4", new String(trace.getEventList().get(0).getEventId().array()));

Expand All @@ -235,12 +235,12 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir
td.advanceWallClockTime(Duration.ofSeconds(35));

// trace1 should have 1 span i.e. span3
trace = (StructuredTrace) outputTopic.readValue();
trace = outputTopic.readValue();
assertEquals(1, trace.getEventList().size());
assertEquals("event-3", new String(trace.getEventList().get(0).getEventId().array()));

// trace2 should have 1 span i.e. span4
trace = (StructuredTrace) outputTopic.readValue();
trace = outputTopic.readValue();
assertEquals(1, trace.getEventList().size());
assertEquals("event-5", new String(trace.getEventList().get(0).getEventId().array()));

Expand All @@ -253,7 +253,7 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir
td.advanceWallClockTime(Duration.ofSeconds(35));

// trace should be truncated with 5 spans
trace = (StructuredTrace) outputTopic.readValue();
trace = outputTopic.readValue();
assertEquals(5, trace.getEventList().size());

// input 8 spans of trace-4 for tenant2, as there is global upper limit apply, it will emit only
Expand All @@ -267,8 +267,11 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir
inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span18);
inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span19);
td.advanceWallClockTime(Duration.ofSeconds(35));
trace = (StructuredTrace) outputTopic.readValue();
assertEquals(6, trace.getEventList().size());

TestRecord<TraceIdentity, StructuredTrace> testRecord = outputTopic.readRecord();

assertEquals(tenant2, testRecord.getKey().getTenantId());
assertEquals(6, testRecord.getValue().getEventList().size());
}

private Event createEvent(String eventId, String tenantId) {
Expand Down
2 changes: 1 addition & 1 deletion span-normalizer/span-normalizer/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ dependencies {
implementation("org.hypertrace.core.datamodel:data-model:0.1.20")
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.33")
implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.33")
implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.23")
implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.25")
implementation("org.hypertrace.config.service:span-processing-config-service-api:0.1.27")
implementation("org.hypertrace.config.service:config-utils:0.1.30")
implementation("org.hypertrace.core.grpcutils:grpc-client-utils:0.7.1")
Expand Down