forked from QubitProducts/bamboo
-
Notifications
You must be signed in to change notification settings - Fork 5
/
event_bus.go
79 lines (66 loc) · 1.42 KB
/
event_bus.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package event_bus
import (
"log"
"reflect"
"sync"
)
type EventBus struct {
handlers map[reflect.Type][]reflect.Value
lock sync.RWMutex
}
/**
* Construct a new EventBus
*/
func New() *EventBus {
return &EventBus{
make(map[reflect.Type][]reflect.Value),
sync.RWMutex{},
}
}
/**
* Register an event handler
*/
func (ebus *EventBus) Register(fn interface{}, forTypes ...interface{}) {
v := reflect.ValueOf(fn)
def := v.Type()
if def.NumIn() != 1 {
log.Panicf("EventBus Handler must have a single argument")
}
argument := def.In(0)
for _, typ := range forTypes {
t := reflect.TypeOf(typ)
if !t.ConvertibleTo(argument) {
log.Fatalf("EventBus Handler argument %v is not compatible with type %v", argument, t)
}
ebus.addHandler(t, v)
}
if len(forTypes) == 0 {
ebus.addHandler(argument, v)
}
}
/**
* Publish an event to the EventBus
*/
func (ebus *EventBus) Publish(event interface{}) error {
ebus.lock.RLock()
defer ebus.lock.RUnlock()
t := reflect.TypeOf(event)
handlers, ok := ebus.handlers[t]
if !ok {
return nil
}
args := [...]reflect.Value{reflect.ValueOf(event)}
for _, fn := range handlers {
fn.Call(args[:])
}
return nil
}
func (ebus *EventBus) addHandler(fnType reflect.Type, fn reflect.Value) {
ebus.lock.Lock()
defer ebus.lock.Unlock()
handlers, ok := ebus.handlers[fnType]
if !ok {
handlers = make([]reflect.Value, 0)
}
ebus.handlers[fnType] = append(handlers, fn)
}