-
Notifications
You must be signed in to change notification settings - Fork 10
/
event.go
339 lines (304 loc) · 8.22 KB
/
event.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
// Copyright (c) 2023 cheng-zhongliang. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package event
import (
"time"
)
const (
// EvRead is readable event.
EvRead = 1 << iota
// EvWrite is writable event.
EvWrite = 1 << iota
// EvTimeout is timeout event.
EvTimeout = 1 << iota
// EvPersist is persistent behavior option.
EvPersist = 1 << iota
// EvET is edge-triggered behavior option.
EvET = 1 << iota
// EvLoopOnce is the flag to control event base loop just once.
EvLoopOnce = 001
// EvLoopNoblock is the flag to control event base loop not block.
EvLoopNoblock = 002
// HPri is the high priority.
HPri eventPriority = 0b00
// MPri is the middle priority.
MPri eventPriority = 0b01
// LPri is the low priority.
LPri eventPriority = 0b10
// evListInserted is the flag to indicate the event is in the event list.
evListInserted = 0x01
// evListActive is the flag to indicate the event is in the active event list.
evListActive = 0x02
// evListTimeout is the flag to indicate the event is in the timeout event heap.
evListTimeout = 0x04
)
// eventPriority is the priority of the event.
type eventPriority uint8
// Event is the event to watch.
type Event struct {
// base is the event base of the event.
base *EventBase
// ele is the element in the total event list.
ele element
// activeEle is the element in the active event list.
activeEle element
// index is the index in the event heap.
index int
// fd is the file descriptor to watch.
fd int
// events is the events to watch. Such as EvRead or EvWrite.
events uint32
// cb is the callback function when the event is triggered.
cb func(fd int, events uint32, arg interface{})
// arg is the argument passed to the callback function.
arg interface{}
// res is the result passed to the callback function.
res uint32
// flags is the status of the event in the event list.
flags int
// timeout is the timeout of the event.
timeout time.Duration
// deadline is the deadline for the event.
deadline time.Time
// priority is the priority of the event.
priority eventPriority
}
// New creates a new event with default priority MPri.
func New(base *EventBase, fd int, events uint32, callback func(fd int, events uint32, arg interface{}), arg interface{}) *Event {
ev := new(Event)
ev.Assign(base, fd, events, callback, arg, MPri)
return ev
}
// Assign assigns the event.
// It is used to reuse the event.
// The event must be detached before it is assigned.
func (ev *Event) Assign(base *EventBase, fd int, events uint32, callback func(fd int, events uint32, arg interface{}), arg interface{}, priority eventPriority) {
ev.base = base
ev.fd = fd
ev.events = events
ev.cb = callback
ev.arg = arg
ev.priority = priority
ev.res = 0
ev.flags = 0
ev.timeout = 0
ev.deadline = time.Time{}
ev.ele = element{}
ev.activeEle = element{}
ev.index = -1
}
// Attach adds the event to the event base.
// Timeout is the timeout of the event. Default is 0, which means no timeout.
// But if EvTimeout is set in the event, the 0 represents expired immediately.
func (ev *Event) Attach(timeout time.Duration) error {
if ev.events&(EvRead|EvWrite|EvTimeout) == 0 {
return ErrEventInvalid
}
if ev.flags&evListInserted != 0 {
return ErrEventExists
}
ev.timeout = timeout
return ev.base.addEvent(ev)
}
// Detach deletes the event from the event base.
// The event will not be triggered after it is detached.
func (ev *Event) Detach() error {
if ev.flags&evListInserted == 0 {
return ErrEventNotExists
}
return ev.base.delEvent(ev)
}
// Base returns the event base of the event.
func (ev *Event) Base() *EventBase {
return ev.base
}
// Fd returns the file descriptor of the event.
func (ev *Event) Fd() int {
return ev.fd
}
// Events returns the events of the event.
func (ev *Event) Events() uint32 {
return ev.events
}
// Timeout returns the timeout of the event.
func (ev *Event) Timeout() time.Duration {
return ev.timeout
}
// Priority returns the priority of the event.
func (ev *Event) Priority() eventPriority {
return ev.priority
}
// SetPriority sets the priority of the event.
func (ev *Event) SetPriority(priority eventPriority) {
ev.priority = priority
}
// EventBase is the base of all events.
type EventBase struct {
// poll is the event poller to watch events.
poll *poll
// evList is the list of all events.
evList *list
// activeEvList is the list of active events.
activeEvLists []*list
// eventHeap is the min heap of timeout events.
evHeap *eventHeap
// nowTimeCache is the cache of now time.
nowTimeCache time.Time
}
// NewBase creates a new event base.
func NewBase() (*EventBase, error) {
bs := new(EventBase)
p, err := openPoll()
if err != nil {
return nil, err
}
bs.poll = p
bs.evList = newList()
bs.activeEvLists = []*list{newList(), newList(), newList()}
bs.evHeap = new(eventHeap)
bs.nowTimeCache = time.Time{}
return bs, nil
}
// Loop loops events.
// Flags is the flags to control the loop behavior.
// Flags can be EvLoopOnce or EvLoopNoblock or both.
// If EvLoopOnce is set, the loop will just loop once.
// If EvLoopNoblock is set, the loop will not block.
func (bs *EventBase) Loop(flags int) error {
bs.clearTimeCache()
for {
err := bs.poll.wait(bs.onActive, bs.waitTime(flags&EvLoopNoblock != 0))
if err != nil {
return err
}
bs.updateTimeCache()
bs.onTimeout()
bs.handleActiveEvents()
if flags&EvLoopOnce != 0 {
return nil
}
}
}
// Dispatch dispatches events.
// It will block until events trigger.
func (bs *EventBase) Dispatch() error {
return bs.Loop(0)
}
// Shutdown breaks event loop and close the poll.
func (bs *EventBase) Shutdown() error {
return bs.poll.close()
}
// Now returns the cache of now time.
func (bs *EventBase) Now() time.Time {
if !bs.nowTimeCache.IsZero() {
return bs.nowTimeCache
}
return time.Now()
}
func (bs *EventBase) addEvent(ev *Event) error {
bs.eventQueueInsert(ev, evListInserted)
if ev.events&EvTimeout != 0 {
ev.deadline = bs.Now().Add(ev.timeout)
bs.eventQueueInsert(ev, evListTimeout)
}
if ev.events&(EvRead|EvWrite) != 0 {
return bs.poll.add(ev)
}
return nil
}
func (bs *EventBase) delEvent(ev *Event) error {
bs.eventQueueRemove(ev, evListTimeout)
bs.eventQueueRemove(ev, evListActive)
bs.eventQueueRemove(ev, evListInserted)
if ev.events&(EvRead|EvWrite) != 0 {
return bs.poll.del(ev)
}
return nil
}
func (bs *EventBase) waitTime(noblock bool) time.Duration {
if noblock {
return 0
}
if !bs.evHeap.empty() {
ev := bs.evHeap.peekEvent()
if d := ev.deadline.Sub(bs.Now()); d > 0 {
return d
}
return 0
}
return -1
}
func (bs *EventBase) onTimeout() {
now := bs.Now()
for !bs.evHeap.empty() {
ev := bs.evHeap.peekEvent()
if ev.deadline.After(now) {
break
}
bs.eventQueueRemove(ev, evListTimeout)
bs.onActive(ev, EvTimeout)
}
}
func (bs *EventBase) onActive(ev *Event, res uint32) {
if ev.flags&evListActive != 0 {
ev.res |= res
return
}
ev.res = res
bs.eventQueueInsert(ev, evListActive)
}
func (bs *EventBase) handleActiveEvents() {
for i := range bs.activeEvLists {
for e := bs.activeEvLists[i].front(); e != nil; {
next := e.nextEle()
ev := e.value.(*Event)
e = next
if ev.events&EvPersist != 0 {
bs.eventQueueRemove(ev, evListActive)
if ev.events&EvTimeout != 0 {
bs.eventQueueRemove(ev, evListTimeout)
ev.deadline = bs.Now().Add(ev.timeout)
bs.eventQueueInsert(ev, evListTimeout)
}
} else {
bs.delEvent(ev)
}
ev.cb(ev.fd, ev.res, ev.arg)
}
}
}
func (bs *EventBase) eventQueueInsert(ev *Event, which int) {
if ev.flags&which != 0 {
return
}
ev.flags |= which
switch which {
case evListInserted:
bs.evList.pushBack(ev, &ev.ele)
case evListActive:
bs.activeEvLists[ev.priority].pushBack(ev, &ev.activeEle)
case evListTimeout:
bs.evHeap.pushEvent(ev)
}
}
func (bs *EventBase) eventQueueRemove(ev *Event, which int) {
if ev.flags&which == 0 {
return
}
ev.flags &^= which
switch which {
case evListInserted:
bs.evList.remove(&ev.ele)
case evListActive:
bs.activeEvLists[ev.priority].remove(&ev.activeEle)
case evListTimeout:
bs.evHeap.removeEvent(ev.index)
}
}
func (bs *EventBase) updateTimeCache() {
bs.nowTimeCache = time.Now()
}
func (bs *EventBase) clearTimeCache() {
bs.nowTimeCache = time.Time{}
}