-
Notifications
You must be signed in to change notification settings - Fork 133
/
sender.go
118 lines (106 loc) · 3.34 KB
/
sender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package statsd
import (
"sync/atomic"
"time"
)
// A statsdWriter offers a standard interface regardless of the underlying
// protocol. For now UDS and UPD writers are available.
// Attention: the underlying buffer of `data` is reused after a `statsdWriter.Write` call.
// `statsdWriter.Write` must be synchronous.
type statsdWriter interface {
Write(data []byte) (n int, err error)
SetWriteTimeout(time.Duration) error
Close() error
}
// SenderMetrics contains metrics about the health of the sender
type SenderMetrics struct {
TotalSentBytes uint64
TotalSentPayloads uint64
TotalDroppedPayloads uint64
TotalDroppedBytes uint64
TotalDroppedPayloadsQueueFull uint64
TotalDroppedBytesQueueFull uint64
TotalDroppedPayloadsWriter uint64
TotalDroppedBytesWriter uint64
}
type sender struct {
transport statsdWriter
pool *bufferPool
queue chan *statsdBuffer
metrics SenderMetrics
stop chan struct{}
}
func newSender(transport statsdWriter, queueSize int, pool *bufferPool) *sender {
sender := &sender{
transport: transport,
pool: pool,
queue: make(chan *statsdBuffer, queueSize),
stop: make(chan struct{}),
}
go sender.sendLoop()
return sender
}
func (s *sender) send(buffer *statsdBuffer) {
select {
case s.queue <- buffer:
default:
atomic.AddUint64(&s.metrics.TotalDroppedPayloads, 1)
atomic.AddUint64(&s.metrics.TotalDroppedBytes, uint64(len(buffer.bytes())))
atomic.AddUint64(&s.metrics.TotalDroppedPayloadsQueueFull, 1)
atomic.AddUint64(&s.metrics.TotalDroppedBytesQueueFull, uint64(len(buffer.bytes())))
s.pool.returnBuffer(buffer)
}
}
func (s *sender) write(buffer *statsdBuffer) {
_, err := s.transport.Write(buffer.bytes())
if err != nil {
atomic.AddUint64(&s.metrics.TotalDroppedPayloads, 1)
atomic.AddUint64(&s.metrics.TotalDroppedBytes, uint64(len(buffer.bytes())))
atomic.AddUint64(&s.metrics.TotalDroppedPayloadsWriter, 1)
atomic.AddUint64(&s.metrics.TotalDroppedBytesWriter, uint64(len(buffer.bytes())))
} else {
atomic.AddUint64(&s.metrics.TotalSentPayloads, 1)
atomic.AddUint64(&s.metrics.TotalSentBytes, uint64(len(buffer.bytes())))
}
s.pool.returnBuffer(buffer)
}
func (s *sender) flushMetrics() SenderMetrics {
return SenderMetrics{
TotalSentBytes: atomic.SwapUint64(&s.metrics.TotalSentBytes, 0),
TotalSentPayloads: atomic.SwapUint64(&s.metrics.TotalSentPayloads, 0),
TotalDroppedPayloads: atomic.SwapUint64(&s.metrics.TotalDroppedPayloads, 0),
TotalDroppedBytes: atomic.SwapUint64(&s.metrics.TotalDroppedBytes, 0),
TotalDroppedPayloadsQueueFull: atomic.SwapUint64(&s.metrics.TotalDroppedPayloadsQueueFull, 0),
TotalDroppedBytesQueueFull: atomic.SwapUint64(&s.metrics.TotalDroppedBytesQueueFull, 0),
TotalDroppedPayloadsWriter: atomic.SwapUint64(&s.metrics.TotalDroppedPayloadsWriter, 0),
TotalDroppedBytesWriter: atomic.SwapUint64(&s.metrics.TotalDroppedBytesWriter, 0),
}
}
func (s *sender) sendLoop() {
defer close(s.stop)
for {
select {
case buffer := <-s.queue:
s.write(buffer)
case <-s.stop:
return
}
}
}
func (s *sender) flush() {
for {
select {
case buffer := <-s.queue:
s.write(buffer)
default:
return
}
}
}
func (s *sender) close() error {
s.flush()
err := s.transport.Close()
s.stop <- struct{}{}
<-s.stop
return err
}