/
proc.go
84 lines (61 loc) · 1.75 KB
/
proc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package relay
import (
"github.com/davyxu/cellnet"
)
// 处理入站的relay消息
func ResoleveInboundEvent(inputEvent cellnet.Event) (ouputEvent cellnet.Event, handled bool, err error) {
switch relayMsg := inputEvent.Message().(type) {
case *RelayACK:
var payload, passThrough interface{}
payload, passThrough, err = relayMsg.Decode()
if err != nil {
return inputEvent, false, err
}
if log.IsDebugEnabled() {
peerInfo := inputEvent.Session().Peer().(cellnet.PeerProperty)
log.Debugf("#relay.recv(%s)@%d len: %d %s passThrough: '%+v' | %s",
peerInfo.Name(),
inputEvent.Session().ID(),
cellnet.MessageSize(relayMsg),
cellnet.MessageToName(payload),
passThrough,
cellnet.MessageToString(payload))
}
ev := &RecvMsgEvent{
Ses: inputEvent.Session(),
Msg: payload,
PassThrough: passThrough,
}
if bcFunc != nil {
// 转到对应线程中调用
cellnet.SessionQueuedCall(inputEvent.Session(), func() {
bcFunc(ev)
})
}
return ev, true, nil
}
return inputEvent, false, nil
}
// 处理relay.Relay出站消息的日志
func ResolveOutboundEvent(inputEvent cellnet.Event) (handled bool, err error) {
switch relayMsg := inputEvent.Message().(type) {
case *RelayACK:
if log.IsDebugEnabled() {
var payload, passThrough interface{}
payload, passThrough, err = relayMsg.Decode()
if err != nil {
return false, err
}
peerInfo := inputEvent.Session().Peer().(cellnet.PeerProperty)
log.Debugf("#relay.send(%s)@%d len: %d %s passThrough: '%+v' | %s",
peerInfo.Name(),
inputEvent.Session().ID(),
cellnet.MessageSize(relayMsg),
cellnet.MessageToName(payload),
passThrough,
cellnet.MessageToString(payload))
return true, nil
}
}
return
}