-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpubsub.go
97 lines (87 loc) · 1.65 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package goutils
import (
"sync"
)
/**
@author : Jerbe - The porter from Earth
@time : 2023/10/5 18:51
@describe :
*/
// PubSub 发布/订阅器
type PubSub struct {
subscribers map[*SubSignal]struct{}
rwMute sync.RWMutex
closed bool
}
// SubSignal 订阅信号
type SubSignal struct {
ch chan interface{}
C <-chan interface{}
close chan struct{}
signal *PubSub
}
// Close 订阅者信号
func (sb *SubSignal) Close() error {
sb.signal.Unsubscribe(sb)
return nil
}
// Publish 推送
func (s *PubSub) Publish(i interface{}) {
if s.closed {
panic("signal is closed")
}
s.rwMute.RLock()
defer s.rwMute.RUnlock()
for sb := range s.subscribers {
go func(in *SubSignal) {
select {
case <-in.close:
close(in.ch)
return
case in.ch <- i:
}
}(sb)
}
}
// Subscribe 订阅
func (s *PubSub) Subscribe() *SubSignal {
if s.closed {
panic("signal is closed")
}
ch := make(chan interface{})
sb := &SubSignal{
ch: ch,
C: ch,
close: make(chan struct{}),
signal: s,
}
s.rwMute.Lock()
defer s.rwMute.Unlock()
s.subscribers[sb] = struct{}{}
return sb
}
// Unsubscribe 取消订阅
func (s *PubSub) Unsubscribe(sb *SubSignal) {
s.rwMute.Lock()
defer s.rwMute.Unlock()
if _, ok := s.subscribers[sb]; ok {
delete(s.subscribers, sb)
close(sb.close)
}
}
// Close 关闭该发布/订阅器
func (s *PubSub) Close() {
if s.closed {
return
}
s.rwMute.Lock()
defer s.rwMute.Unlock()
s.closed = true
for sb := range s.subscribers {
delete(s.subscribers, sb)
}
}
// NewPubSub 新生成一个发布/订阅器
func NewPubSub() *PubSub {
return &PubSub{subscribers: make(map[*SubSignal]struct{})}
}