-
Notifications
You must be signed in to change notification settings - Fork 559
/
exporters.go
141 lines (122 loc) · 3.31 KB
/
exporters.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package telemetry
import (
"context"
"github.com/moby/buildkit/identity"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"golang.org/x/sync/errgroup"
"github.com/dagger/dagger/engine/slog"
"github.com/dagger/dagger/telemetry/sdklog"
)
type MultiSpanExporter []sdktrace.SpanExporter
var _ sdktrace.SpanExporter = MultiSpanExporter{}
func (m MultiSpanExporter) ExportSpans(ctx context.Context, spans []sdktrace.ReadOnlySpan) error {
eg := new(errgroup.Group)
for _, e := range m {
e := e
eg.Go(func() error {
return e.ExportSpans(ctx, spans)
})
}
return eg.Wait()
}
func (m MultiSpanExporter) Shutdown(ctx context.Context) error {
eg := new(errgroup.Group)
for _, e := range m {
e := e
eg.Go(func() error {
return e.Shutdown(ctx)
})
}
return eg.Wait()
}
type SpanForwarder struct {
Processors []sdktrace.SpanProcessor
}
var _ sdktrace.SpanExporter = SpanForwarder{}
type discardWritesSpan struct {
noop.Span
sdktrace.ReadOnlySpan
}
func (s discardWritesSpan) SpanContext() trace.SpanContext {
return s.ReadOnlySpan.SpanContext()
}
func (m SpanForwarder) ExportSpans(ctx context.Context, spans []sdktrace.ReadOnlySpan) error {
eg := new(errgroup.Group)
for _, p := range m.Processors {
p := p
eg.Go(func() error {
for _, span := range spans {
if span.EndTime().Before(span.StartTime()) {
p.OnStart(ctx, discardWritesSpan{noop.Span{}, span})
} else {
p.OnEnd(span)
}
}
return nil
})
}
return eg.Wait()
}
func (m SpanForwarder) Shutdown(ctx context.Context) error {
eg := new(errgroup.Group)
for _, p := range m.Processors {
p := p
eg.Go(func() error {
return p.Shutdown(ctx)
})
}
return eg.Wait()
}
// FilterLiveSpansExporter is a SpanExporter that filters out spans that are
// currently running, as indicated by an end time older than its start time
// (typically year 1753).
type FilterLiveSpansExporter struct {
sdktrace.SpanExporter
}
// ExportSpans passes each span to the span processor's OnEnd hook so that it
// can be batched and emitted more efficiently.
func (exp FilterLiveSpansExporter) ExportSpans(ctx context.Context, spans []sdktrace.ReadOnlySpan) error {
batch := identity.NewID()
filtered := make([]sdktrace.ReadOnlySpan, 0, len(spans))
for _, span := range spans {
if span.StartTime().After(span.EndTime()) {
slog.ExtraDebug("skipping unfinished span", "batch", batch, "span", span.Name(), "id", span.SpanContext().SpanID())
} else {
slog.ExtraDebug("keeping finished span", "batch", batch, "span", span.Name(), "id", span.SpanContext().SpanID())
filtered = append(filtered, span)
}
}
if len(filtered) == 0 {
return nil
}
return exp.SpanExporter.ExportSpans(ctx, filtered)
}
type LogForwarder struct {
Processors []sdklog.LogProcessor
}
var _ sdklog.LogExporter = LogForwarder{}
func (m LogForwarder) ExportLogs(ctx context.Context, logs []*sdklog.LogData) error {
eg := new(errgroup.Group)
for _, e := range m.Processors {
e := e
eg.Go(func() error {
for _, log := range logs {
e.OnEmit(ctx, log)
}
return nil
})
}
return eg.Wait()
}
func (m LogForwarder) Shutdown(ctx context.Context) error {
eg := new(errgroup.Group)
for _, e := range m.Processors {
e := e
eg.Go(func() error {
return e.Shutdown(ctx)
})
}
return eg.Wait()
}