-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
heartbeat.go
195 lines (156 loc) · 5.15 KB
/
heartbeat.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
package znet
import (
"fmt"
"time"
"github.com/aceld/zinx/ziface"
"github.com/aceld/zinx/zlog"
)
type HeartbeatChecker struct {
interval time.Duration // Heartbeat detection interval(心跳检测时间间隔)
quitChan chan bool // Quit signal(退出信号)
makeMsg ziface.HeartBeatMsgFunc //User-defined heartbeat message processing method(用户自定义的心跳检测消息处理方法)
onRemoteNotAlive ziface.OnRemoteNotAlive // User-defined method for handling remote connections that are not alive (用户自定义的远程连接不存活时的处理方法)
msgID uint32 // Heartbeat message ID(心跳的消息ID)
router ziface.IRouter // User-defined heartbeat message business processing router(用户自定义的心跳检测消息业务处理路由)
routerSlices []ziface.RouterHandler //(用户自定义的心跳检测消息业务处理新路由)
conn ziface.IConnection // Bound connection(绑定的链接)
beatFunc ziface.HeartBeatFunc // // User-defined heartbeat sending function(用户自定义心跳发送函数)
}
/*
Default callback routing business for receiving remote heartbeat messages
(收到remote心跳消息的默认回调路由业务)
*/
type HeatBeatDefaultRouter struct {
BaseRouter
}
func (r *HeatBeatDefaultRouter) Handle(req ziface.IRequest) {
zlog.Ins().DebugF("Recv Heartbeat from %s, MsgID = %+v, Data = %s",
req.GetConnection().RemoteAddr(), req.GetMsgID(), string(req.GetData()))
}
func HeatBeatDefaultHandle(req ziface.IRequest) {
zlog.Ins().DebugF("Recv Heartbeat from %s, MsgID = %+v, Data = %s",
req.GetConnection().RemoteAddr(), req.GetMsgID(), string(req.GetData()))
}
func makeDefaultMsg(conn ziface.IConnection) []byte {
msg := fmt.Sprintf("heartbeat [%s->%s]", conn.LocalAddr(), conn.RemoteAddr())
return []byte(msg)
}
func notAliveDefaultFunc(conn ziface.IConnection) {
zlog.Ins().InfoF("Remote connection %s is not alive, stop it", conn.RemoteAddr())
conn.Stop()
}
func NewHeartbeatChecker(interval time.Duration) ziface.IHeartbeatChecker {
heartbeat := &HeartbeatChecker{
interval: interval,
quitChan: make(chan bool),
// Use default heartbeat message generation function and remote connection not alive handling method
// (均使用默认的心跳消息生成函数和远程连接不存活时的处理方法)
makeMsg: makeDefaultMsg,
onRemoteNotAlive: notAliveDefaultFunc,
msgID: ziface.HeartBeatDefaultMsgID,
router: &HeatBeatDefaultRouter{},
routerSlices: []ziface.RouterHandler{HeatBeatDefaultHandle},
beatFunc: nil,
}
return heartbeat
}
func (h *HeartbeatChecker) SetOnRemoteNotAlive(f ziface.OnRemoteNotAlive) {
if f != nil {
h.onRemoteNotAlive = f
}
}
func (h *HeartbeatChecker) SetHeartbeatMsgFunc(f ziface.HeartBeatMsgFunc) {
if f != nil {
h.makeMsg = f
}
}
func (h *HeartbeatChecker) SetHeartbeatFunc(beatFunc ziface.HeartBeatFunc) {
if beatFunc != nil {
h.beatFunc = beatFunc
}
}
func (h *HeartbeatChecker) BindRouter(msgID uint32, router ziface.IRouter) {
if router != nil && msgID != ziface.HeartBeatDefaultMsgID {
h.msgID = msgID
h.router = router
}
}
func (h *HeartbeatChecker) BindRouterSlices(msgID uint32, handlers ...ziface.RouterHandler) {
if len(handlers) > 0 && msgID != ziface.HeartBeatDefaultMsgID {
h.msgID = msgID
h.routerSlices = append(h.routerSlices, handlers...)
}
}
func (h *HeartbeatChecker) start() {
ticker := time.NewTicker(h.interval)
for {
select {
case <-ticker.C:
h.check()
case <-h.quitChan:
ticker.Stop()
return
}
}
}
func (h *HeartbeatChecker) Start() {
go h.start()
}
func (h *HeartbeatChecker) Stop() {
zlog.Ins().InfoF("heartbeat checker stop, connID=%+v", h.conn.GetConnID())
h.quitChan <- true
}
func (h *HeartbeatChecker) SendHeartBeatMsg() error {
msg := h.makeMsg(h.conn)
err := h.conn.SendMsg(h.msgID, msg)
if err != nil {
zlog.Ins().ErrorF("send heartbeat msg error: %v, msgId=%+v msg=%+v", err, h.msgID, msg)
return err
}
return nil
}
func (h *HeartbeatChecker) check() (err error) {
if h.conn == nil {
return nil
}
if !h.conn.IsAlive() {
h.onRemoteNotAlive(h.conn)
} else {
if h.beatFunc != nil {
err = h.beatFunc(h.conn)
} else {
err = h.SendHeartBeatMsg()
}
}
return err
}
func (h *HeartbeatChecker) BindConn(conn ziface.IConnection) {
h.conn = conn
conn.SetHeartBeat(h)
}
// Clone clones to a specified connection
// (克隆到一个指定的链接上)
func (h *HeartbeatChecker) Clone() ziface.IHeartbeatChecker {
heartbeat := &HeartbeatChecker{
interval: h.interval,
quitChan: make(chan bool),
beatFunc: h.beatFunc,
makeMsg: h.makeMsg,
onRemoteNotAlive: h.onRemoteNotAlive,
msgID: h.msgID,
router: h.router,
conn: nil, // The bound connection needs to be reassigned
}
// deep copy routerSlices
heartbeat.routerSlices = append(heartbeat.routerSlices, h.routerSlices...)
return heartbeat
}
func (h *HeartbeatChecker) MsgID() uint32 {
return h.msgID
}
func (h *HeartbeatChecker) Router() ziface.IRouter {
return h.router
}
func (h *HeartbeatChecker) RouterSlices() []ziface.RouterHandler {
return h.routerSlices
}