Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[R4R] state verification pipeline #795

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions core/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error {
// transition, such as amount of used gas, the receipt roots and the state root
// itself. ValidateState returns a database batch if the validation was a success
// otherwise nil and an error is returned.
func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateDB, receipts types.Receipts, usedGas uint64, skipHeavyVerify bool) error {
func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateDB, receipts types.Receipts, usedGas uint64) error {
header := block.Header()
if block.GasUsed() != usedGas {
return fmt.Errorf("invalid gas used (remote: %d local: %d)", block.GasUsed(), usedGas)
Expand All @@ -135,13 +135,15 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateD
return nil
},
}
if skipHeavyVerify {
if statedb.IsPipeCommit() {
validateFuns = append(validateFuns, func() error {
if err := statedb.WaitPipeVerification(); err != nil {
return err
}
statedb.CorrectAccountsRoot()
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
statedb.Finalise(v.config.IsEIP158(header.Number))
statedb.AccountsIntermediateRoot()
// State verification pipeline - accounts root are not calculated here, just populate needed fields for process
statedb.PopulateSnapAccountAndStorage()
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
return nil
})
} else {
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2143,7 +2143,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
// Validate the state using the default validator
substart = time.Now()
if !statedb.IsLightProcessed() {
if err := bc.validator.ValidateState(block, statedb, receipts, usedGas, bc.pipeCommit); err != nil {
if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil {
log.Error("validate state failed", "error", err)
bc.reportBlock(block, receipts, err)
return it.index, err
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func testBlockChainImport(chain types.Blocks, pipelineCommit bool, blockchain *B
blockchain.reportBlock(block, receipts, err)
return err
}
err = blockchain.validator.ValidateState(block, statedb, receipts, usedGas, pipelineCommit)
err = blockchain.validator.ValidateState(block, statedb, receipts, usedGas)
if err != nil {
blockchain.reportBlock(block, receipts, err)
return err
Expand Down
40 changes: 38 additions & 2 deletions core/state/snapshot/difflayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,9 @@ type diffLayer struct {
storageList map[common.Hash][]common.Hash // List of storage slots for iterated retrievals, one per account. Any existing lists are sorted if non-nil
storageData map[common.Hash]map[common.Hash][]byte // Keyed storage slots for direct retrieval. one per account (nil means deleted)

verifiedCh chan struct{} // the difflayer is verified when verifiedCh is nil or closed
valid bool // mark the difflayer is valid or not.
verifiedCh chan struct{} // the difflayer is verified when verifiedCh is nil or closed
valid bool // mark the difflayer is valid or not.
accountCorrected bool // mark the accountData has been corrected ort not
unclezoro marked this conversation as resolved.
Show resolved Hide resolved

diffed *bloomfilter.Filter // Bloom filter tracking all the diffed items up to the disk layer

Expand Down Expand Up @@ -182,6 +183,7 @@ func newDiffLayer(parent snapshot, root common.Hash, destructs map[common.Hash]s
storageList: make(map[common.Hash][]common.Hash),
verifiedCh: verified,
}

switch parent := parent.(type) {
case *diskLayer:
dl.rebloom(parent)
Expand All @@ -190,6 +192,7 @@ func newDiffLayer(parent snapshot, root common.Hash, destructs map[common.Hash]s
default:
panic("unknown parent type")
}

// Sanity check that accounts or storage slots are never nil
for accountHash, blob := range accounts {
if blob == nil {
Expand Down Expand Up @@ -286,6 +289,21 @@ func (dl *diffLayer) Verified() bool {
}
}

func (dl *diffLayer) CorrectAccounts(accounts map[common.Hash][]byte) {
dl.lock.Lock()
defer dl.lock.Unlock()

dl.accountData = accounts
dl.accountCorrected = true
}

func (dl *diffLayer) AccountsCorrected() bool {
dl.lock.RLock()
defer dl.lock.RUnlock()

return dl.accountCorrected
}

// Parent returns the subsequent layer of a diff layer.
func (dl *diffLayer) Parent() snapshot {
return dl.parent
Expand Down Expand Up @@ -314,6 +332,24 @@ func (dl *diffLayer) Account(hash common.Hash) (*Account, error) {
return account, nil
}

// Accounts directly retrieves all accounts in current snapshot in
// the snapshot slim data format.
func (dl *diffLayer) Accounts() (map[common.Hash]*Account, error) {
dl.lock.RLock()
defer dl.lock.RUnlock()

accounts := make(map[common.Hash]*Account, len(dl.accountData))
for hash, data := range dl.accountData {
account := new(Account)
if err := rlp.DecodeBytes(data, account); err != nil {
return nil, err
}
accounts[hash] = account
}

return accounts, nil
}

// AccountRLP directly retrieves the account RLP associated with a particular
// hash in the snapshot slim data format.
//
Expand Down
13 changes: 13 additions & 0 deletions core/state/snapshot/disklayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ func (dl *diskLayer) Verified() bool {
return true
}

func (dl *diskLayer) CorrectAccounts(map[common.Hash][]byte) {
}

func (dl *diskLayer) AccountsCorrected() bool {
return true
}

// Parent always returns nil as there's no layer below the disk.
func (dl *diskLayer) Parent() snapshot {
return nil
Expand All @@ -73,6 +80,12 @@ func (dl *diskLayer) Stale() bool {
return dl.stale
}

// Accounts directly retrieves all accounts in current snapshot in
// the snapshot slim data format.
func (dl *diskLayer) Accounts() (map[common.Hash]*Account, error) {
return nil, nil
}

// Account directly retrieves the account associated with a particular hash in
// the snapshot slim data format.
func (dl *diskLayer) Account(hash common.Hash) (*Account, error) {
Expand Down
1 change: 1 addition & 0 deletions core/state/snapshot/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ func (dl *diffLayer) Journal(buffer *bytes.Buffer) (common.Hash, error) {
if dl.Stale() {
return common.Hash{}, ErrSnapshotStale
}

// Everything below was journalled, persist this layer too
if err := rlp.Encode(buffer, dl.root); err != nil {
return common.Hash{}, err
Expand Down
22 changes: 21 additions & 1 deletion core/state/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,23 @@ type Snapshot interface {
// Verified returns whether the snapshot is verified
Verified() bool

// Store the verification result
// MarkValid stores the verification result
MarkValid()

// CorrectAccounts updates account data for storing the correct data during pipecommit
CorrectAccounts(map[common.Hash][]byte)

// AccountsCorrected checks whether the account data has been corrected during pipecommit
AccountsCorrected() bool

// Account directly retrieves the account associated with a particular hash in
// the snapshot slim data format.
Account(hash common.Hash) (*Account, error)

// Accounts directly retrieves all accounts in current snapshot in
// the snapshot slim data format.
Accounts() (map[common.Hash]*Account, error)

// AccountRLP directly retrieves the account RLP associated with a particular
// hash in the snapshot slim data format.
AccountRLP(hash common.Hash) ([]byte, error)
Expand Down Expand Up @@ -240,6 +250,11 @@ func (t *Tree) waitBuild() {
}
}

// Layers returns the number of layers
func (t *Tree) Layers() int {
return len(t.layers)
}

// Disable interrupts any pending snapshot generator, deletes all the snapshot
// layers in memory and marks snapshots disabled globally. In order to resume
// the snapshot functionality, the caller must invoke Rebuild.
Expand Down Expand Up @@ -666,6 +681,11 @@ func (t *Tree) Journal(root common.Hash) (common.Hash, error) {
if snap == nil {
return common.Hash{}, fmt.Errorf("snapshot [%#x] missing", root)
}
// Wait the snapshot(difflayer) is verified, it means the account data also been refreshed with the correct data
if !snap.WaitAndGetVerifyRes() {
return common.Hash{}, ErrSnapshotStale
}

// Run the journaling
t.lock.Lock()
defer t.lock.Unlock()
Expand Down
21 changes: 16 additions & 5 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,11 @@ func (s Storage) Copy() Storage {
// Account values can be accessed and modified through the object.
// Finally, call CommitTrie to write the modified storage trie into a database.
type StateObject struct {
address common.Address
addrHash common.Hash // hash of ethereum address of the account
data Account
db *StateDB
address common.Address
addrHash common.Hash // hash of ethereum address of the account
data Account
db *StateDB
rootCorrected bool // To indicate whether the root has been corrected in pipecommit mode

// DB error.
// State objects are used by the consensus core and VM which are
Expand Down Expand Up @@ -355,7 +356,17 @@ func (s *StateObject) finalise(prefetch bool) {
}
}

if s.db.prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != emptyRoot {
// The account root need to be updated before prefetch, otherwise the account root is empty
if s.db.pipeCommit && s.data.Root == dummyRoot && !s.rootCorrected && s.db.snap.AccountsCorrected() {
if acc, err := s.db.snap.Account(crypto.HashData(s.db.hasher, s.address.Bytes())); err == nil {
if acc != nil && len(acc.Root) != 0 {
s.data.Root = common.BytesToHash(acc.Root)
s.rootCorrected = true
}
}
}

if s.db.prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != emptyRoot && s.data.Root != dummyRoot {
s.db.prefetcher.prefetch(s.data.Root, slotsToPrefetch, s.addrHash)
}
if len(s.dirtyStorage) > 0 {
Expand Down
Loading