-
Notifications
You must be signed in to change notification settings - Fork 16
/
default_impl.go
133 lines (110 loc) · 3.14 KB
/
default_impl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package messaging
import (
"github.com/glide-im/glide/pkg/gate"
"github.com/glide-im/glide/pkg/logger"
"github.com/glide-im/glide/pkg/messages"
"github.com/glide-im/glide/pkg/subscription"
"github.com/panjf2000/ants/v2"
)
type Options struct {
NotifyServerError bool
MaxMessageConcurrency int
}
func onMessageHandlerPanic(i interface{}) {
logger.E("MessageInterfaceImpl panic: %v", i)
}
// MessageInterfaceImpl default implementation of the messaging interface.
type MessageInterfaceImpl struct {
// execPool 100 capacity goroutine pool, 假设每个消息处理需要10ms, 一个协程则每秒能处理100条消息
execPool *ants.Pool
// hc message handler chain
hc *handlerChain
subscription subscription.Interface
gate gate.Gateway
// notifyOnSrvErr notify client on server error
notifyOnSrvErr bool
}
func NewDefaultImpl(options *Options) (*MessageInterfaceImpl, error) {
ret := MessageInterfaceImpl{
notifyOnSrvErr: options.NotifyServerError,
hc: &handlerChain{},
}
var err error
ret.execPool, err = ants.NewPool(
options.MaxMessageConcurrency,
ants.WithNonblocking(true),
ants.WithPanicHandler(onMessageHandlerPanic),
ants.WithPreAlloc(false),
)
if err != nil {
return nil, err
}
return &ret, nil
}
func (d *MessageInterfaceImpl) Handle(cInfo *gate.Info, msg *messages.GlideMessage) error {
if !msg.GetAction().IsInternal() {
msg.From = cInfo.ID.UID()
}
logger.D("handle message: %s", msg)
err := d.execPool.Submit(func() {
handled := d.hc.handle(d, cInfo, msg)
if !handled {
if !msg.GetAction().IsInternal() {
r := messages.NewMessage(msg.GetSeq(), messages.ActionNotifyUnknownAction, msg.GetAction())
_ = d.gate.EnqueueMessage(cInfo.ID, r)
}
logger.W("action is not handled: %s", msg.GetAction())
}
})
if err != nil {
d.OnHandleMessageError(cInfo, msg, err)
return err
}
return nil
}
func (d *MessageInterfaceImpl) AddHandler(i MessageHandler) {
d.hc.add(i)
}
func (d *MessageInterfaceImpl) SetGate(g gate.Gateway) {
d.gate = g
}
func (d *MessageInterfaceImpl) SetSubscription(g subscription.Interface) {
d.subscription = g
}
func (d *MessageInterfaceImpl) SetNotifyErrorOnServer(enable bool) {
d.notifyOnSrvErr = enable
}
func (d *MessageInterfaceImpl) GetClientInterface() gate.Gateway {
return d.gate
}
func (d *MessageInterfaceImpl) GetGroupInterface() subscription.Interface {
return d.subscription
}
func (d *MessageInterfaceImpl) OnHandleMessageError(cInfo *gate.Info, msg *messages.GlideMessage, err error) {
if d.notifyOnSrvErr {
_ = d.gate.EnqueueMessage(cInfo.ID, messages.NewMessage(-1, messages.ActionNotifyError, err.Error()))
}
}
// handlerChain is a chain of MessageHandlers.
type handlerChain struct {
h MessageHandler
next *handlerChain
}
func (hc *handlerChain) add(i MessageHandler) {
if hc.next == nil {
hc.next = &handlerChain{
h: i,
}
} else {
hc.next.add(i)
}
}
func (hc handlerChain) handle(h2 *MessageInterfaceImpl, cliInfo *gate.Info, message *messages.GlideMessage) bool {
if hc.h != nil && hc.h.Handle(h2, cliInfo, message) {
return true
}
if hc.next != nil {
return hc.next.handle(h2, cliInfo, message)
}
return false
}