-
Notifications
You must be signed in to change notification settings - Fork 1
/
interceptor.go
131 lines (116 loc) · 3.49 KB
/
interceptor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package temporal
import (
"context"
"fmt"
apiTrace "go.bryk.io/pkg/otel/api"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
"go.temporal.io/sdk/interceptor"
"go.temporal.io/sdk/log"
)
// based on the original: go.temporal.io/sdk/contrib/opentelemetry
const (
// default HTTP header key used to transmit span context data.
defaultHeaderKey = "_tracer-data"
)
type monitor struct {
at apiTrace.Tracer
mp propagation.TextMapPropagator
interceptor.BaseTracer // embed base implementation
}
// NewTracingInterceptor creates an interceptor for setting on client
// options that implements OpenTelemetry tracing for workflows.
//
// client.Options{
// Interceptors: []interceptor.ClientInterceptor{
// NewTracingInterceptor(),
// },
// }
func NewTracingInterceptor() interceptor.Interceptor {
tracer := &monitor{
at: apiTrace.GetTracer(),
mp: otel.GetTextMapPropagator(),
}
return interceptor.NewTracingInterceptor(tracer)
}
func (m *monitor) Options() interceptor.TracerOptions {
return interceptor.TracerOptions{
SpanContextKey: spanContextKey{},
HeaderKey: defaultHeaderKey,
DisableSignalTracing: true,
DisableQueryTracing: true,
AllowInvalidParentSpans: false,
}
}
func (m *monitor) GetLogger(logger log.Logger, ref interceptor.TracerSpanRef) log.Logger {
span, ok := ref.(*tracerSpan)
if !ok {
return logger
}
spCtx := span.Unwrap().SpanContext()
logger = log.With(logger,
"TraceID", spCtx.TraceID(),
"SpanID", spCtx.SpanID(),
)
return logger
}
func (m *monitor) UnmarshalSpan(kv map[string]string) (interceptor.TracerSpanRef, error) {
ctx := trace.SpanContextFromContext(m.mp.Extract(context.Background(), textMapCarrier(kv)))
if !ctx.IsValid() {
return nil, fmt.Errorf("failed extracting OpenTelemetry span from map")
}
return &tracerSpanRef{SpanContext: ctx}, nil
}
func (m *monitor) MarshalSpan(span interceptor.TracerSpan) (map[string]string, error) {
tp, ok := span.(*tracerSpan)
if !ok {
return nil, fmt.Errorf("invalid span type")
}
data := textMapCarrier{}
m.mp.Inject(trace.ContextWithSpan(context.Background(), tp.Unwrap()), data)
return map[string]string(data), nil
}
func (m *monitor) SpanFromContext(ctx context.Context) interceptor.TracerSpan {
span := apiTrace.SpanFromContext(ctx)
if !span.Unwrap().SpanContext().IsValid() {
return nil
}
return &tracerSpan{Span: span}
}
func (m *monitor) ContextWithSpan(ctx context.Context, span interceptor.TracerSpan) context.Context {
tp, ok := span.(*tracerSpan)
if !ok {
return ctx
}
return trace.ContextWithSpan(ctx, tp.Unwrap())
}
func (m *monitor) StartSpan(opts *interceptor.TracerStartSpanOptions) (interceptor.TracerSpan, error) {
// Create context with parent
var parent trace.SpanContext
switch optParent := opts.Parent.(type) {
case nil:
case *tracerSpan:
parent = optParent.Unwrap().SpanContext()
case *tracerSpanRef:
parent = optParent.SpanContext
default:
return nil, fmt.Errorf("unrecognized parent type %T", optParent)
}
ctx := context.Background()
if parent.IsValid() {
ctx = trace.ContextWithSpanContext(ctx, parent)
}
// Create span
spanName := opts.Operation + ":" + opts.Name
span := m.at.Start(ctx, spanName, apiTrace.WithStartOptions(trace.WithTimestamp(opts.Time)))
// Set tags
if len(opts.Tags) > 0 {
tags := map[string]interface{}{}
for k, v := range opts.Tags {
tags[k] = v
}
span.SetAttributes(tags)
}
return &tracerSpan{Span: span}, nil
}