-
Notifications
You must be signed in to change notification settings - Fork 0
/
peer.go
162 lines (141 loc) · 3.97 KB
/
peer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package filecoin
import (
"context"
"fmt"
"time"
"github.com/gammazero/workerpool"
"github.com/ipfs/go-datastore"
dsync "github.com/ipfs/go-datastore/sync"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/frrist/surveyor/core"
)
var log = logging.Logger("surveyor/filecoin")
type Config struct {
Bootstrap []peer.AddrInfo
Datastore datastore.Batching
DHTProtocolPrefix protocol.ID
}
type Peer struct {
core *core.Peer
cfg Config
}
func New(ctx context.Context, cfg Config, opts ...core.ConfigOpt) (*Peer, error) {
if cfg.Bootstrap == nil {
cfg.Bootstrap = MainnetPeers
}
if cfg.Datastore == nil {
cfg.Datastore = dsync.MutexWrap(datastore.NewMapDatastore())
}
if cfg.DHTProtocolPrefix == "" {
cfg.DHTProtocolPrefix = MainnetDHTPrefix
}
p, err := core.New(ctx, cfg.DHTProtocolPrefix, opts...)
if err != nil {
return nil, err
}
return &Peer{
core: p,
cfg: cfg,
}, nil
}
func (p *Peer) Bootstrap(ctx context.Context) error {
return p.core.Bootstrap(ctx, p.cfg.Bootstrap)
}
type PeerAgentProtocols struct {
Peer peer.AddrInfo
Agent string
Protocols []string
}
func (p *Peer) GetAllPeerAgentProtocols(ctx context.Context, whos []peer.AddrInfo, by time.Duration, workers int) chan *PeerAgentProtocols {
out := make(chan *PeerAgentProtocols)
wp := workerpool.New(workers)
for _, who := range whos {
wp.Submit(func() {
found, err := p.GetPeerAgentProtocolsWithTimeout(ctx, who, by)
if err != nil {
return
}
out <- found
})
}
go func() {
wp.StopWait()
close(out)
}()
return out
}
func (p *Peer) GetPeerAgentProtocolsWithTimeout(ctx context.Context, who peer.AddrInfo, by time.Duration) (*PeerAgentProtocols, error) {
ctx, cancel := context.WithTimeout(ctx, by)
defer cancel()
found, err := p.GetPeerAgentProtocols(ctx, who)
if err != nil {
if err == context.DeadlineExceeded {
log.Infow("deadline exceeded for peer", "error", err)
return nil, err
}
log.Infow("connecting to peer failed", "error", err, "peer", who.String())
return nil, err
}
return found, nil
}
func (p *Peer) GetPeerAgentProtocols(ctx context.Context, who peer.AddrInfo) (*PeerAgentProtocols, error) {
if err := p.core.Host().Connect(ctx, who); err != nil {
return nil, fmt.Errorf("connecting to peer %s: %w", who.Addrs, err)
}
protos, err := p.core.Host().Peerstore().GetProtocols(who.ID)
if err != nil {
return nil, err
}
agent, err := p.core.Host().Peerstore().Get(who.ID, "AgentVersion")
if err != nil {
return nil, err
}
return &PeerAgentProtocols{
Peer: who,
Agent: agent.(string),
Protocols: protos,
}, nil
}
// FindAllPeers queries the DHT for all peers in `whos` using `workers` goroutines, each peer has a find timeout of `by`.
// FindAllPeers will close the returned channel when all find operations have completed. Errors for finding peers are ignored.
func (p *Peer) FindAllPeers(ctx context.Context, whos []peer.ID, by time.Duration, workers int) chan peer.AddrInfo {
out := make(chan peer.AddrInfo)
wp := workerpool.New(workers)
for _, who := range whos {
wp.Submit(func() {
found, err := p.FindPeerWithTimeout(ctx, who, by)
if err != nil {
return
}
out <- found
})
}
go func() {
wp.StopWait()
close(out)
}()
return out
}
func (p *Peer) FindPeerWithTimeout(ctx context.Context, who peer.ID, by time.Duration) (peer.AddrInfo, error) {
ctx, cancel := context.WithTimeout(ctx, by)
defer cancel()
found, err := p.FindPeer(ctx, who)
if err != nil {
if err == context.DeadlineExceeded {
log.Infow("deadline exceeded for peer", "error", err)
return peer.AddrInfo{}, err
}
log.Infow("finding peer failed", "error", err, "peer", who.String())
return peer.AddrInfo{}, err
}
return found, nil
}
func (p *Peer) FindPeer(ctx context.Context, who peer.ID) (peer.AddrInfo, error) {
found, err := p.core.DHT().FindPeer(ctx, who)
if err != nil {
return peer.AddrInfo{}, err
}
return found, nil
}