Skip to content
Permalink
Browse files

Merge branch 'hotfix/1.2.1' into develop

  • Loading branch information...
graup committed Aug 22, 2019
2 parents 354f27f + 65a5aca commit 5e1997ce55c67118b85851af1fe11a3ac39fa8d4
Showing with 1,018 additions and 119 deletions.
  1. +4 −4 chain/chaindb.go
  2. +2 −2 chain/chaindbForRaft.go
  3. +1 −0 config/types.go
  4. +95 −18 consensus/impl/raftv2/blockfactory.go
  5. +5 −2 consensus/impl/raftv2/cluster.go
  6. +7 −2 consensus/impl/raftv2/config.go
  7. +35 −0 consensus/impl/raftv2/debug.go
  8. +78 −7 consensus/impl/raftv2/raftserver.go
  9. +2 −0 consensus/impl/raftv2/test/config/BP11001.toml
  10. +2 −0 consensus/impl/raftv2/test/config/BP11002.toml
  11. +2 −0 consensus/impl/raftv2/test/config/BP11003.toml
  12. +2 −0 consensus/impl/raftv2/test/config/BP11004.toml
  13. +2 −0 consensus/impl/raftv2/test/config/BP11005.toml
  14. +2 −0 consensus/impl/raftv2/test/config/BP11006.toml
  15. +2 −0 consensus/impl/raftv2/test/config/BP11007.toml
  16. +3 −3 consensus/impl/raftv2/test/test_leader_change.sh
  17. +10 −7 mempool/mempool.go
  18. +8 −6 p2p/actorwork.go
  19. +17 −12 p2p/mofactory.go
  20. +114 −0 p2p/mofactory_test.go
  21. +20 −3 p2p/{protobufHelper.go → msgorder.go}
  22. +23 −10 p2p/{protobufHelper_test.go → msgorder_test.go}
  23. +18 −3 p2p/p2p.go
  24. +1 −0 p2p/p2pcommon/internalmsg.go
  25. +24 −0 p2p/p2pcommon/txnotice.go
  26. +12 −0 p2p/p2pmock/mock_msgorder.go
  27. +59 −0 p2p/p2pmock/mock_txnotice.go
  28. +2 −0 p2p/p2putil/util_test.go
  29. +3 −3 p2p/peermanager.go
  30. +1 −0 p2p/raftsupport/status.go
  31. +11 −11 p2p/remotepeer.go
  32. +1 −1 p2p/subproto/block.go
  33. +4 −0 p2p/subproto/blockhash_test.go
  34. +2 −25 p2p/syncmanager.go
  35. +184 −0 p2p/txnoticetracer.go
  36. +237 −0 p2p/txnoticetracer_test.go
  37. +23 −0 types/p2plogging.go
@@ -458,17 +458,17 @@ type txInfo struct {
}

func (cdb *ChainDB) addTxsOfBlock(dbTx *db.Transaction, txs []*types.Tx, blockHash []byte) error {
if err := TestDebugger.Check(DEBUG_CHAIN_STOP, 4, nil); err != nil {
return err
}

for i, txEntry := range txs {
if err := cdb.addTx(dbTx, txEntry, blockHash, i); err != nil {
logger.Error().Err(err).Str("hash", enc.ToString(blockHash)).Int("txidx", i).
Msg("failed to add tx")

return err
}

if err := TestDebugger.Check(DEBUG_CHAIN_STOP, 4, nil); err != nil {
return err
}
}

return nil
@@ -124,7 +124,7 @@ func (cdb *ChainDB) WriteHardState(hardstate *raftpb.HardState) error {
var data []byte
var err error

logger.Info().Uint64("term", hardstate.Term).Uint64("vote", hardstate.Vote).Uint64("commit", hardstate.Commit).Msg("save hard state")
logger.Info().Uint64("term", hardstate.Term).Str("vote", types.Uint64ToHexaString(hardstate.Vote)).Uint64("commit", hardstate.Commit).Msg("save hard state")

if data, err = proto.Marshal(hardstate); err != nil {
logger.Panic().Msg("failed to marshal raft state")
@@ -149,7 +149,7 @@ func (cdb *ChainDB) GetHardState() (*raftpb.HardState, error) {
return nil, ErrInvalidHardState
}

logger.Info().Uint64("term", state.Term).Uint64("vote", state.Vote).Uint64("commit", state.Commit).Msg("load hard state")
logger.Info().Uint64("term", state.Term).Str("vote", types.Uint64ToHexaString(state.Vote)).Uint64("commit", state.Commit).Msg("load hard state")

return state, nil
}
@@ -126,6 +126,7 @@ type RaftConfig struct {
SnapFrequency uint64 `mapstructure:"snapfrequency" description:"frequency which raft make snapshot with log"`
SlowNodeGap uint `mapstructure:"slownodegap" description:"frequency which raft make snapshot with log"`
RecoverBP *RaftBPConfig `mapstructure:"recoverbp" description:"bp info for creating a new cluster from backup"`
StopDupCommit bool `mapstructure:"stopdupcommit" description:"stop server when commit of duplicate height block occurs. use this only for debugging'"`
}

type RaftBPConfig struct {
@@ -62,6 +62,7 @@ func (te *txExec) Apply(bState *state.BlockState, tx types.Transaction) error {

type Work struct {
*types.Block
term uint64
}

func (work *Work) GetTimeout() time.Duration {
@@ -72,6 +73,43 @@ func (work *Work) ToString() string {
return fmt.Sprintf("bestblock=%s", work.BlockID())
}

type leaderReady struct {
sync.RWMutex

ce *commitEntry
}

func (ready *leaderReady) set(ce *commitEntry) {
logger.Debug().Uint64("term", ce.term).Msg("set ready marker")

ready.Lock()
defer ready.Unlock()

ready.ce = ce
}

func (ready *leaderReady) isReady(curTerm uint64) bool {
ready.RLock()
defer ready.RUnlock()

if curTerm <= 0 {
logger.Fatal().Msg("failed to get status of raft")
return false
}

if ready.ce == nil {
logger.Debug().Msg("not exist ready marker")
return false
}

if ready.ce.term != curTerm {
logger.Debug().Uint64("ready-term", ready.ce.term).Uint64("cur-term", curTerm).Msg("not ready for producing")
return false
}

return true
}

// BlockFactory implments a raft block factory which generate block each cfg.Consensus.BlockIntervalMs if this node is leader of raft
//
// This can be used for testing purpose.
@@ -87,6 +125,8 @@ type BlockFactory struct {
bpTimeoutC chan interface{}
quit chan interface{}

ready leaderReady

maxBlockBodySize uint32
ID string
privKey crypto.PrivKey
@@ -182,8 +222,13 @@ func (bf *BlockFactory) QueueJob(now time.Time, jq chan<- interface{}) {
bf.jobLock.Lock()
defer bf.jobLock.Unlock()

if !bf.raftServer.IsLeader() {
//logger.Debug().Msg("skip producing block because this bp is not leader")
var (
isReady bool
term uint64
)

if isReady, term = bf.isLeaderReady(); !isReady {
logger.Debug().Msg("skip producing block because this bp is leader but it's not ready to produce new block")
return
}

@@ -201,10 +246,25 @@ func (bf *BlockFactory) QueueJob(now time.Time, jq chan<- interface{}) {
}

bf.prevBlock = b
jq <- &Work{b}
jq <- &Work{Block: b, term: term}
}
}

// isLeaderReady must be called after bf.jobLock
// check if block factory has finished all blocks of previous term. it can be checked that it has received raft marker of this term.
// TODO) term may be set when raft leader is changed from hardstate
func (bf *BlockFactory) isLeaderReady() (bool, uint64) {
var (
status LeaderStatus
)

if status = bf.raftServer.GetLeaderStatus(); !status.IsLeader {
return false, 0
}

return bf.ready.isReady(status.Term), status.Term
}

func (bf *BlockFactory) GetType() consensus.ConsensusType {
return consensus.ConsensusRAFT
}
@@ -339,7 +399,8 @@ func (bf *BlockFactory) worker() {
err error
)

if block, blockState, err = bf.generateBlock(work.Block); err != nil {
if block, blockState, err = bf.generateBlock(work); err != nil {
logger.Error().Err(err).Msg("failed to generate block")
if err == chain.ErrQuit {
logger.Info().Msg("quit worker of block factory")
return
@@ -349,7 +410,7 @@ func (bf *BlockFactory) worker() {
continue
}

if err = bf.raftOp.propose(block, blockState); err != nil {
if err = bf.raftOp.propose(block, blockState, work.term); err != nil {
logger.Error().Err(err).Msg("failed to propose block")
bf.reset()
}
@@ -364,11 +425,9 @@ func (bf *BlockFactory) worker() {

// RaftEmptyBlockLog: When the leader changes, the new raft leader creates an empty data log with a new term and index.
// When block factory receives empty block log, the blockfactory that is running as the leader should reset the proposal in progress.
// This proposal may have been dropped on the raft.
// Warning : There may be timing issue when reseting. If empty log of buffered channel is processed after propose,
// the proposal you just submitted may be canceled incorrectly.
if cEntry.block == nil {
bf.reset()
// since it may have been dropped on the raft. Block factory must produce new block after all blocks of previous term are connected. Empty log can be a marker for that.
if cEntry.IsReadyMarker() {
bf.handleReadyMarker(cEntry)
continue
}

@@ -385,8 +444,23 @@ func (bf *BlockFactory) worker() {
}
}

func (bf *BlockFactory) generateBlock(bestBlock *types.Block) (*types.Block, *state.BlockState, error) {
var err error
// @ReadyMarker: leader can make new block after receiving empty commit entry. It is ready marker.
// Receiving a marker ensures that all the blocks of previous term has been connected in chain
func (bf *BlockFactory) handleReadyMarker(ce *commitEntry) {
logger.Debug().Uint64("index", ce.index).Uint64("term", ce.term).Msg("set raft marker(empty block)")

// set ready to produce block for this term
bf.ready.set(ce)
bf.reset()
}

func (bf *BlockFactory) generateBlock(work *Work) (*types.Block, *state.BlockState, error) {
var (
bestBlock *types.Block
err error
)

bestBlock = work.Block

defer func() {
if panicMsg := recover(); panicMsg != nil {
@@ -395,7 +469,7 @@ func (bf *BlockFactory) generateBlock(bestBlock *types.Block) (*types.Block, *st
}()

checkCancel := func() bool {
if !bf.raftServer.IsLeader() {
if !bf.raftServer.IsLeaderOfTerm(work.term) {
logger.Debug().Msg("cancel because no more leader")
return true
}
@@ -452,9 +526,10 @@ func (bf *BlockFactory) reset() {
bf.jobLock.Lock()
defer bf.jobLock.Unlock()

logger.Info().Str("prev proposed", bf.raftOp.toString()).Msg("reset prev work of block factory")

bf.prevBlock = nil
if bf.prevBlock != nil {
logger.Info().Str("prev proposed", bf.raftOp.toString()).Msg("reset previous work of block factory")
bf.prevBlock = nil
}
bf.bpc.resetSavedConfChangePropose()
}

@@ -744,12 +819,14 @@ func newRaftOperator(rs *raftServer, cl *Cluster) *RaftOperator {
return &RaftOperator{commitC: commitC, rs: rs, cl: cl}
}

func (rop *RaftOperator) propose(block *types.Block, blockState *state.BlockState) error {
if !rop.rs.IsLeader() {
func (rop *RaftOperator) propose(block *types.Block, blockState *state.BlockState, term uint64) error {
if !rop.rs.IsLeaderOfTerm(term) {
logger.Info().Msg("dropped produced block because this bp became no longer leader")
return ErrNotRaftLeader
}

debugRaftProposeSleep()

rop.proposed = &Proposed{block: block, blockState: blockState}

if err := rop.rs.Propose(block); err != nil {
@@ -1026,10 +1026,13 @@ func (cl *Cluster) saveConfChangePropose(ccPropose *consensus.ConfChangePropose)

func (cl *Cluster) resetSavedConfChangePropose() {
var ccid uint64
if cl.savedChange != nil {
ccid = cl.savedChange.Cc.ID

if cl.savedChange == nil {
return
}

ccid = cl.savedChange.Cc.ID

logger.Debug().Uint64("requestID", ccid).Msg("reset saved conf change propose")

cl.savedChange = nil
@@ -3,11 +3,12 @@ package raftv2
import (
"errors"
"fmt"
"strings"
"time"

"github.com/aergoio/aergo/message"
"github.com/aergoio/aergo/p2p/p2pkey"
"github.com/aergoio/aergo/types"
"strings"
"time"

"github.com/aergoio/aergo/chain"
"github.com/aergoio/aergo/config"
@@ -46,6 +47,7 @@ var (

ElectionTickCount = DefaultElectionTickCount
MaxSlowNodeGap uint64 = DefaultSlowNodeGap // Criteria for determining whether the server is in a slow state
StopDupCommit = false
)

func Init(raftCfg *config.RaftConfig) {
@@ -87,6 +89,9 @@ func Init(raftCfg *config.RaftConfig) {
MaxSlowNodeGap = uint64(raftCfg.SlowNodeGap)
}

if raftCfg.StopDupCommit {
StopDupCommit = true
}
logger.Info().Int64("factory tick(ms)", BlockFactoryTickMs.Nanoseconds()/int64(time.Millisecond)).
Int64("interval(ms)", BlockIntervalMs.Nanoseconds()/int64(time.Millisecond)).Msg("set block factory tick/interval")
}
@@ -0,0 +1,35 @@
package raftv2

import (
"os"
"strconv"
"time"
)

var (
DEBUG_PROPOSE_SLEEP = "DEBUG_PROPOSE_SLEEP"
)

func checkEnv(envName string) int {
envStr := os.Getenv(envName)
if len(envStr) > 0 {
val, err := strconv.Atoi(envStr)
if err != nil {
logger.Error().Err(err).Msgf("%s environment varialble must be integer", envName)
return 0
}
logger.Debug().Int("value", val).Msgf("env variable[%s] is set", envName)

return val
}
return 0
}

func debugRaftProposeSleep() {
val := checkEnv(DEBUG_PROPOSE_SLEEP)

if val > 0 {
logger.Debug().Int("sleep", val).Msg("sleep raft propose")
time.Sleep(time.Second * time.Duration(val))
}
}

0 comments on commit 5e1997c

Please sign in to comment.
You can’t perform that action at this time.