forked from uber/ringpop-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
sender_buffered.go
158 lines (140 loc) · 3.57 KB
/
sender_buffered.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package statsd
import (
"bytes"
"fmt"
"sync"
"time"
)
// BufferedSender provides a buffered statsd udp, sending multiple
// metrics, where possible.
type BufferedSender struct {
flushBytes int
flushInterval time.Duration
sender Sender
buffer *bytes.Buffer
reqs chan []byte
shutdown chan chan error
running bool
mx sync.RWMutex
}
// Send bytes.
func (s *BufferedSender) Send(data []byte) (int, error) {
s.mx.RLock()
defer s.mx.RUnlock()
if !s.running {
return 0, fmt.Errorf("BufferedSender is not running")
}
// copy bytes, because the caller may mutate the slice (and the underlying
// array) after we return, since we may not end up sending right away.
c := make([]byte, len(data))
dlen := copy(c, data)
s.reqs <- c
return dlen, nil
}
// Close Buffered Sender
func (s *BufferedSender) Close() error {
// only need really read lock to see if we are currently
// running or not
s.mx.RLock()
if !s.running {
s.mx.RUnlock()
return nil
}
s.mx.RUnlock()
// since we are running, write lock during cleanup
s.mx.Lock()
defer s.mx.Unlock()
errChan := make(chan error)
s.running = false
s.shutdown <- errChan
return <-errChan
}
// Start Buffered Sender
// Begins ticker and read loop
func (s *BufferedSender) Start() {
// read lock to see if we are running
s.mx.RLock()
if s.running {
s.mx.RUnlock()
return
}
s.mx.RUnlock()
// write lock to start running
s.mx.Lock()
defer s.mx.Unlock()
s.running = true
s.reqs = make(chan []byte, 8)
go s.run()
}
func (s *BufferedSender) run() {
ticker := time.NewTicker(s.flushInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if s.buffer.Len() > 0 {
s.flush()
}
case req := <-s.reqs:
// StatsD supports receiving multiple metrics in a single packet by
// separating them with a newline.
if s.buffer.Len()+len(req)+1 > s.flushBytes {
s.flush()
}
s.buffer.Write(req)
s.buffer.WriteByte('\n')
// if we happen to fill up the buffer, just flush right away
// instead of waiting for next input.
if s.buffer.Len() >= s.flushBytes {
s.flush()
}
case errChan := <-s.shutdown:
close(s.reqs)
for req := range s.reqs {
if s.buffer.Len()+len(req)+1 > s.flushBytes {
s.flush()
}
s.buffer.Write(req)
s.buffer.WriteByte('\n')
}
if s.buffer.Len() > 0 {
s.flush()
}
errChan <- s.sender.Close()
return
}
}
}
// flush the buffer/send to remove endpoint.
func (s *BufferedSender) flush() (int, error) {
n, err := s.sender.Send(s.buffer.Bytes())
s.buffer.Reset() // clear the buffer
return n, err
}
// Returns a new BufferedSender
//
// addr is a string of the format "hostname:port", and must be parsable by
// net.ResolveUDPAddr.
//
// flushInterval is a time.Duration, and specifies the maximum interval for
// packet sending. Note that if you send lots of metrics, you will send more
// often. This is just a maximal threshold.
//
// flushBytes specifies the maximum udp packet size you wish to send. If adding
// a metric would result in a larger packet than flushBytes, the packet will
// first be send, then the new data will be added to the next packet.
func NewBufferedSender(addr string, flushInterval time.Duration, flushBytes int) (Sender, error) {
simpleSender, err := NewSimpleSender(addr)
if err != nil {
return nil, err
}
sender := &BufferedSender{
flushBytes: flushBytes,
flushInterval: flushInterval,
sender: simpleSender,
buffer: bytes.NewBuffer(make([]byte, 0, flushBytes)),
shutdown: make(chan chan error),
}
sender.Start()
return sender, nil
}