Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ private Pair<List<Event>, List<Event>> getEventsInApiBoundary(
}

public ApiTraceGraph build() {
StructuredTraceGraph graph = StructuredTraceGraph.createGraph(trace);
StructuredTraceGraph graph = StructuredTraceGraphBuilder.buildGraph(trace);

List<ApiNode<Event>> apiNodes = buildApiNodes(graph);
this.setNodeList(apiNodes);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package org.hypertrace.traceenricher.trace.util;

import org.hypertrace.core.datamodel.StructuredTrace;
import org.hypertrace.core.datamodel.shared.StructuredTraceGraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StructuredTraceGraphBuilder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add unit test for this class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a utility wrapper method with a set of ordered condition. I think, you are referring here to test around the structure trace graph itself. Can handle it as part of #105. Can you add more detail to that ticket too?

private static final Logger LOG = LoggerFactory.getLogger(StructuredTraceGraphBuilder.class);

private static ThreadLocal<StructuredTraceGraph> cachedGraph = new ThreadLocal<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we want it specific to a Thread? Can we use Supplier instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, StructureTraceGraph's state is changing while processing a trace, and we want to re-construct whenever the state affecting its changes. So, we choose thread-local to share the state in a given kstream thread.
@s what do you mean by supplier here? Are you referring to supplying StructureTraceGraph at the start of processing of topology?
cc: @laxmanchekka

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kotharironak We use Suppliers in a similar way to cache the results https://mkyong.com/java8/java-8-supplier-examples/

Though I am not sure, if you can change the entry in supplier based on a certain condition, if the graph structure can potentially change

Copy link
Contributor

@findingrish findingrish Jan 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that it is a quick optimisation. My question is eventually should we instead of caching the StructuredTraceGraph at thread level, pass around the instance of this graph between enrichers? Anyways the graph is going to be changed for each Trace. We can just create this graph once for every trace at the entry point and other enrichers can use this graph. Any enrichment task can then update the StructuredTraceGraph if there is a need.
That way we also get rid of the need to compare the graphs and building graph multiple times.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, it is another way around, the trace is changing during the enrichment process while processing by going over every span and so we have to re-construct StructureTraceGraph every time if the trace state has been modified. Yes, we have to rethink the current processing, and if it's the right data structure, or can we do it differently.

private static ThreadLocal<StructuredTrace> cachedTrace = new ThreadLocal<>();

public static StructuredTraceGraph buildGraph(StructuredTrace trace) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kotharironak Basic question, Why buildGraph is called for every span. Can we call only once for Trace instead of every span ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pavan-traceable Where does the buildGraph get called for every span?

Copy link
Contributor

@pavan-traceable pavan-traceable Jan 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pavan-traceable buildGraph is called at 3 places,
ApiBoundaryTypeAttributeEnricher
DefaultServiceEntityEnricher
BackendEntityEnricher
Also we work with complete trace in this service, so buildGraph is invoked at Trace level

// trace doesn't exist
if (cachedTrace.get() == null) {
LOG.info("Building structured trace graph. Reason: no cached trace");
StructuredTraceGraph graph = StructuredTraceGraph.createGraph(trace);
cachedTrace.set(StructuredTrace.newBuilder(trace).build());
cachedGraph.set(graph);
return graph;
}

// is processed and cached are same trace?
if (!cachedTrace.get().getCustomerId().equals(trace.getCustomerId()) ||
!cachedTrace.get().getTraceId().equals(trace.getTraceId())) {
LOG.info("Building structured trace graph. Reason: cached trace and current trace doesn't not match");
StructuredTraceGraph graph = StructuredTraceGraph.createGraph(trace);
cachedTrace.set(StructuredTrace.newBuilder(trace).build());
cachedGraph.set(graph);
return graph;
}

// trace internally changed
if(cachedTrace.get().getEntityList().size() != trace.getEntityList().size() ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When this scenario will occur ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we pull this out in a separate boolean function? say hasTraceChanged?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the set of attributes and enriched attributes? If the attributes and enriched attributes change, the graph structure will stay the same, but will we get the latest set of attributes from the graph nodes?

Basically, Does it create a copy of the Event, or just keeps the reference to it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is keeping reference. ApiTraceGraph is using getRootEvents, but it is not in the enrichment process. @skjindal93 Can you also re-check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For hasTraceChanged currently, remaining condition of functions - isTraceSame and doeTraceExists works together for this function. So, if we move out, we have to move those conditions too. So, thinking of keeping this as-is for now.

cachedTrace.get().getEventList().size() != trace.getEventList().size() ||
cachedTrace.get().getEntityEdgeList().size() != trace.getEntityEdgeList().size() ||
cachedTrace.get().getEntityEventEdgeList().size() != trace.getEntityEventEdgeList().size() ||
cachedTrace.get().getEventEdgeList().size() != trace.getEventEdgeList().size()) {
LOG.info("Building structured trace graph. Reason: cached trace and current trace have different size");
StructuredTraceGraph graph = StructuredTraceGraph.createGraph(trace);
cachedTrace.set(StructuredTrace.newBuilder(trace).build());
cachedGraph.set(graph);
return graph;
}

return cachedGraph.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.hypertrace.core.datamodel.StructuredTrace;
import org.hypertrace.core.datamodel.shared.StructuredTraceGraph;
import org.hypertrace.entity.data.service.client.EntityDataServiceClientProvider;
import org.hypertrace.traceenricher.trace.util.StructuredTraceGraphBuilder;

public abstract class AbstractTraceEnricher implements Enricher {

Expand Down Expand Up @@ -52,7 +53,7 @@ public void enrichTrace(StructuredTrace trace) {
* Wrapper to the structure graph factory for testing
*/
public StructuredTraceGraph buildGraph(StructuredTrace trace) {
return StructuredTraceGraph.createGraph(trace);
return StructuredTraceGraphBuilder.buildGraph(trace);
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
import org.hypertrace.core.datamodel.shared.SpanAttributeUtils;
import org.hypertrace.core.datamodel.shared.StructuredTraceGraph;
import org.hypertrace.core.datamodel.shared.trace.AttributeValueCreator;
import org.hypertrace.core.span.constants.RawSpanConstants;
import org.hypertrace.core.span.constants.v1.JaegerAttribute;
import org.hypertrace.entity.constants.v1.ServiceAttribute;
import org.hypertrace.entity.data.service.client.EdsClient;
import org.hypertrace.entity.data.service.client.EntityDataServiceClientProvider;
Expand Down Expand Up @@ -69,7 +67,7 @@ public void enrichEvent(StructuredTrace trace, Event event) {
// 2. Enrich the exit span with the parent span's service entity.
// This will enable creating an edge between the exit span and the backend

StructuredTraceGraph graph = StructuredTraceGraph.createGraph(trace);
StructuredTraceGraph graph = buildGraph(trace);
if (EnrichedSpanUtils.isExitSpan(event) &&
SpanAttributeUtils.isLeafSpan(graph, event)) {
String parentSvcName = findServiceNameOfFirstAncestorThatIsNotAnExitSpanAndBelongsToADifferentService(event,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import static org.hypertrace.traceenricher.trace.enricher.StructuredTraceEnricherConstants.STRUCTURED_TRACES_ENRICHMENT_JOB_CONFIG_KEY;

import com.typesafe.config.Config;
import java.time.Duration;
import java.time.Instant;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -15,10 +17,13 @@
import org.hypertrace.entity.data.service.client.DefaultEdsClientProvider;
import org.hypertrace.traceenricher.enrichment.EnrichmentProcessor;
import org.hypertrace.traceenricher.enrichment.EnrichmentRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StructuredTraceEnrichProcessor implements
Transformer<String, StructuredTrace, KeyValue<String, StructuredTrace>> {

private static final Logger logger = LoggerFactory.getLogger(StructuredTraceEnrichProcessor.class);
private static EnrichmentProcessor processor = null;

@Override
Expand All @@ -38,7 +43,11 @@ public void init(ProcessorContext context) {

@Override
public KeyValue<String, StructuredTrace> transform(String key, StructuredTrace value) {
Instant start = Instant.now();
processor.process(value);
Instant finish = Instant.now();
long timeElapsed = Duration.between(start, finish).toMillis();
logger.debug("Time taken by enrichment process for key:{} is :{}", key, timeElapsed);
return new KeyValue<>(null, value);
}

Expand Down