-
Notifications
You must be signed in to change notification settings - Fork 5
/
fake_metron.go
122 lines (100 loc) · 2.53 KB
/
fake_metron.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package metrics
import (
"fmt"
"net"
"sync"
"code.cloudfoundry.org/cf-networking-helpers/testsupport/ports"
"github.com/cloudfoundry/sonde-go/events"
"github.com/gogo/protobuf/proto"
)
type Event struct {
EventType string
Name string
Origin string
Value float64
}
type FakeMetron interface {
AllEvents() []Event
Address() string
Close() error
Port() int
}
type fakeMetron struct {
lock *sync.Mutex
receivedEvents []Event
listener net.PacketConn
port int
}
func NewFakeMetron(params ...int) *fakeMetron {
portOffset := 0
if len(params) > 0 {
portOffset = params[0]
}
port := ports.PickAPort() + portOffset
addr := fmt.Sprintf("127.0.0.1:%d", port)
listener, err := net.ListenPacket("udp4", addr)
if err != nil {
panic(err)
}
metron := &fakeMetron{
lock: &sync.Mutex{},
listener: listener,
port: port,
}
go metron.listenForEvents()
return metron
}
func (f *fakeMetron) Address() string {
return f.listener.LocalAddr().String()
}
func (f *fakeMetron) Port() int {
return f.port
}
func (f *fakeMetron) Close() error {
return f.listener.Close()
}
func (f *fakeMetron) AllEvents() []Event {
f.lock.Lock()
defer f.lock.Unlock()
ret := make([]Event, len(f.receivedEvents))
copy(ret, f.receivedEvents)
return ret
}
// modified from https://github.com/cloudfoundry/dropsonde/blob/9b2cd8f8f9e99dca1f764ca4511d6011b4f44d0c/integration_test/dropsonde_end_to_end_test.go
func (f *fakeMetron) listenForEvents() {
for {
buffer := make([]byte, 1024)
n, _, err := f.listener.ReadFrom(buffer)
if err != nil {
return
}
if n == 0 {
panic("Received empty packet")
}
envelope := new(events.Envelope)
err = proto.Unmarshal(buffer[0:n], envelope)
if err != nil {
panic(err)
}
var eventId = envelope.GetEventType().String()
newEvent := Event{EventType: eventId}
switch envelope.GetEventType() {
case events.Envelope_HttpStartStop:
newEvent.Name = envelope.GetHttpStartStop().GetPeerType().String()
case events.Envelope_ValueMetric:
valMetric := envelope.GetValueMetric()
newEvent.Name = valMetric.GetName()
newEvent.Value = valMetric.GetValue()
case events.Envelope_CounterEvent:
countMetric := envelope.GetCounterEvent()
newEvent.Name = countMetric.GetName()
newEvent.Value = float64(countMetric.GetDelta())
default:
panic("Unexpected message type: " + envelope.GetEventType().String())
}
newEvent.Origin = envelope.GetOrigin()
f.lock.Lock()
f.receivedEvents = append(f.receivedEvents, newEvent)
f.lock.Unlock()
}
}