Skip to content

Commit

Permalink
[R4R] state verification pipeline (#795)
Browse files Browse the repository at this point in the history
* pipeline state verification

* update codes and add logs for debug

* refactor

* update and add logs

* refactor

* refactor

* remove unneeded logs

* fix a blocking issue

* fix sync issue when force kill

* remove logs

* refactor based on comments

* refactor based on comments

* refactor based on comments

* refactor based on comments

* refactor based on comments

* fix a deadlock issue

* fix merkle root mismatch issue during sync

* refactor based on review comments

* remove unnecessary code

* remove unnecessary code

* refactor based on review comments

* change based on comments

* refactor

* uew dummyRoot to replace emptyRoot

* add nil check

* add comments

* remove unneeded codes

* format comments

Co-authored-by: forcodedancing <liguo.fudan@gmail.com>
  • Loading branch information
2 people authored and unclezoro committed Mar 29, 2022
1 parent 4ff9697 commit d894987
Show file tree
Hide file tree
Showing 11 changed files with 190 additions and 25 deletions.
8 changes: 5 additions & 3 deletions core/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error {
// transition, such as amount of used gas, the receipt roots and the state root
// itself. ValidateState returns a database batch if the validation was a success
// otherwise nil and an error is returned.
func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateDB, receipts types.Receipts, usedGas uint64, skipHeavyVerify bool) error {
func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateDB, receipts types.Receipts, usedGas uint64) error {
header := block.Header()
if block.GasUsed() != usedGas {
return fmt.Errorf("invalid gas used (remote: %d local: %d)", block.GasUsed(), usedGas)
Expand All @@ -135,13 +135,15 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateD
return nil
},
}
if skipHeavyVerify {
if statedb.IsPipeCommit() {
validateFuns = append(validateFuns, func() error {
if err := statedb.WaitPipeVerification(); err != nil {
return err
}
statedb.CorrectAccountsRoot()
statedb.Finalise(v.config.IsEIP158(header.Number))
statedb.AccountsIntermediateRoot()
// State verification pipeline - accounts root are not calculated here, just populate needed fields for process
statedb.PopulateSnapAccountAndStorage()
return nil
})
} else {
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2143,7 +2143,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
// Validate the state using the default validator
substart = time.Now()
if !statedb.IsLightProcessed() {
if err := bc.validator.ValidateState(block, statedb, receipts, usedGas, bc.pipeCommit); err != nil {
if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil {
log.Error("validate state failed", "error", err)
bc.reportBlock(block, receipts, err)
return it.index, err
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func testBlockChainImport(chain types.Blocks, pipelineCommit bool, blockchain *B
blockchain.reportBlock(block, receipts, err)
return err
}
err = blockchain.validator.ValidateState(block, statedb, receipts, usedGas, pipelineCommit)
err = blockchain.validator.ValidateState(block, statedb, receipts, usedGas)
if err != nil {
blockchain.reportBlock(block, receipts, err)
return err
Expand Down
40 changes: 38 additions & 2 deletions core/state/snapshot/difflayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,9 @@ type diffLayer struct {
storageList map[common.Hash][]common.Hash // List of storage slots for iterated retrievals, one per account. Any existing lists are sorted if non-nil
storageData map[common.Hash]map[common.Hash][]byte // Keyed storage slots for direct retrieval. one per account (nil means deleted)

verifiedCh chan struct{} // the difflayer is verified when verifiedCh is nil or closed
valid bool // mark the difflayer is valid or not.
verifiedCh chan struct{} // the difflayer is verified when verifiedCh is nil or closed
valid bool // mark the difflayer is valid or not.
accountCorrected bool // mark the accountData has been corrected ort not

diffed *bloomfilter.Filter // Bloom filter tracking all the diffed items up to the disk layer

Expand Down Expand Up @@ -182,6 +183,7 @@ func newDiffLayer(parent snapshot, root common.Hash, destructs map[common.Hash]s
storageList: make(map[common.Hash][]common.Hash),
verifiedCh: verified,
}

switch parent := parent.(type) {
case *diskLayer:
dl.rebloom(parent)
Expand All @@ -190,6 +192,7 @@ func newDiffLayer(parent snapshot, root common.Hash, destructs map[common.Hash]s
default:
panic("unknown parent type")
}

// Sanity check that accounts or storage slots are never nil
for accountHash, blob := range accounts {
if blob == nil {
Expand Down Expand Up @@ -286,6 +289,21 @@ func (dl *diffLayer) Verified() bool {
}
}

func (dl *diffLayer) CorrectAccounts(accounts map[common.Hash][]byte) {
dl.lock.Lock()
defer dl.lock.Unlock()

dl.accountData = accounts
dl.accountCorrected = true
}

func (dl *diffLayer) AccountsCorrected() bool {
dl.lock.RLock()
defer dl.lock.RUnlock()

return dl.accountCorrected
}

// Parent returns the subsequent layer of a diff layer.
func (dl *diffLayer) Parent() snapshot {
return dl.parent
Expand Down Expand Up @@ -314,6 +332,24 @@ func (dl *diffLayer) Account(hash common.Hash) (*Account, error) {
return account, nil
}

// Accounts directly retrieves all accounts in current snapshot in
// the snapshot slim data format.
func (dl *diffLayer) Accounts() (map[common.Hash]*Account, error) {
dl.lock.RLock()
defer dl.lock.RUnlock()

accounts := make(map[common.Hash]*Account, len(dl.accountData))
for hash, data := range dl.accountData {
account := new(Account)
if err := rlp.DecodeBytes(data, account); err != nil {
return nil, err
}
accounts[hash] = account
}

return accounts, nil
}

// AccountRLP directly retrieves the account RLP associated with a particular
// hash in the snapshot slim data format.
//
Expand Down
13 changes: 13 additions & 0 deletions core/state/snapshot/disklayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ func (dl *diskLayer) Verified() bool {
return true
}

func (dl *diskLayer) CorrectAccounts(map[common.Hash][]byte) {
}

func (dl *diskLayer) AccountsCorrected() bool {
return true
}

// Parent always returns nil as there's no layer below the disk.
func (dl *diskLayer) Parent() snapshot {
return nil
Expand All @@ -73,6 +80,12 @@ func (dl *diskLayer) Stale() bool {
return dl.stale
}

// Accounts directly retrieves all accounts in current snapshot in
// the snapshot slim data format.
func (dl *diskLayer) Accounts() (map[common.Hash]*Account, error) {
return nil, nil
}

// Account directly retrieves the account associated with a particular hash in
// the snapshot slim data format.
func (dl *diskLayer) Account(hash common.Hash) (*Account, error) {
Expand Down
1 change: 1 addition & 0 deletions core/state/snapshot/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ func (dl *diffLayer) Journal(buffer *bytes.Buffer) (common.Hash, error) {
if dl.Stale() {
return common.Hash{}, ErrSnapshotStale
}

// Everything below was journalled, persist this layer too
if err := rlp.Encode(buffer, dl.root); err != nil {
return common.Hash{}, err
Expand Down
22 changes: 21 additions & 1 deletion core/state/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,23 @@ type Snapshot interface {
// Verified returns whether the snapshot is verified
Verified() bool

// Store the verification result
// MarkValid stores the verification result
MarkValid()

// CorrectAccounts updates account data for storing the correct data during pipecommit
CorrectAccounts(map[common.Hash][]byte)

// AccountsCorrected checks whether the account data has been corrected during pipecommit
AccountsCorrected() bool

// Account directly retrieves the account associated with a particular hash in
// the snapshot slim data format.
Account(hash common.Hash) (*Account, error)

// Accounts directly retrieves all accounts in current snapshot in
// the snapshot slim data format.
Accounts() (map[common.Hash]*Account, error)

// AccountRLP directly retrieves the account RLP associated with a particular
// hash in the snapshot slim data format.
AccountRLP(hash common.Hash) ([]byte, error)
Expand Down Expand Up @@ -240,6 +250,11 @@ func (t *Tree) waitBuild() {
}
}

// Layers returns the number of layers
func (t *Tree) Layers() int {
return len(t.layers)
}

// Disable interrupts any pending snapshot generator, deletes all the snapshot
// layers in memory and marks snapshots disabled globally. In order to resume
// the snapshot functionality, the caller must invoke Rebuild.
Expand Down Expand Up @@ -666,6 +681,11 @@ func (t *Tree) Journal(root common.Hash) (common.Hash, error) {
if snap == nil {
return common.Hash{}, fmt.Errorf("snapshot [%#x] missing", root)
}
// Wait the snapshot(difflayer) is verified, it means the account data also been refreshed with the correct data
if !snap.WaitAndGetVerifyRes() {
return common.Hash{}, ErrSnapshotStale
}

// Run the journaling
t.lock.Lock()
defer t.lock.Unlock()
Expand Down
21 changes: 16 additions & 5 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,11 @@ func (s Storage) Copy() Storage {
// Account values can be accessed and modified through the object.
// Finally, call CommitTrie to write the modified storage trie into a database.
type StateObject struct {
address common.Address
addrHash common.Hash // hash of ethereum address of the account
data Account
db *StateDB
address common.Address
addrHash common.Hash // hash of ethereum address of the account
data Account
db *StateDB
rootCorrected bool // To indicate whether the root has been corrected in pipecommit mode

// DB error.
// State objects are used by the consensus core and VM which are
Expand Down Expand Up @@ -355,7 +356,17 @@ func (s *StateObject) finalise(prefetch bool) {
}
}

if s.db.prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != emptyRoot {
// The account root need to be updated before prefetch, otherwise the account root is empty
if s.db.pipeCommit && s.data.Root == dummyRoot && !s.rootCorrected && s.db.snap.AccountsCorrected() {
if acc, err := s.db.snap.Account(crypto.HashData(s.db.hasher, s.address.Bytes())); err == nil {
if acc != nil && len(acc.Root) != 0 {
s.data.Root = common.BytesToHash(acc.Root)
s.rootCorrected = true
}
}
}

if s.db.prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != emptyRoot && s.data.Root != dummyRoot {
s.db.prefetcher.prefetch(s.data.Root, slotsToPrefetch, s.addrHash)
}
if len(s.dirtyStorage) > 0 {
Expand Down
Loading

0 comments on commit d894987

Please sign in to comment.