-
Notifications
You must be signed in to change notification settings - Fork 785
/
otel.go
151 lines (135 loc) · 4.63 KB
/
otel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package tracing
import (
"strings"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
"github.com/benthosdev/benthos/v4/internal/message"
)
const (
name = "benthos"
)
// GetSpan returns a span attached to a message part. Returns nil if the part
// doesn't have a span attached.
func GetSpan(p *message.Part) *Span {
ctx := message.GetContext(p)
t := trace.SpanFromContext(ctx)
return otelSpan(ctx, t)
}
// GetActiveSpan returns a span attached to a message part. Returns nil if the
// part doesn't have a span attached or it is inactive.
func GetActiveSpan(p *message.Part) *Span {
ctx := message.GetContext(p)
t := trace.SpanFromContext(ctx)
if !t.IsRecording() {
return nil
}
return otelSpan(ctx, t)
}
// GetTraceID returns the traceID from a span attached to a message part. Returns a zeroed traceID if the part
// doesn't have a span attached.
func GetTraceID(p *message.Part) string {
ctx := message.GetContext(p)
span := trace.SpanFromContext(ctx)
return span.SpanContext().TraceID().String()
}
// WithChildSpan takes a message, extracts a span, creates a new child span,
// and returns a new message with that span embedded. The original message is
// unchanged.
func WithChildSpan(prov trace.TracerProvider, operationName string, part *message.Part) (*message.Part, *Span) {
span := GetActiveSpan(part)
if span == nil {
ctx, t := prov.Tracer(name).Start(part.GetContext(), operationName)
span = otelSpan(ctx, t)
part = part.WithContext(ctx)
} else {
ctx, t := prov.Tracer(name).Start(span.ctx, operationName)
span = otelSpan(ctx, t)
part = part.WithContext(ctx)
}
return part, span
}
// WithChildSpans takes a message, extracts spans per message part, creates new
// child spans, and returns a new message with those spans embedded. The
// original message is unchanged.
func WithChildSpans(prov trace.TracerProvider, operationName string, batch message.Batch) (message.Batch, []*Span) {
spans := make([]*Span, 0, len(batch))
newParts := make(message.Batch, len(batch))
for i, part := range batch {
if part == nil {
continue
}
var otSpan *Span
newParts[i], otSpan = WithChildSpan(prov, operationName, part)
spans = append(spans, otSpan)
}
return newParts, spans
}
// WithSiblingSpans takes a message, extracts spans per message part, creates
// new sibling spans, and returns a new message with those spans embedded. The
// original message is unchanged.
func WithSiblingSpans(prov trace.TracerProvider, operationName string, batch message.Batch) (message.Batch, []*Span) {
spans := make([]*Span, 0, len(batch))
newParts := make([]*message.Part, batch.Len())
for i, part := range batch {
if part == nil {
continue
}
otSpan := GetActiveSpan(part)
if otSpan == nil {
ctx, t := prov.Tracer(name).Start(part.GetContext(), operationName)
otSpan = otelSpan(ctx, t)
} else {
ctx, t := prov.Tracer(name).Start(
part.GetContext(), operationName,
trace.WithLinks(trace.LinkFromContext(otSpan.ctx)),
)
otSpan = otelSpan(ctx, t)
}
newParts[i] = message.WithContext(otSpan.ctx, part)
spans = append(spans, otSpan)
}
return newParts, spans
}
//------------------------------------------------------------------------------
// InitSpans sets up OpenTracing spans on each message part if one does not
// already exist.
func InitSpans(prov trace.TracerProvider, operationName string, batch message.Batch) {
for i, p := range batch {
batch[i] = InitSpan(prov, operationName, p)
}
}
// InitSpan sets up an OpenTracing span on a message part if one does not
// already exist.
func InitSpan(prov trace.TracerProvider, operationName string, part *message.Part) *message.Part {
if GetActiveSpan(part) != nil {
return part
}
ctx, _ := prov.Tracer(name).Start(part.GetContext(), operationName)
return message.WithContext(ctx, part)
}
// InitSpansFromParentTextMap obtains a span parent reference from a text map
// and creates child spans for each message.
func InitSpansFromParentTextMap(prov trace.TracerProvider, operationName string, textMapGeneric map[string]any, batch message.Batch) error {
c := propagation.MapCarrier{}
for k, v := range textMapGeneric {
if vStr, ok := v.(string); ok {
c[strings.ToLower(k)] = vStr
}
}
textProp := otel.GetTextMapPropagator()
for i, p := range batch {
ctx := textProp.Extract(p.GetContext(), c)
pCtx, _ := prov.Tracer(name).Start(ctx, operationName)
batch[i] = message.WithContext(pCtx, p)
}
return nil
}
// FinishSpans calls Finish on all message parts containing a span.
func FinishSpans(batch message.Batch) {
for _, p := range batch {
if span := GetActiveSpan(p); span != nil {
span.unwrap().End()
}
}
}