-
Notifications
You must be signed in to change notification settings - Fork 8
/
lp2p.go
130 lines (111 loc) 路 3.36 KB
/
lp2p.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package lp2p
import (
"context"
"fmt"
"sync"
"time"
hook "github.com/alanshaw/ipfs-hookds"
"github.com/alanshaw/libp2p-dht-scrape-aas/version"
ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
dht "github.com/libp2p/go-libp2p-kad-dht"
noise "github.com/libp2p/go-libp2p-noise"
"github.com/libp2p/go-libp2p-peerstore/pstoreds"
quic "github.com/libp2p/go-libp2p-quic-transport"
tls "github.com/libp2p/go-libp2p-tls"
"github.com/libp2p/go-tcp-transport"
ws "github.com/libp2p/go-ws-transport"
"github.com/multiformats/go-base32"
"github.com/multiformats/go-multiaddr"
secio "github.com/libp2p/go-libp2p-secio"
)
var (
log = logging.Logger("dht_scrape_aas_lp2p")
peersRoot = ds.NewKey("/peers")
)
// PeerUpdatedF is a function called when a peer is updated in the peerstore.
type PeerUpdatedF func(peerstore.Peerstore, peer.ID)
// New creates a new libp2p host and DHT for use by a scraper.
func New(ctx context.Context, bootstrapAddrs []string, peerUpdated PeerUpdatedF) (host.Host, *dht.IpfsDHT, error) {
var pstore peerstore.Peerstore
afterPut := func(k ds.Key, v []byte, err error) error {
peerID, _ := pstoreKeyToPeerID(k)
if peerID != "" {
go peerUpdated(pstore, peerID)
}
return err
}
pstoreDs := hook.NewBatching(dssync.MutexWrap(ds.NewMapDatastore()), hook.WithAfterPut(afterPut))
pstore, err := pstoreds.NewPeerstore(ctx, pstoreDs, pstoreds.Options{
CacheSize: 0,
GCPurgeInterval: 0,
GCLookaheadInterval: 0,
GCInitialDelay: 60 * time.Second,
})
if err != nil {
return nil, nil, err
}
h, err := libp2p.New(
ctx,
libp2p.UserAgent(version.UserAgent),
libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"),
libp2p.Peerstore(pstore),
libp2p.Transport(quic.NewTransport),
libp2p.Transport(tcp.NewTCPTransport),
libp2p.Transport(ws.New),
libp2p.Security(tls.ID, tls.New),
libp2p.Security(noise.ID, noise.New),
libp2p.Security(secio.ID, secio.New),
)
if err != nil {
return nil, nil, err
}
log.Infof("created peer with addrs %v", h.Addrs())
dht := dht.NewDHT(ctx, h, ds.NewMapDatastore())
bootstrap(ctx, h, bootstrapAddrs)
return h, dht, nil
}
func bootstrap(ctx context.Context, h host.Host, addrs []string) {
var wg sync.WaitGroup
for _, addr := range addrs {
wg.Add(1)
go func(addr string) {
defer wg.Done()
ma, err := multiaddr.NewMultiaddr(addr)
if err != nil {
log.Error(err)
return
}
ai, err := peer.AddrInfoFromP2pAddr(ma)
if err != nil {
log.Error(err)
return
}
if err := h.Connect(ctx, *ai); err != nil {
log.Error(addr, err)
}
}(addr)
}
wg.Wait()
}
var errInvalidKeyNamespaces = fmt.Errorf("not enough namespaces in peerstore record key")
// /peers/keys/CIQMTANQSIA5TBRC6KMBKUPIVFYZ6MOQNY4233JOXZ37FY52H7KW3YY/pub
// /peers/metadata/CIQB62TDWSJAVTIVR3Z3LXCZVGVOOL56IWEQUY5F2HX4QTSPLOQHEXA/protocols
// /peers/addrs/CIQB62TDWSJAVTIVR3Z3LXCZVGVOOL56IWEQUY5F2HX4QTSPLOQHEXA
// etc...
func pstoreKeyToPeerID(k ds.Key) (peer.ID, error) {
nss := k.Namespaces()
if len(nss) < 3 {
return "", errInvalidKeyNamespaces
}
b, err := base32.RawStdEncoding.DecodeString(nss[2])
if err != nil {
return "", err
}
return peer.IDFromBytes(b)
}