Skip to content

Commit 710b14c

Browse files
committed
raft: support safe readonly request
Implement raft readonly request described in raft thesis 6.4 along with the existing clock/lease based approach.
1 parent e53b995 commit 710b14c

File tree

6 files changed

+297
-68
lines changed

6 files changed

+297
-68
lines changed

raft/README.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,13 @@ This raft implementation is a full feature implementation of Raft protocol. Feat
2626
- Log compaction
2727
- Membership changes
2828
- Leadership transfer extension
29-
- Lease-based linearizable read-only queries served by both the leader and followers
29+
- Efficient linearizable read-only queries served by both the leader and followers
30+
- leader checks with quorum and bypasses Raft log before processing read-only queries
31+
- followers asks leader to get a safe read index before processing read-only queries
32+
- More efficient lease-based linearizable read-only queries served by both the leader and followers
33+
- leader bypasses Raft log and processing read-only queries locally
34+
- followers asks leader to get a safe read index before processing read-only queries
35+
- this approach relies on the clock of the all the machines in raft group
3036

3137
This raft implementation also includes a few optional enhancements:
3238

raft/node.go

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,11 @@ type Ready struct {
6060
// HardState will be equal to empty state if there is no update.
6161
pb.HardState
6262

63-
// ReadState can be used for node to serve linearizable read requests locally
63+
// ReadStates can be used for node to serve linearizable read requests locally
6464
// when its applied index is greater than the index in ReadState.
6565
// Note that the readState will be returned when raft receives msgReadIndex.
6666
// The returned is only valid for the request that requested to read.
67-
ReadState
67+
ReadStates []ReadState
6868

6969
// Entries specifies entries to be saved to stable storage BEFORE
7070
// Messages are sent.
@@ -102,7 +102,7 @@ func IsEmptySnap(sp pb.Snapshot) bool {
102102
func (rd Ready) containsUpdates() bool {
103103
return rd.SoftState != nil || !IsEmptyHardState(rd.HardState) ||
104104
!IsEmptySnap(rd.Snapshot) || len(rd.Entries) > 0 ||
105-
len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 || rd.Index != None
105+
len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 || len(rd.ReadStates) != 0
106106
}
107107

108108
// Node represents a node in a raft cluster.
@@ -151,11 +151,6 @@ type Node interface {
151151
// Read state has a read index. Once the application advances further than the read
152152
// index, any linearizable read requests issued before the read request can be
153153
// processed safely. The read state will have the same rctx attached.
154-
//
155-
// Note: the current implementation depends on the leader lease. If the clock drift is unbounded,
156-
// leader might keep the lease longer than it should (clock can move backward/pause without any bound).
157-
// ReadIndex is not safe in that case.
158-
// TODO: add clock drift bound into raft configuration.
159154
ReadIndex(ctx context.Context, rctx []byte) error
160155

161156
// Status returns the current status of the raft state machine.
@@ -370,8 +365,7 @@ func (n *node) run(r *raft) {
370365
}
371366

372367
r.msgs = nil
373-
r.readState.Index = None
374-
r.readState.RequestCtx = nil
368+
r.readStates = nil
375369
advancec = n.advancec
376370
case <-advancec:
377371
if prevHardSt.Commit != 0 {
@@ -516,12 +510,8 @@ func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
516510
if r.raftLog.unstable.snapshot != nil {
517511
rd.Snapshot = *r.raftLog.unstable.snapshot
518512
}
519-
if r.readState.Index != None {
520-
c := make([]byte, len(r.readState.RequestCtx))
521-
copy(c, r.readState.RequestCtx)
522-
523-
rd.Index = r.readState.Index
524-
rd.RequestCtx = c
513+
if len(r.readStates) != 0 {
514+
rd.ReadStates = r.readStates
525515
}
526516
return rd
527517
}

raft/node_test.go

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -150,24 +150,19 @@ func TestNodeReadIndex(t *testing.T) {
150150
appendStep := func(r *raft, m raftpb.Message) {
151151
msgs = append(msgs, m)
152152
}
153-
wreadIndex := uint64(1)
154-
wrequestCtx := []byte("somedata")
153+
wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}}
155154

156155
n := newNode()
157156
s := NewMemoryStorage()
158157
r := newTestRaft(1, []uint64{1}, 10, 1, s)
159-
r.readState.Index = wreadIndex
160-
r.readState.RequestCtx = wrequestCtx
158+
r.readStates = wrs
159+
161160
go n.run(r)
162161
n.Campaign(context.TODO())
163162
for {
164163
rd := <-n.Ready()
165-
if rd.Index != wreadIndex {
166-
t.Errorf("ReadIndex = %d, want %d", rd.Index, wreadIndex)
167-
}
168-
169-
if !bytes.Equal(rd.RequestCtx, wrequestCtx) {
170-
t.Errorf("RequestCtx = %v, want %v", rd.RequestCtx, wrequestCtx)
164+
if !reflect.DeepEqual(rd.ReadStates, wrs) {
165+
t.Errorf("ReadStates = %v, want %v", rd.ReadStates, wrs)
171166
}
172167

173168
s.Append(rd.Entries)
@@ -180,7 +175,7 @@ func TestNodeReadIndex(t *testing.T) {
180175
}
181176

182177
r.step = appendStep
183-
wrequestCtx = []byte("somedata2")
178+
wrequestCtx := []byte("somedata2")
184179
n.ReadIndex(context.TODO(), wrequestCtx)
185180
n.Stop()
186181

raft/raft.go

Lines changed: 89 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,20 @@ const (
3737
StateLeader
3838
)
3939

40+
type ReadOnlyOption int
41+
42+
const (
43+
// ReadOnlySafe guarantees the linearizability of the read only request by
44+
// communicating with the quorum. It is the default and suggested option.
45+
ReadOnlySafe ReadOnlyOption = iota
46+
// ReadOnlyLeaseBased ensures linearizability of the read only request by
47+
// relying on the leader lease. It can be affected by clock drift.
48+
// If the clock drift is unbounded, leader might keep the lease longer than it
49+
// should (clock can move backward/pause without any bound). ReadIndex is not safe
50+
// in that case.
51+
ReadOnlyLeaseBased
52+
)
53+
4054
// Possible values for CampaignType
4155
const (
4256
// campaignElection represents the type of normal election
@@ -114,6 +128,18 @@ type Config struct {
114128
// steps down when quorum is not active for an electionTimeout.
115129
CheckQuorum bool
116130

131+
// ReadOnlyOption specifies how the read only request is processed.
132+
//
133+
// ReadOnlySafe guarantees the linearizability of the read only request by
134+
// communicating with the quorum. It is the default and suggested option.
135+
//
136+
// ReadOnlyLeaseBased ensures linearizability of the read only request by
137+
// relying on the leader lease. It can be affected by clock drift.
138+
// If the clock drift is unbounded, leader might keep the lease longer than it
139+
// should (clock can move backward/pause without any bound). ReadIndex is not safe
140+
// in that case.
141+
ReadOnlyOption ReadOnlyOption
142+
117143
// Logger is the logger used for raft log. For multinode which can host
118144
// multiple raft group, each raft group can have its own logger
119145
Logger Logger
@@ -147,23 +173,13 @@ func (c *Config) validate() error {
147173
return nil
148174
}
149175

150-
// ReadState provides state for read only query.
151-
// It's caller's responsibility to send MsgReadIndex first before getting
152-
// this state from ready, It's also caller's duty to differentiate if this
153-
// state is what it requests through RequestCtx, eg. given a unique id as
154-
// RequestCtx
155-
type ReadState struct {
156-
Index uint64
157-
RequestCtx []byte
158-
}
159-
160176
type raft struct {
161177
id uint64
162178

163179
Term uint64
164180
Vote uint64
165181

166-
readState ReadState
182+
readStates []ReadState
167183

168184
// the log
169185
raftLog *raftLog
@@ -186,6 +202,8 @@ type raft struct {
186202
// New configuration is ignored if there exists unapplied configuration.
187203
pendingConf bool
188204

205+
readOnly *readOnly
206+
189207
// number of ticks since it reached last electionTimeout when it is leader
190208
// or candidate.
191209
// number of ticks since it reached last electionTimeout or received a
@@ -234,7 +252,6 @@ func newRaft(c *Config) *raft {
234252
r := &raft{
235253
id: c.ID,
236254
lead: None,
237-
readState: ReadState{Index: None, RequestCtx: nil},
238255
raftLog: raftlog,
239256
maxMsgSize: c.MaxSizePerMsg,
240257
maxInflight: c.MaxInflightMsgs,
@@ -243,6 +260,7 @@ func newRaft(c *Config) *raft {
243260
heartbeatTimeout: c.HeartbeatTick,
244261
logger: c.Logger,
245262
checkQuorum: c.CheckQuorum,
263+
readOnly: newReadOnly(c.ReadOnlyOption),
246264
}
247265
r.rand = rand.New(rand.NewSource(int64(c.ID)))
248266
for _, p := range peers {
@@ -361,7 +379,7 @@ func (r *raft) sendAppend(to uint64) {
361379
}
362380

363381
// sendHeartbeat sends an empty MsgApp
364-
func (r *raft) sendHeartbeat(to uint64) {
382+
func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
365383
// Attach the commit as min(to.matched, r.committed).
366384
// When the leader sends out heartbeat message,
367385
// the receiver(follower) might not be matched with the leader
@@ -370,10 +388,12 @@ func (r *raft) sendHeartbeat(to uint64) {
370388
// an unmatched index.
371389
commit := min(r.prs[to].Match, r.raftLog.committed)
372390
m := pb.Message{
373-
To: to,
374-
Type: pb.MsgHeartbeat,
375-
Commit: commit,
391+
To: to,
392+
Type: pb.MsgHeartbeat,
393+
Commit: commit,
394+
Context: ctx,
376395
}
396+
377397
r.send(m)
378398
}
379399

@@ -390,11 +410,20 @@ func (r *raft) bcastAppend() {
390410

391411
// bcastHeartbeat sends RPC, without entries to all the peers.
392412
func (r *raft) bcastHeartbeat() {
413+
lastCtx := r.readOnly.lastPendingRequestCtx()
414+
if len(lastCtx) == 0 {
415+
r.bcastHeartbeatWithCtx(nil)
416+
} else {
417+
r.bcastHeartbeatWithCtx([]byte(lastCtx))
418+
}
419+
}
420+
421+
func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
393422
for id := range r.prs {
394423
if id == r.id {
395424
continue
396425
}
397-
r.sendHeartbeat(id)
426+
r.sendHeartbeat(id, ctx)
398427
r.prs[id].resume()
399428
}
400429
}
@@ -434,6 +463,7 @@ func (r *raft) reset(term uint64) {
434463
}
435464
}
436465
r.pendingConf = false
466+
r.readOnly = newReadOnly(r.readOnly.option)
437467
}
438468

439469
func (r *raft) appendEntry(es ...pb.Entry) {
@@ -680,16 +710,29 @@ func stepLeader(r *raft, m pb.Message) {
680710
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
681711
return
682712
case pb.MsgReadIndex:
683-
ri := None
684-
if r.checkQuorum {
685-
ri = r.raftLog.committed
686-
}
687-
if m.From == None || m.From == r.id { // from local member
688-
r.readState.Index = ri
689-
r.readState.RequestCtx = m.Entries[0].Data
713+
if r.quorum() > 1 {
714+
// thinking: use an interally defined context instead of the user given context.
715+
// We can express this in terms of the term and index instead of a user-supplied value.
716+
// This would allow multiple reads to piggyback on the same message.
717+
switch r.readOnly.option {
718+
case ReadOnlySafe:
719+
r.readOnly.addRequest(r.raftLog.committed, m)
720+
r.bcastHeartbeatWithCtx(m.Entries[0].Data)
721+
case ReadOnlyLeaseBased:
722+
var ri uint64
723+
if r.checkQuorum {
724+
ri = r.raftLog.committed
725+
}
726+
if m.From == None || m.From == r.id { // from local member
727+
r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
728+
} else {
729+
r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries})
730+
}
731+
}
690732
} else {
691-
r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries})
733+
r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
692734
}
735+
693736
return
694737
}
695738

@@ -750,6 +793,25 @@ func stepLeader(r *raft, m pb.Message) {
750793
if pr.Match < r.raftLog.lastIndex() {
751794
r.sendAppend(m.From)
752795
}
796+
797+
if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
798+
return
799+
}
800+
801+
ackCount := r.readOnly.recvAck(m)
802+
if ackCount < r.quorum() {
803+
return
804+
}
805+
806+
rss := r.readOnly.advance(m)
807+
for _, rs := range rss {
808+
req := rs.req
809+
if req.From == None || req.From == r.id { // from local member
810+
r.readStates = append(r.readStates, ReadState{Index: rs.index, RequestCtx: req.Entries[0].Data})
811+
} else {
812+
r.send(pb.Message{To: req.From, Type: pb.MsgReadIndexResp, Index: rs.index, Entries: req.Entries})
813+
}
814+
}
753815
case pb.MsgSnapStatus:
754816
if pr.State != ProgressStateSnapshot {
755817
return
@@ -891,9 +953,7 @@ func stepFollower(r *raft, m pb.Message) {
891953
r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries))
892954
return
893955
}
894-
895-
r.readState.Index = m.Index
896-
r.readState.RequestCtx = m.Entries[0].Data
956+
r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})
897957
}
898958
}
899959

@@ -914,7 +974,7 @@ func (r *raft) handleAppendEntries(m pb.Message) {
914974

915975
func (r *raft) handleHeartbeat(m pb.Message) {
916976
r.raftLog.commitTo(m.Commit)
917-
r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp})
977+
r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
918978
}
919979

920980
func (r *raft) handleSnapshot(m pb.Message) {

0 commit comments

Comments
 (0)