forked from ipfs/kubo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pin.go
632 lines (536 loc) · 15.4 KB
/
pin.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
// Package pin implements structures and methods to keep track of
// which objects a user wants to keep stored locally.
package pin
import (
"context"
"fmt"
"os"
"sync"
"time"
mdag "github.com/ipfs/go-ipfs/merkledag"
dutils "github.com/ipfs/go-ipfs/merkledag/utils"
ds "gx/ipfs/QmPpegoMqhAEqjncrzArm7KVWAkCm78rqL2DPuNjhPrshg/go-datastore"
logging "gx/ipfs/QmRb5jh8z2E8hMGN2tkvs1yHynUanqnZ3UeKwgN1i9P1F8/go-log"
cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
)
var log = logging.Logger("pin")
var pinDatastoreKey = ds.NewKey("/local/pins")
var emptyKey *cid.Cid
func init() {
e, err := cid.Decode("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n")
if err != nil {
log.Error("failed to decode empty key constant")
os.Exit(1)
}
emptyKey = e
}
const (
linkRecursive = "recursive"
linkDirect = "direct"
linkIndirect = "indirect"
linkInternal = "internal"
linkNotPinned = "not pinned"
linkAny = "any"
linkAll = "all"
)
// Mode allows to specify different types of pin (recursive, direct etc.).
// See the Pin Modes constants for a full list.
type Mode int
// Pin Modes
const (
// Recursive pins pin the target cids along with any reachable children.
Recursive Mode = iota
// Direct pins pin just the target cid.
Direct
// Indirect pins are cids who have some ancestor pinned recursively.
Indirect
// Internal pins are cids used to keep the internal state of the pinner.
Internal
// NotPinned
NotPinned
// Any refers to any pinned cid
Any
)
// ModeToString returns a human-readable name for the Mode.
func ModeToString(mode Mode) (string, bool) {
m := map[Mode]string{
Recursive: linkRecursive,
Direct: linkDirect,
Indirect: linkIndirect,
Internal: linkInternal,
NotPinned: linkNotPinned,
Any: linkAny,
}
s, ok := m[mode]
return s, ok
}
// StringToMode parses the result of ModeToString() back to a Mode.
// It returns a boolean which is set to false if the mode is unknown.
func StringToMode(s string) (Mode, bool) {
m := map[string]Mode{
linkRecursive: Recursive,
linkDirect: Direct,
linkIndirect: Indirect,
linkInternal: Internal,
linkNotPinned: NotPinned,
linkAny: Any,
linkAll: Any, // "all" and "any" means the same thing
}
mode, ok := m[s]
return mode, ok
}
// A Pinner provides the necessary methods to keep track of Nodes which are
// to be kept locally, according to a pin mode. In practice, a Pinner is in
// in charge of keeping the list of items from the local storage that should
// not be garbaged-collected.
type Pinner interface {
// IsPinned returns whether or not the given cid is pinned
// and an explanation of why its pinned
IsPinned(*cid.Cid) (string, bool, error)
// IsPinnedWithType returns whether or not the given cid is pinned with the
// given pin type, as well as returning the type of pin its pinned with.
IsPinnedWithType(*cid.Cid, Mode) (string, bool, error)
// Pin the given node, optionally recursively.
Pin(ctx context.Context, node ipld.Node, recursive bool) error
// Unpin the given cid. If recursive is true, removes either a recursive or
// a direct pin. If recursive is false, only removes a direct pin.
Unpin(ctx context.Context, cid *cid.Cid, recursive bool) error
// Update updates a recursive pin from one cid to another
// this is more efficient than simply pinning the new one and unpinning the
// old one
Update(ctx context.Context, from, to *cid.Cid, unpin bool) error
// Check if a set of keys are pinned, more efficient than
// calling IsPinned for each key
CheckIfPinned(cids ...*cid.Cid) ([]Pinned, error)
// PinWithMode is for manually editing the pin structure. Use with
// care! If used improperly, garbage collection may not be
// successful.
PinWithMode(*cid.Cid, Mode)
// RemovePinWithMode is for manually editing the pin structure.
// Use with care! If used improperly, garbage collection may not
// be successful.
RemovePinWithMode(*cid.Cid, Mode)
// Flush writes the pin state to the backing datastore
Flush() error
// DirectKeys returns all directly pinned cids
DirectKeys() []*cid.Cid
// DirectKeys returns all recursively pinned cids
RecursiveKeys() []*cid.Cid
// InternalPins returns all cids kept pinned for the internal state of the
// pinner
InternalPins() []*cid.Cid
}
// Pinned represents CID which has been pinned with a pinning strategy.
// The Via field allows to identify the pinning parent of this CID, in the
// case that the item is not pinned directly (but rather pinned recursively
// by some ascendant).
type Pinned struct {
Key *cid.Cid
Mode Mode
Via *cid.Cid
}
// Pinned returns whether or not the given cid is pinned
func (p Pinned) Pinned() bool {
return p.Mode != NotPinned
}
// String Returns pin status as string
func (p Pinned) String() string {
switch p.Mode {
case NotPinned:
return "not pinned"
case Indirect:
return fmt.Sprintf("pinned via %s", p.Via)
default:
modeStr, _ := ModeToString(p.Mode)
return fmt.Sprintf("pinned: %s", modeStr)
}
}
// pinner implements the Pinner interface
type pinner struct {
lock sync.RWMutex
recursePin *cid.Set
directPin *cid.Set
// Track the keys used for storing the pinning state, so gc does
// not delete them.
internalPin *cid.Set
dserv ipld.DAGService
internal ipld.DAGService // dagservice used to store internal objects
dstore ds.Datastore
}
// NewPinner creates a new pinner using the given datastore as a backend
func NewPinner(dstore ds.Datastore, serv, internal ipld.DAGService) Pinner {
rcset := cid.NewSet()
dirset := cid.NewSet()
return &pinner{
recursePin: rcset,
directPin: dirset,
dserv: serv,
dstore: dstore,
internal: internal,
internalPin: cid.NewSet(),
}
}
// Pin the given node, optionally recursive
func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool) error {
p.lock.Lock()
defer p.lock.Unlock()
err := p.dserv.Add(ctx, node)
if err != nil {
return err
}
c := node.Cid()
if recurse {
if p.recursePin.Has(c) {
return nil
}
if p.directPin.Has(c) {
p.directPin.Remove(c)
}
// fetch entire graph
err := mdag.FetchGraph(ctx, c, p.dserv)
if err != nil {
return err
}
p.recursePin.Add(c)
} else {
if _, err := p.dserv.Get(ctx, c); err != nil {
return err
}
if p.recursePin.Has(c) {
return fmt.Errorf("%s already pinned recursively", c.String())
}
p.directPin.Add(c)
}
return nil
}
// ErrNotPinned is returned when trying to unpin items which are not pinned.
var ErrNotPinned = fmt.Errorf("not pinned")
// Unpin a given key
func (p *pinner) Unpin(ctx context.Context, c *cid.Cid, recursive bool) error {
p.lock.Lock()
defer p.lock.Unlock()
reason, pinned, err := p.isPinnedWithType(c, Any)
if err != nil {
return err
}
if !pinned {
return ErrNotPinned
}
switch reason {
case "recursive":
if recursive {
p.recursePin.Remove(c)
return nil
}
return fmt.Errorf("%s is pinned recursively", c)
case "direct":
p.directPin.Remove(c)
return nil
default:
return fmt.Errorf("%s is pinned indirectly under %s", c, reason)
}
}
func (p *pinner) isInternalPin(c *cid.Cid) bool {
return p.internalPin.Has(c)
}
// IsPinned returns whether or not the given key is pinned
// and an explanation of why its pinned
func (p *pinner) IsPinned(c *cid.Cid) (string, bool, error) {
p.lock.RLock()
defer p.lock.RUnlock()
return p.isPinnedWithType(c, Any)
}
// IsPinnedWithType returns whether or not the given cid is pinned with the
// given pin type, as well as returning the type of pin its pinned with.
func (p *pinner) IsPinnedWithType(c *cid.Cid, mode Mode) (string, bool, error) {
p.lock.RLock()
defer p.lock.RUnlock()
return p.isPinnedWithType(c, mode)
}
// isPinnedWithType is the implementation of IsPinnedWithType that does not lock.
// intended for use by other pinned methods that already take locks
func (p *pinner) isPinnedWithType(c *cid.Cid, mode Mode) (string, bool, error) {
switch mode {
case Any, Direct, Indirect, Recursive, Internal:
default:
err := fmt.Errorf("Invalid Pin Mode '%d', must be one of {%d, %d, %d, %d, %d}",
mode, Direct, Indirect, Recursive, Internal, Any)
return "", false, err
}
if (mode == Recursive || mode == Any) && p.recursePin.Has(c) {
return linkRecursive, true, nil
}
if mode == Recursive {
return "", false, nil
}
if (mode == Direct || mode == Any) && p.directPin.Has(c) {
return linkDirect, true, nil
}
if mode == Direct {
return "", false, nil
}
if (mode == Internal || mode == Any) && p.isInternalPin(c) {
return linkInternal, true, nil
}
if mode == Internal {
return "", false, nil
}
// Default is Indirect
visitedSet := cid.NewSet()
for _, rc := range p.recursePin.Keys() {
has, err := hasChild(p.dserv, rc, c, visitedSet.Visit)
if err != nil {
return "", false, err
}
if has {
return rc.String(), true, nil
}
}
return "", false, nil
}
// CheckIfPinned Checks if a set of keys are pinned, more efficient than
// calling IsPinned for each key, returns the pinned status of cid(s)
func (p *pinner) CheckIfPinned(cids ...*cid.Cid) ([]Pinned, error) {
p.lock.RLock()
defer p.lock.RUnlock()
pinned := make([]Pinned, 0, len(cids))
toCheck := cid.NewSet()
// First check for non-Indirect pins directly
for _, c := range cids {
if p.recursePin.Has(c) {
pinned = append(pinned, Pinned{Key: c, Mode: Recursive})
} else if p.directPin.Has(c) {
pinned = append(pinned, Pinned{Key: c, Mode: Direct})
} else if p.isInternalPin(c) {
pinned = append(pinned, Pinned{Key: c, Mode: Internal})
} else {
toCheck.Add(c)
}
}
// Now walk all recursive pins to check for indirect pins
var checkChildren func(*cid.Cid, *cid.Cid) error
checkChildren = func(rk, parentKey *cid.Cid) error {
links, err := ipld.GetLinks(context.TODO(), p.dserv, parentKey)
if err != nil {
return err
}
for _, lnk := range links {
c := lnk.Cid
if toCheck.Has(c) {
pinned = append(pinned,
Pinned{Key: c, Mode: Indirect, Via: rk})
toCheck.Remove(c)
}
err := checkChildren(rk, c)
if err != nil {
return err
}
if toCheck.Len() == 0 {
return nil
}
}
return nil
}
for _, rk := range p.recursePin.Keys() {
err := checkChildren(rk, rk)
if err != nil {
return nil, err
}
if toCheck.Len() == 0 {
break
}
}
// Anything left in toCheck is not pinned
for _, k := range toCheck.Keys() {
pinned = append(pinned, Pinned{Key: k, Mode: NotPinned})
}
return pinned, nil
}
// RemovePinWithMode is for manually editing the pin structure.
// Use with care! If used improperly, garbage collection may not
// be successful.
func (p *pinner) RemovePinWithMode(c *cid.Cid, mode Mode) {
p.lock.Lock()
defer p.lock.Unlock()
switch mode {
case Direct:
p.directPin.Remove(c)
case Recursive:
p.recursePin.Remove(c)
default:
// programmer error, panic OK
panic("unrecognized pin type")
}
}
func cidSetWithValues(cids []*cid.Cid) *cid.Set {
out := cid.NewSet()
for _, c := range cids {
out.Add(c)
}
return out
}
// LoadPinner loads a pinner and its keysets from the given datastore
func LoadPinner(d ds.Datastore, dserv, internal ipld.DAGService) (Pinner, error) {
p := new(pinner)
rootKeyI, err := d.Get(pinDatastoreKey)
if err != nil {
return nil, fmt.Errorf("cannot load pin state: %v", err)
}
rootKeyBytes, ok := rootKeyI.([]byte)
if !ok {
return nil, fmt.Errorf("cannot load pin state: %s was not bytes", pinDatastoreKey)
}
rootCid, err := cid.Cast(rootKeyBytes)
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.TODO(), time.Second*5)
defer cancel()
root, err := internal.Get(ctx, rootCid)
if err != nil {
return nil, fmt.Errorf("cannot find pinning root object: %v", err)
}
rootpb, ok := root.(*mdag.ProtoNode)
if !ok {
return nil, mdag.ErrNotProtobuf
}
internalset := cid.NewSet()
internalset.Add(rootCid)
recordInternal := internalset.Add
{ // load recursive set
recurseKeys, err := loadSet(ctx, internal, rootpb, linkRecursive, recordInternal)
if err != nil {
return nil, fmt.Errorf("cannot load recursive pins: %v", err)
}
p.recursePin = cidSetWithValues(recurseKeys)
}
{ // load direct set
directKeys, err := loadSet(ctx, internal, rootpb, linkDirect, recordInternal)
if err != nil {
return nil, fmt.Errorf("cannot load direct pins: %v", err)
}
p.directPin = cidSetWithValues(directKeys)
}
p.internalPin = internalset
// assign services
p.dserv = dserv
p.dstore = d
p.internal = internal
return p, nil
}
// DirectKeys returns a slice containing the directly pinned keys
func (p *pinner) DirectKeys() []*cid.Cid {
return p.directPin.Keys()
}
// RecursiveKeys returns a slice containing the recursively pinned keys
func (p *pinner) RecursiveKeys() []*cid.Cid {
return p.recursePin.Keys()
}
// Update updates a recursive pin from one cid to another
// this is more efficient than simply pinning the new one and unpinning the
// old one
func (p *pinner) Update(ctx context.Context, from, to *cid.Cid, unpin bool) error {
p.lock.Lock()
defer p.lock.Unlock()
if !p.recursePin.Has(from) {
return fmt.Errorf("'from' cid was not recursively pinned already")
}
err := dutils.DiffEnumerate(ctx, p.dserv, from, to)
if err != nil {
return err
}
p.recursePin.Add(to)
if unpin {
p.recursePin.Remove(from)
}
return nil
}
// Flush encodes and writes pinner keysets to the datastore
func (p *pinner) Flush() error {
p.lock.Lock()
defer p.lock.Unlock()
ctx := context.TODO()
internalset := cid.NewSet()
recordInternal := internalset.Add
root := &mdag.ProtoNode{}
{
n, err := storeSet(ctx, p.internal, p.directPin.Keys(), recordInternal)
if err != nil {
return err
}
if err := root.AddNodeLink(linkDirect, n); err != nil {
return err
}
}
{
n, err := storeSet(ctx, p.internal, p.recursePin.Keys(), recordInternal)
if err != nil {
return err
}
if err := root.AddNodeLink(linkRecursive, n); err != nil {
return err
}
}
// add the empty node, its referenced by the pin sets but never created
err := p.internal.Add(ctx, new(mdag.ProtoNode))
if err != nil {
return err
}
err = p.internal.Add(ctx, root)
if err != nil {
return err
}
k := root.Cid()
internalset.Add(k)
if err := p.dstore.Put(pinDatastoreKey, k.Bytes()); err != nil {
return fmt.Errorf("cannot store pin state: %v", err)
}
p.internalPin = internalset
return nil
}
// InternalPins returns all cids kept pinned for the internal state of the
// pinner
func (p *pinner) InternalPins() []*cid.Cid {
p.lock.Lock()
defer p.lock.Unlock()
var out []*cid.Cid
out = append(out, p.internalPin.Keys()...)
return out
}
// PinWithMode allows the user to have fine grained control over pin
// counts
func (p *pinner) PinWithMode(c *cid.Cid, mode Mode) {
p.lock.Lock()
defer p.lock.Unlock()
switch mode {
case Recursive:
p.recursePin.Add(c)
case Direct:
p.directPin.Add(c)
}
}
// hasChild recursively looks for a Cid among the children of a root Cid.
// The visit function can be used to shortcut already-visited branches.
func hasChild(ng ipld.NodeGetter, root *cid.Cid, child *cid.Cid, visit func(*cid.Cid) bool) (bool, error) {
links, err := ipld.GetLinks(context.TODO(), ng, root)
if err != nil {
return false, err
}
for _, lnk := range links {
c := lnk.Cid
if lnk.Cid.Equals(child) {
return true, nil
}
if visit(c) {
has, err := hasChild(ng, c, child, visit)
if err != nil {
return false, err
}
if has {
return has, nil
}
}
}
return false, nil
}