-
Notifications
You must be signed in to change notification settings - Fork 0
/
event.go
100 lines (86 loc) · 2.09 KB
/
event.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package event
import (
"reflect"
"sync"
"time"
"github.com/davidfantasy/embedded-mqtt-broker/logger"
)
var DefaultEventBus = AsyncEventBus{
handlerMap: make(map[EventType][]eventHandlerWapper),
}
type Event struct {
EventType EventType
Ts int64
Data interface{}
}
type EventType int
type EventHandler func(event Event)
type EventBus interface {
Publish(event Event)
Subscribe(eventType EventType, handler EventHandler)
}
func NewEvent(eventType EventType, data any) Event {
return Event{EventType: eventType, Data: data, Ts: time.Now().UnixMilli()}
}
type eventHandlerWapper struct {
handler EventHandler
ch chan Event
}
func wrapHandler(handler EventHandler) eventHandlerWapper {
wrapped := eventHandlerWapper{handler: handler, ch: make(chan Event, 100)}
go func() {
for event := range wrapped.ch {
callHandler(event, wrapped.handler)
}
}()
return wrapped
}
func callHandler(event Event, handler EventHandler) {
defer func() {
if r := recover(); r != nil {
logger.ERROR.Printf("事件处理时发生异常:%v,%v\n", event, r)
}
}()
handler(event)
}
type AsyncEventBus struct {
handlerMap map[EventType][]eventHandlerWapper
}
var handlerMu sync.Mutex
func (bus *AsyncEventBus) Publish(event Event) {
handlerMu.Lock()
handlers := bus.handlerMap[event.EventType]
handlerMu.Unlock()
for _, handler := range handlers {
select {
case handler.ch <- event:
default:
logger.WARN.Printf("事件处理器被阻塞了:%v\n", event.EventType)
}
}
}
func (bus *AsyncEventBus) Subscribe(eventType EventType, handler EventHandler) {
if handler == nil {
logger.WARN.Println("事件处理函数为nil")
return
}
handlerMu.Lock()
defer handlerMu.Unlock()
wrappers := bus.handlerMap[eventType]
if wrappers == nil {
wrappers = make([]eventHandlerWapper, 0)
}
hp := reflect.ValueOf(handler).Pointer()
existed := false
for _, w := range wrappers {
if reflect.ValueOf(w.handler).Pointer() == hp {
existed = true
break
}
}
if !existed {
wrapped := wrapHandler(handler)
wrappers = append(wrappers, wrapped)
bus.handlerMap[eventType] = wrappers
}
}