/
recver.go
126 lines (118 loc) · 2.71 KB
/
recver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// 网关接收器
package gateway
import (
"errors"
"fmt"
"reflect"
"time"
"github.com/gfandada/gserver/network"
Services "github.com/gfandada/gserver/services"
)
type gaterecv struct {
sess *Session
in <-chan []byte
out *gatesend
config *network.Config
one_min_timer *time.Timer
router *router
}
func (gr *gaterecv) run() {
sess := gr.sess
out := gr.out
one_min_timer := time.NewTimer(time.Minute)
defer func() {
close(sess.Die)
if sess.Stream != nil {
sess.Stream.CloseSend()
}
}()
for {
select {
case data, ok := <-gr.in:
if !ok {
return
}
gr.clientToGate(data)
case frame := <-sess.MQ:
switch frame.Type {
case network.Data_Message:
out.send(frame.Message)
case network.Data_Kick:
sess.Flag |= SESS_AUTHFAILED
}
case <-one_min_timer.C:
one_min_timer.Reset(time.Minute)
gr.one_timer_work()
case <-sess.Die:
sess.Flag |= SESS_AUTHFAILED
}
if sess.Flag&SESS_AUTHFAILED != 0 {
return
}
}
}
func (gr *gaterecv) clientToGate(data []byte) {
defer func() {
if r := recover(); r != nil {
switch reflect.TypeOf(r).Name() {
case "int":
gr.out.send(Services.NewLogicError(r.(int)))
default:
gr.out.send(Services.NewInError(fmt.Errorf("%v", r)))
}
}
}()
gr.sess.PacketCount++
gr.sess.PacketCountOneMin++
gr.sess.PacketTime = time.Now()
gr.sess.LastPacketTime = gr.sess.PacketTime
id, msg, err := gr.check(data)
if err == nil {
if result := gr.router.router(id, msg); result != nil {
gr.out.send(result)
}
} else {
gr.out.send(Services.NewInError(err))
}
return
}
func (gr *gaterecv) one_timer_work() {
defer func() {
gr.sess.PacketCountOneMin = 0
}()
if gr.sess.PacketCountOneMin > gr.config.Rpm {
gr.sess.Flag |= SESS_AUTHFAILED
}
}
func (gr *gaterecv) check(data []byte) (uint16, []byte, error) {
seq, id, msg, err := gr.config.Parser.ReadBodyFull(data)
if err != nil {
gr.sess.Flag |= SESS_AUTHFAILED
return 0, nil, errors.New("ReadBodyFull is wrong")
}
if seq != gr.sess.PacketCount {
gr.sess.Flag |= SESS_AUTHFAILED
return 0, nil, errors.New("seq is wrong")
}
return id, msg, nil
}
// 构建GateRecv处理器
// @params sess:会话 in:处理client->gateway out:处理gateway->client config:配置参数
// @return GateRecv处理器
func startRecver(sess *Session, in <-chan []byte, out *gatesend, config *network.Config) *gaterecv {
if sess == nil || out == nil || config == nil {
return nil
}
sess.MQ = make(chan network.Data_Frame, config.AsyncMQ)
sess.ConnectTime = time.Now()
sess.LastPacketTime = time.Now()
gr := &gaterecv{
sess: sess,
in: in,
out: out,
config: config,
router: startRouter(sess, config),
}
go gr.run()
return gr
}