Skip to content

Commit 1cdca57

Browse files
committed
[FAB-13178] Use MaxInflightMsgs to throttle requests
If there are MaxInflightMsgs blocks proposed but not committed, chain blocks further incoming requests. Change-Id: I58c84e23c882ccc152e5c9a248434e466a8b5266 Signed-off-by: Jay Guo <guojiannan1101@gmail.com>
1 parent f59d398 commit 1cdca57

File tree

2 files changed

+80
-2
lines changed

2 files changed

+80
-2
lines changed

orderer/consensus/etcdraft/chain.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ type Chain struct {
114114
confChangeInProgress *raftpb.ConfChange
115115
justElected bool // this is true when node has just been elected
116116
configInflight bool // this is true when there is config block or ConfChange in flight
117+
blockInflight int // number of in flight blocks
117118

118119
clock clock.Clock // Tests can inject a fake clock
119120

@@ -415,6 +416,7 @@ func (c *Chain) serveRequest() {
415416
var bc *blockCreator
416417

417418
becomeLeader := func() {
419+
c.blockInflight = 0
418420
c.justElected = true
419421
submitC = nil
420422

@@ -431,6 +433,7 @@ func (c *Chain) serveRequest() {
431433
}
432434

433435
becomeFollower := func() {
436+
c.blockInflight = 0
434437
_ = c.support.BlockCutter().Cut()
435438
stop()
436439
submitC = c.submitC
@@ -468,7 +471,7 @@ func (c *Chain) serveRequest() {
468471
}
469472

470473
c.propose(bc, batches...)
471-
if c.configInflight {
474+
if c.configInflight || c.blockInflight >= c.opts.MaxInflightMsgs {
472475
submitC = nil // stop accepting new envelopes
473476
}
474477

@@ -541,7 +544,7 @@ func (c *Chain) serveRequest() {
541544
} else if c.configInflight {
542545
c.logger.Debugf("Config block or ConfChange in flight, pause accepting transaction")
543546
submitC = nil
544-
} else {
547+
} else if c.blockInflight < c.opts.MaxInflightMsgs {
545548
submitC = c.submitC
546549
}
547550

@@ -587,6 +590,10 @@ func (c *Chain) serveRequest() {
587590
}
588591

589592
func (c *Chain) writeBlock(block *common.Block, index uint64) {
593+
if c.blockInflight > 0 {
594+
c.blockInflight-- // only reduce on leader
595+
}
596+
590597
if utils.IsConfigBlock(block) {
591598
c.writeConfigBlock(block, index)
592599
return
@@ -649,6 +656,8 @@ func (c *Chain) propose(bc *blockCreator, batches ...[]*common.Envelope) {
649656
if utils.IsConfigBlock(b) {
650657
c.configInflight = true
651658
}
659+
660+
c.blockInflight++
652661
}
653662

654663
return

orderer/consensus/etcdraft/chain_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1830,6 +1830,75 @@ var _ = Describe("Chain", func() {
18301830
})
18311831
})
18321832

1833+
When("MaxInflightMsgs is reached", func() {
1834+
BeforeEach(func() {
1835+
network.exec(func(c *chain) { c.opts.MaxInflightMsgs = 1 })
1836+
})
1837+
1838+
It("waits for in flight blocks to be committed", func() {
1839+
c1.cutter.CutNext = true
1840+
// disconnect c1 to disrupt consensus
1841+
network.disconnect(1)
1842+
1843+
Expect(c1.Order(env, 0)).To(Succeed())
1844+
1845+
doneProp := make(chan struct{})
1846+
go func() {
1847+
Expect(c1.Order(env, 0)).To(Succeed())
1848+
close(doneProp)
1849+
}()
1850+
// expect second `Order` to block
1851+
Consistently(doneProp).ShouldNot(BeClosed())
1852+
network.exec(func(c *chain) {
1853+
Consistently(c.support.WriteBlockCallCount).Should(BeZero())
1854+
})
1855+
1856+
network.connect(1)
1857+
c1.clock.Increment(interval)
1858+
1859+
Eventually(doneProp).Should(BeClosed())
1860+
network.exec(func(c *chain) {
1861+
Eventually(c.support.WriteBlockCallCount).Should(Equal(2))
1862+
})
1863+
})
1864+
1865+
It("resets block in flight when steps down from leader", func() {
1866+
c1.cutter.CutNext = true
1867+
c2.cutter.CutNext = true
1868+
// disconnect c1 to disrupt consensus
1869+
network.disconnect(1)
1870+
1871+
Expect(c1.Order(env, 0)).To(Succeed())
1872+
1873+
doneProp := make(chan struct{})
1874+
go func() {
1875+
defer GinkgoRecover()
1876+
1877+
Expect(c1.Order(env, 0)).To(Succeed())
1878+
close(doneProp)
1879+
}()
1880+
// expect second `Order` to block
1881+
Consistently(doneProp).ShouldNot(BeClosed())
1882+
network.exec(func(c *chain) {
1883+
Consistently(c.support.WriteBlockCallCount).Should(BeZero())
1884+
})
1885+
1886+
network.elect(2)
1887+
Expect(c3.Order(env, 0)).To(Succeed())
1888+
Eventually(c1.support.WriteBlockCallCount).Should(Equal(0))
1889+
Eventually(c2.support.WriteBlockCallCount).Should(Equal(1))
1890+
Eventually(c3.support.WriteBlockCallCount).Should(Equal(1))
1891+
1892+
network.connect(1)
1893+
c2.clock.Increment(interval)
1894+
1895+
Eventually(doneProp).Should(BeClosed())
1896+
network.exec(func(c *chain) {
1897+
Eventually(c.support.WriteBlockCallCount).Should(Equal(2))
1898+
})
1899+
})
1900+
})
1901+
18331902
When("follower is disconnected", func() {
18341903
It("should return error when receiving an env", func() {
18351904
network.disconnect(2)

0 commit comments

Comments
 (0)