/
transponder.go
90 lines (77 loc) · 2.08 KB
/
transponder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package v2
import (
"time"
metrics "code.cloudfoundry.org/go-metric-registry"
"code.cloudfoundry.org/go-loggregator/v9/rpc/loggregator_v2"
"code.cloudfoundry.org/loggregator-agent-release/src/pkg/plumbing/batching"
)
type Nexter interface {
TryNext() (*loggregator_v2.Envelope, bool)
}
type BatchWriter interface {
Write(msgs []*loggregator_v2.Envelope) error
}
type Transponder struct {
nexter Nexter
writer BatchWriter
batchSize int
batchInterval time.Duration
droppedMetric metrics.Counter
egressMetric metrics.Counter
}
type MetricClient interface {
NewCounter(name, helpText string, opts ...metrics.MetricOption) metrics.Counter
}
func NewTransponder(
n Nexter,
w BatchWriter,
batchSize int,
batchInterval time.Duration,
metricClient MetricClient,
) *Transponder {
droppedMetric := metricClient.NewCounter(
"dropped",
"Total number of dropped envelopes.",
metrics.WithMetricLabels(map[string]string{"direction": "egress", "metric_version": "2.0"}),
)
egressMetric := metricClient.NewCounter(
"egress",
"Total number of envelopes successfully egressed.",
metrics.WithMetricLabels(map[string]string{"metric_version": "2.0"}),
)
return &Transponder{
nexter: n,
writer: w,
droppedMetric: droppedMetric,
egressMetric: egressMetric,
batchSize: batchSize,
batchInterval: batchInterval,
}
}
func (t *Transponder) Start() {
b := batching.NewV2EnvelopeBatcher(
t.batchSize,
t.batchInterval,
batching.V2EnvelopeWriterFunc(t.write),
)
for {
envelope, ok := t.nexter.TryNext()
if !ok {
b.Flush()
time.Sleep(100 * time.Millisecond)
continue
}
b.Write(envelope)
}
}
func (t *Transponder) write(batch []*loggregator_v2.Envelope) {
if err := t.writer.Write(batch); err != nil {
// metric-documentation-v2: (loggregator.metron.dropped) Number of messages
// dropped when failing to write to Dopplers v2 API
t.droppedMetric.Add(float64(len(batch)))
return
}
// metric-documentation-v2: (loggregator.metron.egress)
// Number of messages written to Doppler's v2 API
t.egressMetric.Add(float64(len(batch)))
}