Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions span-normalizer/helm/templates/span-normalizer-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ data:
{{- if hasKey .Values.spanNormalizerConfig.processor "rootExitSpanDropCriterion" }}
rootExitSpanDropCriterion = {{ .Values.spanNormalizerConfig.processor.rootExitSpanDropCriterion | toJson }}
{{- end }}

{{- if hasKey .Values.spanNormalizerConfig.processor "excludeLogsTenantIds" }}
excludeLogsTenantIds = {{ .Values.spanNormalizerConfig.processor.excludeLogsTenantIds | toJson }}
{{- end }}
}
{{- end }}

Expand Down
1 change: 1 addition & 0 deletions span-normalizer/span-normalizer/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,6 @@ dependencies {
testImplementation("org.junit.jupiter:junit-jupiter:5.7.1")
testImplementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.26")
testImplementation("org.junit-pioneer:junit-pioneer:1.3.8")
testImplementation("org.mockito:mockito-core:3.8.0")
testImplementation("org.apache.kafka:kafka-streams-test-utils:6.0.1-ccs")
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package org.hypertrace.core.spannormalizer.jaeger;

import static org.hypertrace.core.spannormalizer.constants.SpanNormalizerConstants.SPAN_NORMALIZER_JOB_CONFIG;

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.util.Timestamps;
import com.typesafe.config.Config;
import io.jaegertracing.api_v2.JaegerSpanInternalModel;
import io.jaegertracing.api_v2.JaegerSpanInternalModel.Span;
import io.micrometer.core.instrument.Counter;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -27,24 +31,31 @@ public class JaegerSpanToLogRecordsTransformer

private static final Logger LOG =
LoggerFactory.getLogger(JaegerSpanToLogRecordsTransformer.class);
private static final String TENANT_IDS_TO_EXCLUDE_LOGS_CONFIG = "processor.excludeLogsTenantIds";

private static final String VALID_SPAN_WITH_LOGS_RECEIVED_COUNT =
"hypertrace.reported.span.with.logs.processed";

private static final ConcurrentMap<String, Counter> tenantToSpanWithLogsReceivedCount =
new ConcurrentHashMap<>();

private List<String> tenantIdsToExclude;

@Override
public void init(ProcessorContext context) {
// no-op
Config jobConfig = (Config) context.appConfigs().get(SPAN_NORMALIZER_JOB_CONFIG);
this.tenantIdsToExclude =
jobConfig.hasPath(TENANT_IDS_TO_EXCLUDE_LOGS_CONFIG)
? jobConfig.getStringList(TENANT_IDS_TO_EXCLUDE_LOGS_CONFIG)
: Collections.emptyList();
}

@Override
public KeyValue<String, LogEvents> transform(byte[] key, PreProcessedSpan preProcessedSpan) {
try {
Span value = preProcessedSpan.getSpan();
String tenantId = preProcessedSpan.getTenantId();
if (value.getLogsCount() == 0) {
if (value.getLogsCount() == 0 || tenantIdsToExclude.contains(tenantId)) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,55 +2,84 @@

import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import com.typesafe.config.ConfigFactory;
import io.jaegertracing.api_v2.JaegerSpanInternalModel;
import io.jaegertracing.api_v2.JaegerSpanInternalModel.Log;
import io.jaegertracing.api_v2.JaegerSpanInternalModel.Span;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.hypertrace.core.datamodel.LogEvents;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

public class JaegerSpanToLogRecordsTransformerTest {

@Test
void testBuildLogEventRecords() {
Span span =
Span.newBuilder()
.setSpanId(ByteString.copyFrom("1".getBytes()))
.setTraceId(ByteString.copyFrom("trace-1".getBytes()))
.addTags(
JaegerSpanInternalModel.KeyValue.newBuilder()
.setKey("jaeger.servicename")
.setVStr("SERVICE_NAME")
.build())
.addLogs(
Log.newBuilder()
.setTimestamp(Timestamp.newBuilder().setSeconds(5).build())
.addFields(
JaegerSpanInternalModel.KeyValue.newBuilder()
.setKey("e1")
.setVStr("some event detail")
.build())
.addFields(
JaegerSpanInternalModel.KeyValue.newBuilder()
.setKey("e2")
.setVStr("some event detail")
.build()))
.addLogs(
Log.newBuilder()
.setTimestamp(Timestamp.newBuilder().setSeconds(10).build())
.addFields(
JaegerSpanInternalModel.KeyValue.newBuilder()
.setKey("z2")
.setVStr("some event detail")
.build()))
.build();

LogEvents logEvents =
new JaegerSpanToLogRecordsTransformer().buildLogEventRecords(span, "tenant");
new JaegerSpanToLogRecordsTransformer().buildLogEventRecords(getTestSpan(), "tenant");
Assertions.assertEquals(2, logEvents.getLogEvents().size());
Assertions.assertEquals(
2, logEvents.getLogEvents().get(0).getAttributes().getAttributeMap().size());
Assertions.assertEquals(
1, logEvents.getLogEvents().get(1).getAttributes().getAttributeMap().size());
}

@Test
void testDropLogEventRecords() {
Map<String, Object> configs = new HashMap<>();
configs.putAll(
Map.of(
"processor",
Map.of("tenantIdTagKey", "tenant-key", "excludeLogsTenantIds", List.of("tenant-1"))));

ProcessorContext processorContext = Mockito.mock(ProcessorContext.class);
Mockito.when(processorContext.appConfigs())
.thenReturn(Map.of("span-normalizer-job-config", ConfigFactory.parseMap(configs)));
JaegerSpanToLogRecordsTransformer jaegerSpanToLogRecordsTransformer =
new JaegerSpanToLogRecordsTransformer();
jaegerSpanToLogRecordsTransformer.init(processorContext);
KeyValue<String, LogEvents> keyValue =
jaegerSpanToLogRecordsTransformer.transform(
null, new PreProcessedSpan("tenant-1", getTestSpan()));
Assertions.assertNull(keyValue);
}

private Span getTestSpan() {
return Span.newBuilder()
.setSpanId(ByteString.copyFrom("1".getBytes()))
.setTraceId(ByteString.copyFrom("trace-1".getBytes()))
.addTags(
JaegerSpanInternalModel.KeyValue.newBuilder()
.setKey("jaeger.servicename")
.setVStr("SERVICE_NAME")
.setKey("")
.build())
.addLogs(
Log.newBuilder()
.setTimestamp(Timestamp.newBuilder().setSeconds(5).build())
.addFields(
JaegerSpanInternalModel.KeyValue.newBuilder()
.setKey("e1")
.setVStr("some event detail")
.build())
.addFields(
JaegerSpanInternalModel.KeyValue.newBuilder()
.setKey("e2")
.setVStr("some event detail")
.build()))
.addLogs(
Log.newBuilder()
.setTimestamp(Timestamp.newBuilder().setSeconds(10).build())
.addFields(
JaegerSpanInternalModel.KeyValue.newBuilder()
.setKey("z2")
.setVStr("some event detail")
.build()))
.build();
}
}