forked from cilium/cilium
/
eventqueue.go
312 lines (266 loc) · 10.5 KB
/
eventqueue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
// Copyright 2019 Authors of Cilium
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package eventqueue
import (
"fmt"
"reflect"
"sync"
"sync/atomic"
"github.com/cilium/cilium/pkg/lock"
"github.com/cilium/cilium/pkg/logging"
"github.com/cilium/cilium/pkg/logging/logfields"
"github.com/cilium/cilium/pkg/option"
"github.com/cilium/cilium/pkg/spanstat"
"github.com/sirupsen/logrus"
)
var (
log = logging.DefaultLogger.WithField(logfields.LogSubsys, "eventqueue")
)
// EventQueue is a structure which is utilized to handle Events in a first-in,
// first-out order. An EventQueue may be closed, in which case all events which
// are queued up, but have not been processed yet, will be cancelled (i.e., not
// ran). It is guaranteed that no events will be scheduled onto an EventQueue
// after it has been closed; if any event is attempted to be scheduled onto an
// EventQueue after it has been closed, it will be cancelled immediately. For
// any event to be processed by the EventQueue, it must implement the
// `EventHandler` interface. This allows for different types of events to be
// processed by anything which chooses to utilize an `EventQueue`.
type EventQueue struct {
// This should always be a buffered channel.
events chan *Event
// close is closed once the EventQueue has been closed.
close chan struct{}
// drain is closed when the EventQueue is stopped. Any Event which is
// Enqueued after this channel is closed will be cancelled / not processed
// by the queue. If an Event has been Enqueued, but has not been processed
// before this channel is closed, it will be cancelled and not processed
// as well.
drain chan struct{}
// eventQueueOnce is used to ensure that the EventQueue business logic can
// only be ran once.
eventQueueOnce sync.Once
// closeOnce is used to ensure that the EventQueue can only be closed once.
closeOnce sync.Once
// name is used to differentiate this EventQueue from other EventQueues that
// are also running in logs
name string
eventsMu lock.RWMutex
}
// NewEventQueue returns an EventQueue with a capacity for only one event at
// a time.
func NewEventQueue() *EventQueue {
return NewEventQueueBuffered("", 1)
}
func (q *EventQueue) getLogger() *logrus.Entry {
return log.WithFields(
logrus.Fields{
"name": q.name,
})
}
// NewEventQueueBuffered returns an EventQueue with a capacity of,
// numBufferedEvents at a time, and all other needed fields initialized.
func NewEventQueueBuffered(name string, numBufferedEvents int) *EventQueue {
log.WithFields(logrus.Fields{
"name": name,
"numBufferedEvents": numBufferedEvents,
}).Debug("creating new EventQueue")
return &EventQueue{
name: name,
// Up to numBufferedEvents can be Enqueued until Enqueueing blocks.
events: make(chan *Event, numBufferedEvents),
close: make(chan struct{}),
drain: make(chan struct{}),
}
}
// Event is an event that can be enqueued onto an EventQueue.
type Event struct {
// Metadata is the information about the event which is sent
// by its queuer. Metadata must implement the EventHandler interface in
// order for the Event to be successfully processed by the EventQueue.
Metadata EventHandler
// eventResults is a channel on which the results of the event are sent.
// It is populated by the EventQueue itself, not by the queuer. This channel
// is closed if the event is cancelled.
eventResults chan interface{}
// cancelled signals that the given Event was not ran. This can happen
// if the EventQueue processing this Event was closed before the Event was
// Enqueued onto the Event queue, or if the Event was Enqueued onto an
// EventQueue, and the EventQueue on which the Event was scheduled was
// closed.
cancelled chan struct{}
// stats is a field which contains information about when this event is
// enqueued, dequeued, etc.
stats eventStatistics
// enqueued is an atomic boolean that specifies whether this event has
// been enqueued on an EventQueue.
enqueued int32
}
type eventStatistics struct {
// waitEnqueue shows how long a given event was waiting on the queue before
// it was actually processed.
waitEnqueue spanstat.SpanStat
// durationStat shows how long the actual processing of the event took. This
// is the time for how long Handle() takes for the event.
durationStat spanstat.SpanStat
// waitConsumeOffQueue shows how long it took for the event to be consumed
// off of the queue.
waitConsumeOffQueue spanstat.SpanStat
}
// NewEvent returns an Event with all fields initialized.
func NewEvent(meta EventHandler) *Event {
return &Event{
Metadata: meta,
eventResults: make(chan interface{}, 1),
cancelled: make(chan struct{}),
stats: eventStatistics{},
}
}
// WasCancelled returns whether the cancelled channel for the given Event has
// been closed or not. Cancellation occurs if the event was not processed yet
// by an EventQueue onto which this Event was Enqueued, and the queue is closed,
// or if the event was attempted to be scheduled onto an EventQueue which has
// already been closed.
func (ev *Event) WasCancelled() bool {
select {
case <-ev.cancelled:
return true
default:
return false
}
}
// Enqueue pushes the given event onto the EventQueue. If the queue has been
// stopped, the Event will not be enqueued, and its cancel channel will be
// closed, indicating that the Event was not ran. This function may block if
// the queue is at its capacity for events. If a single Event has Enqueue
// called on it multiple times asynchronously, there is no guarantee as to
// which one will return the channel which passes results back to the caller.
// It is up to the caller to check whether the returned channel is nil, as
// waiting to receive on such a channel will block forever. Returns an error
// if the Event has been previously enqueued, if the Event is nil, or the queue
// itself is not initialized properly.
func (q *EventQueue) Enqueue(ev *Event) (<-chan interface{}, error) {
if q.notSafeToAccess() || ev == nil {
return nil, fmt.Errorf("unable to Enqueue event")
}
// Multiple Enqueues can occur at the same time. Ensure that events channel
// is not closed while we are enqueueing events.
q.eventsMu.RLock()
defer q.eventsMu.RUnlock()
// Events can only be enqueued once.
if atomic.AddInt32(&ev.enqueued, 1) > 1 {
return nil, fmt.Errorf("unable to Enqueue event; event has already had Enqueue called on it")
}
select {
// The event should be drained from the queue (e.g., it should not be
// processed).
case <-q.drain:
// Closed eventResults channel signifies cancellation.
close(ev.cancelled)
close(ev.eventResults)
return ev.eventResults, nil
default:
// The events channel may be closed even if an event has been pushed
// onto the events channel, as events are consumed off of the events
// channel asynchronously! If the EventQueue is closed before this
// event is processed, then it will be cancelled.
ev.stats.waitEnqueue.Start()
q.events <- ev
ev.stats.waitEnqueue.End(true)
ev.stats.waitConsumeOffQueue.Start()
return ev.eventResults, nil
}
}
func (ev *Event) printStats(q *EventQueue) {
if option.Config.Debug {
q.getLogger().WithFields(logrus.Fields{
"eventType": reflect.TypeOf(ev.Metadata).String(),
"eventHandlingDuration": ev.stats.durationStat.Total(),
"eventEnqueueWaitTime": ev.stats.waitEnqueue.Total(),
"eventConsumeOffQueueWaitTime": ev.stats.waitConsumeOffQueue.Total(),
}).Debug("EventQueue event processing statistics")
}
}
// Run consumes events that have been queued for this EventQueue. It
// is presumed that the eventQueue is a buffered channel with a length of one
// (i.e., only one event can be processed at a time). All business logic for
// handling queued events is contained within this function. The events in the
// queue must implement the EventHandler interface. If the event queue is
// closed, then all events which were queued up, but not processed, are
// cancelled; any event which is currently being processed will not be
// cancelled.
func (q *EventQueue) Run() {
if q.notSafeToAccess() {
return
}
go q.eventQueueOnce.Do(func() {
for ev := range q.events {
select {
case <-q.drain:
ev.stats.waitConsumeOffQueue.End(false)
close(ev.cancelled)
close(ev.eventResults)
ev.printStats(q)
default:
ev.stats.waitConsumeOffQueue.End(true)
ev.stats.durationStat.Start()
ev.Metadata.Handle(ev.eventResults)
// Always indicate success for now.
ev.stats.durationStat.End(true)
// Ensures that no more results can be sent as the event has
// already been processed.
ev.printStats(q)
close(ev.eventResults)
}
}
})
}
func (q *EventQueue) notSafeToAccess() bool {
return q == nil || q.close == nil || q.drain == nil || q.events == nil
}
// Stop stops any further events from being processed by the EventQueue. Any
// event which is currently being processed by the EventQueue will continue to
// run. All other events waiting to be processed, and all events that may be
// enqueued will not be processed by the event queue; they will be cancelled.
// If the queue has already been stopped, this is a no-op.
func (q *EventQueue) Stop() {
if q.notSafeToAccess() {
return
}
q.closeOnce.Do(func() {
q.getLogger().Debug("stopping EventQueue")
// Any event that is sent to the queue at this point will be cancelled
// immediately in Enqueue().
close(q.drain)
// Signal that the queue has been drained.
close(q.close)
q.eventsMu.Lock()
close(q.events)
q.eventsMu.Unlock()
})
}
// WaitToBeDrained returns the channel which waits for the EventQueue to have been
// stopped. This allows for queuers to ensure that all events in the queue have
// been processed or cancelled. If the queue is nil, returns immediately.
func (q *EventQueue) WaitToBeDrained() {
if q == nil {
return
}
<-q.close
}
// EventHandler is an interface for allowing an EventQueue to handle events
// in a generic way. To be processed by the EventQueue, all event types must
// implement any function specified in this interface.
type EventHandler interface {
Handle(chan interface{})
}