-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
eventer.go
156 lines (134 loc) · 4.03 KB
/
eventer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package eventer
import (
"context"
"sync"
"time"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/plugins/drivers"
)
var (
// DefaultSendEventTimeout is the timeout used when publishing events to consumers
DefaultSendEventTimeout = 2 * time.Second
// ConsumerGCInterval is the interval at which garbage collection of consumers
// occures
ConsumerGCInterval = time.Minute
)
// Eventer is a utility to control broadcast of TaskEvents to multiple consumers.
// It also implements the TaskEvents func in the DriverPlugin interface so that
// it can be embedded in a implementing driver struct.
type Eventer struct {
// events is a channel were events to be broadcasted are sent
// This channel is never closed, because it's lifetime is tied to the
// life of the driver and closing creates some subtile race conditions
// between closing it and emitting events.
events chan *drivers.TaskEvent
// consumers is a slice of eventConsumers to broadcast events to.
// access is gaurded by consumersLock RWMutex
consumers []*eventConsumer
consumersLock sync.RWMutex
// ctx to allow control of event loop shutdown
ctx context.Context
logger hclog.Logger
}
type eventConsumer struct {
timeout time.Duration
ctx context.Context
ch chan *drivers.TaskEvent
logger hclog.Logger
}
// NewEventer returns an Eventer with a running event loop that can be stopped
// by closing the given stop channel
func NewEventer(ctx context.Context, logger hclog.Logger) *Eventer {
e := &Eventer{
events: make(chan *drivers.TaskEvent),
ctx: ctx,
logger: logger,
}
go e.eventLoop()
return e
}
// eventLoop is the main logic which pulls events from the channel and broadcasts
// them to all consumers
func (e *Eventer) eventLoop() {
for {
select {
case <-e.ctx.Done():
e.logger.Trace("task event loop shutdown")
return
case event := <-e.events:
e.iterateConsumers(event)
case <-time.After(ConsumerGCInterval):
e.gcConsumers()
}
}
}
// iterateConsumers will iterate through all consumers and broadcast the event,
// cleaning up any consumers that have closed their context
func (e *Eventer) iterateConsumers(event *drivers.TaskEvent) {
e.consumersLock.Lock()
filtered := e.consumers[:0]
for _, consumer := range e.consumers {
// prioritize checking if context is cancelled prior
// to attempting to forwarding events
// golang select evaluations aren't predictable
if consumer.ctx.Err() != nil {
close(consumer.ch)
continue
}
select {
case <-time.After(consumer.timeout):
filtered = append(filtered, consumer)
e.logger.Warn("timeout sending event", "task_id", event.TaskID, "message", event.Message)
case <-consumer.ctx.Done():
// consumer context finished, filtering it out of loop
close(consumer.ch)
case consumer.ch <- event:
filtered = append(filtered, consumer)
}
}
e.consumers = filtered
e.consumersLock.Unlock()
}
func (e *Eventer) gcConsumers() {
e.consumersLock.Lock()
filtered := e.consumers[:0]
for _, consumer := range e.consumers {
select {
case <-consumer.ctx.Done():
// consumer context finished, filtering it out of loop
default:
filtered = append(filtered, consumer)
}
}
e.consumers = filtered
e.consumersLock.Unlock()
}
func (e *Eventer) newConsumer(ctx context.Context) *eventConsumer {
e.consumersLock.Lock()
defer e.consumersLock.Unlock()
consumer := &eventConsumer{
ch: make(chan *drivers.TaskEvent),
ctx: ctx,
timeout: DefaultSendEventTimeout,
logger: e.logger,
}
e.consumers = append(e.consumers, consumer)
return consumer
}
// TaskEvents is an implementation of the DriverPlugin.TaskEvents function
func (e *Eventer) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, error) {
consumer := e.newConsumer(ctx)
return consumer.ch, nil
}
// EmitEvent can be used to broadcast a new event
func (e *Eventer) EmitEvent(event *drivers.TaskEvent) error {
select {
case <-e.ctx.Done():
return e.ctx.Err()
case e.events <- event:
if e.logger.IsTrace() {
e.logger.Trace("emitting event", "event", event)
}
}
return nil
}