Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
package org.hypertrace.traceenricher.enrichment;

import static org.hypertrace.core.serviceframework.metrics.PlatformMetricsRegistry.registerCounter;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import java.util.ArrayList;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.hypertrace.core.datamodel.Edge;
import org.hypertrace.core.datamodel.Entity;
import org.hypertrace.core.datamodel.Event;
Expand All @@ -22,15 +32,34 @@ public class EnrichmentProcessor {
private static final String ENRICHMENT_ARRIVAL_TIME = "enrichment.arrival.time";
private static final Timer enrichmentArrivalTimer =
PlatformMetricsRegistry.registerTimer(DataflowMetricUtils.ARRIVAL_LAG, new HashMap<>());
private final List<Enricher> enrichers = new ArrayList<>();

// Must use linked hashmap
private final Map<String, Enricher> enrichers = new LinkedHashMap<>();

private static final String ENRICHED_TRACES_COUNTER = "hypertrace.enriched.traces";
Copy link
Contributor

@avinashkolluru avinashkolluru Oct 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We can get rid of the hypertrace. prefix in both the metric names

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the metric convention we are using across hypertrace projects.
Also, this may cause compatibility issue if we change the name and existing dashboards may be broken.

@kotharironak @rish691: please confirm if changing and simplifying this metric name is okay?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is exposed in ready to use dashboard - https://github.com/hypertrace/hypertrace/tree/main/kubernetes/monitoring/dashboards. So would suggest keeping it as is now. And, we can clean up if we decided to go with metrics.reporter based prefix only in application.conf

private static final ConcurrentMap<String, Counter> traceCounters = new ConcurrentHashMap<>();

private static final String ENRICHED_TRACES_TIMER = "hypertrace.trace.enrichment.latency";
private static final ConcurrentMap<String, Timer> traceTimers = new ConcurrentHashMap<>();

private static final String TRACE_ENRICHMENT_ERRORS_COUNTER =
"hypertrace.trace.enrichment.errors";
private static final ConcurrentMap<String, Counter> traceErrorsCounters =
new ConcurrentHashMap<>();

public EnrichmentProcessor(List<EnricherInfo> enricherInfoList, ClientRegistry clientRegistry) {
for (EnricherInfo enricherInfo : enricherInfoList) {
try {
if (enrichers.containsKey(enricherInfo.getName())) {
LOG.error("Duplicate enricher found. enricher name: {}", enricherInfo.getName());
throw new RuntimeException(
"Configuration error. Duplicate enricher found. enricher name: {}"
+ enricherInfo.getName());
}
Enricher enricher = enricherInfo.getClazz().getDeclaredConstructor().newInstance();
enricher.init(enricherInfo.getEnricherConfig(), clientRegistry);
LOG.info("Initialized the enricher: {}", enricherInfo.getClazz().getCanonicalName());
enrichers.add(enricher);
enrichers.put(enricherInfo.getName(), enricher);
} catch (Exception e) {
LOG.error("Exception initializing enricher:{}", enricherInfo, e);
}
Expand All @@ -42,13 +71,31 @@ public void process(StructuredTrace trace) {
DataflowMetricUtils.reportArrivalLagAndInsertTimestamp(
trace, enrichmentArrivalTimer, ENRICHMENT_ARRIVAL_TIME);
AvroToJsonLogger.log(LOG, "Structured Trace before all the enrichment is: {}", trace);
for (Enricher enricher : enrichers) {
for (Entry<String, Enricher> entry : enrichers.entrySet()) {
String metricKey = String.format("%s/%s", trace.getCustomerId(), entry.getKey());
Map<String, String> metricTags =
Map.of("tenantId", trace.getCustomerId(), "enricher", entry.getKey());
try {
applyEnricher(enricher, trace);
Instant start = Instant.now();
applyEnricher(entry.getValue(), trace);
long timeElapsed = Duration.between(start, Instant.now()).toMillis();

traceCounters
.computeIfAbsent(metricKey, k -> registerCounter(ENRICHED_TRACES_COUNTER, metricTags))
.increment();
traceTimers
.computeIfAbsent(
metricKey,
k -> PlatformMetricsRegistry.registerTimer(ENRICHED_TRACES_TIMER, metricTags))
.record(timeElapsed, TimeUnit.MILLISECONDS);
} catch (Exception e) {
traceErrorsCounters
.computeIfAbsent(
metricKey, k -> registerCounter(TRACE_ENRICHMENT_ERRORS_COUNTER, metricTags))
.increment();
LOG.error(
"Could not apply the enricher: {} to the trace with traceId: {}",
enricher.getClass().getCanonicalName(),
entry.getKey(),
HexUtils.getHex(trace.getTraceId()),
e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,43 +6,23 @@
import static org.hypertrace.traceenricher.trace.enricher.StructuredTraceEnricherConstants.STRUCTURED_TRACES_ENRICHMENT_JOB_CONFIG_KEY;

import com.typesafe.config.Config;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import java.time.Duration;
import java.time.Instant;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.hypertrace.core.datamodel.StructuredTrace;
import org.hypertrace.core.serviceframework.metrics.PlatformMetricsRegistry;
import org.hypertrace.traceenricher.enrichment.EnrichmentProcessor;
import org.hypertrace.traceenricher.enrichment.EnrichmentRegistry;
import org.hypertrace.traceenricher.enrichment.clients.DefaultClientRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StructuredTraceEnrichProcessor
implements Transformer<String, StructuredTrace, KeyValue<String, StructuredTrace>> {

private static final Logger logger =
LoggerFactory.getLogger(StructuredTraceEnrichProcessor.class);
private static EnrichmentProcessor processor = null;
private DefaultClientRegistry clientRegistry;

private static final String ENRICHED_TRACES_COUNTER = "hypertrace.enriched.traces";
private static final ConcurrentMap<String, Counter> tenantToEnrichedTraceCounter =
new ConcurrentHashMap<>();

private static final String ENRICHED_TRACES_TIMER = "hypertrace.trace.enrichment.latency";
private static final ConcurrentMap<String, Timer> tenantToEnrichmentTraceTimer =
new ConcurrentHashMap<>();

@Override
public void init(ProcessorContext context) {
if (processor == null) {
Expand All @@ -62,23 +42,7 @@ public void init(ProcessorContext context) {

@Override
public KeyValue<String, StructuredTrace> transform(String key, StructuredTrace value) {
Instant start = Instant.now();
processor.process(value);
long timeElapsed = Duration.between(start, Instant.now()).toMillis();

tenantToEnrichedTraceCounter
.computeIfAbsent(
value.getCustomerId(),
k ->
PlatformMetricsRegistry.registerCounter(
ENRICHED_TRACES_COUNTER, Map.of("tenantId", k)))
.increment();
tenantToEnrichmentTraceTimer
.computeIfAbsent(
value.getCustomerId(),
k ->
PlatformMetricsRegistry.registerTimer(ENRICHED_TRACES_TIMER, Map.of("tenantId", k)))
.record(timeElapsed, TimeUnit.MILLISECONDS);
return new KeyValue<>(null, value);
}

Expand Down