-
Notifications
You must be signed in to change notification settings - Fork 641
/
pubsub_memory.go
95 lines (82 loc) · 2 KB
/
pubsub_memory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package pubsub
import (
"context"
"sync"
"github.com/google/uuid"
)
// genericListener is either a Listener or ListenerWithErr
type genericListener struct {
l Listener
le ListenerWithErr
}
func (g genericListener) send(ctx context.Context, message []byte) {
if g.l != nil {
g.l(ctx, message)
}
if g.le != nil {
g.le(ctx, message, nil)
}
}
// MemoryPubsub is an in-memory Pubsub implementation. It's an exported type so that our test code can do type
// checks.
type MemoryPubsub struct {
mut sync.RWMutex
listeners map[string]map[uuid.UUID]genericListener
}
func (m *MemoryPubsub) Subscribe(event string, listener Listener) (cancel func(), err error) {
return m.subscribeGeneric(event, genericListener{l: listener})
}
func (m *MemoryPubsub) SubscribeWithErr(event string, listener ListenerWithErr) (cancel func(), err error) {
return m.subscribeGeneric(event, genericListener{le: listener})
}
func (m *MemoryPubsub) subscribeGeneric(event string, listener genericListener) (cancel func(), err error) {
m.mut.Lock()
defer m.mut.Unlock()
var listeners map[uuid.UUID]genericListener
var ok bool
if listeners, ok = m.listeners[event]; !ok {
listeners = map[uuid.UUID]genericListener{}
m.listeners[event] = listeners
}
var id uuid.UUID
for {
id = uuid.New()
if _, ok = listeners[id]; !ok {
break
}
}
listeners[id] = listener
return func() {
m.mut.Lock()
defer m.mut.Unlock()
listeners := m.listeners[event]
delete(listeners, id)
}, nil
}
func (m *MemoryPubsub) Publish(event string, message []byte) error {
m.mut.RLock()
defer m.mut.RUnlock()
listeners, ok := m.listeners[event]
if !ok {
return nil
}
var wg sync.WaitGroup
for _, listener := range listeners {
wg.Add(1)
listener := listener
go func() {
defer wg.Done()
listener.send(context.Background(), message)
}()
}
wg.Wait()
return nil
}
func (*MemoryPubsub) Close() error {
return nil
}
func NewInMemory() Pubsub {
return &MemoryPubsub{
listeners: make(map[string]map[uuid.UUID]genericListener),
}
}