-
Notifications
You must be signed in to change notification settings - Fork 137
/
topic.go
100 lines (92 loc) · 1.75 KB
/
topic.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package realtime
type filter struct {
whole bool // true if the events for the whole doctype should be sent
ids []string
}
type toWatch struct {
sub *Subscriber
id string // empty string means the whole doctype
}
type topic struct {
broadcast chan *Event // input
subs map[*Subscriber]filter // output
subscribe chan *toWatch
unsubscribe chan *toWatch
running chan bool
}
func newTopic() *topic {
topic := &topic{
broadcast: make(chan *Event, 10),
subs: make(map[*Subscriber]filter),
subscribe: make(chan *toWatch),
unsubscribe: make(chan *toWatch),
running: make(chan bool),
}
go topic.loop()
return topic
}
func (t *topic) loop() {
for {
select {
case e := <-t.broadcast:
t.publish(e)
case w := <-t.subscribe:
t.doSubscribe(w)
case w := <-t.unsubscribe:
t.doUnsubscribe(w)
if len(t.subs) == 0 {
close(t.running)
return
}
t.running <- true
}
}
}
func (t *topic) publish(e *Event) {
for s, f := range t.subs {
ok := false
if f.whole {
ok = true
} else {
for _, id := range f.ids {
if e.Doc.ID() == id {
ok = true
break
}
}
}
if ok {
select {
case s.Channel <- e:
case <-s.running: // the subscriber has been closed
}
}
}
}
func (t *topic) doSubscribe(w *toWatch) {
f := t.subs[w.sub]
if w.id == "" {
f.whole = true
} else {
f.ids = append(f.ids, w.id)
}
t.subs[w.sub] = f
}
func (t *topic) doUnsubscribe(w *toWatch) {
if w.id == "" {
delete(t.subs, w.sub)
} else if f, ok := t.subs[w.sub]; ok {
ids := f.ids[:0]
for _, id := range f.ids {
if id != w.id {
ids = append(ids, id)
}
}
if len(ids) == 0 && !f.whole {
delete(t.subs, w.sub)
} else {
f.ids = ids
t.subs[w.sub] = f
}
}
}