Skip to content

Commit

Permalink
consensus/*, eth, ethstats, miner: add block lock and some features
Browse files Browse the repository at this point in the history
* consensus/istanbul: handle future preprepare
* consensus/istanbul: handle request timeout in evnet loop
* consensus, eth: start/stop core engine while start/stop mining
* eth, ethstats: fix crash while reporting to ethstats
* consensus/istanbul, miner: add new event to trigger new block creation
* eth, consensus/istanbul: improve sending messages
* consensus/istanbul: stop future preprepare timer while stop core
* consensus/istanbul: add cache in ecrecover()
  • Loading branch information
markya0616 committed Aug 1, 2017
1 parent 3730ecf commit 41dfb17
Show file tree
Hide file tree
Showing 30 changed files with 718 additions and 278 deletions.
12 changes: 6 additions & 6 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,15 @@ type PoW interface {
type Istanbul interface {
Engine

// Handle a message from peer
// HandleMsg handles a message from peer
HandleMsg(pubKey *ecdsa.PublicKey, data []byte) error

// Receive new chain head block
NewChainHead(block *types.Block)
// NewChainHead is called if a new chain head block comes
NewChainHead(block *types.Block) error

// Start the engine
Start(chain ChainReader, inserter func(block *types.Block) error) error
// Start starts the engine
Start(chain ChainReader, inserter func(types.Blocks) (int, error)) error

// Stop the engine
// Stop stops the engine
Stop() error
}
22 changes: 16 additions & 6 deletions consensus/istanbul/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package istanbul

import (
"math/big"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/event"
)
Expand All @@ -32,26 +35,33 @@ type Backend interface {
// EventMux returns the event mux in backend
EventMux() *event.TypeMux

// Send sends a message to specific target
Send(payload []byte, target common.Address) error

// Broadcast sends a message to all validators
Broadcast(valSet ValidatorSet, payload []byte) error

// Commit delivers an approved proposal to backend.
// The delivered proposal will be put into blockchain.
Commit(proposal Proposal, seals []byte) error
Commit(proposal Proposal, seals [][]byte) error

// NextRound is called when we want to trigger next Seal()
NextRound() error

// Verify verifies the proposal.
Verify(Proposal) error
// Verify verifies the proposal. If a consensus.ErrFutureBlock error is returned,
// the time difference of the proposal and current time is also returned.
Verify(Proposal) (time.Duration, error)

// Sign signs input data with the backend's private key
Sign([]byte) ([]byte, error)

// CheckSignature verifies the signature by checking if it's signed by
// the given validator
CheckSignature(data []byte, addr common.Address, sig []byte) error

// HasBlock checks if the combination of the given hash and height matches any existing blocks
HasBlock(hash common.Hash, number *big.Int) bool

// GetProposer returns the proposer of the given block height
GetProposer(number uint64) common.Address

// ParentValidators returns the validator set of the given proposal's parent block
ParentValidators(proposal Proposal) ValidatorSet
}
2 changes: 1 addition & 1 deletion consensus/istanbul/backend/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
// API is a user facing RPC API to dump Istanbul state
type API struct {
chain consensus.ChainReader
istanbul *simpleBackend
istanbul *backend
}

// GetSnapshot retrieves the state snapshot at a given block.
Expand Down
117 changes: 78 additions & 39 deletions consensus/istanbul/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,61 +18,65 @@ package backend

import (
"crypto/ecdsa"
"math/big"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/consensus/istanbul"
istanbulCore "github.com/ethereum/go-ethereum/consensus/istanbul/core"
"github.com/ethereum/go-ethereum/consensus/istanbul/validator"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/miner"
lru "github.com/hashicorp/golang-lru"
)

// New creates an Ethereum backend for Istanbul core engine.
func New(config *istanbul.Config, eventMux *event.TypeMux, privateKey *ecdsa.PrivateKey, db ethdb.Database) consensus.Istanbul {
// Allocate the snapshot caches and create the engine
recents, _ := lru.NewARC(inmemorySnapshots)
backend := &simpleBackend{
backend := &backend{
config: config,
eventMux: eventMux,
istanbulEventMux: new(event.TypeMux),
privateKey: privateKey,
address: crypto.PubkeyToAddress(privateKey.PublicKey),
logger: log.New("backend", "simple"),
logger: log.New(),
db: db,
commitCh: make(chan *types.Block, 1),
recents: recents,
candidates: make(map[common.Address]bool),
coreStarted: false,
}
backend.core = istanbulCore.New(backend, backend.config)
return backend
}

// ----------------------------------------------------------------------------

type simpleBackend struct {
type backend struct {
config *istanbul.Config
eventMux *event.TypeMux
istanbulEventMux *event.TypeMux
privateKey *ecdsa.PrivateKey
address common.Address
core istanbulCore.Engine
logger log.Logger
quitSync chan struct{}
db ethdb.Database
timeout uint64
chain consensus.ChainReader
inserter func(block *types.Block) error
inserter func(types.Blocks) (int, error)

// the channels for istanbul engine notifications
commitCh chan *types.Block
proposedBlockHash common.Hash
sealMu sync.Mutex
coreStarted bool
coreMu sync.Mutex

// Current list of candidates we are pushing
candidates map[common.Address]bool
Expand All @@ -83,29 +87,18 @@ type simpleBackend struct {
}

// Address implements istanbul.Backend.Address
func (sb *simpleBackend) Address() common.Address {
func (sb *backend) Address() common.Address {
return sb.address
}

// Validators implements istanbul.Backend.Validators
func (sb *simpleBackend) Validators(proposal istanbul.Proposal) istanbul.ValidatorSet {
snap, err := sb.snapshot(sb.chain, proposal.Number().Uint64(), proposal.Hash(), nil)
if err != nil {
return validator.NewSet(nil, sb.config.ProposerPolicy)
}
return snap.ValSet
}

func (sb *simpleBackend) Send(payload []byte, target common.Address) error {
go sb.eventMux.Post(istanbul.ConsensusDataEvent{
Target: target,
Data: payload,
})
return nil
func (sb *backend) Validators(proposal istanbul.Proposal) istanbul.ValidatorSet {
return sb.getValidators(proposal.Number().Uint64(), proposal.Hash())
}

// Broadcast implements istanbul.Backend.Send
func (sb *simpleBackend) Broadcast(valSet istanbul.ValidatorSet, payload []byte) error {
func (sb *backend) Broadcast(valSet istanbul.ValidatorSet, payload []byte) error {
targets := make(map[common.Address]bool)
for _, val := range valSet.List() {
if val.Address() == sb.Address() {
// send to self
Expand All @@ -116,14 +109,21 @@ func (sb *simpleBackend) Broadcast(valSet istanbul.ValidatorSet, payload []byte)

} else {
// send to other peers
sb.Send(payload, val.Address())
targets[val.Address()] = true
}
}

if len(targets) > 0 {
go sb.eventMux.Post(istanbul.ConsensusDataEvent{
Targets: targets,
Data: payload,
})
}
return nil
}

// Commit implements istanbul.Backend.Commit
func (sb *simpleBackend) Commit(proposal istanbul.Proposal, seals []byte) error {
func (sb *backend) Commit(proposal istanbul.Proposal, seals [][]byte) error {
// Check if the proposal is a valid block
block := &types.Block{}
block, ok := proposal.(*types.Block)
Expand Down Expand Up @@ -154,49 +154,58 @@ func (sb *simpleBackend) Commit(proposal istanbul.Proposal, seals []byte) error
// TODO: how do we check the block is inserted correctly?
return nil
}

return sb.inserter(block)
// if I'm not a proposer, insert the block directly and broadcast NewCommittedEvent
if _, err := sb.inserter(types.Blocks{block}); err != nil {
return err
}
msg := istanbul.NewCommittedEvent{
Block: block,
}
go sb.eventMux.Post(msg)
return nil
}

// NextRound will broadcast ChainHeadEvent to trigger next seal()
func (sb *simpleBackend) NextRound() error {
// NextRound will broadcast NewBlockEvent to trigger next seal()
func (sb *backend) NextRound() error {
header := sb.chain.CurrentHeader()
sb.logger.Debug("NextRound", "address", sb.Address(), "current_hash", header.Hash(), "current_number", header.Number)
go sb.eventMux.Post(core.ChainHeadEvent{})
go sb.eventMux.Post(miner.NewBlockEvent{})
return nil
}

// EventMux implements istanbul.Backend.EventMux
func (sb *simpleBackend) EventMux() *event.TypeMux {
func (sb *backend) EventMux() *event.TypeMux {
return sb.istanbulEventMux
}

// Verify implements istanbul.Backend.Verify
func (sb *simpleBackend) Verify(proposal istanbul.Proposal) error {
func (sb *backend) Verify(proposal istanbul.Proposal) (time.Duration, error) {
// Check if the proposal is a valid block
block := &types.Block{}
block, ok := proposal.(*types.Block)
if !ok {
sb.logger.Error("Invalid proposal, %v", proposal)
return errInvalidProposal
return 0, errInvalidProposal
}
// verify the header of proposed block
err := sb.VerifyHeader(sb.chain, block.Header(), false)
// Ignore errEmptyCommittedSeals error because we don't have the committed seals yet
if err != nil && err != errEmptyCommittedSeals {
return err
// ignore errEmptyCommittedSeals error because we don't have the committed seals yet
if err == nil || err == errEmptyCommittedSeals {
return 0, nil
} else if err == consensus.ErrFutureBlock {
return time.Unix(block.Header().Time.Int64(), 0).Sub(now()), consensus.ErrFutureBlock
}
return nil
return 0, err
}

// Sign implements istanbul.Backend.Sign
func (sb *simpleBackend) Sign(data []byte) ([]byte, error) {
func (sb *backend) Sign(data []byte) ([]byte, error) {
hashData := crypto.Keccak256([]byte(data))
return crypto.Sign(hashData, sb.privateKey)
}

// CheckSignature implements istanbul.Backend.CheckSignature
func (sb *simpleBackend) CheckSignature(data []byte, address common.Address, sig []byte) error {
func (sb *backend) CheckSignature(data []byte, address common.Address, sig []byte) error {
signer, err := istanbul.GetSignatureAddress(data, sig)
if err != nil {
log.Error("Failed to get signer address", "err", err)
Expand All @@ -208,3 +217,33 @@ func (sb *simpleBackend) CheckSignature(data []byte, address common.Address, sig
}
return nil
}

// HasBlock implements istanbul.Backend.HashBlock
func (sb *backend) HasBlock(hash common.Hash, number *big.Int) bool {
return sb.chain.GetHeader(hash, number.Uint64()) != nil
}

// GetProposer implements istanbul.Backend.GetProposer
func (sb *backend) GetProposer(number uint64) common.Address {
if h := sb.chain.GetHeaderByNumber(number); h != nil {
a, _ := sb.Author(h)
return a
}
return common.Address{}
}

// ParentValidators implements istanbul.Backend.GetParentValidators
func (sb *backend) ParentValidators(proposal istanbul.Proposal) istanbul.ValidatorSet {
if block, ok := proposal.(*types.Block); ok {
return sb.getValidators(block.Number().Uint64()-1, block.ParentHash())
}
return validator.NewSet(nil, sb.config.ProposerPolicy)
}

func (sb *backend) getValidators(number uint64, hash common.Hash) istanbul.ValidatorSet {
snap, err := sb.snapshot(sb.chain, number, hash, nil)
if err != nil {
return validator.NewSet(nil, sb.config.ProposerPolicy)
}
return snap.ValSet
}
Loading

0 comments on commit 41dfb17

Please sign in to comment.