This repository has been archived by the owner on Mar 24, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 115
/
emit.go
132 lines (105 loc) · 2.54 KB
/
emit.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package metric
import (
"fmt"
"strings"
"time"
flags "github.com/jessevdk/go-flags"
"code.cloudfoundry.org/lager"
)
type Event struct {
Name string
Value interface{}
State EventState
Attributes map[string]string
Host string
Time time.Time
}
type EventState string
const EventStateOK EventState = "ok"
const EventStateWarning EventState = "warning"
const EventStateCritical EventState = "critical"
//go:generate counterfeiter . Emitter
type Emitter interface {
Emit(lager.Logger, Event)
}
//go:generate counterfeiter . EmitterFactory
type EmitterFactory interface {
Description() string
IsConfigured() bool
NewEmitter() (Emitter, error)
}
var emitterFactories []EmitterFactory
func RegisterEmitter(factory EmitterFactory) {
emitterFactories = append(emitterFactories, factory)
}
func WireEmitters(group *flags.Group) {
for _, factory := range emitterFactories {
_, err := group.AddGroup(fmt.Sprintf("Metric Emitter (%s)", factory.Description()), "", factory)
if err != nil {
panic(err)
}
}
}
var emitter Emitter
var eventHost string
var eventAttributes map[string]string
type eventEmission struct {
event Event
logger lager.Logger
}
var emissions = make(chan eventEmission, 1000)
func Initialize(logger lager.Logger, host string, attributes map[string]string) error {
var emitterDescriptions []string
for _, factory := range emitterFactories {
if factory.IsConfigured() {
emitterDescriptions = append(emitterDescriptions, factory.Description())
}
}
if len(emitterDescriptions) > 1 {
return fmt.Errorf("Multiple emitters configured: %s", strings.Join(emitterDescriptions, ", "))
}
var err error
for _, factory := range emitterFactories {
if factory.IsConfigured() {
emitter, err = factory.NewEmitter()
if err != nil {
return err
}
}
}
if emitter == nil {
return nil
}
emitter = emitter
eventHost = host
eventAttributes = attributes
go emitLoop()
return nil
}
func emit(logger lager.Logger, event Event) {
if emitter == nil {
return
}
event.Host = eventHost
event.Time = time.Now()
mergedAttributes := map[string]string{}
for k, v := range eventAttributes {
mergedAttributes[k] = v
}
if event.Attributes != nil {
for k, v := range event.Attributes {
mergedAttributes[k] = v
}
}
event.Attributes = mergedAttributes
select {
case emissions <- eventEmission{logger: logger, event: event}:
default:
logger.Error("queue-full", nil)
}
}
func emitLoop() {
for emission := range emissions {
emitter.Emit(emission.logger.Session("emit"), emission.event)
}
}