Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust the span match rule of the attached event #10069

Merged
merged 4 commits into from Dec 1, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -276,29 +277,46 @@ private void appendAttachedEventsToSpan(List<Span> spans, List<SpanAttachedEvent
return;
}

// sort by start time
events.sort((e1, e2) -> {
final int second = Long.compare(e1.getStartTimeSecond(), e2.getStartTimeSecond());
if (second == 0) {
return Long.compare(e1.getStartTimeNanos(), e2.getStartTimeNanos());
}
return second;
});

final HashMap<String, Span> spanMatcher = new HashMap<>();
for (SpanAttachedEventRecord record : events) {
if (!StringUtils.isNumeric(record.getTraceSpanId())) {
continue;
}
SpanAttachedEvent event = SpanAttachedEvent.parseFrom(record.getDataBinary());
final String spanMatcherKey = record.getTraceSegmentId() + "_" + record.getTraceSpanId();
Span span = spanMatcher.get(spanMatcherKey);
if (span == null) {
// find the matches span
final int eventSpanId = Integer.parseInt(record.getTraceSpanId());
span = spans.stream().filter(s -> Objects.equals(s.getSegmentId(), record.getTraceSegmentId()) &&
Objects.equals(s.getSpanId(), eventSpanId)).findFirst().orElse(null);
mrproliu marked this conversation as resolved.
Show resolved Hide resolved
if (span == null) {
continue;
}

// find matches span
final int eventSpanId = Integer.parseInt(record.getTraceSpanId());
Span matchesSpan = spans.stream().filter(s -> Objects.equals(s.getSegmentId(), record.getTraceSegmentId()) &&
Objects.equals(s.getSpanId(), eventSpanId)).findFirst().orElse(null);
if (matchesSpan == null) {
continue;
}
// if the event is server side, then needs to change to the upstream span
final String direction = getSpanAttachedEventTagValue(event.getTagsList(), "data_direction");
final String type = getSpanAttachedEventTagValue(event.getTagsList(), "data_type");

// find the first entry span of upstream if the event from the upstream
SpanAttachedEvent event = SpanAttachedEvent.parseFrom(record.getDataBinary());
final String bindToTheUpstreamEntrySpan = getSpanAttachedEventTagValue(event.getTagsList(), "bind to upstream span");
if (Objects.equals(bindToTheUpstreamEntrySpan, "true")) {
final String parentSpanId = matchesSpan.getSegmentSpanId();
matchesSpan = spans.stream().filter(s -> s.getSegmentParentSpanId().equals(parentSpanId)
&& Objects.equals(s.getType(), SpanType.Entry.name())).findFirst().orElse(matchesSpan);
if (("request".equals(type) && "inbound".equals(direction)) || ("response".equals(type) && "outbound".equals(direction))) {
final String parentSpanId = span.getSegmentSpanId();
span = spans.stream().filter(s -> s.getSegmentParentSpanId().equals(parentSpanId)
&& Objects.equals(s.getType(), SpanType.Entry.name())).findFirst().orElse(span);
}

spanMatcher.put(spanMatcherKey, span);
}

matchesSpan.getAttachedEvents().add(parseEvent(event));
span.getAttachedEvents().add(parseEvent(event));
}
}

Expand Down
Expand Up @@ -311,34 +311,54 @@ private void appendEvents(List<Span> spans, List<SpanAttachedEventRecord> events

final List<Tuple2<Integer, Span>> spanWithIndex = IntStream.range(0, spans.size()).mapToObj(i -> Tuple.of(i, spans.get(i))).collect(Collectors.toList());

final Map<String, List<SpanAttachedEventRecord>> namedEvents = events.stream().collect(Collectors.groupingBy(SpanAttachedEventRecord::getEvent, Collectors.toList()));
final Map<Integer, Span.Builder> spanCache = new HashMap<>();
for (Map.Entry<String, List<SpanAttachedEventRecord>> entry : namedEvents.entrySet()) {
for (int i = 1; i <= entry.getValue().size(); i++) {
final SpanAttachedEventRecord record = entry.getValue().get(i - 1);
String eventName = record.getEvent() + (entry.getValue().size() == 1 ? "" : "-" + i);
Tuple2<Integer, Span> matchesSpan = spanWithIndex.stream().filter(s -> Objects.equals(s._2.id(), record.getTraceSpanId())).
findFirst().orElse(null);
if (matchesSpan == null) {
continue;
}
// sort by start time
events.stream().sorted((e1, e2) -> {
mrproliu marked this conversation as resolved.
Show resolved Hide resolved
final int second = Long.compare(e1.getStartTimeSecond(), e2.getStartTimeSecond());
if (second == 0) {
return Long.compare(e1.getStartTimeNanos(), e2.getStartTimeNanos());
}
return second;
});

final Map<String, List<SpanAttachedEventRecord>> namedEvents = events.stream()
.collect(Collectors.groupingBy(SpanAttachedEventRecord::getEvent, Collectors.toList()));

final Map<String, Tuple2<Span.Builder, Integer>> spanCache = new HashMap<>();
for (Map.Entry<String, List<SpanAttachedEventRecord>> namedEntry : namedEvents.entrySet()) {
for (int i = 1; i <= namedEntry.getValue().size(); i++) {
final SpanAttachedEventRecord record = namedEntry.getValue().get(i - 1);
String eventName = record.getEvent() + (namedEntry.getValue().size() == 1 ? "" : "-" + i);
final SpanAttachedEvent event = SpanAttachedEvent.parseFrom(record.getDataBinary());
final String bindToTheUpstreamEntrySpan = getSpanAttachedEventTagValue(event.getTagsList(), "bind to upstream span");
if (Objects.equals(bindToTheUpstreamEntrySpan, "true")) {
final String parentSpanId = matchesSpan._2.id();
matchesSpan = spanWithIndex.stream().filter(s -> s._2.parentId().equals(parentSpanId)
&& Objects.equals(s._2.kind(), Span.Kind.SERVER)).findFirst().orElse(matchesSpan);

// find matched span
Tuple2<Span.Builder, Integer> spanBuilder = spanCache.get(record.getTraceSpanId());
if (spanBuilder == null) {
Tuple2<Integer, Span> matchesSpan = spanWithIndex.stream().filter(s -> Objects.equals(s._2.id(), record.getTraceSpanId())).
findFirst().orElse(null);
if (matchesSpan == null) {
continue;
}

// if the event is server side, then needs to change to the upstream span
final String direction = getSpanAttachedEventTagValue(event.getTagsList(), "data_direction");
final String type = getSpanAttachedEventTagValue(event.getTagsList(), "data_type");
if (("request".equals(type) && "inbound".equals(direction)) || ("response".equals(type) && "outbound".equals(direction))) {
final String parentSpanId = matchesSpan._2.id();
matchesSpan = spanWithIndex.stream().filter(s -> Objects.equals(s._2.parentId(), parentSpanId)
&& Objects.equals(s._2.kind(), Span.Kind.SERVER)).findFirst().orElse(matchesSpan);
}

spanBuilder = Tuple.of(matchesSpan._2.toBuilder(), matchesSpan._1);
spanCache.put(record.getTraceSpanId(), spanBuilder);
}

final Span.Builder builder = spanCache.computeIfAbsent(matchesSpan._1, idx -> spans.get(idx).toBuilder());
appendEvent(builder, eventName, event);
appendEvent(spanBuilder._1, eventName, event);
}
}

// re-build modified spans
for (Map.Entry<Integer, Span.Builder> entry : spanCache.entrySet()) {
spans.set(entry.getKey(), entry.getValue().build());
for (Map.Entry<String, Tuple2<Span.Builder, Integer>> entry : spanCache.entrySet()) {
spans.set(entry.getValue()._2, entry.getValue()._1.build());
}
}

Expand Down