/
copy.go
181 lines (156 loc) · 3.92 KB
/
copy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
package blockstore
import (
"bytes"
"context"
"fmt"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/multiformats/go-multicodec"
cbg "github.com/whyrusleeping/cbor-gen"
"go.opencensus.io/trace"
)
func CopyBlockstore(ctx context.Context, from, to Blockstore) error {
ctx, span := trace.StartSpan(ctx, "copyBlockstore")
defer span.End()
cids, err := from.AllKeysChan(ctx)
if err != nil {
return err
}
// TODO: should probably expose better methods on the blockstore for this operation
var blks []blocks.Block
for c := range cids {
b, err := from.Get(ctx, c)
if err != nil {
return err
}
blks = append(blks, b)
}
return to.PutMany(ctx, blks)
}
func linksForObj(blk blocks.Block, cb func(cid.Cid)) error {
switch blk.Cid().Prefix().Codec {
case cid.DagCBOR:
err := cbg.ScanForLinks(bytes.NewReader(blk.RawData()), cb)
if err != nil {
return fmt.Errorf("cbg.ScanForLinks: %v", err)
}
return nil
case cid.Raw:
// We implicitly have all children of raw blocks.
return nil
default:
return fmt.Errorf("vm flush copy method only supports dag cbor")
}
}
func CopyParticial(ctx context.Context, from, to Blockstore, root cid.Cid) error {
ctx, span := trace.StartSpan(ctx, "vm.Copy") // nolint
defer span.End()
var numBlocks int
var totalCopySize int
const batchSize = 128
const bufCount = 3
freeBufs := make(chan []blocks.Block, bufCount)
toFlush := make(chan []blocks.Block, bufCount)
for i := 0; i < bufCount; i++ {
freeBufs <- make([]blocks.Block, 0, batchSize)
}
errFlushChan := make(chan error)
go func() {
for b := range toFlush {
if err := to.PutMany(ctx, b); err != nil {
close(freeBufs)
errFlushChan <- fmt.Errorf("batch put in copy: %v", err)
return
}
freeBufs <- b[:0]
}
close(errFlushChan)
close(freeBufs)
}()
batch := <-freeBufs
batchCp := func(blk blocks.Block) error {
numBlocks++
totalCopySize += len(blk.RawData())
batch = append(batch, blk)
if len(batch) >= batchSize {
toFlush <- batch
var ok bool
batch, ok = <-freeBufs
if !ok {
return <-errFlushChan
}
}
return nil
}
if err := copyRec(ctx, from, to, root, batchCp); err != nil {
return fmt.Errorf("copyRec: %v", err)
}
if len(batch) > 0 {
toFlush <- batch
}
close(toFlush) // close the toFlush triggering the loop to end
err := <-errFlushChan // get error out or get nil if it was closed
if err != nil {
return err
}
span.AddAttributes(
trace.Int64Attribute("numBlocks", int64(numBlocks)),
trace.Int64Attribute("copySize", int64(totalCopySize)),
)
return nil
}
func copyRec(ctx context.Context, from, to Blockstore, root cid.Cid, cp func(blocks.Block) error) error {
if root.Prefix().MhType == 0 {
// identity cid, skip
return nil
}
blk, err := from.Get(ctx, root)
if err != nil {
return fmt.Errorf("get %s failed: %v", root, err)
}
var lerr error
err = linksForObj(blk, func(link cid.Cid) {
if lerr != nil {
// Theres no erorr return on linksForObj callback :(
return
}
prefix := link.Prefix()
codec := multicodec.Code(prefix.Codec)
switch codec {
case multicodec.FilCommitmentSealed, cid.FilCommitmentUnsealed:
return
}
// We always have blocks inlined into CIDs, but we may not have their children.
if multicodec.Code(prefix.MhType) == multicodec.Identity {
// Unless the inlined block has no children.
switch codec {
case multicodec.Raw, multicodec.Cbor:
return
}
} else {
// If we have an object, we already have its children, skip the object.
has, err := to.Has(ctx, link)
if err != nil {
lerr = fmt.Errorf("has: %v", err)
return
}
if has {
return
}
}
if err := copyRec(ctx, from, to, link, cp); err != nil {
lerr = err
return
}
})
if err != nil {
return fmt.Errorf("linksForObj (%x): %v", blk.RawData(), err)
}
if lerr != nil {
return lerr
}
if err := cp(blk); err != nil {
return fmt.Errorf("copy: %v", err)
}
return nil
}