Skip to content

Commit

Permalink
Merge pull request #3876 from filecoin-project/feat/pruning-command
Browse files Browse the repository at this point in the history
add command to (slowly) prune lotus chain datastore
  • Loading branch information
magik6k committed Oct 2, 2020
2 parents 43bd7cb + 1319d70 commit 44e352c
Show file tree
Hide file tree
Showing 4 changed files with 320 additions and 17 deletions.
46 changes: 29 additions & 17 deletions chain/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1191,13 +1191,6 @@ func recurseLinks(bs bstore.Blockstore, walked *cid.Set, root cid.Cid, in []cid.
}

func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, skipOldMsgs bool, w io.Writer) error {
if ts == nil {
ts = cs.GetHeaviestTipSet()
}

seen := cid.NewSet()
walked := cid.NewSet()

h := &car.CarHeader{
Roots: ts.Cids(),
Version: 1,
Expand All @@ -1207,6 +1200,28 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRo
return xerrors.Errorf("failed to write car header: %s", err)
}

return cs.WalkSnapshot(ctx, ts, inclRecentRoots, skipOldMsgs, func(c cid.Cid) error {
blk, err := cs.bs.Get(c)
if err != nil {
return xerrors.Errorf("writing object to car, bs.Get: %w", err)
}

if err := carutil.LdWrite(w, c.Bytes(), blk.RawData()); err != nil {
return xerrors.Errorf("failed to write block to car output: %w", err)
}

return nil
})
}

func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, skipOldMsgs bool, cb func(cid.Cid) error) error {
if ts == nil {
ts = cs.GetHeaviestTipSet()
}

seen := cid.NewSet()
walked := cid.NewSet()

blocksToWalk := ts.Cids()
currentMinHeight := ts.Height()

Expand All @@ -1215,15 +1230,15 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRo
return nil
}

if err := cb(blk); err != nil {
return err
}

data, err := cs.bs.Get(blk)
if err != nil {
return xerrors.Errorf("getting block: %w", err)
}

if err := carutil.LdWrite(w, blk.Bytes(), data.RawData()); err != nil {
return xerrors.Errorf("failed to write block to car output: %w", err)
}

var b types.BlockHeader
if err := b.UnmarshalCBOR(bytes.NewBuffer(data.RawData())); err != nil {
return xerrors.Errorf("unmarshaling block header (cid=%s): %w", blk, err)
Expand Down Expand Up @@ -1270,14 +1285,11 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRo
if c.Prefix().Codec != cid.DagCBOR {
continue
}
data, err := cs.bs.Get(c)
if err != nil {
return xerrors.Errorf("writing object to car (get %s): %w", c, err)
}

if err := carutil.LdWrite(w, c.Bytes(), data.RawData()); err != nil {
return xerrors.Errorf("failed to write out car object: %w", err)
if err := cb(c); err != nil {
return err
}

}
}

Expand Down
1 change: 1 addition & 0 deletions cmd/lotus-shed/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func main() {
consensusCmd,
serveDealStatsCmd,
syncCmd,
stateTreePruneCmd,
}

app := &cli.App{
Expand Down
289 changes: 289 additions & 0 deletions cmd/lotus-shed/pruning.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,289 @@
package main

import (
"context"
"fmt"

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/vm"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/lib/blockstore"
"github.com/filecoin-project/lotus/node/repo"
"github.com/ipfs/bbloom"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
dshelp "github.com/ipfs/go-ipfs-ds-help"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
)

type cidSet interface {
Add(cid.Cid)
Has(cid.Cid) bool
HasRaw([]byte) bool
Len() int
}

type bloomSet struct {
bloom *bbloom.Bloom
}

func newBloomSet(size int64) (*bloomSet, error) {
b, err := bbloom.New(float64(size), 3)
if err != nil {
return nil, err
}

return &bloomSet{bloom: b}, nil
}

func (bs *bloomSet) Add(c cid.Cid) {
bs.bloom.Add(c.Hash())

}

func (bs *bloomSet) Has(c cid.Cid) bool {
return bs.bloom.Has(c.Hash())
}

func (bs *bloomSet) HasRaw(b []byte) bool {
return bs.bloom.Has(b)
}

func (bs *bloomSet) Len() int {
return int(bs.bloom.ElementsAdded())
}

type mapSet struct {
m map[string]struct{}
}

func newMapSet() *mapSet {
return &mapSet{m: make(map[string]struct{})}
}

func (bs *mapSet) Add(c cid.Cid) {
bs.m[string(c.Hash())] = struct{}{}
}

func (bs *mapSet) Has(c cid.Cid) bool {
_, ok := bs.m[string(c.Hash())]
return ok
}

func (bs *mapSet) HasRaw(b []byte) bool {
_, ok := bs.m[string(b)]
return ok
}

func (bs *mapSet) Len() int {
return len(bs.m)
}

var stateTreePruneCmd = &cli.Command{
Name: "state-prune",
Description: "Deletes old state root data from local chainstore",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "repo",
Value: "~/.lotus",
},
&cli.Int64Flag{
Name: "keep-from-lookback",
Usage: "keep stateroots at or newer than the current height minus this lookback",
Value: 1800, // 2 x finality
},
&cli.IntFlag{
Name: "delete-up-to",
Usage: "delete up to the given number of objects (used to run a faster 'partial' sync)",
},
&cli.BoolFlag{
Name: "use-bloom-set",
Usage: "use a bloom filter for the 'good' set instead of a map, reduces memory usage but may not clean up as much",
},
&cli.BoolFlag{
Name: "dry-run",
Usage: "only enumerate the good set, don't do any deletions",
},
&cli.BoolFlag{
Name: "only-ds-gc",
Usage: "Only run datastore GC",
},
&cli.IntFlag{
Name: "gc-count",
Usage: "number of times to run gc",
Value: 20,
},
},
Action: func(cctx *cli.Context) error {
ctx := context.TODO()

fsrepo, err := repo.NewFS(cctx.String("repo"))
if err != nil {
return err
}

lkrepo, err := fsrepo.Lock(repo.FullNode)
if err != nil {
return err
}

defer lkrepo.Close() //nolint:errcheck

ds, err := lkrepo.Datastore("/chain")
if err != nil {
return err
}

defer ds.Close() //nolint:errcheck

mds, err := lkrepo.Datastore("/metadata")
if err != nil {
return err
}
defer mds.Close() //nolint:errcheck

if cctx.Bool("only-ds-gc") {
gcds, ok := ds.(datastore.GCDatastore)
if ok {
fmt.Println("running datastore gc....")
for i := 0; i < cctx.Int("gc-count"); i++ {
if err := gcds.CollectGarbage(); err != nil {
return xerrors.Errorf("datastore GC failed: %w", err)
}
}
fmt.Println("gc complete!")
return nil
}
return fmt.Errorf("datastore doesnt support gc")
}

bs := blockstore.NewBlockstore(ds)

cs := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier))
if err := cs.Load(); err != nil {
return fmt.Errorf("loading chainstore: %w", err)
}

var goodSet cidSet
if cctx.Bool("use-bloom-set") {
bset, err := newBloomSet(10000000)
if err != nil {
return err
}
goodSet = bset
} else {
goodSet = newMapSet()
}

ts := cs.GetHeaviestTipSet()

rrLb := abi.ChainEpoch(cctx.Int64("keep-from-lookback"))

if err := cs.WalkSnapshot(ctx, ts, rrLb, true, func(c cid.Cid) error {
if goodSet.Len()%20 == 0 {
fmt.Printf("\renumerating keep set: %d ", goodSet.Len())
}
goodSet.Add(c)
return nil
}); err != nil {
return fmt.Errorf("snapshot walk failed: %w", err)
}

fmt.Println()
fmt.Printf("Successfully marked keep set! (%d objects)\n", goodSet.Len())

if cctx.Bool("dry-run") {
return nil
}

var b datastore.Batch
var batchCount int
markForRemoval := func(c cid.Cid) error {
if b == nil {
nb, err := ds.Batch()
if err != nil {
return fmt.Errorf("opening batch: %w", err)
}

b = nb
}
batchCount++

if err := b.Delete(dshelp.MultihashToDsKey(c.Hash())); err != nil {
return err
}

if batchCount > 100 {
if err := b.Commit(); err != nil {
return xerrors.Errorf("failed to commit batch deletes: %w", err)
}
b = nil
batchCount = 0
}
return nil
}

res, err := ds.Query(query.Query{KeysOnly: true})
if err != nil {
return xerrors.Errorf("failed to query datastore: %w", err)
}

dupTo := cctx.Int("delete-up-to")

var deleteCount int
var goodHits int
for {
v, ok := res.NextSync()
if !ok {
break
}

bk, err := dshelp.BinaryFromDsKey(datastore.RawKey(v.Key[len("/blocks"):]))
if err != nil {
return xerrors.Errorf("failed to parse key: %w", err)
}

if goodSet.HasRaw(bk) {
goodHits++
continue
}

nc := cid.NewCidV1(cid.Raw, bk)

deleteCount++
if err := markForRemoval(nc); err != nil {
return fmt.Errorf("failed to remove cid %s: %w", nc, err)
}

if deleteCount%20 == 0 {
fmt.Printf("\rdeleting %d objects (good hits: %d)... ", deleteCount, goodHits)
}

if dupTo != 0 && deleteCount > dupTo {
break
}
}

if b != nil {
if err := b.Commit(); err != nil {
return xerrors.Errorf("failed to commit final batch delete: %w", err)
}
}

gcds, ok := ds.(datastore.GCDatastore)
if ok {
fmt.Println("running datastore gc....")
for i := 0; i < cctx.Int("gc-count"); i++ {
if err := gcds.CollectGarbage(); err != nil {
return xerrors.Errorf("datastore GC failed: %w", err)
}
}
fmt.Println("gc complete!")
}

return nil
},
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ require (
github.com/hashicorp/go-multierror v1.1.0
github.com/hashicorp/golang-lru v0.5.4
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d
github.com/ipfs/bbloom v0.0.4
github.com/ipfs/go-bitswap v0.2.20
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-blockservice v0.1.4-0.20200624145336-a978cec6e834
Expand Down

0 comments on commit 44e352c

Please sign in to comment.