-
Notifications
You must be signed in to change notification settings - Fork 148
/
event_unmarshaller.go
126 lines (107 loc) · 3.58 KB
/
event_unmarshaller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package eventunmarshaller
import (
"errors"
"log"
"unicode"
"unicode/utf8"
"metron/writers"
"fmt"
"github.com/cloudfoundry/dropsonde/metricbatcher"
"github.com/cloudfoundry/sonde-go/events"
"github.com/gogo/protobuf/proto"
)
var (
invalidEnvelope = errors.New("Invalid Envelope")
metricNames map[events.Envelope_EventType]string
)
func init() {
metricNames = make(map[events.Envelope_EventType]string)
for eventType, eventName := range events.Envelope_EventType_name {
r, n := utf8.DecodeRuneInString(eventName)
modifiedName := string(unicode.ToLower(r)) + eventName[n:]
metricName := "dropsondeUnmarshaller." + modifiedName + "Received"
metricNames[events.Envelope_EventType(eventType)] = metricName
}
}
//go:generate hel --type EventBatcher --output mock_event_batcher_test.go
type EventBatcher interface {
BatchCounter(name string) (chainer metricbatcher.BatchCounterChainer)
BatchIncrementCounter(name string)
}
// An EventUnmarshaller is an self-instrumenting tool for converting Protocol
// Buffer-encoded dropsonde messages to Envelope instances.
type EventUnmarshaller struct {
outputWriter writers.EnvelopeWriter
batcher EventBatcher
}
func New(outputWriter writers.EnvelopeWriter, batcher EventBatcher) *EventUnmarshaller {
return &EventUnmarshaller{
outputWriter: outputWriter,
batcher: batcher,
}
}
func (u *EventUnmarshaller) Write(message []byte) {
envelope, err := u.UnmarshallMessage(message)
if err != nil {
log.Printf("Error unmarshalling: %s", err)
return
}
u.outputWriter.Write(envelope)
}
func (u *EventUnmarshaller) UnmarshallMessage(message []byte) (*events.Envelope, error) {
envelope := &events.Envelope{}
err := proto.Unmarshal(message, envelope)
if err != nil {
log.Printf("eventUnmarshaller: unmarshal error %v", err)
u.batcher.BatchIncrementCounter("dropsondeUnmarshaller.unmarshalErrors")
return nil, err
}
if !valid(envelope) {
log.Printf("eventUnmarshaller: validation failed for message %v", envelope.GetEventType())
u.batcher.BatchIncrementCounter("dropsondeUnmarshaller.unmarshalErrors")
return nil, invalidEnvelope
}
if err := u.incrementReceiveCount(envelope.GetEventType()); err != nil {
log.Printf("Error incrementing receive count: %s", err)
return nil, err
}
return envelope, nil
}
func (u *EventUnmarshaller) incrementReceiveCount(eventType events.Envelope_EventType) error {
var err error
switch eventType {
case events.Envelope_LogMessage:
// LogMessage is a special case. `logMessageReceived` used to be broken out by app ID, and
// `logMessageTotal` was the sum of all of those.
u.batcher.BatchIncrementCounter("dropsondeUnmarshaller.logMessageTotal")
default:
metricName := metricNames[eventType]
if metricName == "" {
metricName = "dropsondeUnmarshaller.unknownEventTypeReceived"
err = fmt.Errorf("eventUnmarshaller: received unknown event type %#v", eventType)
}
u.batcher.BatchIncrementCounter(metricName)
}
u.batcher.BatchCounter("dropsondeUnmarshaller.receivedEnvelopes").
SetTag("protocol", "udp").
SetTag("event_type", eventType.String()).
Increment()
return err
}
func valid(env *events.Envelope) bool {
switch env.GetEventType() {
case events.Envelope_HttpStartStop:
return env.GetHttpStartStop() != nil
case events.Envelope_LogMessage:
return env.GetLogMessage() != nil
case events.Envelope_ValueMetric:
return env.GetValueMetric() != nil
case events.Envelope_CounterEvent:
return env.GetCounterEvent() != nil
case events.Envelope_Error:
return env.GetError() != nil
case events.Envelope_ContainerMetric:
return env.GetContainerMetric() != nil
}
return true
}