/
raft.go
179 lines (154 loc) · 4.68 KB
/
raft.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package raft
import (
"context"
"math/rand"
"net/http"
"time"
"github.com/HelloCodeMing/raft-rocks/pb"
"github.com/HelloCodeMing/raft-rocks/store"
"github.com/HelloCodeMing/raft-rocks/utils"
"github.com/golang/glog"
)
type ApplyMsg struct {
Command *pb.KVCommand
UseSnapshot bool // ignore for lab2; only used in lab3
Snapshot []byte // ignore for lab2; only used in lab3
}
// Raft A Go object implementing a single Raft peer.
type Raft struct {
peers []*utils.ClientEnd
me int // index into peers[]
log *store.LogStorage
state *raftState
// raft send apply message to RaftKV
applyCh chan *ApplyMsg
// rpc channel
appendEntriesCh chan *AppendEntriesSession
requestVoteChan chan *RequestVoteSession
// once submit a command, send lastLogIndex into this chan,
// the replicator will try best replicate all logEntries until lastLogIndex
submitedCh chan int
termChangedCh chan bool
shutdownCh chan bool // shutdown all components
}
func (rf *Raft) majority() int {
return len(rf.peers)/2 + 1
}
func (rf *Raft) String() string {
return rf.state.String()
}
// exists a new term
func (rf *Raft) checkNewTerm(candidateID int32, newTerm int32) (beFollower bool) {
if rf.state.checkNewTerm(newTerm) {
glog.Infof("%s RuleForAll: find new term<%d,%d>, become follower", rf, candidateID, newTerm)
rf.termChangedCh <- true
return true
}
return false
}
var (
r = rand.New(rand.NewSource(time.Now().UnixNano()))
)
func (rf *Raft) electionTO() time.Duration {
return time.Duration(r.Int63()%((electionTimeoutMax - electionTimeoutMin).Nanoseconds())) + electionTimeoutMin
}
func (rf *Raft) foreachPeer(f func(peer int)) {
for i := 0; i < len(rf.peers); i++ {
if i != rf.me {
f(i)
}
}
}
// Start raft state machine.
func (rf *Raft) startStateMachine() {
for {
select {
case <-rf.shutdownCh:
glog.Info(rf, "Stop state machine")
return
default:
}
switch rf.state.Role {
case pb.RaftRole_Follower:
rf.doFollower()
case pb.RaftRole_Candidate:
rf.doCandidate()
case pb.RaftRole_Leader:
rf.doLeader()
}
}
}
func (rf *Raft) Kill() {
glog.Info(rf, " Killing raft, wait for goroutines quit")
close(rf.shutdownCh)
}
func (rf *Raft) UpdateReadLease(term int32, lease time.Time) {
rf.state.updateReadLease(term, lease)
}
// SubmitCommand submit a command to raft
// The command will be replicated to followers, then leader commmit, applied to the state machine by raftKV,
// and finally response to the client
func (rf *Raft) SubmitCommand(ctx context.Context, command *pb.KVCommand) (isLeader bool, readOnly bool) {
term := rf.state.getTerm()
isLeader = rf.state.getRole() == pb.RaftRole_Leader
if !isLeader {
return false, false
}
isLeader = true
command.Timestamp = time.Now().UnixNano()
command.Term = term
if command.GetCmdType() == pb.CommandType_Get {
if time.Now().Before(rf.state.readLease) {
readIndex := rf.state.getCommited()
glog.V(utils.VDebug).Infof("Get with lease read readIndex=%d,command=%v", readIndex, command)
for rf.state.LastApplied < readIndex {
glog.V(utils.VDebug).Infof("Lease read: lastApplied=%d < readIndex=%d, wait for a moment", rf.state.LastApplied, readIndex)
time.Sleep(time.Millisecond)
}
readOnly = true
return
}
}
// append to local log
index := rf.log.Append(command)
go func() {
rf.submitedCh <- index
}()
glog.Infof("%s SubmitCommand by leader %v", rf, command)
return
}
func (rf *Raft) IsLeader() bool {
return rf.state.getRole() == pb.RaftRole_Leader
}
func (rf *Raft) registerDebugHandler() {
// a http interface to dump raft state, for debug purpose
http.HandleFunc("/raft/meta", func(res http.ResponseWriter, req *http.Request) {
rf.state.dump(res)
req.Body.Close()
})
http.HandleFunc("/raft/log", func(res http.ResponseWriter, req *http.Request) {
rf.log.Dump(res)
req.Body.Close()
})
}
// NewRaft create a raft instance
// peers used to communicate with other peers in this raft group, need to be construct in advance
// persister used to store metadata of raft, and log used for WAL
// applyCh, apply a command to state machine through this channel
func NewRaft(peers []*utils.ClientEnd, me int, persister store.Persister, log *store.LogStorage, applyCh chan *ApplyMsg) *Raft {
rf := new(Raft)
rf.peers = peers
rf.me = me
rf.applyCh = applyCh
rf.log = log
rf.requestVoteChan = make(chan *RequestVoteSession, 10)
rf.appendEntriesCh = make(chan *AppendEntriesSession, 100)
rf.shutdownCh = make(chan bool)
rf.termChangedCh = make(chan bool, 10)
rf.submitedCh = make(chan int, 10)
rf.state = makeRaftState(log, persister, applyCh, len(peers), me)
glog.Infof("%s Created raft instance: %s", rf, rf.String())
go rf.startStateMachine()
rf.registerDebugHandler()
return rf
}