Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rotation fix and update. #4516

Merged
merged 6 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,12 @@ travis_go_checker:

travis_rpc_checker:
bash ./scripts/travis_rpc_checker.sh

travis_rosetta_checker:
bash ./scripts/travis_rosetta_checker.sh

debug_external: clean
bash test/debug-external.sh

build_localnet_validator:
bash test/build-localnet-validator.sh
8 changes: 7 additions & 1 deletion consensus/consensus_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,12 @@ func (consensus *Consensus) getLogger() *zerolog.Logger {
func VerifyNewBlock(hooks *webhooks.Hooks, blockChain core.BlockChain, beaconChain core.BlockChain) func(*types.Block) error {
return func(newBlock *types.Block) error {
if err := blockChain.ValidateNewBlock(newBlock, beaconChain); err != nil {
switch {
case errors.Is(err, core.ErrKnownBlock):
return nil
default:
}

if hooks := hooks; hooks != nil {
if p := hooks.ProtocolIssues; p != nil {
url := p.OnCannotCommit
Expand All @@ -680,7 +686,7 @@ func VerifyNewBlock(hooks *webhooks.Hooks, blockChain core.BlockChain, beaconCha
Int("numStakingTx", len(newBlock.StakingTransactions())).
Err(err).
Msgf("[VerifyNewBlock] Cannot Verify New Block!!!, blockHeight %d, myHeight %d", newBlock.NumberU64(), blockChain.CurrentHeader().NumberU64())
return errors.Errorf(
return errors.WithMessagef(err,
"[VerifyNewBlock] Cannot Verify New Block!!! block-hash %s txn-count %d",
newBlock.Hash().Hex(),
len(newBlock.Transactions()),
Expand Down
17 changes: 17 additions & 0 deletions consensus/consensus_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (

"github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/internal/registry"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"

msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/consensus/quorum"
Expand Down Expand Up @@ -73,3 +75,18 @@ func TestSetViewID(t *testing.T) {
t.Errorf("Cannot set consensus ID. Got: %v, Expected: %v", consensus.GetCurBlockViewID(), height)
}
}

func TestErrors(t *testing.T) {
e1 := errors.New("e1")
require.True(t, errors.Is(e1, e1))

t.Run("wrap", func(t *testing.T) {
e2 := errors.Wrap(e1, "e2")
require.True(t, errors.Is(e2, e1))
})

t.Run("withMessage", func(t *testing.T) {
e2 := errors.WithMessage(e1, "e2")
require.True(t, errors.Is(e2, e1))
})
}
36 changes: 17 additions & 19 deletions consensus/consensus_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/ethereum/go-ethereum/common"
bls2 "github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/consensus/signature"
"github.com/harmony-one/harmony/core"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
"github.com/rs/zerolog"
Expand All @@ -31,7 +32,6 @@ import (
var (
errSenderPubKeyNotLeader = errors.New("sender pubkey doesn't match leader")
errVerifyMessageSignature = errors.New("verify message signature failed")
errParsingFBFTMessage = errors.New("failed parsing FBFT message")
)

// timeout constant
Expand Down Expand Up @@ -578,8 +578,13 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error {
}

if _, err := consensus.Blockchain().InsertChain([]*types.Block{blk}, !consensus.FBFTLog().IsBlockVerified(blk.Hash())); err != nil {
consensus.getLogger().Error().Err(err).Msg("[preCommitAndPropose] Failed to add block to chain")
return
switch {
case errors.Is(err, core.ErrKnownBlock):
consensus.getLogger().Info().Msg("[preCommitAndPropose] Block already known")
default:
consensus.getLogger().Error().Err(err).Msg("[preCommitAndPropose] Failed to add block to chain")
return
}
}

consensus.getLogger().Info().Msg("[preCommitAndPropose] Start consensus timer")
Expand Down Expand Up @@ -693,7 +698,7 @@ func (consensus *Consensus) rotateLeader(epoch *big.Int) {
prev = consensus.getLeaderPubKey()
leader = consensus.getLeaderPubKey()
)
utils.Logger().Info().Msgf("[Rotating leader] epoch: %v rotation:%v external rotation %v", epoch.Uint64(), bc.Config().IsLeaderRotation(epoch), bc.Config().IsLeaderRotationExternalValidatorsAllowed(epoch, consensus.ShardID))
utils.Logger().Info().Msgf("[Rotating leader] epoch: %v rotation:%v external rotation %v", epoch.Uint64(), bc.Config().IsLeaderRotationInternalValidators(epoch), bc.Config().IsLeaderRotationExternalValidatorsAllowed(epoch))
ss, err := bc.ReadShardState(epoch)
if err != nil {
utils.Logger().Error().Err(err).Msg("Failed to read shard state")
Expand Down Expand Up @@ -721,24 +726,17 @@ func (consensus *Consensus) rotateLeader(epoch *big.Int) {
// mine no less than 3 blocks in a row
numBlocksProducedByLeader = minimumBlocksForLeaderInRow
}
type stored struct {
pub []byte
epoch uint64
count uint64
shifts uint64 // count how much changes validator per epoch
}
var s stored
s.pub, s.epoch, s.count, s.shifts, _ = bc.LeaderRotationMeta()
if !bytes.Equal(leader.Bytes[:], s.pub) {
s := bc.LeaderRotationMeta()
if !bytes.Equal(leader.Bytes[:], s.Pub) {
// Another leader.
return
}
// if it is the first validator which produce blocks, then it should produce `rest` blocks too.
if s.shifts == 0 {
// If it is the first validator producing blocks, it should also produce the remaining 'rest' of the blocks.
if s.Shifts == 0 {
numBlocksProducedByLeader += rest
}
if s.count < numBlocksProducedByLeader {
// Not enough blocks produced by the leader.
if s.Count < numBlocksProducedByLeader {
// Not enough blocks produced by the leader, continue producing by the same leader.
return
}
// Passed all checks, we can change leader.
Expand All @@ -748,7 +746,7 @@ func (consensus *Consensus) rotateLeader(epoch *big.Int) {
wasFound bool
next *bls.PublicKeyWrapper
)
if bc.Config().IsLeaderRotationExternalValidatorsAllowed(epoch, consensus.ShardID) {
if bc.Config().IsLeaderRotationExternalValidatorsAllowed(epoch) {
wasFound, next = consensus.Decider.NthNextValidator(committee.Slots, leader, 1)
} else {
wasFound, next = consensus.Decider.NthNextHmy(shard.Schedule.InstanceForEpoch(epoch), leader, 1)
Expand Down Expand Up @@ -778,7 +776,7 @@ func (consensus *Consensus) setupForNewConsensus(blk *types.Block, committedMsg
} else {
epoch = blk.Epoch()
}
if consensus.Blockchain().Config().IsLeaderRotation(epoch) {
if consensus.Blockchain().Config().IsLeaderRotationInternalValidators(epoch) {
consensus.rotateLeader(epoch)
}

Expand Down
4 changes: 2 additions & 2 deletions consensus/view_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ func (consensus *Consensus) getNextLeaderKey(viewID uint64, committee *shard.Com
// FIXME: rotate leader on harmony nodes only before fully externalization
var wasFound bool
var next *bls.PublicKeyWrapper
if blockchain != nil && blockchain.Config().IsLeaderRotation(epoch) {
if blockchain.Config().IsLeaderRotationExternalValidatorsAllowed(epoch, consensus.ShardID) {
if blockchain != nil && blockchain.Config().IsLeaderRotationInternalValidators(epoch) {
if blockchain.Config().IsLeaderRotationExternalValidatorsAllowed(epoch) {
wasFound, next = consensus.Decider.NthNextValidator(
committee.Slots,
lastLeaderPubKey,
Expand Down
4 changes: 2 additions & 2 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ type BlockChain interface {
//
// After insertion is done, all accumulated events will be fired.
InsertChain(chain types.Blocks, verifyHeaders bool) (int, error)
// LeaderRotationMeta returns the number of continuous blocks by the leader.
LeaderRotationMeta() (publicKeyBytes []byte, epoch, count, shifts uint64, err error)
// LeaderRotationMeta returns info about leader rotation.
LeaderRotationMeta() LeaderRotationMeta
// BadBlocks returns a list of the last 'bad blocks' that
// the client has seen on the network.
BadBlocks() []BadBlock
Expand Down
89 changes: 17 additions & 72 deletions core/blockchain_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ var (
blockWriteTimer = metrics.NewRegisteredTimer("chain/write", nil)

// ErrNoGenesis is the error when there is no genesis.
ErrNoGenesis = errors.New("Genesis not found in chain")
ErrNoGenesis = errors.New("Genesis not found in chain")
ErrEmptyChain = errors.New("empty chain")
// errExceedMaxPendingSlashes ..
errExceedMaxPendingSlashes = errors.New("exceeed max pending slashes")
errNilEpoch = errors.New("nil epoch for voting power computation")
Expand Down Expand Up @@ -222,7 +223,7 @@ type BlockChainImpl struct {
badBlocks *lru.Cache // Bad block cache
pendingSlashes slash.Records
maxGarbCollectedBlkNum int64
leaderRotationMeta leaderRotationMeta
leaderRotationMeta LeaderRotationMeta

options Options
}
Expand Down Expand Up @@ -1632,9 +1633,20 @@ func (bc *BlockChainImpl) InsertChain(chain types.Blocks, verifyHeaders bool) (i
return len(chain), nil
}

for _, b := range chain {
// check if blocks already exist
if bc.HasBlock(b.Hash(), b.NumberU64()) {
return 0, errors.Wrapf(ErrKnownBlock, "block %s %d already exists", b.Hash().Hex(), b.NumberU64())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you look at all the method that called Insertchain and the potential impact on the protocol to return an error if the block already exist ? cc @GheisMohammadi

also I assumed this happened in testnet for you because of the DB folder of the external validator that wasn't deleted, is that correct ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I am thinking that fix could be in a different PR so it deserve a proper code analysis of the impacted method called in the protocol logic

}
}

prevHash := bc.CurrentBlock().Hash()
n, events, logs, err := bc.insertChain(chain, verifyHeaders)
bc.PostChainEvents(events, logs)
if err == nil {
if prevHash == bc.CurrentBlock().Hash() {
panic("insertChain failed to update current block")
}
// there should be only 1 block.
for _, b := range chain {
if b.Epoch().Uint64() > 0 {
Expand All @@ -1653,75 +1665,8 @@ func (bc *BlockChainImpl) InsertChain(chain types.Blocks, verifyHeaders bool) (i
return n, err
}

// buildLeaderRotationMeta builds leader rotation meta if feature is activated.
func (bc *BlockChainImpl) buildLeaderRotationMeta(curHeader *block.Header) error {
if !bc.chainConfig.IsLeaderRotation(curHeader.Epoch()) {
return nil
}
if curHeader.NumberU64() == 0 {
return errors.New("current header is genesis")
}
curPubKey, err := bc.getLeaderPubKeyFromCoinbase(curHeader)
if err != nil {
return err
}
for i := curHeader.NumberU64() - 1; i >= 0; i-- {
header := bc.GetHeaderByNumber(i)
if header == nil {
return errors.New("header is nil")
}
blockPubKey, err := bc.getLeaderPubKeyFromCoinbase(header)
if err != nil {
return err
}
if curPubKey.Bytes != blockPubKey.Bytes || curHeader.Epoch().Uint64() != header.Epoch().Uint64() {
for j := i; j <= curHeader.NumberU64(); j++ {
header := bc.GetHeaderByNumber(j)
if header == nil {
return errors.New("header is nil")
}
err := bc.saveLeaderRotationMeta(header)
if err != nil {
utils.Logger().Error().Err(err).Msg("save leader continuous blocks count error")
return err
}
}
return nil
}
}
return errors.New("no leader rotation meta to save")
}

func (bc *BlockChainImpl) saveLeaderRotationMeta(h *block.Header) error {
blockPubKey, err := bc.getLeaderPubKeyFromCoinbase(h)
if err != nil {
return err
}

var s = bc.leaderRotationMeta

// increase counter only if the same leader and epoch
if bytes.Equal(s.pub, blockPubKey.Bytes[:]) && s.epoch == h.Epoch().Uint64() {
s.count++
} else {
s.count = 1
}
// we should increase shifts if the leader is changed.
if !bytes.Equal(s.pub, blockPubKey.Bytes[:]) {
s.shifts++
}
// but set to zero if new
if s.epoch != h.Epoch().Uint64() {
s.shifts = 0
}
s.epoch = h.Epoch().Uint64()
bc.leaderRotationMeta = s

return nil
}

func (bc *BlockChainImpl) LeaderRotationMeta() (publicKeyBytes []byte, epoch, count, shifts uint64, err error) {
return rawdb.ReadLeaderRotationMeta(bc.db)
func (bc *BlockChainImpl) LeaderRotationMeta() LeaderRotationMeta {
return bc.leaderRotationMeta.Clone()
}

// insertChain will execute the actual chain insertion and event aggregation. The
Expand All @@ -1730,7 +1675,7 @@ func (bc *BlockChainImpl) LeaderRotationMeta() (publicKeyBytes []byte, epoch, co
func (bc *BlockChainImpl) insertChain(chain types.Blocks, verifyHeaders bool) (int, []interface{}, []*types.Log, error) {
// Sanity check that we have something meaningful to import
if len(chain) == 0 {
return 0, nil, nil, nil
return 0, nil, nil, ErrEmptyChain
}
// Do a sanity check that the provided chain is actually ordered and linked
for i := 1; i < len(chain); i++ {
Expand Down
Loading