Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

all: clean up the configs for pruner and snapshotter #22396

Merged
merged 2 commits into from
Sep 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
27 changes: 24 additions & 3 deletions cmd/geth/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,14 @@ func pruneState(ctx *cli.Context) error {
defer stack.Close()

chaindb := utils.MakeChainDatabase(ctx, stack, false)
pruner, err := pruner.NewPruner(chaindb, stack.ResolvePath(""), stack.ResolvePath(config.Eth.TrieCleanCacheJournal), ctx.Uint64(utils.BloomFilterSizeFlag.Name))
defer chaindb.Close()

prunerconfig := pruner.Config{
Datadir: stack.ResolvePath(""),
Cachedir: stack.ResolvePath(config.Eth.TrieCleanCacheJournal),
BloomSize: ctx.Uint64(utils.BloomFilterSizeFlag.Name),
}
pruner, err := pruner.NewPruner(chaindb, prunerconfig)
if err != nil {
log.Error("Failed to open snapshot tree", "err", err)
return err
Expand Down Expand Up @@ -199,12 +206,20 @@ func verifyState(ctx *cli.Context) error {
defer stack.Close()

chaindb := utils.MakeChainDatabase(ctx, stack, true)
defer chaindb.Close()

headBlock := rawdb.ReadHeadBlock(chaindb)
if headBlock == nil {
log.Error("Failed to load head block")
return errors.New("no head block")
}
snaptree, err := snapshot.New(chaindb, trie.NewDatabase(chaindb), 256, headBlock.Root(), false, false, false)
snapconfig := snapshot.Config{
CacheSize: 256,
Recovery: false,
NoBuild: true,
AsyncBuild: false,
}
snaptree, err := snapshot.New(snapconfig, chaindb, trie.NewDatabase(chaindb), headBlock.Root())
if err != nil {
log.Error("Failed to open snapshot tree", "err", err)
return err
Expand Down Expand Up @@ -479,7 +494,13 @@ func dumpState(ctx *cli.Context) error {
if err != nil {
return err
}
snaptree, err := snapshot.New(db, trie.NewDatabase(db), 256, root, false, false, false)
snapConfig := snapshot.Config{
CacheSize: 256,
Recovery: false,
NoBuild: true,
AsyncBuild: false,
}
snaptree, err := snapshot.New(snapConfig, db, trie.NewDatabase(db), root)
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -2198,6 +2198,9 @@ func MakeChain(ctx *cli.Context, stack *node.Node) (*core.BlockChain, ethdb.Data
if !ctx.Bool(SnapshotFlag.Name) {
cache.SnapshotLimit = 0 // Disabled
}
// Disable snapshot generation/wiping by default
cache.SnapshotNoBuild = true

if ctx.IsSet(CacheFlag.Name) || ctx.IsSet(CacheTrieFlag.Name) {
cache.TrieCleanLimit = ctx.Int(CacheFlag.Name) * ctx.Int(CacheTrieFlag.Name) / 100
}
Expand All @@ -2206,7 +2209,6 @@ func MakeChain(ctx *cli.Context, stack *node.Node) (*core.BlockChain, ethdb.Data
}
vmcfg := vm.Config{EnablePreimageRecording: ctx.Bool(VMEnableDebugFlag.Name)}

// TODO(rjl493456442) disable snapshot generation/wiping if the chain is read only.
// Disable transaction indexing/unindexing by default.
chain, err := core.NewBlockChain(chainDb, cache, gspec, nil, engine, vmcfg, nil, nil)
if err != nil {
Expand Down
11 changes: 9 additions & 2 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ type CacheConfig struct {
SnapshotLimit int // Memory allowance (MB) to use for caching snapshot entries in memory
Preimages bool // Whether to store preimage of trie key to the disk

SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it
SnapshotNoBuild bool // Whether the background generation is allowed
SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it
}

// defaultCacheConfig are the default caching values if none are specified by the
Expand Down Expand Up @@ -399,7 +400,13 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
log.Warn("Enabling snapshot recovery", "chainhead", head.NumberU64(), "diskbase", *layer)
recover = true
}
bc.snaps, _ = snapshot.New(bc.db, bc.stateCache.TrieDB(), bc.cacheConfig.SnapshotLimit, head.Root(), !bc.cacheConfig.SnapshotWait, true, recover)
snapconfig := snapshot.Config{
CacheSize: bc.cacheConfig.SnapshotLimit,
Recovery: recover,
NoBuild: bc.cacheConfig.SnapshotNoBuild,
AsyncBuild: !bc.cacheConfig.SnapshotWait,
}
bc.snaps, _ = snapshot.New(snapconfig, bc.db, bc.stateCache.TrieDB(), head.Root())
}

// Start future block processor.
Expand Down
67 changes: 42 additions & 25 deletions core/state/pruner/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ var (
emptyCode = crypto.Keccak256(nil)
)

// Config includes all the configurations for pruning.
type Config struct {
Datadir string // The directory of the state database
Cachedir string // The directory of state clean cache
BloomSize uint64 // The Megabytes of memory allocated to bloom-filter
}

// Pruner is an offline tool to prune the stale state with the
// help of the snapshot. The workflow of pruner is very simple:
//
Expand All @@ -75,40 +82,44 @@ var (
// periodically in order to release the disk usage and improve the
// disk read performance to some extent.
type Pruner struct {
db ethdb.Database
stateBloom *stateBloom
datadir string
trieCachePath string
headHeader *types.Header
snaptree *snapshot.Tree
config Config
chainHeader *types.Header
db ethdb.Database
stateBloom *stateBloom
snaptree *snapshot.Tree
}

// NewPruner creates the pruner instance.
func NewPruner(db ethdb.Database, datadir, trieCachePath string, bloomSize uint64) (*Pruner, error) {
func NewPruner(db ethdb.Database, config Config) (*Pruner, error) {
headBlock := rawdb.ReadHeadBlock(db)
if headBlock == nil {
return nil, errors.New("Failed to load head block")
}
snaptree, err := snapshot.New(db, trie.NewDatabase(db), 256, headBlock.Root(), false, false, false)
snapconfig := snapshot.Config{
CacheSize: 256,
Recovery: false,
NoBuild: true,
AsyncBuild: false,
}
snaptree, err := snapshot.New(snapconfig, db, trie.NewDatabase(db), headBlock.Root())
if err != nil {
return nil, err // The relevant snapshot(s) might not exist
}
// Sanitize the bloom filter size if it's too small.
if bloomSize < 256 {
log.Warn("Sanitizing bloomfilter size", "provided(MB)", bloomSize, "updated(MB)", 256)
bloomSize = 256
if config.BloomSize < 256 {
log.Warn("Sanitizing bloomfilter size", "provided(MB)", config.BloomSize, "updated(MB)", 256)
config.BloomSize = 256
}
stateBloom, err := newStateBloomWithSize(bloomSize)
stateBloom, err := newStateBloomWithSize(config.BloomSize)
if err != nil {
return nil, err
}
return &Pruner{
db: db,
stateBloom: stateBloom,
datadir: datadir,
trieCachePath: trieCachePath,
headHeader: headBlock.Header(),
snaptree: snaptree,
config: config,
chainHeader: headBlock.Header(),
db: db,
stateBloom: stateBloom,
snaptree: snaptree,
}, nil
}

Expand Down Expand Up @@ -236,12 +247,12 @@ func (p *Pruner) Prune(root common.Hash) error {
// reuse it for pruning instead of generating a new one. It's
// mandatory because a part of state may already be deleted,
// the recovery procedure is necessary.
_, stateBloomRoot, err := findBloomFilter(p.datadir)
_, stateBloomRoot, err := findBloomFilter(p.config.Datadir)
if err != nil {
return err
}
if stateBloomRoot != (common.Hash{}) {
return RecoverPruning(p.datadir, p.db, p.trieCachePath)
return RecoverPruning(p.config.Datadir, p.db, p.config.Cachedir)
}
// If the target state root is not specified, use the HEAD-127 as the
// target. The reason for picking it is:
Expand All @@ -252,7 +263,7 @@ func (p *Pruner) Prune(root common.Hash) error {
// Retrieve all snapshot layers from the current HEAD.
// In theory there are 128 difflayers + 1 disk layer present,
// so 128 diff layers are expected to be returned.
layers = p.snaptree.Snapshots(p.headHeader.Root, 128, true)
layers = p.snaptree.Snapshots(p.chainHeader.Root, 128, true)
if len(layers) != 128 {
// Reject if the accumulated diff layers are less than 128. It
// means in most of normal cases, there is no associated state
Expand Down Expand Up @@ -294,7 +305,7 @@ func (p *Pruner) Prune(root common.Hash) error {
}
} else {
if len(layers) > 0 {
log.Info("Selecting bottom-most difflayer as the pruning target", "root", root, "height", p.headHeader.Number.Uint64()-127)
log.Info("Selecting bottom-most difflayer as the pruning target", "root", root, "height", p.chainHeader.Number.Uint64()-127)
} else {
log.Info("Selecting user-specified state as the pruning target", "root", root)
}
Expand All @@ -303,7 +314,7 @@ func (p *Pruner) Prune(root common.Hash) error {
// It's necessary otherwise in the next restart we will hit the
// deleted state root in the "clean cache" so that the incomplete
// state is picked for usage.
deleteCleanTrieCache(p.trieCachePath)
deleteCleanTrieCache(p.config.Cachedir)

// All the state roots of the middle layer should be forcibly pruned,
// otherwise the dangling state will be left.
Expand All @@ -325,7 +336,7 @@ func (p *Pruner) Prune(root common.Hash) error {
if err := extractGenesis(p.db, p.stateBloom); err != nil {
return err
}
filterName := bloomFilterName(p.datadir, root)
filterName := bloomFilterName(p.config.Datadir, root)

log.Info("Writing state bloom to disk", "name", filterName)
if err := p.stateBloom.Commit(filterName, filterName+stateBloomFileTempSuffix); err != nil {
Expand Down Expand Up @@ -362,7 +373,13 @@ func RecoverPruning(datadir string, db ethdb.Database, trieCachePath string) err
// - The state HEAD is rewound already because of multiple incomplete `prune-state`
// In this case, even the state HEAD is not exactly matched with snapshot, it
// still feasible to recover the pruning correctly.
snaptree, err := snapshot.New(db, trie.NewDatabase(db), 256, headBlock.Root(), false, false, true)
snapconfig := snapshot.Config{
CacheSize: 256,
Recovery: true,
NoBuild: true,
AsyncBuild: false,
}
snaptree, err := snapshot.New(snapconfig, db, trie.NewDatabase(db), headBlock.Root())
if err != nil {
return err // The relevant snapshot(s) might not exist
}
Expand Down
11 changes: 7 additions & 4 deletions core/state/snapshot/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func loadAndParseJournal(db ethdb.KeyValueStore, base *diskLayer) (snapshot, jou
}

// loadSnapshot loads a pre-existing state snapshot backed by a key-value store.
func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root common.Hash, recovery bool) (snapshot, bool, error) {
func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, root common.Hash, cache int, recovery bool, noBuild bool) (snapshot, bool, error) {
// If snapshotting is disabled (initial sync in progress), don't do anything,
// wait for the chain to permit us to do something meaningful
if rawdb.ReadSnapshotDisabled(diskdb) {
Expand All @@ -140,7 +140,7 @@ func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int,
}
snapshot, generator, err := loadAndParseJournal(diskdb, base)
if err != nil {
log.Warn("Failed to load new-format journal", "error", err)
log.Warn("Failed to load journal", "error", err)
return nil, false, err
}
// Entire snapshot journal loaded, sanity check the head. If the loaded
Expand All @@ -164,13 +164,16 @@ func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int,
// disk layer.
log.Warn("Snapshot is not continuous with chain", "snaproot", head, "chainroot", root)
}
// Everything loaded correctly, resume any suspended operations
// Load the disk layer status from the generator if it's not complete
if !generator.Done {
// Whether or not wiping was in progress, load any generator progress too
base.genMarker = generator.Marker
if base.genMarker == nil {
base.genMarker = []byte{}
}
}
// Everything loaded correctly, resume any suspended operations
// if the background generation is allowed
if !generator.Done && !noBuild {
base.genPending = make(chan struct{})
base.genAbort = make(chan chan *generatorStats)

Expand Down
25 changes: 17 additions & 8 deletions core/state/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,14 @@ type snapshot interface {
StorageIterator(account common.Hash, seek common.Hash) (StorageIterator, bool)
}

// Config includes the configurations for snapshots.
type Config struct {
CacheSize int // Megabytes permitted to use for read caches
Recovery bool // Indicator that the snapshots is in the recovery mode
NoBuild bool // Indicator that the snapshots generation is disallowed
AsyncBuild bool // The snapshot generation is allowed to be constructed asynchronously
}

// Tree is an Ethereum state snapshot tree. It consists of one persistent base
// layer backed by a key-value store, on top of which arbitrarily many in-memory
// diff layers are topped. The memory diffs can form a tree with branching, but
Expand All @@ -158,9 +166,9 @@ type snapshot interface {
// storage data to avoid expensive multi-level trie lookups; and to allow sorted,
// cheap iteration of the account/storage tries for sync aid.
type Tree struct {
config Config // Snapshots configurations
diskdb ethdb.KeyValueStore // Persistent database to store the snapshot
triedb *trie.Database // In-memory cache to access the trie through
cache int // Megabytes permitted to use for read caches
layers map[common.Hash]snapshot // Collection of all known layers
lock sync.RWMutex

Expand All @@ -183,26 +191,27 @@ type Tree struct {
// This case happens when the snapshot is 'ahead' of the state trie.
// - otherwise, the entire snapshot is considered invalid and will be recreated on
// a background thread.
func New(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root common.Hash, async bool, rebuild bool, recovery bool) (*Tree, error) {
func New(config Config, diskdb ethdb.KeyValueStore, triedb *trie.Database, root common.Hash) (*Tree, error) {
// Create a new, empty snapshot tree
snap := &Tree{
config: config,
diskdb: diskdb,
triedb: triedb,
cache: cache,
layers: make(map[common.Hash]snapshot),
}
if !async {
// Create the building waiter iff the background generation is allowed
if !config.NoBuild && !config.AsyncBuild {
defer snap.waitBuild()
}
// Attempt to load a previously persisted snapshot and rebuild one if failed
head, disabled, err := loadSnapshot(diskdb, triedb, cache, root, recovery)
head, disabled, err := loadSnapshot(diskdb, triedb, root, config.CacheSize, config.Recovery, config.NoBuild)
if disabled {
log.Warn("Snapshot maintenance disabled (syncing)")
return snap, nil
}
if err != nil {
if rebuild {
log.Warn("Failed to load snapshot, regenerating", "err", err)
log.Warn("Failed to load snapshot", "err", err)
if !config.NoBuild {
snap.Rebuild(root)
return snap, nil
}
Expand Down Expand Up @@ -727,7 +736,7 @@ func (t *Tree) Rebuild(root common.Hash) {
// generator will run a wiper first if there's not one running right now.
log.Info("Rebuilding state snapshot")
t.layers = map[common.Hash]snapshot{
root: generateSnapshot(t.diskdb, t.triedb, t.cache, root),
root: generateSnapshot(t.diskdb, t.triedb, t.config.CacheSize, root),
}
}

Expand Down
8 changes: 7 additions & 1 deletion tests/state_test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,13 @@ func MakePreState(db ethdb.Database, accounts core.GenesisAlloc, snapshotter boo

var snaps *snapshot.Tree
if snapshotter {
snaps, _ = snapshot.New(db, sdb.TrieDB(), 1, root, false, true, false)
snapconfig := snapshot.Config{
CacheSize: 1,
Recovery: false,
NoBuild: false,
AsyncBuild: false,
}
snaps, _ = snapshot.New(snapconfig, db, sdb.TrieDB(), root)
}
statedb, _ = state.New(root, sdb, snaps)
return snaps, statedb
Expand Down