forked from DataDog/opencensus-go-exporter-datadog
/
trace.go
160 lines (137 loc) · 3.62 KB
/
trace.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadog.com/).
// Copyright 2018 Datadog, Inc.
package datadog
import (
"bytes"
"io"
"sync"
"time"
"go.opencensus.io/trace"
)
const (
// payloadLimit specifies the maximum payload size that the Datadog
// agent will accept. Request bodies larger than this will be rejected.
payloadLimit = int(1e7) // 10MB
// defaultService specifies the default service name that will be used
// with the registered traces. Users should normally specify a different
// service name.
defaultService = "opencensus-app"
)
// allows tests to override
var (
// inChannelSize specifies the size of the buffered channel which
// takes spans and adds them to the payload.
inChannelSize = int(5e5) // 500K (approx 61MB memory if full)
// flushThreshold specifies the payload's size threshold in bytes. If it
// is exceeded, a flush will be triggered.
flushThreshold = payloadLimit / 2
// flushInterval specifies the interval at which the payload will
// automatically be flushed.
flushInterval = 2 * time.Second
)
type traceExporter struct {
opts Options
payload *payload
errors *errorAmortizer
sampler *prioritySampler
// uploadFn specifies the function used for uploading.
// Defaults to (*transport).upload; replaced in tests.
uploadFn func(pkg *bytes.Buffer, count int) (io.ReadCloser, error)
wg sync.WaitGroup // counts active uploads
in chan *ddSpan
exit chan struct{}
}
func newTraceExporter(o Options) *traceExporter {
if o.Service == "" {
o.Service = defaultService
}
sampler := newPrioritySampler()
e := &traceExporter{
opts: o,
payload: newPayload(),
errors: newErrorAmortizer(defaultErrorFreq, o.OnError),
sampler: sampler,
uploadFn: newTransport(o.TraceAddr).upload,
in: make(chan *ddSpan, inChannelSize),
exit: make(chan struct{}),
}
go e.loop()
return e
}
func (e *traceExporter) exportSpan(s *trace.SpanData) {
select {
case e.in <- e.convertSpan(s):
// ok
default:
e.errors.log(errorTypeOverflow, nil)
}
}
// loop consumes the input channel and also listens on exit channel
// to cleanly stop the exporter, flushing any remaining spans to the transport
// and reporting any errors.
func (e *traceExporter) loop() {
defer close(e.exit)
tick := time.NewTicker(flushInterval)
defer tick.Stop()
loop:
for {
select {
case span := <-e.in:
e.receiveSpan(span)
case <-tick.C:
e.flush()
case <-e.exit:
break loop
}
}
// drain the input channel to catch anything the loop might not process
drained := false
for !drained {
select {
case span := <-e.in:
e.receiveSpan(span)
default:
drained = true
}
}
e.flush()
e.wg.Wait() // wait for uploads to finish
e.errors.flush()
}
func (e *traceExporter) receiveSpan(span *ddSpan) {
if _, ok := span.Metrics[keySamplingPriority]; !ok {
e.sampler.applyPriority(span)
}
if err := e.payload.add(span); err != nil {
e.errors.log(errorTypeEncoding, err)
}
if e.payload.size() > flushThreshold {
e.flush()
}
}
func (e *traceExporter) flush() {
n := len(e.payload.traces)
if n == 0 {
return
}
buf := e.payload.buffer()
e.wg.Add(1)
go func() {
body, err := e.uploadFn(buf, n)
if err != nil {
e.errors.log(errorTypeTransport, err)
} else {
e.sampler.readRatesJSON(body) // do we care about errors?
}
e.wg.Done()
}()
e.payload.reset()
}
// stop signals the loop goroutine to finish.
// This blocks until the loop goroutine closes the exit channel.
func (e *traceExporter) stop() {
e.exit <- struct{}{}
<-e.exit
}