-
Notifications
You must be signed in to change notification settings - Fork 1
/
helper.go
127 lines (104 loc) · 3.33 KB
/
helper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package libs
import (
"context"
"io"
"sync"
cid "github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-unixfs/importer/helpers"
pb "github.com/ipfs/go-unixfs/pb"
)
// Helper callback function, when Helper calls Add to add a node to DAGService,
// it passes the mapping information of the node to MappingService.
type HelperAction func(node ipld.Node, srcPath string, offset uint64, size uint64)
func DefaultHelperAction(node ipld.Node, srcPath string, offset uint64, size uint64) {
}
type WrapDagBuilder struct {
helpers.Helper
spl EnhancedSplitter
hcb HelperAction
dserv ipld.DAGService
metas map[cid.Cid]*SliceMeta
recvdErr error
nextData []byte // the next item to return.
nextMeta *SliceMeta
lk sync.RWMutex
}
func WrappedDagBuilder(params *helpers.DagBuilderParams, spl EnhancedSplitter, hcb HelperAction) (*WrapDagBuilder, error) {
db, err := params.New(spl)
if err != nil {
return nil, err
}
return &WrapDagBuilder{
Helper: db,
spl: spl,
dserv: params.Dagserv,
hcb: hcb,
metas: make(map[cid.Cid]*SliceMeta, 0),
}, nil
}
// Rewrite 'NewLeafDataNode' to cache the 'SliceMeta' information retrieved from the 'EnhancedSplitter'
func (w *WrapDagBuilder) NewLeafDataNode(fsNodeType pb.Data_DataType) (node ipld.Node, dataSize uint64, err error) {
fileData, meta, err := w.next()
if err != nil {
return nil, 0, err
}
dataSize = uint64(len(fileData))
// Create a new leaf node containing the file chunk data.
node, err = w.NewLeafNode(fileData, fsNodeType)
if err != nil {
return nil, 0, err
}
// Convert this leaf to a `FilestoreNode` if needed.
node = w.ProcessFileStore(node, dataSize)
w.lk.Lock()
defer w.lk.Unlock()
// cache the 'SliceMeta' information retrieved from the 'EnhancedSplitter'
w.metas[node.Cid()] = meta
return node, dataSize, nil
}
// Rewrite the 'Add' method to invoke a callback function to pass back the mapping information of the node.
func (w *WrapDagBuilder) Add(node ipld.Node) error {
w.lk.RLock()
defer w.lk.RUnlock()
if meta, ok := w.metas[node.Cid()]; ok {
w.hcb(node, meta.Path, meta.Offset, meta.Size)
}
return w.dserv.Add(context.TODO(), node)
}
// Reimplement the 'prepareNext' function to make the helper retrieve data from the 'EnhancedSplitter' interface.
func (w *WrapDagBuilder) prepareNext() {
// if we already have data waiting to be consumed, we're ready
if w.nextData != nil || w.recvdErr != nil {
return
}
w.nextData, w.nextMeta, w.recvdErr = w.spl.NextBytesWithMeta()
if w.recvdErr == io.EOF {
w.recvdErr = nil
}
}
// Rewrite the 'Done' function to make the helper retrieve data from the 'EnhancedSplitter' interface.
func (w *WrapDagBuilder) Done() bool {
// ensure we have an accurate perspective on data
// as `done` this may be called before `next`.
w.prepareNext() // idempotent
if w.recvdErr != nil {
return false
}
return w.nextData == nil
}
// Rewrite the 'Next' function to make the helper retrieve data from the 'EnhancedSplitter' interface.
func (w *WrapDagBuilder) Next() ([]byte, error) {
w.prepareNext() // idempotent
d := w.nextData
w.nextData = nil // signal we've consumed it
if w.recvdErr != nil {
return nil, w.recvdErr
}
return d, nil
}
func (w *WrapDagBuilder) next() ([]byte, *SliceMeta, error) {
buf, err := w.Next()
return buf, w.nextMeta, err
}
var _ helpers.Helper = &WrapDagBuilder{}