-
Notifications
You must be signed in to change notification settings - Fork 255
/
discovery.go
216 lines (180 loc) · 5.71 KB
/
discovery.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
package net
import (
"context"
"errors"
"fmt"
"time"
"github.com/33cn/chain33/types"
protocol "github.com/libp2p/go-libp2p-core/protocol"
p2pty "github.com/33cn/chain33/system/p2p/dht/types"
opts "github.com/libp2p/go-libp2p-kad-dht/opts"
kbt "github.com/libp2p/go-libp2p-kbucket"
"github.com/33cn/chain33/common/log/log15"
coredis "github.com/libp2p/go-libp2p-core/discovery"
host "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
discovery "github.com/libp2p/go-libp2p-discovery"
dht "github.com/libp2p/go-libp2p-kad-dht"
)
var (
log = log15.New("module", "p2p.dht")
)
const (
// Deprecated 老版本的协议,仅做兼容,TODO 后面升级后移除
classicDhtProtoID = "/ipfs/kad/%s/1.0.0/%d"
dhtProtoID = "/%s-%d/kad/1.0.0" //title-channel/kad/1.0.0
)
// Discovery dht discovery
type Discovery struct {
kademliaDHT *dht.IpfsDHT
RoutingDiscovery *discovery.RoutingDiscovery
mdnsService *mdns
ctx context.Context
subCfg *p2pty.P2PSubConfig
bootstrapnodes []peer.AddrInfo
host host.Host
}
// InitDhtDiscovery init dht discovery
func InitDhtDiscovery(ctx context.Context, host host.Host, peersInfo []peer.AddrInfo, chainCfg *types.Chain33Config, subCfg *p2pty.P2PSubConfig) *Discovery {
// Make the DHT,不同的ID进入不同的网络。
//如果不修改DHTProto 则有可能会连入IPFS网络,dhtproto=/ipfs/kad/1.0.0
d := new(Discovery)
opt := opts.Protocols(protocol.ID(fmt.Sprintf(dhtProtoID, chainCfg.GetTitle(), subCfg.Channel)),
protocol.ID(fmt.Sprintf(classicDhtProtoID, chainCfg.GetTitle(), subCfg.Channel)))
kademliaDHT, err := dht.New(ctx, host, opt)
if err != nil {
panic(err)
}
d.kademliaDHT = kademliaDHT
d.ctx = ctx
d.bootstrapnodes = peersInfo
d.subCfg = subCfg
d.host = host
return d
}
//Start the dht
func (d *Discovery) Start() {
//连接内置种子,以及addrbook存储的节点
initInnerPeers(d.host, d.bootstrapnodes, d.subCfg)
// Bootstrap the DHT. In the default configuration, this spawns a Background
// thread that will refresh the peer table every five minutes.
if err := d.kademliaDHT.Bootstrap(d.ctx); err != nil {
//panic(err)
log.Error("Bootstrap", "err", err.Error())
}
d.RoutingDiscovery = discovery.NewRoutingDiscovery(d.kademliaDHT)
}
//Close close the dht
func (d *Discovery) Close() error {
if d.kademliaDHT != nil {
return d.kademliaDHT.Close()
}
return nil
}
// FindPeers find peers
func (d *Discovery) FindPeers(RendezvousString string, gossip bool) ([]peer.AddrInfo, error) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if gossip {
discovery.Advertise(ctx, d.RoutingDiscovery, RendezvousString)
}
addrinfos, err := discovery.FindPeers(ctx, d.RoutingDiscovery, RendezvousString, coredis.Limit(100))
//peerChan, err := d.routingDiscovery.FindPeers(context.Background(), RendezvousString)
if err != nil {
//panic(err)
log.Error("FindPeers", "err", err.Error())
return nil, err
}
return addrinfos, nil
}
// FindLANPeers 查找局域网内的其他节点
func (d *Discovery) FindLANPeers(host host.Host, serviceTag string) (<-chan peer.AddrInfo, error) {
mdns, err := initMDNS(d.ctx, host, serviceTag)
if err != nil {
return nil, err
}
d.mdnsService = mdns
return d.mdnsService.PeerChan(), nil
}
// CloseFindLANPeers close peers
func (d *Discovery) CloseFindLANPeers() {
if d.mdnsService != nil {
d.mdnsService.Service.Close()
}
}
// ListPeers routingTable 路由表的节点信息
func (d *Discovery) ListPeers() []peer.ID {
if d.kademliaDHT == nil {
return nil
}
return d.kademliaDHT.RoutingTable().ListPeers()
}
// RoutingTableSize routingTable size
func (d *Discovery) RoutingTableSize() int {
if d.kademliaDHT == nil {
return 0
}
return d.kademliaDHT.RoutingTable().Size()
}
// FindSpecialPeer 根据指定的peerID ,查找指定的peer,
func (d *Discovery) FindSpecialPeer(pid peer.ID) (*peer.AddrInfo, error) {
if d.kademliaDHT == nil {
return nil, errors.New("empty ptr")
}
ctx := context.Background()
pctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
peerInfo, err := d.kademliaDHT.FindPeer(pctx, pid)
if err != nil {
return nil, err
}
return &peerInfo, nil
}
// FindLocalPeer 根据pid 查找当前DHT内部的peer信息
func (d *Discovery) FindLocalPeer(pid peer.ID) peer.AddrInfo {
if d.kademliaDHT == nil {
return peer.AddrInfo{}
}
return d.kademliaDHT.FindLocal(pid)
}
// FindLocalPeers find local peers
func (d *Discovery) FindLocalPeers(pids []peer.ID) []peer.AddrInfo {
var addrinfos []peer.AddrInfo
for _, pid := range pids {
addrinfos = append(addrinfos, d.FindLocalPeer(pid))
}
return addrinfos
}
// FindPeersConnectedToPeer 获取连接指定的peerId的peers信息,查找连接PID=A的所有节点
func (d *Discovery) FindPeersConnectedToPeer(pid peer.ID) (<-chan *peer.AddrInfo, error) {
if d.kademliaDHT == nil {
return nil, errors.New("empty ptr")
}
ctx := context.Background()
pctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
return d.kademliaDHT.FindPeersConnectedToPeer(pctx, pid)
}
// Update update peer
func (d *Discovery) Update(pid peer.ID) error {
_, err := d.kademliaDHT.RoutingTable().Update(pid)
return err
}
// FindNearestPeers find nearest peers
func (d *Discovery) FindNearestPeers(pid peer.ID, count int) []peer.ID {
if d.kademliaDHT == nil {
return nil
}
return d.kademliaDHT.RoutingTable().NearestPeers(kbt.ConvertPeerID(pid), count)
}
// Remove remove peer
func (d *Discovery) Remove(pid peer.ID) {
if d.kademliaDHT == nil {
return
}
d.kademliaDHT.RoutingTable().Remove(pid)
}
// RoutingTable get routing table
func (d *Discovery) RoutingTable() *kbt.RoutingTable {
return d.kademliaDHT.RoutingTable()
}