-
Notifications
You must be signed in to change notification settings - Fork 10
/
p2p.go
396 lines (335 loc) · 12.7 KB
/
p2p.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
/*
* @Author: gitsrc
* @Date: 2022-05-24 14:01:14
* @LastEditors: gitsrc
* @LastEditTime: 2022-05-25 13:33:06
* @FilePath: /peerchat/src/p2p.go
*/
package p2p
import (
"context"
"crypto/rand"
"crypto/sha256"
"sync"
"sync/atomic"
"time"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/routing"
"github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p-pubsub"
discoveryRouting "github.com/libp2p/go-libp2p/p2p/discovery/routing"
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
tls "github.com/libp2p/go-libp2p/p2p/security/tls"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
"github.com/mr-tron/base58/base58"
"github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multihash"
"github.com/sirupsen/logrus"
)
// P2P A structure that represents a P2P Host
type P2P struct {
// Represents the host context layer
Ctx context.Context
// Represents the libp2p host
Host host.Host
// Represents the DHT routing table
KadDHT *dht.IpfsDHT
// Represents the peer discovery service
Discovery *discoveryRouting.RoutingDiscovery
// Represents the PubSub Handler
PubSub *pubsub.PubSub
service string
}
/*
A constructor function that generates and returns a P2P object.
Constructs a libp2p host with TLS encrypted secure transportation that works over a TCP
transport connection using a Yamux Stream Multiplexer and uses UPnP for the NAT traversal.
A Kademlia DHT is then bootstrapped on this host using the default peers offered by libp2p
and a Peer Discovery service is created from this Kademlia DHT. The PubSub handler is then
created on the host using the peer discovery service created prior.
*/
func NewP2P(serviceName string) *P2P {
// Setup a background context
ctx := context.Background()
// Setup a P2P Host Node :创建 p2p host
nodehost, kaddht := setupHost(ctx)
// Debug log
logrus.Debugln("Created the P2P Host and the Kademlia DHT.")
// Bootstrap the Kad DHT :根据DHT启动节点
bootstrapDHT(ctx, nodehost, kaddht)
// Debug log
logrus.Debugln("Bootstrapped the Kademlia DHT and Connected to Bootstrap Peers")
// Create a peer discovery service using the Kad DHT : 创建一个节点路由发现方式
routingdiscovery := discoveryRouting.NewRoutingDiscovery(kaddht)
// Debug log
logrus.Debugln("Created the Peer Discovery Service.")
// Create a PubSub handler with the routing discovery:根据节点路由发现机制创建一个PubSub句柄
pubsubhandler := setupPubSub(ctx, nodehost, routingdiscovery)
// Debug log
logrus.Debugln("Created the PubSub Handler.")
// Return the P2P object
return &P2P{
Ctx: ctx,
Host: nodehost,
KadDHT: kaddht,
Discovery: routingdiscovery,
PubSub: pubsubhandler,
service: serviceName,
}
}
// A method of P2P to connect to service peers.
// This method uses the Advertise() functionality of the Peer Discovery Service
// to advertise the service and then disovers all peers advertising the same.
// The peer discovery is handled by a go-routine that will read from a channel
// of peer address information until the peer channel closes
func (p2p *P2P) AdvertiseConnect() {
// Advertise the availabilty of the service on this node
ttl, err := p2p.Discovery.Advertise(p2p.Ctx, p2p.service)
// Debug log
logrus.Debugln("Advertised the PeerChat Service.")
// Sleep to give time for the advertisment to propogate
time.Sleep(time.Second * 5)
// Debug log
logrus.Debugf("Service Time-to-Live is %s", ttl)
// Find all peers advertising the same service
peerchan, err := p2p.Discovery.FindPeers(p2p.Ctx, p2p.service)
// Handle any potential error
if err != nil {
logrus.WithFields(logrus.Fields{
"error": err.Error(),
}).Fatalln("P2P Peer Discovery Failed!")
}
// Trace log
logrus.Traceln("Discovered PeerChat Service Peers.")
// Connect to peers as they are discovered
go handlePeerDiscovery(p2p.Host, peerchan)
// Trace log
logrus.Traceln("Started Peer Connection Handler.")
}
// A method of P2P to connect to service peers.
// This method uses the Provide() functionality of the Kademlia DHT directly to announce
// the ability to provide the service and then disovers all peers that provide the same.
// The peer discovery is handled by a go-routine that will read from a channel
// of peer address information until the peer channel closes
func (p2p *P2P) AnnounceConnect() {
// Generate the Service CID
cidvalue := generateCID(p2p.service)
// Trace log
logrus.Debug("cidvalue ", cidvalue.String())
logrus.Traceln("Generated the Service CID.")
// Announce that this host can provide the service CID
err := p2p.KadDHT.Provide(p2p.Ctx, cidvalue, true)
if err != nil {
logrus.WithFields(logrus.Fields{
"error": err.Error(),
}).Fatalln("Failed to Announce Service CID!")
}
// Debug log
logrus.Debugln("Announced the PeerChat Service.")
// Sleep to give time for the advertisment to propogate
time.Sleep(time.Second * 5)
// Find the other providers for the service CID
peerchan := p2p.KadDHT.FindProvidersAsync(p2p.Ctx, cidvalue, 0)
// Trace log
logrus.Traceln("Discovered PeerChat Service Peers.")
// Connect to peers as they are discovered
go handlePeerDiscovery(p2p.Host, peerchan)
// Debug log
logrus.Debugln("Started Peer Connection Handler.")
}
// A function that generates the p2p configuration options and creates a
// libp2p host object for the given context. The created host is returned
func setupHost(ctx context.Context) (host.Host, *dht.IpfsDHT) {
// Set up the host identity options
prvkey, pubkey, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, rand.Reader)
// Handle any potential error
if err != nil {
logrus.WithFields(logrus.Fields{
"error": err.Error(),
}).Fatalln("Failed to Generate P2P Identity Configuration!")
}
_ = pubkey
identity := libp2p.Identity(prvkey)
// Trace log
logrus.Traceln("Generated P2P Identity Configuration.")
// Set up TLS secured TCP transport and options
tlstransport, err := tls.New(prvkey)
security := libp2p.Security(tls.ID, tlstransport)
transport := libp2p.Transport(tcp.NewTCPTransport)
// Handle any potential error
if err != nil {
logrus.WithFields(logrus.Fields{
"error": err.Error(),
}).Fatalln("Failed to Generate P2P Security and Transport Configurations!")
}
// Trace log
logrus.Traceln("Generated P2P Security and Transport Configurations.")
// Set up host listener address options
muladdr, err := multiaddr.NewMultiaddr("/ip4/0.0.0.0/tcp/0")
listen := libp2p.ListenAddrs(muladdr)
// Handle any potential error
if err != nil {
logrus.WithFields(logrus.Fields{
"error": err.Error(),
}).Fatalln("Failed to Generate P2P Address Listener Configuration!")
}
// Trace log
logrus.Traceln("Generated P2P Address Listener Configuration.")
// Set up the stream multiplexer and connection manager options
muxer := libp2p.Muxer("/yamux/1.0.0", yamux.DefaultTransport)
basicConnMgr, err := connmgr.NewConnManager(100, 400, connmgr.WithGracePeriod(time.Minute))
if err != nil {
logrus.WithFields(logrus.Fields{
"error": err.Error(),
}).Fatalln("Failed to NewConnManager!")
}
conn := libp2p.ConnectionManager(basicConnMgr)
// Trace log
logrus.Traceln("Generated P2P Stream Multiplexer, Connection Manager Configurations.")
// Setup NAT traversal and relay options
nat := libp2p.NATPortMap()
relay := libp2p.EnableAutoRelay()
// Trace log
logrus.Traceln("Generated P2P NAT Traversal and Relay Configurations.")
// Declare a KadDHT
var kaddht *dht.IpfsDHT
// Setup a routing configuration with the KadDHT
//定义节点路由函数,设置节点发现函数
routingOpt := libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
kaddht = setupKadDHT(ctx, h)
return kaddht, err
})
// Trace log
logrus.Traceln("Generated P2P Routing Configurations.")
opts := libp2p.ChainOptions(identity, listen, security, transport, muxer, conn, nat, routingOpt, relay)
// Construct a new libP2P host with the created options
libhost, err := libp2p.New(opts)
// Handle any potential error
if err != nil {
logrus.WithFields(logrus.Fields{
"error": err.Error(),
}).Fatalln("Failed to Create the P2P Host!")
}
// Return the created host and the kademlia DHT
return libhost, kaddht
}
// A function that generates a Kademlia DHT object and returns it
func setupKadDHT(ctx context.Context, nodehost host.Host) *dht.IpfsDHT {
// Create DHT server mode option
dhtmode := dht.Mode(dht.ModeServer)
// Rertieve the list of boostrap peer addresses
bootstrappeers := dht.GetDefaultBootstrapPeerAddrInfos()
// Create the DHT bootstrap peers option
dhtpeers := dht.BootstrapPeers(bootstrappeers...)
// Trace log
logrus.Traceln("Generated DHT Configuration.")
// Start a Kademlia DHT on the host in server mode
kaddht, err := dht.New(ctx, nodehost, dhtmode, dhtpeers)
// Handle any potential error
if err != nil {
logrus.WithFields(logrus.Fields{
"error": err.Error(),
}).Fatalln("Failed to Create the Kademlia DHT!")
}
// Return the KadDHT
return kaddht
}
// A function that generates a PubSub Handler object and returns it
// Requires a node host and a routing discovery service.
func setupPubSub(ctx context.Context, nodehost host.Host, routingdiscovery *discoveryRouting.RoutingDiscovery) *pubsub.PubSub {
// Create a new PubSub service which uses a GossipSub router
pubsubhandler, err := pubsub.NewGossipSub(ctx, nodehost, pubsub.WithDiscovery(routingdiscovery))
// Handle any potential error
if err != nil {
logrus.WithFields(logrus.Fields{
"error": err.Error(),
"type": "GossipSub",
}).Fatalln("PubSub Handler Creation Failed!")
}
// Return the PubSub handler
return pubsubhandler
}
// A function that bootstraps a given Kademlia DHT to satisfy the IPFS router
// interface and connects to all the bootstrap peers provided by libp2p
func bootstrapDHT(ctx context.Context, nodehost host.Host, kaddht *dht.IpfsDHT) {
// Bootstrap the DHT to satisfy the IPFS Router interface
if err := kaddht.Bootstrap(ctx); err != nil {
logrus.WithFields(logrus.Fields{
"error": err.Error(),
}).Fatalln("Failed to Bootstrap the Kademlia!")
}
// Trace log
logrus.Traceln("Set the Kademlia DHT into Bootstrap Mode.")
// Declare a WaitGroup
var wg sync.WaitGroup
// Declare counters for the number of bootstrap peers
var connectedbootpeers int32
var totalbootpeers int32
// Iterate over the default bootstrap peers provided by libp2p
for _, peeraddr := range dht.DefaultBootstrapPeers {
// Retrieve the peer address information
peerinfo, _ := peer.AddrInfoFromP2pAddr(peeraddr)
// Incremenent waitgroup counter
wg.Add(1)
totalbootpeers++
// Start a goroutine to connect to each bootstrap peer
go func() {
// Defer the waitgroup decrement
defer wg.Done()
// Attempt to connect to the bootstrap peer
if err := nodehost.Connect(ctx, *peerinfo); err == nil {
// Increment the connected bootstrap peer count
atomic.AddInt32(&connectedbootpeers, 1)
}
}()
}
// Wait for the waitgroup to complete
wg.Wait()
// Log the number of bootstrap peers connected
logrus.Debugf("Connected to %d out of %d Bootstrap Peers.", connectedbootpeers, totalbootpeers)
}
// A function that connects the given host to all peers recieved from a
// channel of peer address information. Meant to be started as a go routine.
func handlePeerDiscovery(nodehost host.Host, peerchan <-chan peer.AddrInfo) {
// Iterate over the peer channel
for peer := range peerchan {
// Ignore if the discovered peer is the host itself
if peer.ID == nodehost.ID() {
continue
}
// Connect to the peer
err := nodehost.Connect(context.Background(), peer)
if err != nil {
logrus.Debugln("p2p peer connection failed: ", err)
}
logrus.Debugln("p2p peer connection success: ", peer.ID)
}
}
// A function that generates a CID object for a given string and returns it.
// Uses SHA256 to hash the string and generate a multihash from it.
// The mulithash is then base58 encoded and then used to create the CID
func generateCID(namestring string) cid.Cid {
// Hash the service content ID with SHA256
hash := sha256.Sum256([]byte(namestring))
// Append the hash with the hashing codec ID for SHA2-256 (0x12),
// the digest size (0x20) and the hash of the service content ID
finalhash := append([]byte{0x12, 0x20}, hash[:]...)
// Encode the fullhash to Base58
b58string := base58.Encode(finalhash)
// Generate a Multihash from the base58 string
mulhash, err := multihash.FromB58String(string(b58string))
if err != nil {
logrus.WithFields(logrus.Fields{
"error": err.Error(),
}).Fatalln("Failed to Generate Service CID!")
}
// Generate a CID from the Multihash
cidvalue := cid.NewCidV1(12, mulhash)
// Return the CID
return cidvalue
}