/
stream.go
67 lines (56 loc) · 1.37 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package collectors
import (
"reflect"
"runtime"
"bosun.org/collect"
"bosun.org/metadata"
"bosun.org/opentsdb"
"bosun.org/util"
)
/* StreamCollector is useful for collectors that do not produces metrics at a
preset interval. Instead it consummes directly from a channel provided by
the collector and forwards it internally. */
type StreamCollector struct {
F func() <-chan *opentsdb.MultiDataPoint
name string
init func()
TagOverride
}
func (s *StreamCollector) Init() {
if s.init != nil {
s.init()
}
}
func (s *StreamCollector) Run(dpchan chan<- *opentsdb.DataPoint, quit <-chan struct{}) {
inputChan := s.F()
count := 0
for {
select {
case md := <-inputChan:
if !collect.DisableDefaultCollectors {
tags := opentsdb.TagSet{"collector": s.Name(), "os": runtime.GOOS}
Add(md, "scollector.collector.count", count, tags, metadata.Counter, metadata.Count, "Counter of metrics passed through.")
}
for _, dp := range *md {
if _, found := dp.Tags["host"]; !found {
dp.Tags["host"] = util.GetHostManager().GetHostName()
}
s.ApplyTagOverrides(dp.Tags)
dpchan <- dp
count++
}
case <-quit:
return
}
}
}
func (s *StreamCollector) Enabled() bool {
return true
}
func (s *StreamCollector) Name() string {
if s.name != "" {
return s.name
}
v := runtime.FuncForPC(reflect.ValueOf(s.F).Pointer())
return v.Name()
}