diff --git a/span-normalizer/helm/templates/span-normalizer-config.yaml b/span-normalizer/helm/templates/span-normalizer-config.yaml index d5fd1e3f1..18f167c46 100644 --- a/span-normalizer/helm/templates/span-normalizer-config.yaml +++ b/span-normalizer/helm/templates/span-normalizer-config.yaml @@ -60,6 +60,10 @@ data: {{- if hasKey .Values.spanNormalizerConfig.processor "rootExitSpanDropCriterion" }} rootExitSpanDropCriterion = {{ .Values.spanNormalizerConfig.processor.rootExitSpanDropCriterion | toJson }} {{- end }} + + {{- if hasKey .Values.spanNormalizerConfig.processor "excludeLogsTenantIds" }} + excludeLogsTenantIds = {{ .Values.spanNormalizerConfig.processor.excludeLogsTenantIds | toJson }} + {{- end }} } {{- end }} diff --git a/span-normalizer/span-normalizer/build.gradle.kts b/span-normalizer/span-normalizer/build.gradle.kts index 485bd4ba5..2b2c92b3f 100644 --- a/span-normalizer/span-normalizer/build.gradle.kts +++ b/span-normalizer/span-normalizer/build.gradle.kts @@ -66,5 +66,6 @@ dependencies { testImplementation("org.junit.jupiter:junit-jupiter:5.7.1") testImplementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.26") testImplementation("org.junit-pioneer:junit-pioneer:1.3.8") + testImplementation("org.mockito:mockito-core:3.8.0") testImplementation("org.apache.kafka:kafka-streams-test-utils:6.0.1-ccs") } diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanToLogRecordsTransformer.java b/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanToLogRecordsTransformer.java index fe6622594..c7e57d7a3 100644 --- a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanToLogRecordsTransformer.java +++ b/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanToLogRecordsTransformer.java @@ -1,11 +1,15 @@ package org.hypertrace.core.spannormalizer.jaeger; +import static org.hypertrace.core.spannormalizer.constants.SpanNormalizerConstants.SPAN_NORMALIZER_JOB_CONFIG; + import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.util.Timestamps; +import com.typesafe.config.Config; import io.jaegertracing.api_v2.JaegerSpanInternalModel; import io.jaegertracing.api_v2.JaegerSpanInternalModel.Span; import io.micrometer.core.instrument.Counter; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -27,6 +31,7 @@ public class JaegerSpanToLogRecordsTransformer private static final Logger LOG = LoggerFactory.getLogger(JaegerSpanToLogRecordsTransformer.class); + private static final String TENANT_IDS_TO_EXCLUDE_LOGS_CONFIG = "processor.excludeLogsTenantIds"; private static final String VALID_SPAN_WITH_LOGS_RECEIVED_COUNT = "hypertrace.reported.span.with.logs.processed"; @@ -34,9 +39,15 @@ public class JaegerSpanToLogRecordsTransformer private static final ConcurrentMap tenantToSpanWithLogsReceivedCount = new ConcurrentHashMap<>(); + private List tenantIdsToExclude; + @Override public void init(ProcessorContext context) { - // no-op + Config jobConfig = (Config) context.appConfigs().get(SPAN_NORMALIZER_JOB_CONFIG); + this.tenantIdsToExclude = + jobConfig.hasPath(TENANT_IDS_TO_EXCLUDE_LOGS_CONFIG) + ? jobConfig.getStringList(TENANT_IDS_TO_EXCLUDE_LOGS_CONFIG) + : Collections.emptyList(); } @Override @@ -44,7 +55,7 @@ public KeyValue transform(byte[] key, PreProcessedSpan prePro try { Span value = preProcessedSpan.getSpan(); String tenantId = preProcessedSpan.getTenantId(); - if (value.getLogsCount() == 0) { + if (value.getLogsCount() == 0 || tenantIdsToExclude.contains(tenantId)) { return null; } diff --git a/span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanToLogRecordsTransformerTest.java b/span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanToLogRecordsTransformerTest.java index ae7321798..182ead986 100644 --- a/span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanToLogRecordsTransformerTest.java +++ b/span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanToLogRecordsTransformerTest.java @@ -2,55 +2,84 @@ import com.google.protobuf.ByteString; import com.google.protobuf.Timestamp; +import com.typesafe.config.ConfigFactory; import io.jaegertracing.api_v2.JaegerSpanInternalModel; import io.jaegertracing.api_v2.JaegerSpanInternalModel.Log; import io.jaegertracing.api_v2.JaegerSpanInternalModel.Span; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; import org.hypertrace.core.datamodel.LogEvents; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; public class JaegerSpanToLogRecordsTransformerTest { @Test void testBuildLogEventRecords() { - Span span = - Span.newBuilder() - .setSpanId(ByteString.copyFrom("1".getBytes())) - .setTraceId(ByteString.copyFrom("trace-1".getBytes())) - .addTags( - JaegerSpanInternalModel.KeyValue.newBuilder() - .setKey("jaeger.servicename") - .setVStr("SERVICE_NAME") - .build()) - .addLogs( - Log.newBuilder() - .setTimestamp(Timestamp.newBuilder().setSeconds(5).build()) - .addFields( - JaegerSpanInternalModel.KeyValue.newBuilder() - .setKey("e1") - .setVStr("some event detail") - .build()) - .addFields( - JaegerSpanInternalModel.KeyValue.newBuilder() - .setKey("e2") - .setVStr("some event detail") - .build())) - .addLogs( - Log.newBuilder() - .setTimestamp(Timestamp.newBuilder().setSeconds(10).build()) - .addFields( - JaegerSpanInternalModel.KeyValue.newBuilder() - .setKey("z2") - .setVStr("some event detail") - .build())) - .build(); - LogEvents logEvents = - new JaegerSpanToLogRecordsTransformer().buildLogEventRecords(span, "tenant"); + new JaegerSpanToLogRecordsTransformer().buildLogEventRecords(getTestSpan(), "tenant"); Assertions.assertEquals(2, logEvents.getLogEvents().size()); Assertions.assertEquals( 2, logEvents.getLogEvents().get(0).getAttributes().getAttributeMap().size()); Assertions.assertEquals( 1, logEvents.getLogEvents().get(1).getAttributes().getAttributeMap().size()); } + + @Test + void testDropLogEventRecords() { + Map configs = new HashMap<>(); + configs.putAll( + Map.of( + "processor", + Map.of("tenantIdTagKey", "tenant-key", "excludeLogsTenantIds", List.of("tenant-1")))); + + ProcessorContext processorContext = Mockito.mock(ProcessorContext.class); + Mockito.when(processorContext.appConfigs()) + .thenReturn(Map.of("span-normalizer-job-config", ConfigFactory.parseMap(configs))); + JaegerSpanToLogRecordsTransformer jaegerSpanToLogRecordsTransformer = + new JaegerSpanToLogRecordsTransformer(); + jaegerSpanToLogRecordsTransformer.init(processorContext); + KeyValue keyValue = + jaegerSpanToLogRecordsTransformer.transform( + null, new PreProcessedSpan("tenant-1", getTestSpan())); + Assertions.assertNull(keyValue); + } + + private Span getTestSpan() { + return Span.newBuilder() + .setSpanId(ByteString.copyFrom("1".getBytes())) + .setTraceId(ByteString.copyFrom("trace-1".getBytes())) + .addTags( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("jaeger.servicename") + .setVStr("SERVICE_NAME") + .setKey("") + .build()) + .addLogs( + Log.newBuilder() + .setTimestamp(Timestamp.newBuilder().setSeconds(5).build()) + .addFields( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("e1") + .setVStr("some event detail") + .build()) + .addFields( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("e2") + .setVStr("some event detail") + .build())) + .addLogs( + Log.newBuilder() + .setTimestamp(Timestamp.newBuilder().setSeconds(10).build()) + .addFields( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("z2") + .setVStr("some event detail") + .build())) + .build(); + } }