diff --git a/core/block_validator.go b/core/block_validator.go index b109c1e54b..c6a35f1fdf 100644 --- a/core/block_validator.go +++ b/core/block_validator.go @@ -112,7 +112,7 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error { // transition, such as amount of used gas, the receipt roots and the state root // itself. ValidateState returns a database batch if the validation was a success // otherwise nil and an error is returned. -func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateDB, receipts types.Receipts, usedGas uint64, skipHeavyVerify bool) error { +func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateDB, receipts types.Receipts, usedGas uint64) error { header := block.Header() if block.GasUsed() != usedGas { return fmt.Errorf("invalid gas used (remote: %d local: %d)", block.GasUsed(), usedGas) @@ -135,13 +135,15 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateD return nil }, } - if skipHeavyVerify { + if statedb.IsPipeCommit() { validateFuns = append(validateFuns, func() error { if err := statedb.WaitPipeVerification(); err != nil { return err } + statedb.CorrectAccountsRoot() statedb.Finalise(v.config.IsEIP158(header.Number)) - statedb.AccountsIntermediateRoot() + // State verification pipeline - accounts root are not calculated here, just populate needed fields for process + statedb.PopulateSnapAccountAndStorage() return nil }) } else { diff --git a/core/blockchain.go b/core/blockchain.go index fbf9af9db7..47ac859441 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2143,7 +2143,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er // Validate the state using the default validator substart = time.Now() if !statedb.IsLightProcessed() { - if err := bc.validator.ValidateState(block, statedb, receipts, usedGas, bc.pipeCommit); err != nil { + if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil { log.Error("validate state failed", "error", err) bc.reportBlock(block, receipts, err) return it.index, err diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 50d02e0acc..07cb51933a 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -209,7 +209,7 @@ func testBlockChainImport(chain types.Blocks, pipelineCommit bool, blockchain *B blockchain.reportBlock(block, receipts, err) return err } - err = blockchain.validator.ValidateState(block, statedb, receipts, usedGas, pipelineCommit) + err = blockchain.validator.ValidateState(block, statedb, receipts, usedGas) if err != nil { blockchain.reportBlock(block, receipts, err) return err diff --git a/core/state/snapshot/difflayer.go b/core/state/snapshot/difflayer.go index 65b2729d9c..d2b1b2778b 100644 --- a/core/state/snapshot/difflayer.go +++ b/core/state/snapshot/difflayer.go @@ -118,8 +118,9 @@ type diffLayer struct { storageList map[common.Hash][]common.Hash // List of storage slots for iterated retrievals, one per account. Any existing lists are sorted if non-nil storageData map[common.Hash]map[common.Hash][]byte // Keyed storage slots for direct retrieval. one per account (nil means deleted) - verifiedCh chan struct{} // the difflayer is verified when verifiedCh is nil or closed - valid bool // mark the difflayer is valid or not. + verifiedCh chan struct{} // the difflayer is verified when verifiedCh is nil or closed + valid bool // mark the difflayer is valid or not. + accountCorrected bool // mark the accountData has been corrected ort not diffed *bloomfilter.Filter // Bloom filter tracking all the diffed items up to the disk layer @@ -182,6 +183,7 @@ func newDiffLayer(parent snapshot, root common.Hash, destructs map[common.Hash]s storageList: make(map[common.Hash][]common.Hash), verifiedCh: verified, } + switch parent := parent.(type) { case *diskLayer: dl.rebloom(parent) @@ -190,6 +192,7 @@ func newDiffLayer(parent snapshot, root common.Hash, destructs map[common.Hash]s default: panic("unknown parent type") } + // Sanity check that accounts or storage slots are never nil for accountHash, blob := range accounts { if blob == nil { @@ -286,6 +289,21 @@ func (dl *diffLayer) Verified() bool { } } +func (dl *diffLayer) CorrectAccounts(accounts map[common.Hash][]byte) { + dl.lock.Lock() + defer dl.lock.Unlock() + + dl.accountData = accounts + dl.accountCorrected = true +} + +func (dl *diffLayer) AccountsCorrected() bool { + dl.lock.RLock() + defer dl.lock.RUnlock() + + return dl.accountCorrected +} + // Parent returns the subsequent layer of a diff layer. func (dl *diffLayer) Parent() snapshot { return dl.parent @@ -314,6 +332,24 @@ func (dl *diffLayer) Account(hash common.Hash) (*Account, error) { return account, nil } +// Accounts directly retrieves all accounts in current snapshot in +// the snapshot slim data format. +func (dl *diffLayer) Accounts() (map[common.Hash]*Account, error) { + dl.lock.RLock() + defer dl.lock.RUnlock() + + accounts := make(map[common.Hash]*Account, len(dl.accountData)) + for hash, data := range dl.accountData { + account := new(Account) + if err := rlp.DecodeBytes(data, account); err != nil { + return nil, err + } + accounts[hash] = account + } + + return accounts, nil +} + // AccountRLP directly retrieves the account RLP associated with a particular // hash in the snapshot slim data format. // diff --git a/core/state/snapshot/disklayer.go b/core/state/snapshot/disklayer.go index c1de41782c..6d46496a71 100644 --- a/core/state/snapshot/disklayer.go +++ b/core/state/snapshot/disklayer.go @@ -59,6 +59,13 @@ func (dl *diskLayer) Verified() bool { return true } +func (dl *diskLayer) CorrectAccounts(map[common.Hash][]byte) { +} + +func (dl *diskLayer) AccountsCorrected() bool { + return true +} + // Parent always returns nil as there's no layer below the disk. func (dl *diskLayer) Parent() snapshot { return nil @@ -73,6 +80,12 @@ func (dl *diskLayer) Stale() bool { return dl.stale } +// Accounts directly retrieves all accounts in current snapshot in +// the snapshot slim data format. +func (dl *diskLayer) Accounts() (map[common.Hash]*Account, error) { + return nil, nil +} + // Account directly retrieves the account associated with a particular hash in // the snapshot slim data format. func (dl *diskLayer) Account(hash common.Hash) (*Account, error) { diff --git a/core/state/snapshot/journal.go b/core/state/snapshot/journal.go index 35c69cfd6b..587f78a474 100644 --- a/core/state/snapshot/journal.go +++ b/core/state/snapshot/journal.go @@ -288,6 +288,7 @@ func (dl *diffLayer) Journal(buffer *bytes.Buffer) (common.Hash, error) { if dl.Stale() { return common.Hash{}, ErrSnapshotStale } + // Everything below was journalled, persist this layer too if err := rlp.Encode(buffer, dl.root); err != nil { return common.Hash{}, err diff --git a/core/state/snapshot/snapshot.go b/core/state/snapshot/snapshot.go index 8ac93f28e4..7ad4bcc91b 100644 --- a/core/state/snapshot/snapshot.go +++ b/core/state/snapshot/snapshot.go @@ -107,13 +107,23 @@ type Snapshot interface { // Verified returns whether the snapshot is verified Verified() bool - // Store the verification result + // MarkValid stores the verification result MarkValid() + // CorrectAccounts updates account data for storing the correct data during pipecommit + CorrectAccounts(map[common.Hash][]byte) + + // AccountsCorrected checks whether the account data has been corrected during pipecommit + AccountsCorrected() bool + // Account directly retrieves the account associated with a particular hash in // the snapshot slim data format. Account(hash common.Hash) (*Account, error) + // Accounts directly retrieves all accounts in current snapshot in + // the snapshot slim data format. + Accounts() (map[common.Hash]*Account, error) + // AccountRLP directly retrieves the account RLP associated with a particular // hash in the snapshot slim data format. AccountRLP(hash common.Hash) ([]byte, error) @@ -240,6 +250,11 @@ func (t *Tree) waitBuild() { } } +// Layers returns the number of layers +func (t *Tree) Layers() int { + return len(t.layers) +} + // Disable interrupts any pending snapshot generator, deletes all the snapshot // layers in memory and marks snapshots disabled globally. In order to resume // the snapshot functionality, the caller must invoke Rebuild. @@ -666,6 +681,11 @@ func (t *Tree) Journal(root common.Hash) (common.Hash, error) { if snap == nil { return common.Hash{}, fmt.Errorf("snapshot [%#x] missing", root) } + // Wait the snapshot(difflayer) is verified, it means the account data also been refreshed with the correct data + if !snap.WaitAndGetVerifyRes() { + return common.Hash{}, ErrSnapshotStale + } + // Run the journaling t.lock.Lock() defer t.lock.Unlock() diff --git a/core/state/state_object.go b/core/state/state_object.go index c5212e91cc..a89feebf28 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -64,10 +64,11 @@ func (s Storage) Copy() Storage { // Account values can be accessed and modified through the object. // Finally, call CommitTrie to write the modified storage trie into a database. type StateObject struct { - address common.Address - addrHash common.Hash // hash of ethereum address of the account - data Account - db *StateDB + address common.Address + addrHash common.Hash // hash of ethereum address of the account + data Account + db *StateDB + rootCorrected bool // To indicate whether the root has been corrected in pipecommit mode // DB error. // State objects are used by the consensus core and VM which are @@ -355,7 +356,17 @@ func (s *StateObject) finalise(prefetch bool) { } } - if s.db.prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != emptyRoot { + // The account root need to be updated before prefetch, otherwise the account root is empty + if s.db.pipeCommit && s.data.Root == dummyRoot && !s.rootCorrected && s.db.snap.AccountsCorrected() { + if acc, err := s.db.snap.Account(crypto.HashData(s.db.hasher, s.address.Bytes())); err == nil { + if acc != nil && len(acc.Root) != 0 { + s.data.Root = common.BytesToHash(acc.Root) + s.rootCorrected = true + } + } + } + + if s.db.prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != emptyRoot && s.data.Root != dummyRoot { s.db.prefetcher.prefetch(s.data.Root, slotsToPrefetch, s.addrHash) } if len(s.dirtyStorage) > 0 { diff --git a/core/state/statedb.go b/core/state/statedb.go index c8cb410b0e..2621907f6c 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -50,6 +50,10 @@ var ( // emptyRoot is the known root hash of an empty trie. emptyRoot = common.HexToHash("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421") + // dummyRoot is the dummy account root before corrected in pipecommit sync mode, + // the value is 542e5fc2709de84248e9bce43a9c0c8943a608029001360f8ab55bf113b23d28 + dummyRoot = crypto.Keccak256Hash([]byte("dummy_account_root")) + emptyAddr = crypto.Keccak256Hash(common.Address{}.Bytes()) ) @@ -229,11 +233,16 @@ func (s *StateDB) MarkLightProcessed() { // Enable the pipeline commit function of statedb func (s *StateDB) EnablePipeCommit() { - if s.snap != nil { + if s.snap != nil && s.snaps.Layers() > 1 { s.pipeCommit = true } } +// IsPipeCommit checks whether pipecommit is enabled on the statedb or not +func (s *StateDB) IsPipeCommit() bool { + return s.pipeCommit +} + // Mark that the block is full processed func (s *StateDB) MarkFullProcessed() { s.fullProcessed = true @@ -965,6 +974,65 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { return s.StateIntermediateRoot() } +//CorrectAccountsRoot will fix account roots in pipecommit mode +func (s *StateDB) CorrectAccountsRoot() { + if accounts, err := s.snap.Accounts(); err == nil && accounts != nil { + for _, obj := range s.stateObjects { + if !obj.deleted && !obj.rootCorrected && obj.data.Root == dummyRoot { + if account, exist := accounts[crypto.Keccak256Hash(obj.address[:])]; exist && len(account.Root) != 0 { + obj.data.Root = common.BytesToHash(account.Root) + } + } + } + } +} + +//PopulateSnapAccountAndStorage tries to populate required accounts and storages for pipecommit +func (s *StateDB) PopulateSnapAccountAndStorage() { + for addr := range s.stateObjectsPending { + if obj := s.stateObjects[addr]; !obj.deleted { + if s.snap != nil && !obj.deleted { + root := obj.data.Root + storageChanged := s.populateSnapStorage(obj) + if storageChanged { + root = dummyRoot + } + s.snapAccounts[obj.address] = snapshot.SlimAccountRLP(obj.data.Nonce, obj.data.Balance, root, obj.data.CodeHash) + } + } + } +} + +//populateSnapStorage tries to populate required storages for pipecommit, and returns a flag to indicate whether the storage root changed or not +func (s *StateDB) populateSnapStorage(obj *StateObject) bool { + for key, value := range obj.dirtyStorage { + obj.pendingStorage[key] = value + } + if len(obj.pendingStorage) == 0 { + return false + } + var storage map[string][]byte + for key, value := range obj.pendingStorage { + var v []byte + if (value != common.Hash{}) { + // Encoding []byte cannot fail, ok to ignore the error. + v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(value[:])) + } + // If state snapshotting is active, cache the data til commit + if obj.db.snap != nil { + if storage == nil { + // Retrieve the old storage map, if available, create a new one otherwise + if storage = obj.db.snapStorage[obj.address]; storage == nil { + storage = make(map[string][]byte) + obj.db.snapStorage[obj.address] = storage + } + } + storage[string(key[:])] = v // v will be nil if value is 0x00 + } + } + return true +} + func (s *StateDB) AccountsIntermediateRoot() { tasks := make(chan func()) finishCh := make(chan struct{}) @@ -1050,6 +1118,7 @@ func (s *StateDB) StateIntermediateRoot() common.Hash { } s.trie = tr } + usedAddrs := make([][]byte, 0, len(s.stateObjectsPending)) for addr := range s.stateObjectsPending { if obj := s.stateObjects[addr]; obj.deleted { @@ -1062,6 +1131,7 @@ func (s *StateDB) StateIntermediateRoot() common.Hash { if prefetcher != nil { prefetcher.used(s.originalRoot, usedAddrs) } + if len(s.stateObjectsPending) > 0 { s.stateObjectsPending = make(map[common.Address]struct{}) } @@ -1239,6 +1309,7 @@ func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() er var diffLayer *types.DiffLayer var verified chan struct{} var snapUpdated chan struct{} + if s.snap != nil { diffLayer = &types.DiffLayer{} } @@ -1250,9 +1321,24 @@ func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() er commmitTrie := func() error { commitErr := func() error { + if s.pipeCommit { + <-snapUpdated + // Due to state verification pipeline, the accounts roots are not updated, leading to the data in the difflayer is not correct, capture the correct data here + s.AccountsIntermediateRoot() + if parent := s.snap.Root(); parent != s.expectedRoot { + accountData := make(map[common.Hash][]byte) + for k, v := range s.snapAccounts { + accountData[crypto.Keccak256Hash(k[:])] = v + } + s.snaps.Snapshot(s.expectedRoot).CorrectAccounts(accountData) + } + } + if s.stateRoot = s.StateIntermediateRoot(); s.fullProcessed && s.expectedRoot != s.stateRoot { + log.Error("Invalid merkle root", "remote", s.expectedRoot, "local", s.stateRoot) return fmt.Errorf("invalid merkle root (remote: %x local: %x)", s.expectedRoot, s.stateRoot) } + tasks := make(chan func()) taskResults := make(chan error, len(s.stateObjectsDirty)) tasksNum := 0 @@ -1328,12 +1414,10 @@ func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() er if s.pipeCommit { if commitErr == nil { - <-snapUpdated s.snaps.Snapshot(s.stateRoot).MarkValid() } else { // The blockchain will do the further rewind if write block not finish yet if failPostCommitFunc != nil { - <-snapUpdated failPostCommitFunc() } log.Error("state verification failed", "err", commitErr) @@ -1383,11 +1467,15 @@ func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() er if s.pipeCommit { defer close(snapUpdated) } + diffLayer.Destructs, diffLayer.Accounts, diffLayer.Storages = s.SnapToDiffLayer() // Only update if there's a state transition (skip empty Clique blocks) if parent := s.snap.Root(); parent != s.expectedRoot { - if err := s.snaps.Update(s.expectedRoot, parent, s.snapDestructs, s.snapAccounts, s.snapStorage, verified); err != nil { + err := s.snaps.Update(s.expectedRoot, parent, s.snapDestructs, s.snapAccounts, s.snapStorage, verified) + + if err != nil { log.Warn("Failed to update snapshot tree", "from", parent, "to", s.expectedRoot, "err", err) } + // Keep n diff layers in the memory // - head layer is paired with HEAD state // - head-1 layer is paired with HEAD-1 state @@ -1401,12 +1489,6 @@ func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() er } return nil }, - func() error { - if s.snap != nil { - diffLayer.Destructs, diffLayer.Accounts, diffLayer.Storages = s.SnapToDiffLayer() - } - return nil - }, } if s.pipeCommit { go commmitTrie() diff --git a/core/state_processor.go b/core/state_processor.go index ef077e6135..f78c2e38c4 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -309,7 +309,7 @@ func (p *LightStateProcessor) LightProcess(diffLayer *types.DiffLayer, block *ty } // Do validate in advance so that we can fall back to full process - if err := p.bc.validator.ValidateState(block, statedb, diffLayer.Receipts, gasUsed, false); err != nil { + if err := p.bc.validator.ValidateState(block, statedb, diffLayer.Receipts, gasUsed); err != nil { log.Error("validate state failed during diff sync", "error", err) return nil, nil, 0, err } diff --git a/core/types.go b/core/types.go index 61722aea74..c9061233e6 100644 --- a/core/types.go +++ b/core/types.go @@ -31,7 +31,7 @@ type Validator interface { // ValidateState validates the given statedb and optionally the receipts and // gas used. - ValidateState(block *types.Block, state *state.StateDB, receipts types.Receipts, usedGas uint64, skipHeavyVerify bool) error + ValidateState(block *types.Block, state *state.StateDB, receipts types.Receipts, usedGas uint64) error } // Prefetcher is an interface for pre-caching transaction signatures and state.