/
p2pChat.go
76 lines (69 loc) · 2.66 KB
/
p2pChat.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package dht
import (
"errors"
"github.com/curltech/go-colla-core/logger"
"github.com/curltech/go-colla-node/consensus/std"
"github.com/curltech/go-colla-node/libp2p/global"
"github.com/curltech/go-colla-node/libp2p/ns"
handler1 "github.com/curltech/go-colla-node/libp2p/pipe/handler"
"github.com/curltech/go-colla-node/p2p/chain/action"
"github.com/curltech/go-colla-node/p2p/chain/handler"
"github.com/curltech/go-colla-node/p2p/chain/handler/sender"
"github.com/curltech/go-colla-node/p2p/dht/entity"
"github.com/curltech/go-colla-node/p2p/dht/service"
"github.com/curltech/go-colla-node/p2p/msg"
"github.com/curltech/go-colla-node/p2p/msgtype"
)
type p2pChatAction struct {
action.BaseAction
}
var P2pChatAction p2pChatAction
/**
接收消息进行处理,返回为空则没有返回消息,否则,有返回消息
*/
func (this *p2pChatAction) Receive(chainMessage *msg.ChainMessage) (*msg.ChainMessage, error) {
logger.Sugar.Infof("Receive %v message", this.MsgType)
var response *msg.ChainMessage = nil
targetPeerId := handler1.GetPeerId(chainMessage.TargetPeerId)
key := ns.GetPeerClientKey(targetPeerId)
peerClients, err := service.GetPeerClientService().GetLocals(key, "")
if err != nil || len(peerClients) == 0 {
peerClients, err = service.GetPeerClientService().GetValues(targetPeerId, "")
}
if err != nil {
response = handler.Error(chainMessage.MessageType, err)
return response, nil
}
if len(peerClients) == 0 {
response = handler.Error(chainMessage.MessageType, errors.New("NUllPeerClients"))
return response, nil
}
sent := false
for _, peerClient := range peerClients {
if peerClient.ActiveStatus == entity.ActiveStatus_Up {
// 如果PeerClient的连接节点是自己,下一步就是最终目标,将目标会话放入消息中
sent = true
if global.IsMyself(peerClient.ConnectPeerId) {
chainMessage.TargetConnectSessionId = peerClient.ConnectSessionId
chainMessage.TargetConnectPeerId = peerClient.ConnectPeerId
chainMessage.ConnectPeerId = chainMessage.TargetPeerId
} else { // 否则下一步就是连接节点
chainMessage.TargetConnectSessionId = peerClient.ConnectSessionId
chainMessage.TargetConnectPeerId = peerClient.ConnectPeerId
chainMessage.ConnectPeerId = peerClient.ConnectPeerId
}
go sender.SendCM(chainMessage)
}
}
if sent == false {
handler.Decrypt(chainMessage)
response, _ = std.GetStdConsensus().ReceiveConsensus(chainMessage)
return response, nil
}
return nil, nil
}
func init() {
P2pChatAction = p2pChatAction{}
P2pChatAction.MsgType = msgtype.P2PCHAT
handler.RegistChainMessageHandler(msgtype.P2PCHAT, P2pChatAction.Send, P2pChatAction.Receive, P2pChatAction.Response)
}