forked from gocraft/health
/
json_polling_sink.go
80 lines (64 loc) · 2.1 KB
/
json_polling_sink.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package health
import (
"time"
)
type JsonPollingSink struct {
intervalDuration time.Duration
cmdChan chan *emitCmd
doneChan chan int
doneDoneChan chan int
intervalsChanChan chan chan []*IntervalAggregation
}
type cmdKind int
const (
cmdKindEvent cmdKind = iota
cmdKindEventErr
cmdKindTiming
cmdKindGauge
cmdKindComplete
)
type emitCmd struct {
Kind cmdKind
Job string
Event string
Err error
Nanos int64
Value float64
Status CompletionStatus
}
func NewJsonPollingSink(intervalDuration time.Duration, retain time.Duration) *JsonPollingSink {
const buffSize = 4096 // random-ass-guess
s := &JsonPollingSink{
intervalDuration: intervalDuration,
cmdChan: make(chan *emitCmd, buffSize),
doneChan: make(chan int),
doneDoneChan: make(chan int),
intervalsChanChan: make(chan chan []*IntervalAggregation),
}
go startAggregator(intervalDuration, retain, s)
return s
}
func (s *JsonPollingSink) ShutdownServer() {
s.doneChan <- 1
<-s.doneDoneChan
}
func (s *JsonPollingSink) EmitEvent(job string, event string, kvs map[string]string) {
s.cmdChan <- &emitCmd{Kind: cmdKindEvent, Job: job, Event: event}
}
func (s *JsonPollingSink) EmitEventErr(job string, event string, inputErr error, kvs map[string]string) {
s.cmdChan <- &emitCmd{Kind: cmdKindEventErr, Job: job, Event: event, Err: inputErr}
}
func (s *JsonPollingSink) EmitTiming(job string, event string, nanos int64, kvs map[string]string) {
s.cmdChan <- &emitCmd{Kind: cmdKindTiming, Job: job, Event: event, Nanos: nanos}
}
func (s *JsonPollingSink) EmitGauge(job string, event string, value float64, kvs map[string]string) {
s.cmdChan <- &emitCmd{Kind: cmdKindGauge, Job: job, Event: event, Value: value}
}
func (s *JsonPollingSink) EmitComplete(job string, status CompletionStatus, nanos int64, kvs map[string]string) {
s.cmdChan <- &emitCmd{Kind: cmdKindComplete, Job: job, Status: status, Nanos: nanos}
}
func (s *JsonPollingSink) GetMetrics() []*IntervalAggregation {
intervalsChan := make(chan []*IntervalAggregation)
s.intervalsChanChan <- intervalsChan
return <-intervalsChan
}