/
bus.go
142 lines (116 loc) · 3.69 KB
/
bus.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package bus
import (
"fmt"
"reflect"
"sync"
)
// The type of the function's first and only argument.
// declares the msg to listen for.
type HandlerFunc interface{}
type Msg interface{}
// It is a simple but powerful publish-subscribe event system. It requires object to
// register themselves with the event bus to receive events.
type Bus interface {
Dispatch(msg Msg) error
AddHandler(handler HandlerFunc) error
AddEventListener(handler HandlerFunc)
Publish(msg Msg) error
}
type InProcBus struct {
sync.Mutex
handlers map[string]reflect.Value
listeners map[string][]reflect.Value
}
func New() Bus {
return &InProcBus{
handlers: make(map[string]reflect.Value),
listeners: make(map[string][]reflect.Value),
}
}
// Dispatch sends an msg to registered handler that were declared
// to accept values of a msg
func (b *InProcBus) Dispatch(msg Msg) error {
nameOfMsg := reflect.TypeOf(msg)
handler, ok := b.handlers[nameOfMsg.String()]
if !ok {
return &HandlerNotFoundError{Name: nameOfMsg.Name()}
}
params := make([]reflect.Value, 0, 1)
params = append(params, reflect.ValueOf(msg))
ret := handler.Call(params)
v := ret[0].Interface()
if err, ok := v.(error); ok && err != nil {
return err
}
return nil
}
// Publish sends an msg to all registered listeners that were declared
// to accept values of a msg
func (b *InProcBus) Publish(msg Msg) error {
nameOfMsg := reflect.TypeOf(msg)
listeners := b.listeners[nameOfMsg.String()]
params := make([]reflect.Value, 0, 1)
params = append(params, reflect.ValueOf(msg))
for _, listenerHandler := range listeners {
ret := listenerHandler.Call(params)
v := ret[0].Interface()
if err, ok := v.(error); ok && err != nil {
return err
}
}
return nil
}
// AddHandler registers a handler function that will be called when a matching
// msg is dispatched.
func (b *InProcBus) AddHandler(handler HandlerFunc) error {
b.Mutex.Lock()
defer b.Mutex.Unlock()
handlerType := reflect.TypeOf(handler)
validateHandlerFunc(handlerType)
typeOfMsg := handlerType.In(0)
if _, ok := b.handlers[typeOfMsg.String()]; ok {
return &OverwriteHandlerError{Name: typeOfMsg.Name()}
}
b.handlers[typeOfMsg.String()] = reflect.ValueOf(handler)
return nil
}
// AddListener registers a listener function that will be called when a matching
// msg is dispatched.
func (b *InProcBus) AddEventListener(handler HandlerFunc) {
b.Mutex.Lock()
defer b.Mutex.Unlock()
handlerType := reflect.TypeOf(handler)
validateHandlerFunc(handlerType)
// the first input parameter is the msg
typOfMsg := handlerType.In(0)
_, ok := b.listeners[typOfMsg.String()]
if !ok {
b.listeners[typOfMsg.String()] = make([]reflect.Value, 0)
}
b.listeners[typOfMsg.String()] = append(b.listeners[typOfMsg.String()], reflect.ValueOf(handler))
}
// panic if conditions not met (this is a programming error)
func validateHandlerFunc(handlerType reflect.Type) {
switch {
case handlerType.Kind() != reflect.Func:
panic(BadFuncError("handler func must be a function"))
case handlerType.NumIn() != 1:
panic(BadFuncError("handler func must take exactly one input argument"))
case handlerType.NumOut() != 1:
panic(BadFuncError("handler func must take exactly one output argument"))
}
}
// BadFuncError is raised via panic() when AddEventListener or AddHandler is called with an
// invalid listener function.
type BadFuncError string
func (bhf BadFuncError) Error() string {
return fmt.Sprintf("bad handler func: %s", string(bhf))
}
type HandlerNotFoundError struct {
Name string
}
func (e *HandlerNotFoundError) Error() string { return e.Name + ": not found" }
type OverwriteHandlerError struct {
Name string
}
func (e *OverwriteHandlerError) Error() string { return e.Name + ": handler exists" }