-
-
Notifications
You must be signed in to change notification settings - Fork 55
/
handler.go
351 lines (301 loc) · 8.61 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
// Package handler handles incoming Gateway events. It reflects the function's
// first argument and caches that for use in each event.
//
// # Performance
//
// Each call to the event would take 167 ns/op for roughly each handler. Scaling
// that up to 100 handlers is roughly the same as multiplying 167 ns by 100,
// which gives 16700 ns or 0.0167 ms.
//
// BenchmarkReflect-8 7260909 167 ns/op
//
// # Usage
//
// Handler's usage is mostly similar to Discordgo, in that AddHandler expects a
// function with only one argument or an event channel. For more information,
// refer to AddHandler.
package handler
import (
"context"
"errors"
"fmt"
"reflect"
"sync"
)
// Handler is a container for command handlers. A zero-value instance is a valid
// instance.
type Handler struct {
mutex sync.RWMutex
events map[reflect.Type]slab // nil type for interfaces
}
func New() *Handler {
return &Handler{}
}
// Call calls all handlers with the given event. This is an internal method; use
// with care.
func (h *Handler) Call(ev interface{}) {
v := reflect.ValueOf(ev)
t := reflect.TypeOf(ev)
all := h.AllCallersForType(t)
all(func(caller Caller) bool {
caller.Call(v)
return true
})
}
// AllCallersForType returns all callers for the given event type. This is an
// internal method that is rarely useful for external use and should be used
// with care.
func (h *Handler) AllCallersForType(t reflect.Type) func(yield func(Caller) bool) {
return func(yield func(Caller) bool) {
h.mutex.RLock()
defer h.mutex.RUnlock()
typedHandlers := h.events[t].Entries
anyHandlers := h.events[nil].Entries
if len(typedHandlers) == 0 && len(anyHandlers) == 0 {
return
}
for _, entry := range typedHandlers {
if entry.isInvalid() {
continue
}
if !yield(entry) {
return
}
}
for _, entry := range anyHandlers {
if entry.isInvalid() || entry.not(t) {
continue
}
if !yield(entry) {
return
}
}
}
}
// WaitFor blocks until there's an event. It's advised to use ChanFor instead,
// as WaitFor may skip some events if it's not ran fast enough after the event
// arrived.
func (h *Handler) WaitFor(ctx context.Context, fn func(interface{}) bool) interface{} {
var result = make(chan interface{})
cancel := h.AddHandler(func(v interface{}) {
if fn(v) {
result <- v
}
})
defer cancel()
select {
case r := <-result:
return r
case <-ctx.Done():
return nil
}
}
// ChanFor returns a channel that would receive all incoming events that match
// the callback given. The cancel() function removes the handler and drops all
// hanging goroutines.
//
// This method is more intended to be used as a filter. For a persistent event
// channel, consider adding it directly as a handler with AddHandler.
func (h *Handler) ChanFor(fn func(interface{}) bool) (out <-chan interface{}, cancel func()) {
result := make(chan interface{})
closer := make(chan struct{})
removeHandler := h.AddHandler(func(v interface{}) {
if fn(v) {
select {
case result <- v:
case <-closer:
}
}
})
// Only allow cancel to be called once.
var once sync.Once
cancel = func() {
once.Do(func() {
removeHandler()
close(closer)
})
}
out = result
return
}
// AddHandler adds the handler, returning a function that would remove this
// handler when called. A handler type is either a single-argument no-return
// function or a channel.
//
// # Function
//
// A handler can be a function with a single argument that is the expected event
// type. It must not have any returns or any other number of arguments.
//
// // An example of a valid function handler.
// h.AddHandler(func(*gateway.MessageCreateEvent) {})
//
// # Channel
//
// A handler can also be a channel. The underlying type that the channel wraps
// around will be the event type. As such, the type rules are the same as
// function handlers.
//
// Keep in mind that the user must NOT close the channel. In fact, the channel
// should not be closed at all. The caller function WILL PANIC if the channel is
// closed!
//
// When the rm callback that is returned is called, it will also guarantee that
// all blocking sends will be cancelled. This helps prevent dangling goroutines.
//
// // An example of a valid channel handler.
// ch := make(chan *gateway.MessageCreateEvent)
// h.AddHandler(ch)
func (h *Handler) AddHandler(handler interface{}) (rm func()) {
rm, err := h.addHandler(handler, false)
if err != nil {
panic(err)
}
return rm
}
// AddSyncHandler is a synchronous variant of AddHandler. Handlers added using
// this method will block the Call method, which is helpful if the user needs to
// rely on the order of events arriving. Handlers added using this method should
// not block for very long, as it may clog up other handlers.
func (h *Handler) AddSyncHandler(handler interface{}) (rm func()) {
rm, err := h.addHandler(handler, true)
if err != nil {
panic(err)
}
return rm
}
// AddHandlerCheck adds the handler, but safe-guards reflect panics with a
// recoverer, returning the error. Refer to AddHandler for more information.
func (h *Handler) AddHandlerCheck(handler interface{}) (rm func(), err error) {
// Reflect would actually panic if anything goes wrong, so this is just in
// case.
defer func() {
if rec := recover(); rec != nil {
if recErr, ok := rec.(error); ok {
err = recErr
} else {
err = fmt.Errorf("%v", rec)
}
}
}()
return h.addHandler(handler, false)
}
// AddSyncHandlerCheck is the safe-guarded version of AddSyncHandler. It is
// similar to AddHandlerCheck.
func (h *Handler) AddSyncHandlerCheck(handler interface{}) (rm func(), err error) {
// Reflect would actually panic if anything goes wrong, so this is just in
// case.
defer func() {
if rec := recover(); rec != nil {
if recErr, ok := rec.(error); ok {
err = recErr
} else {
err = fmt.Errorf("%v", rec)
}
}
}()
return h.addHandler(handler, true)
}
func (h *Handler) addHandler(fn interface{}, sync bool) (rm func(), err error) {
// Reflect the handler
r, err := newHandler(fn, sync)
if err != nil {
return nil, fmt.Errorf("handler reflect failed: %w", err)
}
var id int
var t reflect.Type
if !r.isIface {
t = r.event
}
h.mutex.Lock()
if h.events == nil {
h.events = make(map[reflect.Type]slab, 10)
}
slab := h.events[t]
id = slab.Put(r)
h.events[t] = slab
h.mutex.Unlock()
return func() {
h.mutex.Lock()
slab := h.events[t]
popped := slab.Pop(id)
h.mutex.Unlock()
popped.cleanup()
}, nil
}
// Caller is an interface that can be used to call a handler.
// It directly accepts a reflect.Value, which is the event.
type Caller interface{ Call(ev reflect.Value) }
type handler struct {
event reflect.Type // underlying type; arg0 or chan underlying type
callback reflect.Value
chanclose reflect.Value // IsValid() if chan
isIface bool
isSync bool
isOnce bool
}
var _ Caller = (*handler)(nil)
// newHandler reflects either a channel or a function into a handler. A function
// must only have a single argument being the event and no return, and a channel
// must have the event type as the underlying type.
func newHandler(unknown interface{}, sync bool) (handler, error) {
fnV := reflect.ValueOf(unknown)
fnT := fnV.Type()
// underlying event type
handler := handler{
callback: fnV,
isSync: sync,
}
switch fnT.Kind() {
case reflect.Func:
if fnT.NumIn() != 1 {
return handler, errors.New("function can only accept 1 event as argument")
}
if fnT.NumOut() > 0 {
return handler, errors.New("function can't accept returns")
}
handler.event = fnT.In(0)
case reflect.Chan:
handler.event = fnT.Elem()
handler.chanclose = reflect.ValueOf(make(chan struct{}))
default:
return handler, errors.New("given interface is not a function or channel")
}
var kind = handler.event.Kind()
// Accept either pointer type or interface{} type
if kind != reflect.Ptr && kind != reflect.Interface {
return handler, errors.New("first argument is not pointer")
}
handler.isIface = kind == reflect.Interface
return handler, nil
}
func (h handler) not(event reflect.Type) bool {
if h.isIface {
return !event.Implements(h.event)
}
return h.event != event
}
func (h handler) Call(event reflect.Value) {
if h.isSync {
h.call(event)
} else {
go h.call(event)
}
}
func (h handler) call(event reflect.Value) {
if h.chanclose.IsValid() {
reflect.Select([]reflect.SelectCase{
{Dir: reflect.SelectSend, Chan: h.callback, Send: event},
{Dir: reflect.SelectRecv, Chan: h.chanclose},
})
} else {
h.callback.Call([]reflect.Value{event})
}
}
func (h handler) cleanup() {
if h.chanclose.IsValid() {
// Closing this channel will force all ongoing selects to return
// immediately.
h.chanclose.Close()
}
}