/
sender.go
164 lines (149 loc) · 4.6 KB
/
sender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package cwopencensusexporter
import (
"context"
"errors"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/cloudwatch"
)
// MetricDatumSender is anything that can send datum somewhere
type MetricDatumSender interface {
// SendMetricDatum should not block. It should queue the datum for sending, or just send it.
// It should not modify the input datum
// but can assume the input datum is immutable. Return an error if unable to send this datum correctly.
SendMetricDatum(md *cloudwatch.MetricDatum) error
}
// CloudWatchClient is anything that can receive CloudWatch metrics as documented by CloudWatch's public API constraints.
type CloudWatchClient interface {
// PutMetricDataWithContext should match the contract of cloudwatch.CloudWatch.PutMetricDataWithContext
PutMetricDataWithContext(aws.Context, *cloudwatch.PutMetricDataInput, ...request.Option) (*cloudwatch.PutMetricDataOutput, error)
}
var _ CloudWatchClient = &cloudwatch.CloudWatch{}
// BatchMetricDatumSender aggregates datum into a channel and sends them to cloudwatch
type BatchMetricDatumSender struct {
// CloudWatchClient is anything that can send datum to cloudwatch. It should probably be cwpagedmetricput.Pager so
// you can take care of batching large requests
CloudWatchClient CloudWatchClient
// BatchDelay is how long to wait between getting one value and waiting for a batch to fill up
BatchDelay time.Duration
// BatchSize is the maximum number of Datum to send to a single call to CloudWatchClient
BatchSize int
// Namespace is the cloudwatch namespace attached to the datum
Namespace string
// OnFailedSend is called on any failure to send datum to CloudWatchClient
OnFailedSend func(datum []*cloudwatch.MetricDatum, err error)
tosend chan *cloudwatch.MetricDatum
onClose chan struct{}
startDone chan struct{}
once sync.Once
}
func (b *BatchMetricDatumSender) init() {
b.once.Do(func() {
b.startDone = make(chan struct{})
b.onClose = make(chan struct{})
b.tosend = make(chan *cloudwatch.MetricDatum, 1024)
})
}
var _ MetricDatumSender = &BatchMetricDatumSender{}
// Run executes the batch datum sender. You should probably execute this inside a goroutine. It blocks until Shutdown
func (b *BatchMetricDatumSender) Run() error {
b.init()
defer close(b.startDone)
for {
var first *cloudwatch.MetricDatum
select {
case <-b.onClose:
return nil
case first = <-b.tosend:
}
finishSending := time.After(b.batchDelay())
toSend := make([]*cloudwatch.MetricDatum, 0, b.batchSize())
toSend = append(toSend, first)
forloop:
for len(toSend) < b.batchSize() {
select {
case next := <-b.tosend:
toSend = append(toSend, next)
case <-finishSending:
break forloop
}
}
_, err := b.CloudWatchClient.PutMetricDataWithContext(context.Background(), &cloudwatch.PutMetricDataInput{
MetricData: toSend,
Namespace: b.namespace(),
})
if err != nil && b.OnFailedSend != nil {
b.OnFailedSend(toSend, err)
}
}
}
// Shutdown stops the sender once it has been started. Blocks until either Run finishes, or ctx dies.
func (b *BatchMetricDatumSender) Shutdown(ctx context.Context) error {
b.init()
close(b.onClose)
select {
case <-ctx.Done():
return ctx.Err()
case <-b.startDone:
}
return b.flush(ctx)
}
// SendMetricDatum queues a datum for sending to cloudwatch
func (b *BatchMetricDatumSender) SendMetricDatum(md *cloudwatch.MetricDatum) error {
select {
case b.tosend <- md:
return nil
default:
return errors.New("tosend channel full")
}
}
func (b *BatchMetricDatumSender) batchSize() int {
if b.BatchSize == 0 {
return 20
}
return b.BatchSize
}
func (b *BatchMetricDatumSender) batchDelay() time.Duration {
if b.BatchDelay == 0 {
return time.Second
}
return b.BatchDelay
}
func (b *BatchMetricDatumSender) namespace() *string {
if b.Namespace == "" {
return aws.String("custom")
}
return &b.Namespace
}
func (b *BatchMetricDatumSender) flush(ctx context.Context) error {
for {
var first *cloudwatch.MetricDatum
select {
case <-ctx.Done():
return ctx.Err()
case first = <-b.tosend:
default:
return nil
}
toSend := make([]*cloudwatch.MetricDatum, 0, b.batchSize())
toSend = append(toSend, first)
forloop:
for len(toSend) < b.batchSize() {
select {
case next := <-b.tosend:
toSend = append(toSend, next)
default:
break forloop
}
}
_, err := b.CloudWatchClient.PutMetricDataWithContext(ctx, &cloudwatch.PutMetricDataInput{
MetricData: toSend,
Namespace: b.namespace(),
})
if err != nil && b.OnFailedSend != nil {
b.OnFailedSend(toSend, err)
}
}
}