forked from lfq7413/tomato
/
event_emitter.go
151 lines (130 loc) · 3.42 KB
/
event_emitter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package pubsub
import (
"sync"
"unsafe"
)
// EventEmitter 事件发射器
type EventEmitter struct {
mutex sync.Mutex
events map[string]map[int]HandlerType
}
// NewEventEmitter ...
func NewEventEmitter() *EventEmitter {
return &EventEmitter{
events: map[string]map[int]HandlerType{},
}
}
// Init ...
func (e *EventEmitter) Init() {
e.events = map[string]map[int]HandlerType{}
}
// Emit 向指定通道中的所有订阅者发送事件消息
func (e *EventEmitter) Emit(messageType string, args ...string) bool {
e.mutex.Lock()
defer e.mutex.Unlock()
if e.events == nil {
e.events = map[string]map[int]HandlerType{}
}
if handler, ok := e.events[messageType]; ok {
if handler == nil {
return false
}
for _, listener := range handler {
// TODO 完善多线程发送逻辑
go listener(args...)
}
return true
}
return false
}
// AddListener 向指定通道添加订阅者的消息监听器
func (e *EventEmitter) AddListener(messageType string, listener HandlerType) *EventEmitter {
e.mutex.Lock()
defer e.mutex.Unlock()
if e.events == nil {
e.events = map[string]map[int]HandlerType{}
}
addr := *(*int)(unsafe.Pointer(&listener))
if handler, ok := e.events[messageType]; ok {
handler[addr] = listener
e.events[messageType] = handler
} else {
e.events[messageType] = map[int]HandlerType{addr: listener}
}
return e
}
// On 向指定通道添加订阅者的消息监听器,同 AddListener
func (e *EventEmitter) On(messageType string, listener HandlerType) *EventEmitter {
return e.AddListener(messageType, listener)
}
// Once 添加只执行一次的监听器
func (e *EventEmitter) Once(messageType string, listener HandlerType) *EventEmitter {
fired := false
// 包装订阅者的监听器,当包装监听器得到执行时,立即删除自身,并执行订阅者的监听器
var wrapListener HandlerType
wrapListener = func(args ...string) {
e.RemoveListener(messageType, wrapListener)
if fired == false {
fired = true
listener(args...)
}
}
e.On(messageType, wrapListener)
return e
}
// RemoveListener 删除指定通道上的指定监听器
func (e *EventEmitter) RemoveListener(messageType string, listener HandlerType) *EventEmitter {
e.mutex.Lock()
defer e.mutex.Unlock()
if e.events == nil {
return e
}
addr := *(*int)(unsafe.Pointer(&listener))
if handler, ok := e.events[messageType]; ok {
if _, ok := handler[addr]; ok == false {
return e
}
if len(handler) == 1 {
delete(handler, addr)
delete(e.events, messageType)
} else {
delete(handler, addr)
}
}
return e
}
// RemoveAllListeners 删除指定通道上类所有监听器,当不指定时,删除所有通道上的所有监听器
func (e *EventEmitter) RemoveAllListeners(messageType string) *EventEmitter {
e.mutex.Lock()
defer e.mutex.Unlock()
if e.events == nil {
return e
}
if messageType == "" {
for key := range e.events {
e.RemoveAllListeners(key)
}
e.events = map[string]map[int]HandlerType{}
return e
}
listeners := e.events[messageType]
for addr := range listeners {
delete(listeners, addr)
}
delete(e.events, messageType)
return e
}
// Listeners ...
func (e *EventEmitter) Listeners(messageType string) map[int]HandlerType {
if e.events == nil {
return map[int]HandlerType{}
}
return e.events[messageType]
}
// ListenerCount ...
func (e *EventEmitter) ListenerCount(messageType string) int {
if e.events == nil {
return 0
}
return len(e.events[messageType])
}