Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] the Implementation of Parallel EVM 2.0(v1.1.16 rebased) #1130

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ var (
utils.GpoPercentileFlag,
utils.GpoMaxGasPriceFlag,
utils.GpoIgnoreGasPriceFlag,
utils.ParallelTxFlag,
utils.ParallelTxNumFlag,
utils.MinerNotifyFullFlag,
configFileFlag,
utils.BlockAmountReserved,
Expand Down
29 changes: 29 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"math/big"
"os"
"path/filepath"
"runtime"
godebug "runtime/debug"
"strconv"
"strings"
Expand Down Expand Up @@ -860,6 +861,14 @@ var (
Usage: "InfluxDB bucket name to push reported metrics to (v2 only)",
Value: metrics.DefaultConfig.InfluxDBBucket,
}
ParallelTxFlag = cli.BoolFlag{
Name: "parallel",
Usage: "Enable the experimental parallel transaction execution mode, only valid in full sync mode (default = false)",
}
ParallelTxNumFlag = cli.IntFlag{
Name: "parallel.num",
Usage: "Number of slot for transaction execution, only valid in parallel mode (runtime calculated, no fixed default value)",
}

// Init network
InitNetworkSize = cli.IntFlag{
Expand Down Expand Up @@ -1696,6 +1705,26 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
if ctx.GlobalIsSet(RangeLimitFlag.Name) {
cfg.RangeLimit = ctx.GlobalBool(RangeLimitFlag.Name)
}
if ctx.GlobalIsSet(ParallelTxFlag.Name) {
cfg.ParallelTxMode = ctx.GlobalBool(ParallelTxFlag.Name)
// The best prallel num will be tuned later, we do a simple parallel num set here
numCpu := runtime.NumCPU()
var parallelNum int
if ctx.GlobalIsSet(ParallelTxNumFlag.Name) {
// first of all, we use "--parallel.num", but "--parallel.num 0" is not allowed
parallelNum = ctx.GlobalInt(ParallelTxNumFlag.Name)
if parallelNum < 1 {
parallelNum = 1
}
} else if numCpu == 1 {
parallelNum = 1 // single CPU core
} else if numCpu < 10 {
parallelNum = numCpu - 1
} else {
parallelNum = 8 // we found concurrency 8 is slightly better than 15
}
cfg.ParallelTxNum = parallelNum
}
// Read the value from the flag no matter if it's set or not.
cfg.Preimages = ctx.GlobalBool(CachePreimagesFlag.Name)
if cfg.NoPruning && !cfg.Preimages {
Expand Down
54 changes: 37 additions & 17 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ var (
errInsertionInterrupted = errors.New("insertion is interrupted")
errStateRootVerificationFailed = errors.New("state root verification failed")
errChainStopped = errors.New("blockchain is stopped")
ParallelTxMode = false // parallel transaction execution
)

const (
Expand Down Expand Up @@ -248,13 +249,14 @@ type BlockChain struct {
running int32 // 0 if chain is running, 1 when stopped
procInterrupt int32 // interrupt signaler for block processing

engine consensus.Engine
prefetcher Prefetcher
validator Validator // Block and state validator interface
processor Processor // Block transaction processor interface
forker *ForkChoice
vmConfig vm.Config
pipeCommit bool
engine consensus.Engine
prefetcher Prefetcher
validator Validator // Block and state validator interface
processor Processor // Block transaction processor interface
forker *ForkChoice
vmConfig vm.Config
pipeCommit bool
parallelExecution bool

shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion.
Expand Down Expand Up @@ -1878,27 +1880,32 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool)
statedb.StartPrefetcher("chain")
interruptCh := make(chan struct{})
// For diff sync, it may fallback to full sync, so we still do prefetch
if len(block.Transactions()) >= prefetchTxNumber {
// do Prefetch in a separate goroutine to avoid blocking the critical path

// 1.do state prefetch for snapshot cache
throwaway := statedb.CopyDoPrefetch()
go bc.prefetcher.Prefetch(block, throwaway, &bc.vmConfig, interruptCh)
// parallel mode has a pipeline, similar to this prefetch, to save CPU we disable this prefetch for parallel
if !bc.parallelExecution {
if len(block.Transactions()) >= prefetchTxNumber {
// do Prefetch in a separate goroutine to avoid blocking the critical path

// 2.do trie prefetch for MPT trie node cache
// it is for the big state trie tree, prefetch based on transaction's From/To address.
// trie prefetcher is thread safe now, ok to prefetch in a separate routine
go throwaway.TriePrefetchInAdvance(block, signer)
}
// 1.do state prefetch for snapshot cache
throwaway := statedb.CopyDoPrefetch()
go bc.prefetcher.Prefetch(block, throwaway, &bc.vmConfig, interruptCh)

// 2.do trie prefetch for MPT trie node cache
// it is for the big state trie tree, prefetch based on transaction's From/To address.
// trie prefetcher is thread safe now, ok to prefetch in a separate routine
go throwaway.TriePrefetchInAdvance(block, signer)
}
}
//Process block using the parent state as reference point
substart := time.Now()
if bc.pipeCommit {
statedb.EnablePipeCommit()
}
statedb.SetExpectedStateRoot(block.Root())

statedb, receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig)
close(interruptCh) // state prefetch can be stopped

if err != nil {
bc.reportBlock(block, receipts, err)
statedb.StopPrefetcher()
Expand Down Expand Up @@ -3064,3 +3071,16 @@ func CalculateDiffHash(d *types.DiffLayer) (common.Hash, error) {
hasher.Sum(hash[:0])
return hash, nil
}

func EnableParallelProcessor(parallelNum int) BlockChainOption {
return func(chain *BlockChain) (*BlockChain, error) {
if chain.snaps == nil {
// disable parallel processor if snapshot is not enabled to avoid concurrent issue for SecureTrie
log.Info("parallel processor is not enabled since snapshot is not enabled")
return chain, nil
}
chain.parallelExecution = true
chain.processor = NewParallelStateProcessor(chain.Config(), chain, chain.engine, parallelNum)
return chain, nil
}
}
2 changes: 1 addition & 1 deletion core/state/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (s *StateDB) DumpToCollector(c DumpCollector, conf *DumpConfig) (nextKey []
account.SecureKey = it.Key
}
addr := common.BytesToAddress(addrBytes)
obj := newObject(s, addr, data)
obj := newObject(s, s.isParallel, addr, data)
if !conf.SkipCode {
account.Code = obj.Code(s.db)
}
Expand Down
82 changes: 82 additions & 0 deletions core/state/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package state

import (
"math/big"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)

// StateDBer is copied from vm/interface.go
// It is used by StateObject & Journal right now, to abstract StateDB & ParallelStateDB
type StateDBer interface {
getBaseStateDB() *StateDB
getStateObject(common.Address) *StateObject // only accessible for journal
storeStateObj(common.Address, *StateObject) // only accessible for journal

CreateAccount(common.Address)

SubBalance(common.Address, *big.Int)
AddBalance(common.Address, *big.Int)
GetBalance(common.Address) *big.Int

GetNonce(common.Address) uint64
SetNonce(common.Address, uint64)

GetCodeHash(common.Address) common.Hash
GetCode(common.Address) []byte
SetCode(common.Address, []byte)
GetCodeSize(common.Address) int

AddRefund(uint64)
SubRefund(uint64)
GetRefund() uint64

GetCommittedState(common.Address, common.Hash) common.Hash
GetState(common.Address, common.Hash) common.Hash
SetState(common.Address, common.Hash, common.Hash)

Suicide(common.Address) bool
HasSuicided(common.Address) bool

// Exist reports whether the given account exists in state.
// Notably this should also return true for suicided accounts.
Exist(common.Address) bool
// Empty returns whether the given account is empty. Empty
// is defined according to EIP161 (balance = nonce = code = 0).
Empty(common.Address) bool

PrepareAccessList(sender common.Address, dest *common.Address, precompiles []common.Address, txAccesses types.AccessList)
AddressInAccessList(addr common.Address) bool
SlotInAccessList(addr common.Address, slot common.Hash) (addressOk bool, slotOk bool)
// AddAddressToAccessList adds the given address to the access list. This operation is safe to perform
// even if the feature/fork is not active yet
AddAddressToAccessList(addr common.Address)
// AddSlotToAccessList adds the given (address,slot) to the access list. This operation is safe to perform
// even if the feature/fork is not active yet
AddSlotToAccessList(addr common.Address, slot common.Hash)

RevertToSnapshot(int)
Snapshot() int

AddLog(*types.Log)
AddPreimage(common.Hash, []byte)

ForEachStorage(common.Address, func(common.Hash, common.Hash) bool) error
}
Loading