Skip to content

Commit

Permalink
Transfer leadership before submitting to raft library (#3651) (#3810)
Browse files Browse the repository at this point in the history
* Transfer leadership before submitting to raft library

If transaction request rotates the current leader cert or
removes it then transfer the leadership to another node
before proposing the transaction for orderering. It would
help to avoid the leadership transfer while committing
the block

Signed-off-by: Parameswaran Selvam <parselva@in.ibm.com>

* invoke abdicate leader only on current leader node

Signed-off-by: Parameswaran Selvam <parselva@in.ibm.com>

* Leader addicates when removing itself from config

Signed-off-by: Parameswaran Selvam <parselva@in.ibm.com>

* abdicate leader function updated to return values

Signed-off-by: Parameswaran Selvam <parselva@in.ibm.com>

* Abdication retries when possible

Change-Id: Ib6d8d319443b735078edfb69fdad4f7b3db10dd2
Signed-off-by: Yacov Manevich <yacovm@il.ibm.com>

* Address review comments from Yoav

Change-Id: I61be008c1d4e676316e6e3b778aef9e75eaec56c
Signed-off-by: Yacov Manevich <yacovm@il.ibm.com>

Signed-off-by: Parameswaran Selvam <parselva@in.ibm.com>
Signed-off-by: Yacov Manevich <yacovm@il.ibm.com>
Co-authored-by: Yacov Manevich <yacovm@il.ibm.com>
(cherry picked from commit 1514fa2)
  • Loading branch information
Param-S committed Nov 28, 2022
1 parent 824ea23 commit f945a9a
Show file tree
Hide file tree
Showing 3 changed files with 233 additions and 246 deletions.
118 changes: 96 additions & 22 deletions orderer/consensus/etcdraft/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/hyperledger/fabric-protos-go/orderer/etcdraft"
"github.com/hyperledger/fabric/bccsp"
"github.com/hyperledger/fabric/common/channelconfig"
"github.com/hyperledger/fabric/common/configtx"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/orderer/common/cluster"
"github.com/hyperledger/fabric/orderer/common/types"
Expand Down Expand Up @@ -59,6 +60,10 @@ const (
// DefaultLeaderlessCheckInterval is the interval that a chain checks
// its own leadership status.
DefaultLeaderlessCheckInterval = time.Second * 10

// AbdicationMaxAttempts determines how many retries of leadership abdication we do
// for a transaction that removes ourselves from reconfiguration.
AbdicationMaxAttempts = 5
)

//go:generate counterfeiter -o mocks/configurator.go . Configurator
Expand Down Expand Up @@ -204,6 +209,8 @@ type Chain struct {

// BCCSP instance
CryptoProvider bccsp.BCCSP

leadershipTransferInProgress uint32
}

// NewChain constructs a chain object.
Expand Down Expand Up @@ -712,6 +719,11 @@ func (c *Chain) run() {
c.logger.Errorf("Failed to order message: %s", err)
continue
}

if !pending && len(batches) == 0 {
continue
}

if pending {
startTimer() // no-op if timer is already started
} else {
Expand Down Expand Up @@ -912,6 +924,45 @@ func (c *Chain) ordered(msg *orderer.SubmitRequest) (batches [][]*common.Envelop
}
}

if c.checkForEvictionNCertRotation(msg.Payload) {

if !atomic.CompareAndSwapUint32(&c.leadershipTransferInProgress, 0, 1) {
c.logger.Warnf("A reconfiguration transaction is already in progress, ignoring a subsequent transaction")
return
}

go func() {
defer atomic.StoreUint32(&c.leadershipTransferInProgress, 0)

for attempt := 1; attempt <= AbdicationMaxAttempts; attempt++ {
if err := c.Node.abdicateLeadership(); err != nil {
// If there is no leader, abort and do not retry.
// Return early to prevent re-submission of the transaction
if err == ErrNoLeader || err == ErrChainHalting {
return
}

// If the error isn't any of the below, it's a programming error, so panic.
if err != ErrNoAvailableLeaderCandidate && err != ErrTimedOutLeaderTransfer {
c.logger.Panicf("Programming error, abdicateLeader() returned with an unexpected error: %v", err)
}

// Else, it's one of the errors above, so we retry.
continue
} else {
// Else, abdication succeeded, so we submit the transaction (which forwards to the leader)
if err := c.Submit(msg, 0); err != nil {
c.logger.Warnf("Reconfiguration transaction forwarding failed with error: %v", err)
}
return
}
}

c.logger.Warnf("Abdication failed too many times consecutively (%d), aborting retries", AbdicationMaxAttempts)
}()
return nil, false, nil
}

batch := c.support.BlockCutter().Cut()
batches = [][]*common.Envelope{}
if len(batch) != 0 {
Expand Down Expand Up @@ -1114,17 +1165,9 @@ func (c *Chain) apply(ents []raftpb.Entry) {
c.Metrics.ClusterSize.Set(float64(len(c.opts.BlockMetadata.ConsenterIds)))
}

lead := atomic.LoadUint64(&c.lastKnownLeader)
removeLeader := cc.Type == raftpb.ConfChangeRemoveNode && cc.NodeID == lead
shouldHalt := cc.Type == raftpb.ConfChangeRemoveNode && cc.NodeID == c.raftID

// unblock `run` go routine so it can still consume Raft messages
go func() {
if removeLeader {
c.logger.Infof("Current leader is being removed from channel, attempt leadership transfer")
c.Node.abdicateLeader(lead)
}

if configureComm && !shouldHalt { // no need to configure comm if this node is going to halt
if err := c.configureComm(); err != nil {
c.logger.Panicf("Failed to configure communication: %s", err)
Expand Down Expand Up @@ -1292,20 +1335,9 @@ func (c *Chain) writeConfigBlock(block *common.Block, index uint64) {
}

c.configInflight = true
} else if configMembership.Rotated() {
lead := atomic.LoadUint64(&c.lastKnownLeader)
if configMembership.RotatedNode == lead {
c.logger.Infof("Certificate of Raft leader is being rotated, attempt leader transfer before reconfiguring communication")
go func() {
c.Node.abdicateLeader(lead)
if err := c.configureComm(); err != nil {
c.logger.Panicf("Failed to configure communication: %s", err)
}
}()
} else {
if err := c.configureComm(); err != nil {
c.logger.Panicf("Failed to configure communication: %s", err)
}
} else {
if err := c.configureComm(); err != nil {
c.logger.Panicf("Failed to configure communication: %s", err)
}
}

Expand Down Expand Up @@ -1485,3 +1517,45 @@ func (c *Chain) triggerCatchup(sn *raftpb.Snapshot) {
case <-c.doneC:
}
}

// checkForEvictionNCertRotation checks for node eviction and
// certificate rotation, return true if request includes it
// otherwise returns false
func (c *Chain) checkForEvictionNCertRotation(env *common.Envelope) bool {
payload, err := protoutil.UnmarshalPayload(env.Payload)
if err != nil {
c.logger.Warnf("failed to extract payload from config envelope: %s", err)
return false
}

configUpdate, err := configtx.UnmarshalConfigUpdateFromPayload(payload)
if err != nil {
c.logger.Warnf("could not read config update: %s", err)
return false
}

configMeta, err := MetadataFromConfigUpdate(configUpdate)
if err != nil || configMeta == nil {
c.logger.Warnf("could not read config metadata: %s", err)
return false
}

membershipUpdates, err := ComputeMembershipChanges(c.opts.BlockMetadata, c.opts.Consenters, configMeta.Consenters)
if err != nil {
c.logger.Warnf("illegal configuration change detected: %s", err)
return false
}

if membershipUpdates.RotatedNode == c.raftID {
c.logger.Infof("Detected certificate rotation of our node")
return true
}

if _, exists := membershipUpdates.NewConsenters[c.raftID]; !exists {
c.logger.Infof("Detected eviction of ourselves from the configuration")
return true
}

c.logger.Debugf("Node %d is still part of the consenters set", c.raftID)
return false
}

0 comments on commit f945a9a

Please sign in to comment.