Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[R4R] state verification pipeline #795

Merged
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions core/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error {
// transition, such as amount of used gas, the receipt roots and the state root
// itself. ValidateState returns a database batch if the validation was a success
// otherwise nil and an error is returned.
func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateDB, receipts types.Receipts, usedGas uint64, skipHeavyVerify bool) error {
func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateDB, receipts types.Receipts, usedGas uint64) error {
header := block.Header()
if block.GasUsed() != usedGas {
return fmt.Errorf("invalid gas used (remote: %d local: %d)", block.GasUsed(), usedGas)
Expand All @@ -135,13 +135,14 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateD
return nil
},
}
if skipHeavyVerify {
if statedb.IsPipeCommit() {
validateFuns = append(validateFuns, func() error {
if err := statedb.WaitPipeVerification(); err != nil {
return err
}
statedb.Finalise(v.config.IsEIP158(header.Number))
statedb.AccountsIntermediateRoot()
//state verification pipeline - accounts root are not calculated here
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
statedb.PopulateSnapAccountAndStorage()
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
return nil
})
} else {
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2143,7 +2143,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
// Validate the state using the default validator
substart = time.Now()
if !statedb.IsLightProcessed() {
if err := bc.validator.ValidateState(block, statedb, receipts, usedGas, bc.pipeCommit); err != nil {
if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil {
log.Error("validate state failed", "error", err)
bc.reportBlock(block, receipts, err)
return it.index, err
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func testBlockChainImport(chain types.Blocks, pipelineCommit bool, blockchain *B
blockchain.reportBlock(block, receipts, err)
return err
}
err = blockchain.validator.ValidateState(block, statedb, receipts, usedGas, pipelineCommit)
err = blockchain.validator.ValidateState(block, statedb, receipts, usedGas)
if err != nil {
blockchain.reportBlock(block, receipts, err)
return err
Expand Down
22 changes: 20 additions & 2 deletions core/state/snapshot/difflayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,9 @@ type diffLayer struct {
storageList map[common.Hash][]common.Hash // List of storage slots for iterated retrievals, one per account. Any existing lists are sorted if non-nil
storageData map[common.Hash]map[common.Hash][]byte // Keyed storage slots for direct retrieval. one per account (nil means deleted)

verifiedCh chan struct{} // the difflayer is verified when verifiedCh is nil or closed
valid bool // mark the difflayer is valid or not.
verifiedCh chan struct{} // the difflayer is verified when verifiedCh is nil or closed
valid bool // mark the difflayer is valid or not.
accountCorrected bool // mark the accountData has been corrected ort not
unclezoro marked this conversation as resolved.
Show resolved Hide resolved

diffed *bloomfilter.Filter // Bloom filter tracking all the diffed items up to the disk layer

Expand Down Expand Up @@ -182,6 +183,7 @@ func newDiffLayer(parent snapshot, root common.Hash, destructs map[common.Hash]s
storageList: make(map[common.Hash][]common.Hash),
verifiedCh: verified,
}

switch parent := parent.(type) {
case *diskLayer:
dl.rebloom(parent)
Expand All @@ -190,6 +192,7 @@ func newDiffLayer(parent snapshot, root common.Hash, destructs map[common.Hash]s
default:
panic("unknown parent type")
}

// Sanity check that accounts or storage slots are never nil
for accountHash, blob := range accounts {
if blob == nil {
Expand Down Expand Up @@ -286,6 +289,21 @@ func (dl *diffLayer) Verified() bool {
}
}

func (dl *diffLayer) CorrectAccounts(accounts map[common.Hash][]byte) {
dl.lock.Lock()
defer dl.lock.Unlock()

dl.accountData = accounts
dl.accountCorrected = true
}

func (dl *diffLayer) AccountsCorrected() bool {
dl.lock.RLock()
defer dl.lock.RUnlock()

return dl.accountCorrected
}

// Parent returns the subsequent layer of a diff layer.
func (dl *diffLayer) Parent() snapshot {
return dl.parent
Expand Down
7 changes: 7 additions & 0 deletions core/state/snapshot/disklayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ func (dl *diskLayer) Verified() bool {
return true
}

func (dl *diskLayer) CorrectAccounts(map[common.Hash][]byte) {
}

func (dl *diskLayer) AccountsCorrected() bool {
return true
}

// Parent always returns nil as there's no layer below the disk.
func (dl *diskLayer) Parent() snapshot {
return nil
Expand Down
1 change: 1 addition & 0 deletions core/state/snapshot/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ func (dl *diffLayer) Journal(buffer *bytes.Buffer) (common.Hash, error) {
if dl.Stale() {
return common.Hash{}, ErrSnapshotStale
}

// Everything below was journalled, persist this layer too
if err := rlp.Encode(buffer, dl.root); err != nil {
return common.Hash{}, err
Expand Down
17 changes: 16 additions & 1 deletion core/state/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,15 @@ type Snapshot interface {
// Verified returns whether the snapshot is verified
Verified() bool

// Store the verification result
// MarkValid stores the verification result
MarkValid()

// CorrectAccounts updates account data for storing the correct data during pipecommit
CorrectAccounts(map[common.Hash][]byte)

// AccountsCorrected checks whether the account data has been corrected during pipecommit
AccountsCorrected() bool

// Account directly retrieves the account associated with a particular hash in
// the snapshot slim data format.
Account(hash common.Hash) (*Account, error)
Expand Down Expand Up @@ -240,6 +246,10 @@ func (t *Tree) waitBuild() {
}
}

func (t *Tree) Layers() int {
return len(t.layers)
}

// Disable interrupts any pending snapshot generator, deletes all the snapshot
// layers in memory and marks snapshots disabled globally. In order to resume
// the snapshot functionality, the caller must invoke Rebuild.
Expand Down Expand Up @@ -666,6 +676,11 @@ func (t *Tree) Journal(root common.Hash) (common.Hash, error) {
if snap == nil {
return common.Hash{}, fmt.Errorf("snapshot [%#x] missing", root)
}
// Wait the snapshot(difflayer) is verified, it means the account data also been refreshed with the correct data
if !snap.WaitAndGetVerifyRes() {
return common.Hash{}, ErrSnapshotStale
}

// Run the journaling
t.lock.Lock()
defer t.lock.Unlock()
Expand Down
24 changes: 20 additions & 4 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,11 @@ func (s Storage) Copy() Storage {
// Account values can be accessed and modified through the object.
// Finally, call CommitTrie to write the modified storage trie into a database.
type StateObject struct {
address common.Address
addrHash common.Hash // hash of ethereum address of the account
data Account
db *StateDB
address common.Address
addrHash common.Hash // hash of ethereum address of the account
data Account
db *StateDB
rootCorrected bool // To indicate whether the root has been corrected in pipecommit mode

// DB error.
// State objects are used by the consensus core and VM which are
Expand Down Expand Up @@ -320,6 +321,21 @@ func (s *StateObject) finalise(prefetch bool) {
slotsToPrefetch = append(slotsToPrefetch, common.CopyBytes(key[:])) // Copy needed for closure
}
}

// The account root need to be updated before prefetch, otherwise the account root is empty
if s.db.pipeCommit {
if s.data.Root == emptyRoot && !s.rootCorrected {
forcodedancing marked this conversation as resolved.
Show resolved Hide resolved
if s.db.snap.AccountsCorrected() {
if acc, err := s.db.snap.Account(crypto.HashData(s.db.hasher, s.address.Bytes())); err == nil {
if acc != nil && len(acc.Root) != 0 {
s.data.Root = common.BytesToHash(acc.Root)
s.rootCorrected = true
}
}
}
}
}

if s.db.prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != emptyRoot {
s.db.prefetcher.prefetch(s.data.Root, slotsToPrefetch, s.addrHash)
}
Expand Down
84 changes: 74 additions & 10 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,16 @@ func (s *StateDB) MarkLightProcessed() {

// Enable the pipeline commit function of statedb
func (s *StateDB) EnablePipeCommit() {
if s.snap != nil {
if s.snap != nil && s.snaps.Layers() > 1 {
s.pipeCommit = true
}
}

// IsPipeCommit checks whether pipecommit is enabled on the statedb or not
func (s *StateDB) IsPipeCommit() bool {
return s.pipeCommit
}

// Mark that the block is full processed
func (s *StateDB) MarkFullProcessed() {
s.fullProcessed = true
Expand Down Expand Up @@ -1023,6 +1028,52 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
return s.StateIntermediateRoot()
}

//PopulateSnapAccountAndStorage tries to populate required accounts and storages for pipecommit
func (s *StateDB) PopulateSnapAccountAndStorage() {
for addr := range s.stateObjectsPending {
if obj := s.stateObjects[addr]; !obj.deleted {
if s.snap != nil && !obj.deleted {
root := obj.data.Root
storageChanged := s.populateSnapStorage(obj)
if storageChanged {
root = emptyRoot
}
s.snapAccounts[obj.address] = snapshot.SlimAccountRLP(obj.data.Nonce, obj.data.Balance, root, obj.data.CodeHash)
}
}
}
}

//populateSnapStorage tries to populate required storages for pipecommit, and returns a flag to indicate whether the storage root changed or not
func (s *StateDB) populateSnapStorage(obj *StateObject) bool {
for key, value := range obj.dirtyStorage {
obj.pendingStorage[key] = value
}
if len(obj.pendingStorage) == 0 {
return false
}
var storage map[string][]byte
for key, value := range obj.pendingStorage {
var v []byte
if (value != common.Hash{}) {
// Encoding []byte cannot fail, ok to ignore the error.
v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(value[:]))
}
// If state snapshotting is active, cache the data til commit
if obj.db.snap != nil {
if storage == nil {
// Retrieve the old storage map, if available, create a new one otherwise
if storage = obj.db.snapStorage[obj.address]; storage == nil {
storage = make(map[string][]byte)
obj.db.snapStorage[obj.address] = storage
}
}
storage[string(key[:])] = v // v will be nil if value is 0x00
}
}
return true
}

func (s *StateDB) AccountsIntermediateRoot() {
tasks := make(chan func())
finishCh := make(chan struct{})
Expand Down Expand Up @@ -1108,6 +1159,7 @@ func (s *StateDB) StateIntermediateRoot() common.Hash {
}
s.trie = tr
}

usedAddrs := make([][]byte, 0, len(s.stateObjectsPending))
for addr := range s.stateObjectsPending {
if obj := s.stateObjects[addr]; obj.deleted {
Expand All @@ -1120,6 +1172,7 @@ func (s *StateDB) StateIntermediateRoot() common.Hash {
if prefetcher != nil {
prefetcher.used(s.originalRoot, usedAddrs)
}

if len(s.stateObjectsPending) > 0 {
s.stateObjectsPending = make(map[common.Address]struct{})
}
Expand Down Expand Up @@ -1297,6 +1350,7 @@ func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() er
var diffLayer *types.DiffLayer
var verified chan struct{}
var snapUpdated chan struct{}

if s.snap != nil {
diffLayer = &types.DiffLayer{}
}
Expand All @@ -1308,9 +1362,23 @@ func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() er

commmitTrie := func() error {
commitErr := func() error {
if s.pipeCommit {
<-snapUpdated
// Due to state verification pipeline, the accounts roots are not updated, leading to the data in the difflayer is not correct, capture the correct data here
s.AccountsIntermediateRoot()
if parent := s.snap.Root(); parent != s.expectedRoot {
accountData := make(map[common.Hash][]byte)
for k, v := range s.snapAccounts {
accountData[crypto.Keccak256Hash(k[:])] = v
}
s.snaps.Snapshot(s.expectedRoot).CorrectAccounts(accountData)
}
}

if s.stateRoot = s.StateIntermediateRoot(); s.fullProcessed && s.expectedRoot != s.stateRoot {
return fmt.Errorf("invalid merkle root (remote: %x local: %x)", s.expectedRoot, s.stateRoot)
}

tasks := make(chan func())
taskResults := make(chan error, len(s.stateObjectsDirty))
tasksNum := 0
Expand Down Expand Up @@ -1399,12 +1467,10 @@ func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() er

if s.pipeCommit {
if commitErr == nil {
<-snapUpdated
s.snaps.Snapshot(s.stateRoot).MarkValid()
} else {
// The blockchain will do the further rewind if write block not finish yet
if failPostCommitFunc != nil {
<-snapUpdated
failPostCommitFunc()
}
log.Error("state verification failed", "err", commitErr)
Expand Down Expand Up @@ -1448,11 +1514,15 @@ func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() er
if s.pipeCommit {
defer close(snapUpdated)
}
diffLayer.Destructs, diffLayer.Accounts, diffLayer.Storages = s.SnapToDiffLayer()
// Only update if there's a state transition (skip empty Clique blocks)
if parent := s.snap.Root(); parent != s.expectedRoot {
if err := s.snaps.Update(s.expectedRoot, parent, s.snapDestructs, s.snapAccounts, s.snapStorage, verified); err != nil {
err := s.snaps.Update(s.expectedRoot, parent, s.snapDestructs, s.snapAccounts, s.snapStorage, verified)

if err != nil {
log.Warn("Failed to update snapshot tree", "from", parent, "to", s.expectedRoot, "err", err)
}

// Keep n diff layers in the memory
// - head layer is paired with HEAD state
// - head-1 layer is paired with HEAD-1 state
Expand All @@ -1466,12 +1536,6 @@ func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() er
}
return nil
},
func() error {
if s.snap != nil {
diffLayer.Destructs, diffLayer.Accounts, diffLayer.Storages = s.SnapToDiffLayer()
}
return nil
},
}
if s.pipeCommit {
go commmitTrie()
Expand Down
2 changes: 1 addition & 1 deletion core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func (p *LightStateProcessor) LightProcess(diffLayer *types.DiffLayer, block *ty
}

// Do validate in advance so that we can fall back to full process
if err := p.bc.validator.ValidateState(block, statedb, diffLayer.Receipts, gasUsed, false); err != nil {
if err := p.bc.validator.ValidateState(block, statedb, diffLayer.Receipts, gasUsed); err != nil {
log.Error("validate state failed during diff sync", "error", err)
return nil, nil, 0, err
}
Expand Down
2 changes: 1 addition & 1 deletion core/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type Validator interface {

// ValidateState validates the given statedb and optionally the receipts and
// gas used.
ValidateState(block *types.Block, state *state.StateDB, receipts types.Receipts, usedGas uint64, skipHeavyVerify bool) error
ValidateState(block *types.Block, state *state.StateDB, receipts types.Receipts, usedGas uint64) error
}

// Prefetcher is an interface for pre-caching transaction signatures and state.
Expand Down