-
Notifications
You must be signed in to change notification settings - Fork 0
/
storageproviders.go
191 lines (158 loc) · 5.52 KB
/
storageproviders.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
package filclient
import (
"context"
"errors"
"fmt"
"time"
"github.com/filecoin-project/go-address"
cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/lotus/chain/types"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/multiformats/go-multiaddr"
)
// miners.go - code for managing information about individual miners
var (
ErrMinerConnectionFailed = errors.New("miner connection failed")
ErrMinerStreamFailed = errors.New("stream failed")
ErrCBORWriteFailed = errors.New("CBOR write failed")
ErrCBORReadFailed = errors.New("CBOR read failed")
)
// A storage provider handle contains all the functions used to interact with the storage provider,
// and facilitates mapping between addresses and peer IDs
type StorageProviderHandle struct {
// WARNING: may be uninitialized - use .Address()
addr address.Address
// WARNING: may be uninitialized - use .PeerID()
peerID peer.ID
client *Client
}
func (client *Client) StorageProviderByAddress(addr address.Address) *StorageProviderHandle {
return &StorageProviderHandle{
addr: addr,
client: client,
}
}
func (client *Client) StorageProviderByPeerID(peerID peer.ID) *StorageProviderHandle {
return &StorageProviderHandle{
peerID: peerID,
client: client,
}
}
// Returns the stored address of the provider, or errors if it is not known
//
// In the future, this function may be able to derive the address from the peer
// ID
func (handle *StorageProviderHandle) Address(ctx context.Context) (address.Address, error) {
if handle.addr == address.Undef {
return address.Undef, fmt.Errorf("peer ID to address mapping is not yet implemented")
}
return handle.addr, nil
}
// Returns the peer ID of the provider, looking it up on chain using the address
// if not already stored
func (handle *StorageProviderHandle) PeerID(ctx context.Context) (peer.ID, error) {
info, err := handle.client.api.StateMinerInfo(ctx, handle.addr, types.EmptyTSK)
if err != nil {
return "", fmt.Errorf("%w: %v", ErrLotusError, err)
}
if info.PeerId == nil {
return "", fmt.Errorf("%w: miner info has no peer ID set on chain", ErrLotusError)
}
handle.peerID = *info.PeerId
return handle.peerID, nil
}
// Looks up the version string of the miner
func (handle *StorageProviderHandle) Version(ctx context.Context) (string, error) {
peer, err := handle.Connect(ctx)
if err != nil {
return "", err
}
version, err := handle.client.host.Peerstore().Get(peer, "AgentVersion")
if err != nil {
return "", fmt.Errorf("%w: %v", ErrLotusError, err)
}
return version.(string), nil
}
// Opens a P2P stream to the storage provider
func (handle *StorageProviderHandle) stream(ctx context.Context, protocols ...protocol.ID) (network.Stream, error) {
peer, err := handle.Connect(ctx)
if err != nil {
return nil, err
}
stream, err := handle.client.host.NewStream(ctx, peer, protocols...)
if err != nil {
return nil, fmt.Errorf("%w: %v", ErrMinerStreamFailed, err)
}
return stream, nil
}
// Sends a single RPC request, and puts the response into resp - handy but not
// ideal for multiple requests
//
// TODO(@elijaharita): generics
func (handle *StorageProviderHandle) runSingleRPC(ctx context.Context, req interface{}, resp interface{}, protocols ...protocol.ID) error {
stream, err := handle.stream(ctx, protocols...)
if err != nil {
return err
}
defer stream.Close()
dline, ok := ctx.Deadline()
if ok {
stream.SetDeadline(dline)
defer stream.SetDeadline(time.Time{})
}
if err := cborutil.WriteCborRPC(stream, req); err != nil {
return fmt.Errorf("%w: %v", ErrCBORWriteFailed, err)
}
if err := cborutil.ReadCborRPC(stream, resp); err != nil {
return fmt.Errorf("%w: %v", ErrCBORReadFailed, err)
}
return nil
}
// Makes sure that the storage provider is connected
//
// BEHAVIOR CHANGE - no longer errors on invalid multiaddr if at least one valid
// multiaddr exists
func (handle *StorageProviderHandle) Connect(ctx context.Context) (peer.ID, error) {
info, err := handle.client.api.StateMinerInfo(ctx, handle.addr, types.EmptyTSK)
if err != nil {
return "", fmt.Errorf("%w: %v", ErrLotusError, err)
}
// We have to find the peer ID here anyway, so populate it
handle.peerID = *info.PeerId
if info.PeerId == nil {
return "", fmt.Errorf("%w: miner info has no peer ID set on chain", ErrLotusError)
}
// Parse the multiaddr bytes
var multiaddrs []multiaddr.Multiaddr
hadInvalid := false
for _, addrBytes := range info.Multiaddrs {
multiaddr, err := multiaddr.NewMultiaddrBytes(addrBytes)
if err != nil {
// If an address failed to parse, keep going but make note
hadInvalid = true
continue
}
multiaddrs = append(multiaddrs, multiaddr)
}
fmt.Printf("Connecting to %v (%s)\n", multiaddrs, handle.peerID)
// FIXME - lotus-client-proper falls back on the DHT when it has a peerid but no multiaddr
// filc should do the same
if len(multiaddrs) == 0 {
// If there were addresses and they were all invalid (hadInvalid marked
// true and multiaddrs length 0), specifically mention that
if hadInvalid {
return "", fmt.Errorf("%w: miner info has only invalid multiaddrs", ErrMinerConnectionFailed)
}
// Otherwise, just mention no multiaddrs available
return "", fmt.Errorf("%w: miner info has no multiaddrs", ErrMinerConnectionFailed)
}
if err := handle.client.host.Connect(ctx, peer.AddrInfo{
ID: *info.PeerId,
Addrs: multiaddrs,
}); err != nil {
return "", fmt.Errorf("%w: %v", ErrMinerConnectionFailed, err)
}
return *info.PeerId, nil
}