/
replication.go
168 lines (151 loc) · 4.77 KB
/
replication.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
package raft
import (
"flag"
"fmt"
"log"
"time"
"github.com/HelloCodeMing/raft-rocks/pb"
"github.com/HelloCodeMing/raft-rocks/utils"
"github.com/golang/glog"
"golang.org/x/net/context"
"golang.org/x/net/trace"
)
const DumpRPCAppendEntries = false
var (
heartbeatTO time.Duration
)
func init() {
flag.DurationVar(&heartbeatTO, "leader_heartbeat", 500*time.Millisecond, "leader heartbeat interval")
}
type AppendEntriesSession struct {
args *pb.AppendEntriesReq
reply *pb.AppendEntriesRes
tr trace.Trace
done chan bool
me int
}
func NewAppendEntriesSession(me int, ctx context.Context, req *pb.AppendEntriesReq, res *pb.AppendEntriesRes) *AppendEntriesSession {
tr, ok := trace.FromContext(ctx)
if !ok {
// tr = trace.New("Raft.AppendEntries", fmt.Sprintf("peer<%d,%d>", me, term))
glog.Fatal("no trace from context")
}
return &AppendEntriesSession{
args: req,
reply: res,
tr: tr,
done: make(chan bool, 1),
me: me,
}
}
func (session *AppendEntriesSession) trace(format string, arg ...interface{}) {
session.tr.LazyPrintf(format, arg...)
if DumpRPCAppendEntries {
log.Printf("RPCAppendEntries<%d>: %s", session.me, fmt.Sprintf(format, arg...))
}
}
func (session *AppendEntriesSession) finish() {
// session.tr.Finish()
}
func (rf *Raft) sendAppendEntries(peer int, req *pb.AppendEntriesReq) (*pb.AppendEntriesRes, error) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
res, err := rf.peers[peer].AppendEntries(ctx, req)
if err != nil {
return res, err
}
rf.checkNewTerm(int32(peer), res.Term)
return res, err
}
func (rf *Raft) AppendEntries(ctx context.Context, req *pb.AppendEntriesReq) (res *pb.AppendEntriesRes, err error) {
res = &pb.AppendEntriesRes{}
session := NewAppendEntriesSession(rf.me, ctx, req, res)
rf.appendEntriesCh <- session
<-session.done
session.finish()
return res, nil
}
func (rf *Raft) processAppendEntries(session *AppendEntriesSession) {
args := session.args
reply := session.reply
rf.checkNewTerm(args.LeaderId, args.Term)
if len(args.LogEntries) == 0 {
session.trace("receive heartbeat")
}
term := rf.state.getTerm()
reply.Term = term
reply.Success = false
if args.Term < term {
session.trace("Reject due to term: %d<%d", args.Term, term)
} else if int(args.PrevLogIndex) <= rf.log.LastIndex() {
t := rf.log.At(int(args.PrevLogIndex)).Term
if t == args.PrevLogTerm {
reply.Success = true
session.trace("Accept")
if len(args.LogEntries) > 0 {
rf.log.AppendAt(int(args.PrevLogIndex)+1, args.LogEntries)
session.trace("Append log")
}
} else {
session.trace("Reject due to term mismatch at log[%d]: %d!=%d", args.PrevLogIndex, args.PrevLogTerm, t)
}
} else {
session.trace("Reject due to prevLogIndex > lastLogIndex: %d>%d", args.PrevLogIndex, rf.log.LastIndex())
}
if reply.Success {
lastNewEntry := int(args.PrevLogIndex) + len(args.LogEntries)
if lastNewEntry > rf.state.getCommited() && rf.state.checkFollowerCommit(int(args.LeaderCommit)) {
session.trace("Follower update commitIndex: commitIndex: %d", rf.state.getCommited())
}
}
session.done <- true
}
// If there's some stuff to replicate to peer, send it, or send a empty heartbeat.
func (rf *Raft) replicateLog(peer int, retreatCnt *int32) {
// if rf.nextIndex[peer] <= last log Index, send entries until lastLogIndex
// else send heartbeat, choose empty last log entry to send
const BatchSize = 100
peerStr := fmt.Sprintf("peer<%d>", peer)
rf.state.RLock()
isHeartBeat := false
lastIndex := rf.log.LastIndex()
toReplicate := rf.state.toReplicate(peer)
prevIndex := toReplicate - 1
prevTerm := rf.log.At(prevIndex).Term
args := &pb.AppendEntriesReq{
Term: rf.state.CurrentTerm,
LeaderId: int32(rf.me),
PrevLogIndex: int32(prevIndex),
PrevLogTerm: prevTerm,
LeaderCommit: int32(rf.state.CommitIndex),
}
if toReplicate <= lastIndex {
// big step to mix up, small step when retreating
if *retreatCnt > 0 {
args.LogEntries = rf.log.Slice(toReplicate, toReplicate+1)
} else {
args.LogEntries = rf.log.Slice(toReplicate, toReplicate+BatchSize)
}
isHeartBeat = false
} else {
isHeartBeat = true
}
rf.state.RUnlock()
res, err := rf.sendAppendEntries(peer, args)
if err == nil {
if res.Success {
*retreatCnt = 0
if !isHeartBeat {
rf.state.replicatedToPeer(peer, int(args.PrevLogIndex)+len(args.LogEntries))
glog.V(utils.VDebug).Infof("%s Replicate to %s succeed, entries range [%d:%d]", rf, peerStr, toReplicate, toReplicate+len(args.LogEntries))
}
} else {
index := rf.state.retreatForPeer(peer)
*retreatCnt++
glog.Warningf("%s Replicate to %s failed due to inconsistency, backoff to %d", rf, peerStr, index)
}
} else {
glog.Warningf("%s Replicate to %s failed due to rpc error: %s", rf, peerStr, err)
}
return
}