This repository has been archived by the owner on Mar 4, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 136
/
tick.c
241 lines (209 loc) · 7.32 KB
/
tick.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
#include "../include/raft.h"
#include "assert.h"
#include "configuration.h"
#include "convert.h"
#include "election.h"
#include "membership.h"
#include "progress.h"
#include "replication.h"
#include "tracing.h"
/* Set to 1 to enable tracing. */
#if 0
#define tracef(...) Tracef(r->tracer, __VA_ARGS__)
#else
#define tracef(...)
#endif
/* Apply time-dependent rules for followers (Figure 3.1). */
static int tickFollower(struct raft *r)
{
const struct raft_server *server;
int rv;
assert(r != NULL);
assert(r->state == RAFT_FOLLOWER);
server = configurationGet(&r->configuration, r->id);
/* If we have been removed from the configuration, or maybe we didn't
* receive one yet, just stay follower. */
if (server == NULL) {
return 0;
}
/* Check if we need to start an election.
*
* From Section 3.3:
*
* If a follower receives no communication over a period of time called
* the election timeout, then it assumes there is no viable leader and
* begins an election to choose a new leader.
*
* Figure 3.1:
*
* If election timeout elapses without receiving AppendEntries RPC from
* current leader or granting vote to candidate, convert to candidate.
*/
if (electionTimerExpired(r) && server->role == RAFT_VOTER) {
tracef("convert to candidate and start new election");
rv = convertToCandidate(r, false /* disrupt leader */);
if (rv != 0) {
tracef("convert to candidate: %s", raft_strerror(rv));
return rv;
}
}
return 0;
}
/* Apply time-dependent rules for candidates (Figure 3.1). */
static int tickCandidate(struct raft *r)
{
assert(r != NULL);
assert(r->state == RAFT_CANDIDATE);
/* Check if we need to start an election.
*
* From Section 3.4:
*
* The third possible outcome is that a candidate neither wins nor loses
* the election: if many followers become candidates at the same time,
* votes could be split so that no candidate obtains a majority. When this
* happens, each candidate will time out and start a new election by
* incrementing its term and initiating another round of RequestVote RPCs
*/
if (electionTimerExpired(r)) {
tracef("start new election");
return electionStart(r);
}
return 0;
}
/* Return true if we received an AppendEntries RPC result from a majority of
* voting servers since we became leaders or since the last time this function
* was called.
*
* For each server the function checks the recent_recv flag of the associated
* progress object, and resets the flag after the check. It returns true if a
* majority of voting server had the flag set to true. */
static bool checkContactQuorum(struct raft *r)
{
unsigned i;
unsigned contacts = 0;
assert(r->state == RAFT_LEADER);
for (i = 0; i < r->configuration.n; i++) {
struct raft_server *server = &r->configuration.servers[i];
bool recent_recv = progressResetRecentRecv(r, i);
if ((server->role == RAFT_VOTER && recent_recv) ||
server->id == r->id) {
contacts++;
}
}
return contacts > configurationVoterCount(&r->configuration) / 2;
}
/* Apply time-dependent rules for leaders (Figure 3.1). */
static int tickLeader(struct raft *r)
{
raft_time now = r->io->time(r->io);
assert(r->state == RAFT_LEADER);
/* Check if we still can reach a majority of servers.
*
* From Section 6.2:
*
* A leader in Raft steps down if an election timeout elapses without a
* successful round of heartbeats to a majority of its cluster; this
* allows clients to retry their requests with another server.
*/
if (now - r->election_timer_start >= r->election_timeout) {
if (!checkContactQuorum(r)) {
tracef("unable to contact majority of cluster -> step down");
convertToFollower(r);
return 0;
}
r->election_timer_start = r->io->time(r->io);
}
/* Possibly send heartbeats.
*
* From Figure 3.1:
*
* Send empty AppendEntries RPC during idle periods to prevent election
* timeouts.
*/
replicationHeartbeat(r);
/* If a server is being promoted, increment the timer of the current
* round or abort the promotion.
*
* From Section 4.2.1:
*
* The algorithm waits a fixed number of rounds (such as 10). If the last
* round lasts less than an election timeout, then the leader adds the new
* server to the cluster, under the assumption that there are not enough
* unreplicated entries to create a significant availability
* gap. Otherwise, the leader aborts the configuration change with an
* error.
*/
if (r->leader_state.promotee_id != 0) {
raft_id id = r->leader_state.promotee_id;
unsigned server_index;
raft_time round_duration = now - r->leader_state.round_start;
bool is_too_slow;
bool is_unresponsive;
/* If a promotion is in progress, we expect that our configuration
* contains an entry for the server being promoted, and that the server
* is not yet considered as voting. */
server_index = configurationIndexOf(&r->configuration, id);
assert(server_index < r->configuration.n);
assert(r->configuration.servers[server_index].role != RAFT_VOTER);
is_too_slow = (r->leader_state.round_number == r->max_catch_up_rounds &&
round_duration > r->election_timeout);
is_unresponsive = round_duration > r->max_catch_up_round_duration;
/* Abort the promotion if we are at the 10'th round and it's still
* taking too long, or if the server is unresponsive. */
if (is_too_slow || is_unresponsive) {
struct raft_change *change;
r->leader_state.promotee_id = 0;
r->leader_state.round_index = 0;
r->leader_state.round_number = 0;
r->leader_state.round_start = 0;
change = r->leader_state.change;
r->leader_state.change = NULL;
if (change != NULL && change->cb != NULL) {
change->cb(change, RAFT_NOCONNECTION);
}
}
}
return 0;
}
static int tick(struct raft *r)
{
int rv = -1;
assert(r->state == RAFT_UNAVAILABLE || r->state == RAFT_FOLLOWER ||
r->state == RAFT_CANDIDATE || r->state == RAFT_LEADER);
/* If we are not available, let's do nothing. */
if (r->state == RAFT_UNAVAILABLE) {
return 0;
}
switch (r->state) {
case RAFT_FOLLOWER:
rv = tickFollower(r);
break;
case RAFT_CANDIDATE:
rv = tickCandidate(r);
break;
case RAFT_LEADER:
rv = tickLeader(r);
break;
}
return rv;
}
void tickCb(struct raft_io *io)
{
struct raft *r;
int rv;
r = io->data;
rv = tick(r);
if (rv != 0) {
convertToUnavailable(r);
return;
}
/* For all states: if there is a leadership transfer request in progress,
* check if it's expired. */
if (r->transfer != NULL) {
raft_time now = r->io->time(r->io);
if (now - r->transfer->start >= r->election_timeout) {
membershipLeadershipTransferClose(r);
}
}
}
#undef tracef