This repository has been archived by the owner on Nov 19, 2022. It is now read-only.
forked from ipfs/go-unixfs
/
directory.go
609 lines (510 loc) · 18.7 KB
/
directory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
package io
import (
"context"
"fmt"
"os"
"github.com/functionland/go-unixfs/hamt"
"github.com/functionland/go-unixfs/private/linksize"
"github.com/alecthomas/units"
format "github.com/functionland/go-unixfs"
"github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log"
mdag "github.com/ipfs/go-merkledag"
)
var log = logging.Logger("unixfs")
// HAMTShardingSize is a global option that allows switching to a HAMTDirectory
// when the BasicDirectory grows above the size (in bytes) signalled by this
// flag. The default size of 0 disables the option.
// The size is not the *exact* block size of the encoded BasicDirectory but just
// the estimated size based byte length of links name and CID (BasicDirectory's
// ProtoNode doesn't use the Data field so this estimate is pretty accurate).
var HAMTShardingSize = int(256 * units.KiB)
// DefaultShardWidth is the default value used for hamt sharding width.
// Needs to be a power of two (shard entry size) and multiple of 8 (bitfield size).
var DefaultShardWidth = 256
// Directory defines a UnixFS directory. It is used for creating, reading and
// editing directories. It allows to work with different directory schemes,
// like the basic or the HAMT implementation.
//
// It just allows to perform explicit edits on a single directory, working with
// directory trees is out of its scope, they are managed by the MFS layer
// (which is the main consumer of this interface).
type Directory interface {
// SetCidBuilder sets the CID Builder of the root node.
SetCidBuilder(cid.Builder)
// AddChild adds a (name, key) pair to the root node.
AddChild(context.Context, string, ipld.Node) error
// ForEachLink applies the given function to Links in the directory.
ForEachLink(context.Context, func(*ipld.Link) error) error
// EnumLinksAsync returns a channel which will receive Links in the directory
// as they are enumerated, where order is not gauranteed
EnumLinksAsync(context.Context) <-chan format.LinkResult
// Links returns the all the links in the directory node.
Links(context.Context) ([]*ipld.Link, error)
// Find returns the root node of the file named 'name' within this directory.
// In the case of HAMT-directories, it will traverse the tree.
//
// Returns os.ErrNotExist if the child does not exist.
Find(context.Context, string) (ipld.Node, error)
// RemoveChild removes the child with the given name.
//
// Returns os.ErrNotExist if the child doesn't exist.
RemoveChild(context.Context, string) error
// GetNode returns the root of this directory.
GetNode() (ipld.Node, error)
// GetCidBuilder returns the CID Builder used.
GetCidBuilder() cid.Builder
}
// TODO: Evaluate removing `dserv` from this layer and providing it in MFS.
// (The functions should in that case add a `DAGService` argument.)
// Link size estimation function. For production it's usually the one here
// but during test we may mock it to get fixed sizes.
func productionLinkSize(linkName string, linkCid cid.Cid) int {
return len(linkName) + linkCid.ByteLen()
}
func init() {
linksize.LinkSizeFunction = productionLinkSize
}
// BasicDirectory is the basic implementation of `Directory`. All the entries
// are stored in a single node.
type BasicDirectory struct {
node *mdag.ProtoNode
dserv ipld.DAGService
// Internal variable used to cache the estimated size of the basic directory:
// for each link, aggregate link name + link CID. DO NOT CHANGE THIS
// as it will affect the HAMT transition behavior in HAMTShardingSize.
// (We maintain this value up to date even if the HAMTShardingSize is off
// since potentially the option could be activated on the fly.)
estimatedSize int
}
// HAMTDirectory is the HAMT implementation of `Directory`.
// (See package `hamt` for more information.)
type HAMTDirectory struct {
shard *hamt.Shard
dserv ipld.DAGService
// Track the changes in size by the AddChild and RemoveChild calls
// for the HAMTShardingSize option.
sizeChange int
}
func newEmptyBasicDirectory(dserv ipld.DAGService) *BasicDirectory {
return newBasicDirectoryFromNode(dserv, format.EmptyDirNode())
}
func newBasicDirectoryFromNode(dserv ipld.DAGService, node *mdag.ProtoNode) *BasicDirectory {
basicDir := new(BasicDirectory)
basicDir.node = node
basicDir.dserv = dserv
// Scan node links (if any) to restore estimated size.
basicDir.computeEstimatedSize()
return basicDir
}
// NewDirectory returns a Directory implemented by DynamicDirectory
// containing a BasicDirectory that can be converted to a HAMTDirectory.
func NewDirectory(dserv ipld.DAGService) Directory {
return &DynamicDirectory{newEmptyBasicDirectory(dserv)}
}
// ErrNotADir implies that the given node was not a unixfs directory
var ErrNotADir = fmt.Errorf("merkledag node was not a directory or shard")
// NewDirectoryFromNode loads a unixfs directory from the given IPLD node and
// DAGService.
func NewDirectoryFromNode(dserv ipld.DAGService, node ipld.Node) (Directory, error) {
protoBufNode, ok := node.(*mdag.ProtoNode)
if !ok {
return nil, ErrNotADir
}
fsNode, err := format.FSNodeFromBytes(protoBufNode.Data())
if err != nil {
return nil, err
}
switch fsNode.Type() {
case format.TDirectory:
return &DynamicDirectory{newBasicDirectoryFromNode(dserv, protoBufNode.Copy().(*mdag.ProtoNode))}, nil
case format.THAMTShard:
shard, err := hamt.NewHamtFromDag(dserv, node)
if err != nil {
return nil, err
}
return &DynamicDirectory{&HAMTDirectory{shard, dserv, 0}}, nil
}
return nil, ErrNotADir
}
func (d *BasicDirectory) computeEstimatedSize() {
d.estimatedSize = 0
d.ForEachLink(context.TODO(), func(l *ipld.Link) error {
d.addToEstimatedSize(l.Name, l.Cid)
return nil
})
// ForEachLink will never fail traversing the BasicDirectory
// and neither the inner callback `addToEstimatedSize`.
}
func (d *BasicDirectory) addToEstimatedSize(name string, linkCid cid.Cid) {
d.estimatedSize += linksize.LinkSizeFunction(name, linkCid)
}
func (d *BasicDirectory) removeFromEstimatedSize(name string, linkCid cid.Cid) {
d.estimatedSize -= linksize.LinkSizeFunction(name, linkCid)
if d.estimatedSize < 0 {
// Something has gone very wrong. Log an error and recompute the
// size from scratch.
log.Error("BasicDirectory's estimatedSize went below 0")
d.computeEstimatedSize()
}
}
// SetCidBuilder implements the `Directory` interface.
func (d *BasicDirectory) SetCidBuilder(builder cid.Builder) {
d.node.SetCidBuilder(builder)
}
// AddChild implements the `Directory` interface. It adds (or replaces)
// a link to the given `node` under `name`.
func (d *BasicDirectory) AddChild(ctx context.Context, name string, node ipld.Node) error {
link, err := ipld.MakeLink(node)
if err != nil {
return err
}
return d.addLinkChild(ctx, name, link)
}
func (d *BasicDirectory) needsToSwitchToHAMTDir(name string, nodeToAdd ipld.Node) (bool, error) {
if HAMTShardingSize == 0 { // Option disabled.
return false, nil
}
operationSizeChange := 0
// Find if there is an old entry under that name that will be overwritten.
entryToRemove, err := d.node.GetNodeLink(name)
if err != mdag.ErrLinkNotFound {
if err != nil {
return false, err
}
operationSizeChange -= linksize.LinkSizeFunction(name, entryToRemove.Cid)
}
if nodeToAdd != nil {
operationSizeChange += linksize.LinkSizeFunction(name, nodeToAdd.Cid())
}
return d.estimatedSize+operationSizeChange >= HAMTShardingSize, nil
}
// addLinkChild adds the link as an entry to this directory under the given
// name. Plumbing function for the AddChild API.
func (d *BasicDirectory) addLinkChild(ctx context.Context, name string, link *ipld.Link) error {
// Remove old link and account for size change (if it existed; ignore
// `ErrNotExist` otherwise).
err := d.RemoveChild(ctx, name)
if err != nil && err != os.ErrNotExist {
return err
}
err = d.node.AddRawLink(name, link)
if err != nil {
return err
}
d.addToEstimatedSize(name, link.Cid)
return nil
}
// EnumLinksAsync returns a channel which will receive Links in the directory
// as they are enumerated, where order is not gauranteed
func (d *BasicDirectory) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult {
linkResults := make(chan format.LinkResult)
go func() {
defer close(linkResults)
for _, l := range d.node.Links() {
select {
case linkResults <- format.LinkResult{
Link: l,
Err: nil,
}:
case <-ctx.Done():
return
}
}
}()
return linkResults
}
// ForEachLink implements the `Directory` interface.
func (d *BasicDirectory) ForEachLink(_ context.Context, f func(*ipld.Link) error) error {
for _, l := range d.node.Links() {
if err := f(l); err != nil {
return err
}
}
return nil
}
// Links implements the `Directory` interface.
func (d *BasicDirectory) Links(ctx context.Context) ([]*ipld.Link, error) {
return d.node.Links(), nil
}
// Find implements the `Directory` interface.
func (d *BasicDirectory) Find(ctx context.Context, name string) (ipld.Node, error) {
lnk, err := d.node.GetNodeLink(name)
if err == mdag.ErrLinkNotFound {
err = os.ErrNotExist
}
if err != nil {
return nil, err
}
return d.dserv.Get(ctx, lnk.Cid)
}
// RemoveChild implements the `Directory` interface.
func (d *BasicDirectory) RemoveChild(ctx context.Context, name string) error {
// We need to *retrieve* the link before removing it to update the estimated
// size. This means we may iterate the links slice twice: if traversing this
// becomes a problem, a factor of 2 isn't going to make much of a difference.
// We'd likely need to cache a link resolution map in that case.
link, err := d.node.GetNodeLink(name)
if err == mdag.ErrLinkNotFound {
return os.ErrNotExist
}
if err != nil {
return err // at the moment there is no other error besides ErrLinkNotFound
}
// The name actually existed so we should update the estimated size.
d.removeFromEstimatedSize(link.Name, link.Cid)
return d.node.RemoveNodeLink(name)
// GetNodeLink didn't return ErrLinkNotFound so this won't fail with that
// and we don't need to convert the error again.
}
// GetNode implements the `Directory` interface.
func (d *BasicDirectory) GetNode() (ipld.Node, error) {
return d.node, nil
}
// GetCidBuilder implements the `Directory` interface.
func (d *BasicDirectory) GetCidBuilder() cid.Builder {
return d.node.CidBuilder()
}
// switchToSharding returns a HAMT implementation of this directory.
func (d *BasicDirectory) switchToSharding(ctx context.Context) (*HAMTDirectory, error) {
hamtDir := new(HAMTDirectory)
hamtDir.dserv = d.dserv
shard, err := hamt.NewShard(d.dserv, DefaultShardWidth)
if err != nil {
return nil, err
}
shard.SetCidBuilder(d.node.CidBuilder())
hamtDir.shard = shard
for _, lnk := range d.node.Links() {
err = hamtDir.shard.SetLink(ctx, lnk.Name, lnk)
if err != nil {
return nil, err
}
}
return hamtDir, nil
}
// SetCidBuilder implements the `Directory` interface.
func (d *HAMTDirectory) SetCidBuilder(builder cid.Builder) {
d.shard.SetCidBuilder(builder)
}
// AddChild implements the `Directory` interface.
func (d *HAMTDirectory) AddChild(ctx context.Context, name string, nd ipld.Node) error {
oldChild, err := d.shard.Swap(ctx, name, nd)
if err != nil {
return err
}
if oldChild != nil {
d.removeFromSizeChange(oldChild.Name, oldChild.Cid)
}
d.addToSizeChange(name, nd.Cid())
return nil
}
// ForEachLink implements the `Directory` interface.
func (d *HAMTDirectory) ForEachLink(ctx context.Context, f func(*ipld.Link) error) error {
return d.shard.ForEachLink(ctx, f)
}
// EnumLinksAsync returns a channel which will receive Links in the directory
// as they are enumerated, where order is not gauranteed
func (d *HAMTDirectory) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult {
return d.shard.EnumLinksAsync(ctx)
}
// Links implements the `Directory` interface.
func (d *HAMTDirectory) Links(ctx context.Context) ([]*ipld.Link, error) {
return d.shard.EnumLinks(ctx)
}
// Find implements the `Directory` interface. It will traverse the tree.
func (d *HAMTDirectory) Find(ctx context.Context, name string) (ipld.Node, error) {
lnk, err := d.shard.Find(ctx, name)
if err != nil {
return nil, err
}
return lnk.GetNode(ctx, d.dserv)
}
// RemoveChild implements the `Directory` interface.
func (d *HAMTDirectory) RemoveChild(ctx context.Context, name string) error {
oldChild, err := d.shard.Take(ctx, name)
if err != nil {
return err
}
if oldChild != nil {
d.removeFromSizeChange(oldChild.Name, oldChild.Cid)
}
return nil
}
// GetNode implements the `Directory` interface.
func (d *HAMTDirectory) GetNode() (ipld.Node, error) {
return d.shard.Node()
}
// GetCidBuilder implements the `Directory` interface.
func (d *HAMTDirectory) GetCidBuilder() cid.Builder {
return d.shard.CidBuilder()
}
// switchToBasic returns a BasicDirectory implementation of this directory.
func (d *HAMTDirectory) switchToBasic(ctx context.Context) (*BasicDirectory, error) {
basicDir := newEmptyBasicDirectory(d.dserv)
basicDir.SetCidBuilder(d.GetCidBuilder())
err := d.ForEachLink(ctx, func(lnk *ipld.Link) error {
err := basicDir.addLinkChild(ctx, lnk.Name, lnk)
if err != nil {
return err
}
return nil
// This function enumerates all the links in the Directory requiring all
// shards to be accessible but it is only called *after* sizeBelowThreshold
// returns true, which means we have already enumerated and fetched *all*
// shards in the first place (that's the only way we can be really sure
// we are actually below the threshold).
})
if err != nil {
return nil, err
}
return basicDir, nil
}
func (d *HAMTDirectory) addToSizeChange(name string, linkCid cid.Cid) {
d.sizeChange += linksize.LinkSizeFunction(name, linkCid)
}
func (d *HAMTDirectory) removeFromSizeChange(name string, linkCid cid.Cid) {
d.sizeChange -= linksize.LinkSizeFunction(name, linkCid)
}
// Evaluate a switch from HAMTDirectory to BasicDirectory in case the size will
// go above the threshold when we are adding or removing an entry.
// In both the add/remove operations any old name will be removed, and for the
// add operation in particular a new entry will be added under that name (otherwise
// nodeToAdd is nil). We compute both (potential) future subtraction and
// addition to the size change.
func (d *HAMTDirectory) needsToSwitchToBasicDir(ctx context.Context, name string, nodeToAdd ipld.Node) (switchToBasic bool, err error) {
if HAMTShardingSize == 0 { // Option disabled.
return false, nil
}
operationSizeChange := 0
// Find if there is an old entry under that name that will be overwritten
// (AddEntry) or flat out removed (RemoveEntry).
entryToRemove, err := d.shard.Find(ctx, name)
if err != os.ErrNotExist {
if err != nil {
return false, err
}
operationSizeChange -= linksize.LinkSizeFunction(name, entryToRemove.Cid)
}
// For the AddEntry case compute the size addition of the new entry.
if nodeToAdd != nil {
operationSizeChange += linksize.LinkSizeFunction(name, nodeToAdd.Cid())
}
if d.sizeChange+operationSizeChange >= 0 {
// We won't have reduced the HAMT net size.
return false, nil
}
// We have reduced the directory size, check if went below the
// HAMTShardingSize threshold to trigger a switch.
return d.sizeBelowThreshold(ctx, operationSizeChange)
}
// Evaluate directory size and a future sizeChange and check if it will be below
// HAMTShardingSize threshold (to trigger a transition to a BasicDirectory).
// Instead of enumerating the entire tree we eagerly call EnumLinksAsync
// until we either reach a value above the threshold (in that case no need
// to keep counting) or an error occurs (like the context being canceled
// if we take too much time fetching the necessary shards).
func (d *HAMTDirectory) sizeBelowThreshold(ctx context.Context, sizeChange int) (below bool, err error) {
if HAMTShardingSize == 0 {
panic("asked to compute HAMT size with HAMTShardingSize option off (0)")
}
// We don't necessarily compute the full size of *all* shards as we might
// end early if we already know we're above the threshold or run out of time.
partialSize := 0
// We stop the enumeration once we have enough information and exit this function.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for linkResult := range d.EnumLinksAsync(ctx) {
if linkResult.Err != nil {
return false, linkResult.Err
}
partialSize += linksize.LinkSizeFunction(linkResult.Link.Name, linkResult.Link.Cid)
if partialSize+sizeChange >= HAMTShardingSize {
// We have already fetched enough shards to assert we are
// above the threshold, so no need to keep fetching.
return false, nil
}
}
// We enumerated *all* links in all shards and didn't reach the threshold.
return true, nil
}
// DynamicDirectory wraps a Directory interface and provides extra logic
// to switch from BasicDirectory to HAMTDirectory and backwards based on
// size.
type DynamicDirectory struct {
Directory
}
var _ Directory = (*DynamicDirectory)(nil)
// AddChild implements the `Directory` interface. We check when adding new entries
// if we should switch to HAMTDirectory according to global option(s).
func (d *DynamicDirectory) AddChild(ctx context.Context, name string, nd ipld.Node) error {
hamtDir, ok := d.Directory.(*HAMTDirectory)
if ok {
// We evaluate a switch in the HAMTDirectory case even for an AddChild
// as it may overwrite an existing entry and end up actually reducing
// the directory size.
switchToBasic, err := hamtDir.needsToSwitchToBasicDir(ctx, name, nd)
if err != nil {
return err
}
if switchToBasic {
basicDir, err := hamtDir.switchToBasic(ctx)
if err != nil {
return err
}
err = basicDir.AddChild(ctx, name, nd)
if err != nil {
return err
}
d.Directory = basicDir
return nil
}
return d.Directory.AddChild(ctx, name, nd)
}
// BasicDirectory
basicDir := d.Directory.(*BasicDirectory)
switchToHAMT, err := basicDir.needsToSwitchToHAMTDir(name, nd)
if err != nil {
return err
}
if !switchToHAMT {
return basicDir.AddChild(ctx, name, nd)
}
hamtDir, err = basicDir.switchToSharding(ctx)
if err != nil {
return err
}
hamtDir.AddChild(ctx, name, nd)
if err != nil {
return err
}
d.Directory = hamtDir
return nil
}
// RemoveChild implements the `Directory` interface. Used in the case where we wrap
// a HAMTDirectory that might need to be downgraded to a BasicDirectory. The
// upgrade path is in AddChild.
func (d *DynamicDirectory) RemoveChild(ctx context.Context, name string) error {
hamtDir, ok := d.Directory.(*HAMTDirectory)
if !ok {
return d.Directory.RemoveChild(ctx, name)
}
switchToBasic, err := hamtDir.needsToSwitchToBasicDir(ctx, name, nil)
if err != nil {
return err
}
if !switchToBasic {
return hamtDir.RemoveChild(ctx, name)
}
basicDir, err := hamtDir.switchToBasic(ctx)
if err != nil {
return err
}
basicDir.RemoveChild(ctx, name)
if err != nil {
return err
}
d.Directory = basicDir
return nil
}