Skip to content

Commit dfd8b58

Browse files
committed
[FAB-13178] Remove global leader var in etcdraft chain
This CR removes the global leader var in etcdraft chain because it is racy in following case: several requests are to be enqued into submitC while leader loses its leadership. This also removes the lock on rpc.SendSubmit because it's guarded by the channel. Change-Id: If5e785e05dcf9bfc60e403f2d5813baf769ee103 Signed-off-by: Jay Guo <guojiannan1101@gmail.com>
1 parent fc7395f commit dfd8b58

File tree

1 file changed

+45
-42
lines changed

1 file changed

+45
-42
lines changed

orderer/consensus/etcdraft/chain.go

Lines changed: 45 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -85,18 +85,21 @@ type Options struct {
8585
RaftMetadata *etcdraft.RaftMetadata
8686
}
8787

88+
type submit struct {
89+
req *orderer.SubmitRequest
90+
errC chan error
91+
}
92+
8893
// Chain implements consensus.Chain interface.
8994
type Chain struct {
9095
configurator Configurator
9196

92-
// access to `SendSubmit` should be serialzed because gRPC is not thread-safe
93-
submitLock sync.Mutex
94-
rpc RPC
97+
rpc RPC
9598

9699
raftID uint64
97100
channelID string
98101

99-
submitC chan *orderer.SubmitRequest
102+
submitC chan *submit
100103
applyC chan apply
101104
observeC chan<- uint64 // Notifies external observer on leader change (passed in optionally as an argument for tests)
102105
haltC chan struct{} // Signals to goroutines that the chain is halting
@@ -112,7 +115,6 @@ type Chain struct {
112115
support consensus.ConsenterSupport
113116
BlockCreator *blockCreator
114117

115-
leader uint64
116118
appliedIndex uint64
117119

118120
// needed by snapshotting
@@ -165,7 +167,7 @@ func NewChain(
165167
rpc: rpc,
166168
channelID: support.ChainID(),
167169
raftID: opts.RaftID,
168-
submitC: make(chan *orderer.SubmitRequest),
170+
submitC: make(chan *submit),
169171
applyC: make(chan apply),
170172
haltC: make(chan struct{}),
171173
doneC: make(chan struct{}),
@@ -360,25 +362,13 @@ func (c *Chain) Submit(req *orderer.SubmitRequest, sender uint64) error {
360362
return err
361363
}
362364

363-
lead := atomic.LoadUint64(&c.leader)
364-
365-
if lead == raft.None {
366-
return errors.Errorf("no Raft leader")
367-
}
368-
369-
if lead == c.raftID {
370-
select {
371-
case c.submitC <- req:
372-
return nil
373-
case <-c.doneC:
374-
return errors.Errorf("chain is stopped")
375-
}
365+
errC := make(chan error, 1)
366+
select {
367+
case c.submitC <- &submit{req, errC}:
368+
return <-errC
369+
case <-c.doneC:
370+
return errors.Errorf("chain is stopped")
376371
}
377-
378-
c.logger.Debugf("Forwarding submit request to Raft leader %d", lead)
379-
c.submitLock.Lock()
380-
defer c.submitLock.Unlock()
381-
return c.rpc.SendSubmit(lead, req)
382372
}
383373

384374
type apply struct {
@@ -416,43 +406,56 @@ func (c *Chain) serveRequest() {
416406

417407
for {
418408
select {
419-
case msg := <-submitC:
420-
if msg == nil {
409+
case s := <-submitC:
410+
if s == nil {
421411
// polled by `WaitReady`
422412
continue
423413
}
424414

425-
batches, pending, err := c.ordered(msg)
426-
if err != nil {
427-
c.logger.Errorf("Failed to order message: %s", err)
428-
}
429-
if pending {
430-
start() // no-op if timer is already started
431-
} else {
432-
stop()
433-
}
415+
var err error
416+
switch leader {
417+
case raft.None: // no Raft leader
418+
c.logger.Debugf("Request is dropped because there is no Raft leader")
419+
err = errors.Errorf("no Raft leader")
434420

435-
proposedConfigBlock := c.propose(batches...)
436-
if proposedConfigBlock {
437-
submitC = nil // stop accepting new envelopes
421+
case c.raftID: // this is leader
422+
batches, pending, err := c.ordered(s.req)
423+
if err != nil {
424+
c.logger.Errorf("Failed to order message: %s", err)
425+
}
426+
if pending {
427+
start() // no-op if timer is already started
428+
} else {
429+
stop()
430+
}
431+
432+
proposedConfigBlock := c.propose(batches...)
433+
if proposedConfigBlock {
434+
submitC = nil // stop accepting new envelopes
435+
}
436+
437+
default: // this is follower
438+
c.logger.Debugf("Forwarding submit request to raft leader %d", leader)
439+
err = c.rpc.SendSubmit(leader, s.req)
438440
}
439441

442+
s.errC <- err // send error back to submitter
443+
440444
case app := <-c.applyC:
441445
var elected, ready bool
442446

443447
if app.soft != nil {
444448
newLeader := atomic.LoadUint64(&app.soft.Lead) // etcdraft requires atomic access
445-
oldLeader := atomic.SwapUint64(&c.leader, newLeader)
446-
if newLeader != oldLeader {
447-
c.logger.Infof("Raft leader changed: %d -> %d", oldLeader, newLeader)
449+
if newLeader != leader {
450+
c.logger.Infof("Raft leader changed: %d -> %d", leader, newLeader)
448451

449452
// follower -> leader
450453
if newLeader == c.raftID {
451454
elected = true
452455
}
453456

454457
// leader -> follower
455-
if oldLeader == c.raftID {
458+
if leader == c.raftID {
456459
_ = c.support.BlockCutter().Cut()
457460
c.BlockCreator.resetCreatedBlocks()
458461
stop()

0 commit comments

Comments
 (0)