-
Notifications
You must be signed in to change notification settings - Fork 0
/
pubsub.go
140 lines (116 loc) · 2.23 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package main
import (
"io"
"sync"
"time"
)
type EventData = any
type Event struct {
Data EventData
Sender int64
}
type Subscriber interface {
Send(data Event) error
}
type SubscriberCh struct {
Ch chan<- Event
}
func (s *SubscriberCh) Send(e Event) error {
select {
case s.Ch <- e:
return nil
default:
return io.ErrClosedPipe
}
}
type Publisher struct {
topic *Topic
created int64
}
func (p *Publisher) Send(data EventData) {
p.topic.Send(p, Event{Sender: p.created, Data: data})
}
func (p *Publisher) Close() {
topic := p.topic
if topic == nil {
return
}
p.topic = nil
topic.Locker.Lock()
defer topic.Locker.Unlock()
if topic.ActivePublisher == p {
topic.ActivePublisher = nil
}
}
type Topic struct {
Subscribers map[Subscriber]struct{}
Locker sync.RWMutex
ActivePublisher *Publisher
MultiPublisher bool
}
func NewTopic() *Topic {
return &Topic{
Subscribers: map[Subscriber]struct{}{},
MultiPublisher: true,
}
}
func (t *Topic) Subscribe(s Subscriber) {
t.Locker.Lock()
defer t.Locker.Unlock()
t.Subscribers[s] = struct{}{}
}
func (t *Topic) Unsubscribe(s Subscriber) {
t.Locker.Lock()
defer t.Locker.Unlock()
delete(t.Subscribers, s)
}
func (t *Topic) Send(p *Publisher, event Event) {
t.Locker.RLock()
defer t.Locker.RUnlock()
if !t.MultiPublisher && t.ActivePublisher != p {
if t.ActivePublisher != nil && t.ActivePublisher.created > p.created {
// Ignore old publisher
return
}
// TODO: Swtich publisher event
t.ActivePublisher = p
}
var errs []Subscriber
for s := range t.Subscribers {
if s.Send(event) != nil {
errs = append(errs, s)
}
}
for _, s := range errs {
delete(t.Subscribers, s)
}
}
func (t *Topic) NewPublisher() *Publisher {
return &Publisher{
topic: t,
created: time.Now().UnixNano(),
}
}
type PubSubServer struct {
Topics map[string]*Topic
Locker sync.RWMutex
AutoCreate bool
}
func NewPubSubServer() *PubSubServer {
return &PubSubServer{
Topics: map[string]*Topic{},
}
}
func (s *PubSubServer) GetTopic(topic string, autoCreate bool) *Topic {
s.Locker.Lock()
defer s.Locker.Unlock()
if t := s.Topics[topic]; t != nil {
return t
}
if !autoCreate {
return nil
}
t := NewTopic()
s.Topics[topic] = t
return t
}