/
bus.go
139 lines (113 loc) · 2.4 KB
/
bus.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package dispatcher
import (
"log"
"github.com/ebittleman/voting/bus"
"github.com/ebittleman/voting/eventmanager"
)
type busDispatcher struct {
mq bus.MessageQueue
eventManager eventmanager.EventManager
subscribers []eventmanager.Subscriber
filters []Filter
errCh chan error
done chan struct{}
closed chan struct{}
}
// NewBusDispatcher initializes a new dispatcher
func NewBusDispatcher(
mq bus.MessageQueue,
eventManager eventmanager.EventManager,
filters ...Filter,
) Runnable {
d := new(busDispatcher)
d.mq = mq
d.eventManager = eventManager
d.filters = filters
d.errCh = make(chan error)
d.done = make(chan struct{})
d.closed = make(chan struct{})
return d
}
func (d *busDispatcher) Run(subscribers ...eventmanager.Subscriber) error {
return <-d.RunAsync(subscribers...)
}
func (d *busDispatcher) RunAsync(subscribers ...eventmanager.Subscriber) chan error {
select {
case <-d.closed:
return d.errCh
default:
}
d.subscribers = subscribers
for _, subscriber := range d.subscribers {
subscriber.Subscribe(d.eventManager)
}
go d.loop()
return d.errCh
}
func (d *busDispatcher) Close() error {
select {
case d.done <- struct{}{}:
case <-d.closed:
return nil
}
<-d.closed
return nil
}
func (d *busDispatcher) loop() {
defer close(d.closed)
defer close(d.errCh)
for {
// return if we have received a close signal
select {
case <-d.done:
return
default:
}
// grab the next message
msg, err := d.mq.Receive()
// if there was an error getting the message return it
if err != nil {
select {
case <-d.done:
case d.errCh <- err:
}
return
}
// if there was no message try and receive again.
if msg == nil {
continue
}
// handle the message and log any errors
if err := d.handle(msg); err != nil {
log.Println("Error: Dispatching msg: ", err)
}
}
}
func (d *busDispatcher) filter(msg bus.Message) error {
for _, f := range d.filters {
if err := f.Filter(msg); err != nil {
return err
}
}
return nil
}
func (d *busDispatcher) dispatch(msg bus.Message) error {
d.eventManager.Publish(msg.Event())
return nil
}
func (d *busDispatcher) handle(msg bus.Message) error {
if err := d.filter(msg); err != nil {
switch err {
case ErrNack:
return d.mq.Nack(msg)
case ErrAck:
return d.mq.Ack(msg)
default:
return err
}
}
if err := d.dispatch(msg); err != nil {
return err
}
return d.mq.Ack(msg)
}