/
sync.go
281 lines (248 loc) · 7.99 KB
/
sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
package httpsync
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"path"
"time"
maurl "github.com/filecoin-project/go-legs/httpsync/multiaddr"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/datamodel"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipld/go-ipld-prime/traversal"
"github.com/ipld/go-ipld-prime/traversal/selector"
ic "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multihash"
"golang.org/x/time/rate"
)
const defaultHttpTimeout = 10 * time.Second
var log = logging.Logger("go-legs-httpsync")
// Sync provides sync functionality for use with all http syncs.
type Sync struct {
blockHook func(peer.ID, cid.Cid)
client *http.Client
lsys ipld.LinkSystem
}
func NewSync(lsys ipld.LinkSystem, client *http.Client, blockHook func(peer.ID, cid.Cid)) *Sync {
if client == nil {
client = &http.Client{
Timeout: defaultHttpTimeout,
}
}
return &Sync{
blockHook: blockHook,
client: client,
lsys: lsys,
}
}
// NewSyncer creates a new Syncer to use for a single sync operation against a peer.
func (s *Sync) NewSyncer(peerID peer.ID, peerAddr multiaddr.Multiaddr, rateLimiter *rate.Limiter) (*Syncer, error) {
rootURL, err := maurl.ToURL(peerAddr)
if err != nil {
return nil, err
}
return &Syncer{
peerID: peerID,
rateLimiter: rateLimiter,
rootURL: *rootURL,
sync: s,
}, nil
}
func (s *Sync) Close() {
s.client.CloseIdleConnections()
}
var errHeadFromUnexpectedPeer = errors.New("found head signed from an unexpected peer")
type Syncer struct {
peerID peer.ID
rateLimiter *rate.Limiter
rootURL url.URL
sync *Sync
}
func (s *Syncer) GetHead(ctx context.Context) (cid.Cid, error) {
var head cid.Cid
var pubKey ic.PubKey
err := s.fetch(ctx, "head", func(msg io.Reader) error {
var err error
pubKey, head, err = openSignedHeadWithIncludedPubKey(msg)
return err
})
if err != nil {
return cid.Undef, err
}
peerIDFromSig, err := peer.IDFromPublicKey(pubKey)
if err != nil {
return cid.Undef, err
}
if peerIDFromSig != s.peerID {
return cid.Undef, errHeadFromUnexpectedPeer
}
return head, nil
}
func (s *Syncer) Sync(ctx context.Context, nextCid cid.Cid, sel ipld.Node) error {
xsel, err := selector.CompileSelector(sel)
if err != nil {
msg := "failed to compile selector"
log.Errorw(msg, "err", err, "selector", sel)
return errors.New(msg)
}
cids, err := s.walkFetch(ctx, nextCid, xsel)
if err != nil {
log.Errorw("failed to traverse requested dag", "err", err, "root", nextCid)
return fmt.Errorf("failed to traverse requested dag: %w", err)
}
// We run the block hook to emulate the behavior of graphsync's
// `OnIncomingBlockHook` callback (gets called even if block is already stored
// locally).
//
// We are purposefully not doing this in the StorageReadOpener because the
// hook can do anything, including deleting the block from the block store. If
// it did that then we would not be able to continue our traversal. So instead
// we remember the blocks seen during traversal and then call the hook at the
// end when we no longer care what it does with the blocks.
if s.sync.blockHook != nil {
for _, c := range cids {
s.sync.blockHook(s.peerID, c)
}
}
s.sync.client.CloseIdleConnections()
return nil
}
// walkFetch is run by a traversal of the selector. For each block that the
// selector walks over, walkFetch will look to see if it can find it in the
// local data store. If it cannot, it will then go and get it over HTTP. This
// emulates way libp2p/graphsync fetches data, but the actual fetch of data is
// done over HTTP.
func (s *Syncer) walkFetch(ctx context.Context, rootCid cid.Cid, sel selector.Selector) ([]cid.Cid, error) {
// Track the order of cids we've seen during our traversal so we can call the
// block hook function in the same order. We emulate the behavior of
// graphsync's `OnIncomingBlockHook`, this means we call the blockhook even if
// we have the block locally.
var traversalOrder []cid.Cid
getMissingLs := cidlink.DefaultLinkSystem()
// trusted because it'll be hashed/verified on the way into the link system when fetched.
getMissingLs.TrustedStorage = true
getMissingLs.StorageReadOpener = func(lc ipld.LinkContext, l ipld.Link) (io.Reader, error) {
c := l.(cidlink.Link).Cid
r, err := s.sync.lsys.StorageReadOpener(lc, l)
if err == nil {
// Found block read opener, so return it.
traversalOrder = append(traversalOrder, c)
return r, nil
}
// Did not find block read opener, so fetch block via HTTP with re-try in case rate limit is
// reached.
for {
if err = s.fetchBlock(ctx, c); err != nil {
log.Errorw("Failed to fetch block", "err", err, "cid", c)
if _, ok := err.(rateLimitErr); ok {
// TODO: implement backoff to avoid potentially exhausting the HTTP source.
continue
}
return nil, err
}
break
}
r, err = s.sync.lsys.StorageReadOpener(lc, l)
if err == nil {
traversalOrder = append(traversalOrder, c)
}
return r, err
}
progress := traversal.Progress{
Cfg: &traversal.Config{
Ctx: ctx,
LinkSystem: getMissingLs,
LinkTargetNodePrototypeChooser: basicnode.Chooser,
},
Path: datamodel.NewPath([]datamodel.PathSegment{}),
}
// get the direct node.
rootNode, err := getMissingLs.Load(ipld.LinkContext{Ctx: ctx}, cidlink.Link{Cid: rootCid}, basicnode.Prototype.Any)
if err != nil {
log.Errorw("Failed to load node", "root", rootCid)
return nil, err
}
if err := progress.WalkMatching(rootNode, sel, func(p traversal.Progress, n datamodel.Node) error {
return nil
}); err != nil {
return nil, err
}
return traversalOrder, nil
}
type rateLimitErr struct {
resource string
rootURL url.URL
source peer.ID
}
func (r rateLimitErr) Error() string {
return fmt.Sprintf("rate limit reached when fetching %s from %s at %s", r.resource, r.source, r.rootURL.String())
}
func (s *Syncer) fetch(ctx context.Context, rsrc string, cb func(io.Reader) error) error {
localURL := s.rootURL
localURL.Path = path.Join(s.rootURL.Path, rsrc)
if s.rateLimiter != nil {
err := s.rateLimiter.Wait(ctx)
if err != nil {
return &rateLimitErr{
resource: rsrc,
rootURL: s.rootURL,
source: s.peerID,
}
}
}
req, err := http.NewRequestWithContext(ctx, "GET", localURL.String(), nil)
if err != nil {
return err
}
resp, err := s.sync.client.Do(req)
if err != nil {
log.Errorw("Failed to execute fetch request", "err", err)
return err
}
if resp.StatusCode != http.StatusOK {
err := fmt.Errorf("non success http code at %s: %d", localURL.String(), resp.StatusCode)
log.Errorw("Fetch was not successful", "err", err)
return err
}
defer resp.Body.Close()
return cb(resp.Body)
}
// fetchBlock fetches an item into the datastore at c if not locally available.
func (s *Syncer) fetchBlock(ctx context.Context, c cid.Cid) error {
n, err := s.sync.lsys.Load(ipld.LinkContext{Ctx: ctx}, cidlink.Link{Cid: c}, basicnode.Prototype.Any)
// node is already present.
if n != nil && err == nil {
return nil
}
return s.fetch(ctx, c.String(), func(data io.Reader) error {
writer, committer, err := s.sync.lsys.StorageWriteOpener(ipld.LinkContext{Ctx: ctx})
if err != nil {
log.Errorw("Failed to get write opener", "err", err)
return err
}
tee := io.TeeReader(data, writer)
sum, err := multihash.SumStream(tee, c.Prefix().MhType, c.Prefix().MhLength)
if err != nil {
return err
}
if !bytes.Equal(c.Hash(), sum) {
err := fmt.Errorf("hash digest mismatch; expected %s but got %s", c.Hash().B58String(), sum.B58String())
log.Errorw("Failed to persist fetched block with mismatching digest", "cid", c, "err", err)
return err
}
if err = committer(cidlink.Link{Cid: c}); err != nil {
log.Errorw("Failed to commit", "err", err)
return err
}
return nil
})
}