/
import.go
110 lines (91 loc) · 2.55 KB
/
import.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package cmd
import (
"bufio"
"context"
"fmt"
"io"
"os"
"strings"
"github.com/filecoin-project/venus/pkg/consensus/chainselector"
"github.com/filecoin-project/venus/pkg/httpreader"
"github.com/DataDog/zstd"
"github.com/filecoin-project/venus/pkg/chain"
"github.com/filecoin-project/venus/pkg/repo"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/mitchellh/go-homedir"
"gopkg.in/cheggaaa/pb.v1"
)
var logImport = logging.Logger("commands/import")
// Import cache tipset cids to store.
// The value of the cached tipset CIDS is used as the check-point when running `venus daemon`
func Import(ctx context.Context, r repo.Repo, fileName string) error {
return importChain(ctx, r, fileName)
}
func importChain(ctx context.Context, r repo.Repo, fname string) error {
var rd io.Reader
var l int64
if strings.HasPrefix(fname, "http://") || strings.HasPrefix(fname, "https://") {
rrd, err := httpreader.NewResumableReader(ctx, fname)
if err != nil {
return fmt.Errorf("fetching chain CAR failed: setting up resumable reader: %w", err)
}
rd = rrd
l = rrd.ContentLength()
} else {
fname, err := homedir.Expand(fname)
if err != nil {
return err
}
fi, err := os.Open(fname)
if err != nil {
return err
}
defer fi.Close() //nolint:errcheck
st, err := os.Stat(fname)
if err != nil {
return err
}
rd = fi
l = st.Size()
}
bs := r.Datastore()
// setup a ipldCbor on top of the local store
chainStore := chain.NewStore(r.ChainDatastore(), bs, cid.Undef, chain.NewMockCirculatingSupplyCalculator(), chainselector.Weight)
bufr := bufio.NewReaderSize(rd, 1<<20)
header, err := bufr.Peek(4)
if err != nil {
return fmt.Errorf("peek header: %w", err)
}
bar := pb.New64(l)
br := bar.NewProxyReader(bufr)
bar.ShowTimeLeft = true
bar.ShowPercent = true
bar.ShowSpeed = true
bar.Units = pb.U_BYTES
var ir io.Reader = br
if string(header[1:]) == "\xB5\x2F\xFD" { // zstd
zr := zstd.NewReader(br)
defer func() {
if err := zr.Close(); err != nil {
log.Errorw("closing zstd reader", "error", err)
}
}()
ir = zr
}
bar.Start()
tip, genesisBlk, err := chainStore.Import(ctx, ir)
if err != nil {
return fmt.Errorf("importing chain failed: %s", err)
}
bar.Finish()
err = chainStore.SetHead(context.TODO(), tip)
if err != nil {
return fmt.Errorf("importing chain failed: %s", err)
}
logImport.Infof("accepting %s as new head", tip.Key().String())
if err := chainStore.PersistGenesisCID(ctx, genesisBlk); err != nil {
return fmt.Errorf("persist genesis failed: %v", err)
}
return err
}