/
provider.go
145 lines (112 loc) · 3.82 KB
/
provider.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package xtel
import (
"context"
"net/http"
"sync"
"time"
"github.com/hashicorp/go-multierror"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.10.0"
)
// Provider helps manage the lifecycle and configuration of different Exporter(s)
// and other service level configurations for tracing and metrics.
type Provider struct {
tp *trace.TracerProvider
mp *metric.MeterProvider
spanExporters []trace.SpanExporter
metricReaders []metric.Reader
roundTripper http.RoundTripper
samplingRatio float64
initErrors *multierror.Error
}
// NewProvider creates a new Provider for the given ProviderOption.
func NewProvider(serviceName string, opts ...ProviderOption) (*Provider, error) {
p := &Provider{
roundTripper: otelhttp.NewTransport(http.DefaultTransport),
samplingRatio: 1,
}
for _, opt := range opts {
opt.apply(p)
}
if err := p.initErrors.ErrorOrNil(); err != nil {
return nil, err
}
p.initializeTraceProvider(serviceName)
p.initializeMetricProvider(serviceName)
return p, nil
}
// TracerProvider provides an OpenTelemetry TracerProvider over a given Provider
// which will be used to provide tracers to instrumentation.
func (p *Provider) TracerProvider() *TracerProvider { return p.tp }
// MeterProvider provides an OpenTelemetry MeterProvider over a given Provider
// which will be used to provide meters to instrumentation.
func (p *Provider) MeterProvider() *MeterProvider { return p.mp }
// Start starts Provider and initialises any related processes used for tracing.
func (p *Provider) Start() error {
otel.SetTracerProvider(p.tp)
otel.SetMeterProvider(p.mp)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(&propagation.TraceContext{}))
if p.roundTripper != nil {
http.DefaultTransport = p.roundTripper
}
return nil
}
// Stop stops the Provider and closes any resources being used for tracing.
func (p *Provider) Stop() error {
wg := &sync.WaitGroup{}
errCh := make(chan error, 2)
shutdownCtx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
for _, fn := range []func(context.Context) error{
p.tp.Shutdown,
p.mp.Shutdown,
} {
wg.Add(1)
go func(closer func(context.Context) error) {
defer wg.Done()
if err := closer(shutdownCtx); err != nil {
errCh <- err
}
}(fn)
}
wg.Wait()
close(errCh)
var errs *multierror.Error
for err := range errCh {
errs = multierror.Append(errs, err)
}
return errs.ErrorOrNil()
}
// Run will start running the Provider and associated processes.
// This method blocks until the passed context has been cancelled and then calls Stop.
// This makes Provider compatible with https://pkg.go.dev/github.com/gojekfarm/xrun package.
func (p *Provider) Run(ctx context.Context) error {
// TODO: Revisit if p.Start() returns error
_ = p.Start()
<-ctx.Done()
return p.Stop()
}
func (p *Provider) initializeTraceProvider(serviceName string) {
traceOpts := []trace.TracerProviderOption{
trace.WithResource(resource.NewSchemaless(semconv.ServiceNameKey.String(serviceName))),
trace.WithSampler(trace.ParentBased(trace.TraceIDRatioBased(p.samplingRatio))),
}
for _, e := range p.spanExporters {
traceOpts = append(traceOpts, trace.WithSpanProcessor(trace.NewBatchSpanProcessor(e)))
}
p.tp = trace.NewTracerProvider(traceOpts...)
}
func (p *Provider) initializeMetricProvider(serviceName string) {
metricOpts := []metric.Option{
metric.WithResource(resource.NewSchemaless(semconv.ServiceNameKey.String(serviceName))),
}
for _, r := range p.metricReaders {
metricOpts = append(metricOpts, metric.WithReader(r))
}
p.mp = metric.NewMeterProvider(metricOpts...)
}