/
dagbuilder.go
168 lines (138 loc) · 4.04 KB
/
dagbuilder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
package helpers
import (
dag "github.com/ipfs/go-ipfs/merkledag"
"github.com/ipfs/go-ipfs/pin"
)
// NodeCB is callback function for dag generation
// the `last` flag signifies whether or not this is the last
// (top-most root) node being added. useful for things like
// only pinning the first node recursively.
type NodeCB func(node *dag.Node, last bool) error
var nilFunc NodeCB = func(_ *dag.Node, _ bool) error { return nil }
// DagBuilderHelper wraps together a bunch of objects needed to
// efficiently create unixfs dag trees
type DagBuilderHelper struct {
dserv dag.DAGService
mp pin.ManualPinner
in <-chan []byte
errs <-chan error
recvdErr error
nextData []byte // the next item to return.
maxlinks int
ncb NodeCB
batch *dag.Batch
}
type DagBuilderParams struct {
// Maximum number of links per intermediate node
Maxlinks int
// DAGService to write blocks to (required)
Dagserv dag.DAGService
// Callback for each block added
NodeCB NodeCB
}
// Generate a new DagBuilderHelper from the given params, using 'in' as a
// data source
func (dbp *DagBuilderParams) New(in <-chan []byte, errs <-chan error) *DagBuilderHelper {
ncb := dbp.NodeCB
if ncb == nil {
ncb = nilFunc
}
return &DagBuilderHelper{
dserv: dbp.Dagserv,
in: in,
errs: errs,
maxlinks: dbp.Maxlinks,
ncb: ncb,
batch: dbp.Dagserv.Batch(),
}
}
// prepareNext consumes the next item from the channel and puts it
// in the nextData field. it is idempotent-- if nextData is full
// it will do nothing.
//
// i realized that building the dag becomes _a lot_ easier if we can
// "peek" the "are done yet?" (i.e. not consume it from the channel)
func (db *DagBuilderHelper) prepareNext() {
if db.in == nil {
// if our input is nil, there is "nothing to do". we're done.
// as if there was no data at all. (a sort of zero-value)
return
}
// if we already have data waiting to be consumed, we're ready.
if db.nextData != nil {
return
}
// if it's closed, nextData will be correctly set to nil, signaling
// that we're done consuming from the channel.
db.nextData = <-db.in
}
// Done returns whether or not we're done consuming the incoming data.
func (db *DagBuilderHelper) Done() bool {
// ensure we have an accurate perspective on data
// as `done` this may be called before `next`.
db.prepareNext() // idempotent
return db.nextData == nil
}
// Next returns the next chunk of data to be inserted into the dag
// if it returns nil, that signifies that the stream is at an end, and
// that the current building operation should finish
func (db *DagBuilderHelper) Next() []byte {
db.prepareNext() // idempotent
d := db.nextData
db.nextData = nil // signal we've consumed it
return d
}
// GetDagServ returns the dagservice object this Helper is using
func (db *DagBuilderHelper) GetDagServ() dag.DAGService {
return db.dserv
}
// FillNodeLayer will add datanodes as children to the give node until
// at most db.indirSize ndoes are added
//
// warning: **children** pinned indirectly, but input node IS NOT pinned.
func (db *DagBuilderHelper) FillNodeLayer(node *UnixfsNode) error {
// while we have room AND we're not done
for node.NumChildren() < db.maxlinks && !db.Done() {
child := NewUnixfsBlock()
if err := db.FillNodeWithData(child); err != nil {
return err
}
if err := node.AddChild(child, db); err != nil {
return err
}
}
return nil
}
func (db *DagBuilderHelper) FillNodeWithData(node *UnixfsNode) error {
data := db.Next()
if data == nil { // we're done!
return nil
}
if len(data) > BlockSizeLimit {
return ErrSizeLimitExceeded
}
node.SetData(data)
return nil
}
func (db *DagBuilderHelper) Add(node *UnixfsNode) (*dag.Node, error) {
dn, err := node.GetDagNode()
if err != nil {
return nil, err
}
_, err = db.dserv.Add(dn)
if err != nil {
return nil, err
}
// node callback
err = db.ncb(dn, true)
if err != nil {
return nil, err
}
return dn, nil
}
func (db *DagBuilderHelper) Maxlinks() int {
return db.maxlinks
}
func (db *DagBuilderHelper) Close() error {
return db.batch.Commit()
}