/
p2p.go
82 lines (73 loc) · 2.07 KB
/
p2p.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package server
import (
"encoding/json"
service2 "github.com/flowshield/flowshield/fullnode/app/v1/access/service"
"github.com/flowshield/flowshield/fullnode/app/v1/node/service"
"github.com/flowshield/flowshield/fullnode/pkg/confer"
"github.com/flowshield/flowshield/fullnode/pkg/logger"
"github.com/flowshield/flowshield/fullnode/pkg/p2p"
"github.com/flowshield/flowshield/fullnode/pkg/schema"
"github.com/tidwall/gjson"
)
func runP2P(cfg *confer.P2P) error {
// Create a new P2PHost
if err := p2p.InitP2P(cfg); err != nil {
return err
}
go startEventHandler(p2p.GetPubSub())
return nil
}
func startEventHandler(ps *p2p.PubSub) {
//ticker := time.NewTicker(time.Second * 10)
//defer ticker.Stop()
//info := NewServerInfo(ps.Host)
for {
select {
case msg := <-ps.Inbound:
//p2p.Generate(msg.Message)
HandleMessage(msg.Message)
//case <-ticker.C:
// // publish
// ps.Outbound <- json.MarshalToString(info)
case logData := <-ps.Logs:
logger.Errorf(nil, "p2p receive error: %s", logData.String())
}
}
}
func HandleMessage(message string) {
// 判断消息类型,是属于节点通信,还是订单通信
messageType := gjson.Get(message, "type")
switch messageType.String() {
case "node":
// 节点通信
service.AddNode(nil, generateNode(gjson.Get(message, "data").String()))
case "order":
service2.AcceptClientOrder(nil, generateClient(gjson.Get(message, "data").String()))
default:
}
}
func generateNode(node string) (server *schema.ServerInfo) {
_ = json.Unmarshal([]byte(node), &server)
return
}
func generateClient(client string) (info *schema.ClientP2P) {
_ = json.Unmarshal([]byte(client), &info)
return
}
//func NewServerInfo(p *p2p.P2P) (server *schema.ServerInfo) {
// server = &schema.ServerInfo{
// PeerId: confer.GlobalConfig().P2P.Account,
// Type: schema.FullNode,
// }
// trace, err := util.GetCftrace()
// if err != nil {
// logger.Warnf(nil, "Request Cfssl CDN Trace Error:%s", err)
// } else {
// server.MetaData = schema.MetaData{
// Ip: trace.Ip,
// Loc: trace.Loc,
// Colo: trace.Colo,
// }
// }
// return
//}