/
clientSessionMonitor.go
executable file
·142 lines (107 loc) · 3.2 KB
/
clientSessionMonitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package fixacceptor
import (
"time"
"github.com/quickfixgo/quickfix"
)
/*
心跳及session监控
*/
func heartbeatWatcher(chExit <-chan int, cliSessnCtrl *clientSessionCtrlSt, acceptor *Acceptor) {
const ftag = "heartbeatWatcher()"
timeout := time.Duration(1) * time.Second
var (
heartbeatToSend []hbtUnitSt
cliToClose []cliCloseUnitSt
)
for {
select {
case <-chExit:
return
case <-time.After(timeout):
}
heartbeatToSend, cliToClose = cliSessnCtrl.checkClientHeartbeat()
// fmt.Printf("%s hbToSend=%v, cliToClose=%v",ftag, heartbeatToSend, cliToClose)
go sendHeartBeats(heartbeatToSend, acceptor)
go closeClients(cliToClose, acceptor)
}
}
// send heartbeats to clients
func sendHeartBeats(clients []hbtUnitSt, acceptor *Acceptor) {
const ftag = "sendHeartBeats()"
if 0 == len(clients) {
return
}
// incase of closed
select {
case <-acceptor.chExit:
return
default:
}
var (
cli hbtUnitSt
hbtMsg *quickfix.Message
err error
)
for _, cli = range clients {
// build heartbeat msg
hbtMsg, _ = buildHbtMsg(cli.beginstring, cli.senderCompID, cli.targetCompID, "")
//send to client
err = acceptor.SendToClient(cli.cid, hbtMsg)
if nil != err {
acceptor.appHdl.OnError("send heartbeat", err)
}
}
}
// close clients
func closeClients(clients []cliCloseUnitSt, acceptor *Acceptor) {
const ftag = "closeClients"
if 0 == len(clients) {
return
}
// in case of closed
select {
case <-acceptor.chExit:
return
default:
}
var (
err error
)
for _, cli := range clients {
switch cli.closeType {
case closeTypeHbtTimeOut:
// 心跳超时
// logout
logout1, _ := buildLogoutMsg(cli.beginstring, cli.senderCompID, cli.targetCompID, "hbt time out")
err = acceptor.SendToClient(cli.cid, logout1)
if nil != err {
acceptor.appHdl.OnError("send logout", err)
}
// logout in system
if nil != acceptor.cliSessnCtrl {
acceptor.cliSessnCtrl.logout(cli.cid)
}
// 之后会由session扫描自动登出
case closeTypeNotLogon:
// 建立TCP 连接之后,超过5 秒未完成登录;
// 在登录失败之后,未在5 秒内关闭连接;
// logout
logout2, _ := buildLogoutMsg(cli.beginstring, cli.senderCompID, cli.targetCompID, "not log on")
err = acceptor.SendToClient(cli.cid, logout2)
if nil != err {
acceptor.appHdl.OnError("send logout", err)
}
// logout in system
if nil != acceptor.cliSessnCtrl {
acceptor.cliSessnCtrl.logout(cli.cid)
}
// 之后会由session扫描自动登出
case closeTypeLogout:
// 已登出
acceptor.closeCli(cli.cid, "logout")
default:
acceptor.closeCli(cli.cid, "unknown")
}
}
return
}