Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust the span match rule of the attached event #10069

Merged
merged 4 commits into from Dec 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 6 additions & 2 deletions .github/workflows/skywalking.yaml
Expand Up @@ -312,7 +312,7 @@ jobs:
((github.event_name == 'schedule' && github.repository == 'apache/skywalking') || needs.changes.outputs.oap == 'true')
name: E2E test
needs: [docker]
runs-on: ubuntu-latest
runs-on: ${{ matrix.test.runs-on || 'ubuntu-latest' }}
timeout-minutes: 60
strategy:
fail-fast: false
Expand Down Expand Up @@ -517,12 +517,15 @@ jobs:
env: OPENSEARCH_VERSION=2.4.0
- name: eBPF Profiling Off CPU
config: test/e2e-v2/cases/profiling/ebpf/offcpu/e2e.yaml
runs-on: ubuntu-20.04
- name: eBPF Profiling Network
config: test/e2e-v2/cases/profiling/ebpf/network/e2e.yaml
env: ISTIO_VERSION=1.13.1
runs-on: ubuntu-20.04
- name: eBPF Profiling Network ES Sharding
config: test/e2e-v2/cases/profiling/ebpf/network/es-sharding/e2e.yaml
env: ISTIO_VERSION=1.13.1
runs-on: ubuntu-20.04

- name: Kafka Basic
config: test/e2e-v2/cases/kafka/simple-so11y/e2e.yaml
Expand Down Expand Up @@ -602,6 +605,7 @@ jobs:
- name: Rover with Istio Process 1.13.1
config: test/e2e-v2/cases/rover/process/istio/e2e.yaml
env: ISTIO_VERSION=1.13.1
runs-on: ubuntu-20.04

- name: Zipkin ES
config: test/e2e-v2/cases/zipkin/es/e2e.yaml
Expand Down Expand Up @@ -668,7 +672,7 @@ jobs:
((github.event_name == 'schedule' && github.repository == 'apache/skywalking') || needs.changes.outputs.oap == 'true')
name: E2E test
needs: [docker]
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
timeout-minutes: 60
strategy:
fail-fast: false
Expand Down
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -276,29 +277,46 @@ private void appendAttachedEventsToSpan(List<Span> spans, List<SpanAttachedEvent
return;
}

// sort by start time
events.sort((e1, e2) -> {
final int second = Long.compare(e1.getStartTimeSecond(), e2.getStartTimeSecond());
if (second == 0) {
return Long.compare(e1.getStartTimeNanos(), e2.getStartTimeNanos());
}
return second;
});

final HashMap<String, Span> spanMatcher = new HashMap<>();
for (SpanAttachedEventRecord record : events) {
if (!StringUtils.isNumeric(record.getTraceSpanId())) {
continue;
}
SpanAttachedEvent event = SpanAttachedEvent.parseFrom(record.getDataBinary());
final String spanMatcherKey = record.getTraceSegmentId() + "_" + record.getTraceSpanId();
Span span = spanMatcher.get(spanMatcherKey);
if (span == null) {
// find the matches span
final int eventSpanId = Integer.parseInt(record.getTraceSpanId());
span = spans.stream().filter(s -> Objects.equals(s.getSegmentId(), record.getTraceSegmentId()) &&
(s.getSpanId() == eventSpanId)).findFirst().orElse(null);
if (span == null) {
continue;
}

// find matches span
final int eventSpanId = Integer.parseInt(record.getTraceSpanId());
Span matchesSpan = spans.stream().filter(s -> Objects.equals(s.getSegmentId(), record.getTraceSegmentId()) &&
Objects.equals(s.getSpanId(), eventSpanId)).findFirst().orElse(null);
if (matchesSpan == null) {
continue;
}
// if the event is server side, then needs to change to the upstream span
final String direction = getSpanAttachedEventTagValue(event.getTagsList(), "data_direction");
final String type = getSpanAttachedEventTagValue(event.getTagsList(), "data_type");

// find the first entry span of upstream if the event from the upstream
SpanAttachedEvent event = SpanAttachedEvent.parseFrom(record.getDataBinary());
final String bindToTheUpstreamEntrySpan = getSpanAttachedEventTagValue(event.getTagsList(), "bind to upstream span");
if (Objects.equals(bindToTheUpstreamEntrySpan, "true")) {
final String parentSpanId = matchesSpan.getSegmentSpanId();
matchesSpan = spans.stream().filter(s -> s.getSegmentParentSpanId().equals(parentSpanId)
&& Objects.equals(s.getType(), SpanType.Entry.name())).findFirst().orElse(matchesSpan);
if (("request".equals(type) && "inbound".equals(direction)) || ("response".equals(type) && "outbound".equals(direction))) {
final String parentSpanId = span.getSegmentSpanId();
span = spans.stream().filter(s -> s.getSegmentParentSpanId().equals(parentSpanId)
&& Objects.equals(s.getType(), SpanType.Entry.name())).findFirst().orElse(span);
}

spanMatcher.put(spanMatcherKey, span);
}

matchesSpan.getAttachedEvents().add(parseEvent(event));
span.getAttachedEvents().add(parseEvent(event));
}
}

Expand Down
Expand Up @@ -311,34 +311,54 @@ private void appendEvents(List<Span> spans, List<SpanAttachedEventRecord> events

final List<Tuple2<Integer, Span>> spanWithIndex = IntStream.range(0, spans.size()).mapToObj(i -> Tuple.of(i, spans.get(i))).collect(Collectors.toList());

final Map<String, List<SpanAttachedEventRecord>> namedEvents = events.stream().collect(Collectors.groupingBy(SpanAttachedEventRecord::getEvent, Collectors.toList()));
final Map<Integer, Span.Builder> spanCache = new HashMap<>();
for (Map.Entry<String, List<SpanAttachedEventRecord>> entry : namedEvents.entrySet()) {
for (int i = 1; i <= entry.getValue().size(); i++) {
final SpanAttachedEventRecord record = entry.getValue().get(i - 1);
String eventName = record.getEvent() + (entry.getValue().size() == 1 ? "" : "-" + i);
Tuple2<Integer, Span> matchesSpan = spanWithIndex.stream().filter(s -> Objects.equals(s._2.id(), record.getTraceSpanId())).
findFirst().orElse(null);
if (matchesSpan == null) {
continue;
}
// sort by start time
events.sort((e1, e2) -> {
final int second = Long.compare(e1.getStartTimeSecond(), e2.getStartTimeSecond());
if (second == 0) {
return Long.compare(e1.getStartTimeNanos(), e2.getStartTimeNanos());
}
return second;
});

final Map<String, List<SpanAttachedEventRecord>> namedEvents = events.stream()
.collect(Collectors.groupingBy(SpanAttachedEventRecord::getEvent, Collectors.toList()));

final Map<String, Tuple2<Span.Builder, Integer>> spanCache = new HashMap<>();
for (Map.Entry<String, List<SpanAttachedEventRecord>> namedEntry : namedEvents.entrySet()) {
for (int i = 1; i <= namedEntry.getValue().size(); i++) {
final SpanAttachedEventRecord record = namedEntry.getValue().get(i - 1);
String eventName = record.getEvent() + (namedEntry.getValue().size() == 1 ? "" : "-" + i);
final SpanAttachedEvent event = SpanAttachedEvent.parseFrom(record.getDataBinary());
final String bindToTheUpstreamEntrySpan = getSpanAttachedEventTagValue(event.getTagsList(), "bind to upstream span");
if (Objects.equals(bindToTheUpstreamEntrySpan, "true")) {
final String parentSpanId = matchesSpan._2.id();
matchesSpan = spanWithIndex.stream().filter(s -> s._2.parentId().equals(parentSpanId)
&& Objects.equals(s._2.kind(), Span.Kind.SERVER)).findFirst().orElse(matchesSpan);

// find matched span
Tuple2<Span.Builder, Integer> spanBuilder = spanCache.get(record.getTraceSpanId());
if (spanBuilder == null) {
Tuple2<Integer, Span> matchesSpan = spanWithIndex.stream().filter(s -> Objects.equals(s._2.id(), record.getTraceSpanId())).
findFirst().orElse(null);
if (matchesSpan == null) {
continue;
}

// if the event is server side, then needs to change to the upstream span
final String direction = getSpanAttachedEventTagValue(event.getTagsList(), "data_direction");
final String type = getSpanAttachedEventTagValue(event.getTagsList(), "data_type");
if (("request".equals(type) && "inbound".equals(direction)) || ("response".equals(type) && "outbound".equals(direction))) {
final String parentSpanId = matchesSpan._2.id();
matchesSpan = spanWithIndex.stream().filter(s -> Objects.equals(s._2.parentId(), parentSpanId)
&& Objects.equals(s._2.kind(), Span.Kind.SERVER)).findFirst().orElse(matchesSpan);
}

spanBuilder = Tuple.of(matchesSpan._2.toBuilder(), matchesSpan._1);
spanCache.put(record.getTraceSpanId(), spanBuilder);
}

final Span.Builder builder = spanCache.computeIfAbsent(matchesSpan._1, idx -> spans.get(idx).toBuilder());
appendEvent(builder, eventName, event);
appendEvent(spanBuilder._1, eventName, event);
}
}

// re-build modified spans
for (Map.Entry<Integer, Span.Builder> entry : spanCache.entrySet()) {
spans.set(entry.getKey(), entry.getValue().build());
for (Map.Entry<String, Tuple2<Span.Builder, Integer>> entry : spanCache.entrySet()) {
spans.set(entry.getValue()._2, entry.getValue()._1.build());
}
}

Expand Down