Skip to content

Commit 5816af0

Browse files
guogeryacovm
authored andcommitted
[FAB-11162] Simplify clock management in chain.
This CR also adds tests for etcdraft chain. Change-Id: I8f86a117a9b1642df2745ef7e48e8bddf8f898e3 Signed-off-by: Jay Guo <guojiannan1101@gmail.com>
1 parent 757a4cf commit 5816af0

File tree

2 files changed

+228
-62
lines changed

2 files changed

+228
-62
lines changed

orderer/consensus/etcdraft/chain.go

Lines changed: 44 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,16 @@ import (
2424
"github.com/pkg/errors"
2525
)
2626

27-
// Storage essentially represents etcd/raft.MemoryStorage.
28-
//
29-
// This interface is defined to expose dependencies of fsm
30-
// so that it may be swapped in the future.
31-
//
32-
// TODO(jay) add other necessary methods to this interface
33-
// once we need them in implementation, e.g. ApplySnapshot
27+
// Storage is currently backed by etcd/raft.MemoryStorage. This interface is
28+
// defined to expose dependencies of fsm so that it may be swapped in the
29+
// future. TODO(jay) Add other necessary methods to this interface once we need
30+
// them in implementation, e.g. ApplySnapshot.
3431
type Storage interface {
3532
raft.Storage
3633
Append(entries []raftpb.Entry) error
3734
}
3835

39-
// Options contains necessary artifacts to start raft-based chain
36+
// Options contains all the configurations relevant to the chain.
4037
type Options struct {
4138
RaftID uint64
4239

@@ -53,18 +50,17 @@ type Options struct {
5350
Peers []raft.Peer
5451
}
5552

56-
// Chain implements consensus.Chain interface with raft-based consensus
53+
// Chain implements consensus.Chain interface.
5754
type Chain struct {
5855
raftID uint64
5956

6057
submitC chan *orderer.SubmitRequest
6158
commitC chan *common.Block
62-
observeC chan<- uint64 // notify external observer on leader change
59+
observeC chan<- uint64 // Notifies external observer on leader change
60+
haltC chan struct{}
61+
doneC chan struct{}
6362

64-
haltC chan struct{}
65-
doneC chan struct{}
66-
67-
clock clock.Clock // test could inject a fake clock
63+
clock clock.Clock
6864

6965
support consensus.ConsenterSupport
7066

@@ -79,7 +75,7 @@ type Chain struct {
7975
logger *flogging.FabricLogger
8076
}
8177

82-
// NewChain constructs a chain object
78+
// NewChain returns a new chain.
8379
func NewChain(support consensus.ConsenterSupport, opts Options, observe chan<- uint64) (*Chain, error) {
8480
return &Chain{
8581
raftID: opts.RaftID,
@@ -96,7 +92,7 @@ func NewChain(support consensus.ConsenterSupport, opts Options, observe chan<- u
9692
}, nil
9793
}
9894

99-
// Start starts the chain
95+
// Start instructs the orderer to begin serving the chain and keep it current.
10096
func (c *Chain) Start() {
10197
config := &raft.Config{
10298
ID: c.raftID,
@@ -114,28 +110,28 @@ func (c *Chain) Start() {
114110
go c.serveRequest()
115111
}
116112

117-
// Order submits normal type transactions
113+
// Order submits normal type transactions for ordering.
118114
func (c *Chain) Order(env *common.Envelope, configSeq uint64) error {
119115
return c.Submit(&orderer.SubmitRequest{LastValidationSeq: configSeq, Content: env}, 0)
120116
}
121117

122-
// Configure submits config type transactins
118+
// Configure submits config type transactions for ordering.
123119
func (c *Chain) Configure(env *common.Envelope, configSeq uint64) error {
124120
c.logger.Panicf("Configure not implemented yet")
125121
return nil
126122
}
127123

128-
// WaitReady is currently no-op
124+
// WaitReady is currently a no-op.
129125
func (c *Chain) WaitReady() error {
130126
return nil
131127
}
132128

133-
// Errored indicates if chain is still running
129+
// Errored returns a channel that closes when the chain stops.
134130
func (c *Chain) Errored() <-chan struct{} {
135131
return c.doneC
136132
}
137133

138-
// Halt stops chain
134+
// Halt stops the chain.
139135
func (c *Chain) Halt() {
140136
select {
141137
case c.haltC <- struct{}{}:
@@ -145,10 +141,10 @@ func (c *Chain) Halt() {
145141
<-c.doneC
146142
}
147143

148-
// Submit submits requests to
149-
// - local serveRequest go routine if this is leader
150-
// - actual leader via transport
151-
// - fails if there's no leader elected yet
144+
// Submit forwards the incoming request to:
145+
// - the local serveRequest goroutine if this is leader
146+
// - the actual leader via the transport mechanism
147+
// The call fails if there's no leader elected yet.
152148
func (c *Chain) Submit(req *orderer.SubmitRequest, sender uint64) error {
153149
c.leaderLock.RLock()
154150
defer c.leaderLock.RUnlock()
@@ -171,13 +167,30 @@ func (c *Chain) Submit(req *orderer.SubmitRequest, sender uint64) error {
171167
}
172168

173169
func (c *Chain) serveRequest() {
174-
clocking := false
175-
timer := c.clock.NewTimer(c.support.SharedConfig().BatchTimeout())
170+
ticking := false
171+
timer := c.clock.NewTimer(time.Second)
172+
// we need a stopped timer rather than nil,
173+
// because we will be select waiting on timer.C()
176174
if !timer.Stop() {
177-
// drain the channel. see godoc of time#Timer.Stop
178175
<-timer.C()
179176
}
180177

178+
// if timer is already started, this is a no-op
179+
start := func() {
180+
if !ticking {
181+
ticking = true
182+
timer.Reset(c.support.SharedConfig().BatchTimeout())
183+
}
184+
}
185+
186+
stop := func() {
187+
if !timer.Stop() && ticking {
188+
// we only need to drain the channel if the timer expired (not explicitly stopped)
189+
<-timer.C()
190+
}
191+
ticking = false
192+
}
193+
181194
for {
182195
seq := c.support.Sequence()
183196

@@ -196,24 +209,18 @@ func (c *Chain) serveRequest() {
196209

197210
batches, _ := c.support.BlockCutter().Ordered(msg.Content)
198211
if len(batches) == 0 {
199-
if !clocking {
200-
clocking = true
201-
timer.Reset(c.support.SharedConfig().BatchTimeout())
202-
}
212+
start()
203213
continue
204214
}
205215

206-
if !timer.Stop() && clocking {
207-
<-timer.C()
208-
}
209-
clocking = false
216+
stop()
210217

211218
if err := c.commitBatches(batches...); err != nil {
212219
c.logger.Errorf("Failed to commit block: %s", err)
213220
}
214221

215222
case <-timer.C():
216-
clocking = false
223+
ticking = false
217224

218225
batch := c.support.BlockCutter().Cut()
219226
if len(batch) == 0 {

0 commit comments

Comments
 (0)