-
Notifications
You must be signed in to change notification settings - Fork 19
/
metron.go
94 lines (79 loc) · 2.03 KB
/
metron.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package testhelpers
import (
"fmt"
"net"
"sync"
"github.com/cloudfoundry/dropsonde/dropsonde_unmarshaller"
"github.com/cloudfoundry/sonde-go/events"
)
type FakeMetron struct {
port uint16
connection net.PacketConn
dropsondeUnmarshaller *dropsonde_unmarshaller.DropsondeUnmarshaller
valueMetrics map[string][]events.ValueMetric
stopped bool
mtx sync.RWMutex
}
func NewFakeMetron(port uint16) *FakeMetron {
return &FakeMetron{
port: port,
dropsondeUnmarshaller: dropsonde_unmarshaller.NewDropsondeUnmarshaller(nil),
mtx: sync.RWMutex{},
valueMetrics: make(map[string][]events.ValueMetric),
}
}
func (m *FakeMetron) Listen() error {
addr := fmt.Sprintf("localhost:%d", m.port)
connection, err := net.ListenPacket("udp4", addr)
if err != nil {
return err
}
m.connection = connection
return nil
}
func (m *FakeMetron) Run() error {
readBuffer := make([]byte, 65535) //buffer with size = max theoretical UDP size
for {
readCount, _, err := m.connection.ReadFrom(readBuffer)
if err != nil && m.isStopped() {
return nil
}
if err != nil {
return err
}
readData := make([]byte, readCount) //pass on buffer in size only of read data
copy(readData, readBuffer[:readCount])
// unmarshal
envelope, err := m.dropsondeUnmarshaller.UnmarshallMessage(readData)
if err != nil {
return err
}
if *envelope.EventType == events.Envelope_ValueMetric {
m.mtx.Lock()
metric := *envelope.ValueMetric
key := *metric.Name
m.valueMetrics[key] = append(m.valueMetrics[key], metric)
m.mtx.Unlock()
}
}
}
func (m *FakeMetron) isStopped() bool {
m.mtx.RLock()
defer m.mtx.RUnlock()
return m.stopped
}
func (m *FakeMetron) Stop() error {
m.mtx.RLock()
defer m.mtx.RUnlock()
m.stopped = true
return m.connection.Close()
}
func (m *FakeMetron) ValueMetricsFor(key string) []events.ValueMetric {
m.mtx.RLock()
defer m.mtx.RUnlock()
metrics, ok := m.valueMetrics[key]
if !ok {
return []events.ValueMetric{}
}
return metrics
}