Skip to content
This repository has been archived by the owner on Mar 4, 2024. It is now read-only.

Don't convert to candidate while entries are being persisted #464

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion include/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,8 @@ struct raft
raft_id id;
char *address;
} current_leader;
uint64_t reserved[8]; /* Future use */
uint64_t append_in_flight_count;
uint64_t reserved[7]; /* Future use */
} follower_state;
struct
{
Expand Down
24 changes: 11 additions & 13 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -851,24 +851,21 @@ static void appendFollowerCb(struct raft_io_append *req, int status)
assert(args->entries != NULL);
assert(args->n_entries > 0);

assert(r->state == RAFT_FOLLOWER || r->state == RAFT_UNAVAILABLE);
if (r->state == RAFT_UNAVAILABLE) {
goto out;
}
assert(r->follower_state.append_in_flight_count > 0);
r->follower_state.append_in_flight_count -= 1;

result.term = r->current_term;
result.version = RAFT_APPEND_ENTRIES_RESULT_VERSION;
result.features = RAFT_DEFAULT_FEATURE_FLAGS;
if (status != 0) {
if (r->state != RAFT_FOLLOWER) {
tracef("local server is not follower -> ignore I/O failure");
goto out;
}
result.rejected = args->prev_log_index + 1;
goto respond;
}

/* If we're shutting down or have errored, ignore the result. */
if (r->state == RAFT_UNAVAILABLE) {
tracef("local server is unavailable -> ignore I/O result");
goto out;
}

/* We received an InstallSnapshot RPC while these entries were being
* persisted to disk */
if (replicationInstallSnapshotBusy(r)) {
Expand All @@ -880,7 +877,7 @@ static void appendFollowerCb(struct raft_io_append *req, int status)
/* If none of the entries that we persisted is present anymore in our
* in-memory log, there's nothing to report or to do. We just discard
* them. */
if (i == 0 || r->state != RAFT_FOLLOWER) {
if (i == 0) {
goto out;
}

Expand Down Expand Up @@ -915,10 +912,10 @@ static void appendFollowerCb(struct raft_io_append *req, int status)
}
}

/* If our state or term number has changed since receiving these entries,
/* If our term number has changed since receiving these entries,
* our current_leader may have changed as well, so don't send a response
* to that server. */
if (r->state != RAFT_FOLLOWER || r->current_term != args->term) {
if (r->current_term != args->term) {
tracef("new role or term since receiving entries -> don't respond");
goto out;
}
Expand Down Expand Up @@ -1176,6 +1173,7 @@ int replicationAppend(struct raft *r,
ErrMsgTransfer(r->io->errmsg, r->errmsg, "io");
goto err_after_acquire_entries;
}
r->follower_state.append_in_flight_count += 1;

entryBatchesDestroy(args->entries, args->n_entries);
return 0;
Expand Down
5 changes: 5 additions & 0 deletions src/tick.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ static int tickFollower(struct raft *r)
electionResetTimer(r);
return 0;
}
if (r->follower_state.append_in_flight_count > 0) {
tracef("append in progress -> don't convert to candidate");
electionResetTimer(r);
return 0;
}
tracef("convert to candidate and start new election");
rv = convertToCandidate(r, false /* disrupt leader */);
if (rv != 0) {
Expand Down
116 changes: 20 additions & 96 deletions test/integration/test_election.c
Original file line number Diff line number Diff line change
Expand Up @@ -770,113 +770,37 @@ TEST(election, preVoteNoStaleVotes, setUp, tearDown, 0, cluster_3_params)
return MUNIT_OK;
}

static char *unpersisted_entries_n[] = {"4", NULL};
static char *unpersisted_entries_n_voting[] = {"3", NULL};

static MunitParameterEnum unpersisted_entries_params[] = {
{CLUSTER_N_PARAM, unpersisted_entries_n},
{CLUSTER_N_VOTING_PARAM, unpersisted_entries_n_voting},
{NULL, NULL},
};

/* When starting an election and sending RequestVote messages, the candidate
* node reports the index and term of its last persisted entry, not of the last
* entry in its in-memory cache of the log, which might contain entries that are
* still being persisted.
*
* In particular, this test exercises the case where the candidate has a not yet
* persisted a configuration change entry in which the candidate is actually not
* a voter anymore. Since we apply new pending configuration entries only once
* persisted, the node is still using the old configuration, where it is a voter
* and this is the reason why it converted to candidate despite having in its
* in-memory log also an entry where it's not a voter anymore. That is all fine,
* however if this candidate reported the index of the last entry in its
* in-memory log cache as opposed to the last persisted one two bad things would
* happen.
*/
TEST(election,
startElectionWithUnpersistedEntries,
setUp,
tearDown,
0,
unpersisted_entries_params)
/* A follower doesn't convert to candidate while waiting for log entries to be
* persisted. */
TEST(election, inFlightAppendBlocksCandidacy, setUp, tearDown, 0, NULL)
{
struct fixture *f = data;
struct raft_change req;
int rv;
struct raft_apply req;

/* Server 1 takes a very long time to persist entries. */
/* Server 1 takes a long time to persist entries. */
CLUSTER_SET_DISK_LATENCY(1, 10000);

/* Disconnect server 1 from server 0, so it won't vote for it. */
CLUSTER_SATURATE(0, 1);
CLUSTER_SATURATE(1, 0);

CLUSTER_START;

/* Server 0 wins elections for term 2, with vote from server 2. */
CLUSTER_STEP_UNTIL_HAS_LEADER(1500);
munit_assert_int(CLUSTER_LEADER, ==, 0);

/* Demote server 1 to stand-by. */
rv = raft_assign(CLUSTER_RAFT(0), &req, 2 /* ID */, RAFT_STANDBY, NULL);
munit_assert_int(rv, ==, 0);
CLUSTER_STEP_UNTIL_APPLIED(0, 3 /* entry index */, 1000);
/* Server 0 is the leader. It replicates a log entry. */
CLUSTER_ELECT(0);
CLUSTER_APPLY_ADD_X(0, &req, 1, NULL);

/* Promote server 3 to voter. */
rv = raft_assign(CLUSTER_RAFT(0), &req, 4, RAFT_VOTER, NULL);
munit_assert_int(rv, ==, 0);
/* Server 1 receives the entry. */
CLUSTER_STEP_UNTIL_DELIVERED(0, 1, 1000);

/* Wait for server 3 to become aware that it's a voter. */
CLUSTER_STEP_UNTIL_APPLIED(3, 4 /* entry index */, 1000);

/* In the meantime server 1 has timeout and has started an (unsuccessful)
* election. */
munit_assert_int(CLUSTER_STATE(1), ==, RAFT_CANDIDATE);

/* Reconnect server 1 to server 0, so it will receive up to index 4,
* although it won't persist it since it has a high disk latency. */
CLUSTER_DESATURATE(0, 1);
CLUSTER_DESATURATE(1, 0);

/* Wait for server 1 to get contacted by server 0, steps down receive
* entries from it */
CLUSTER_STEP_UNTIL_STATE_IS(1, RAFT_FOLLOWER, 500);
munit_assert_int(raft_last_index(CLUSTER_RAFT(1)), ==, 4);

/* Create a network partition, with server 0 and 3 in one partition and
* server 1 and 2 in another partition. */
/* Contact is lost between servers 0 and 1. */
CLUSTER_SATURATE(0, 1);
CLUSTER_SATURATE(1, 0);
CLUSTER_SATURATE(0, 2);
CLUSTER_SATURATE(2, 0);
CLUSTER_SATURATE(3, 1);
CLUSTER_SATURATE(1, 3);
CLUSTER_SATURATE(3, 2);
CLUSTER_SATURATE(2, 3);

/* Eventually both server 1 and server 2 time out and start elections,
* because they have been disconnected from the leader.
*
* Server 1 is not a voter in the latest configuration at index 4, but it
* nevertheless converts to candidate as it's still using the original
* configuration at index 3, because it did receive the configuration at
* index 4, but hasn't persisted it yet. */
CLUSTER_STEP_UNTIL_STATE_IS(1, RAFT_CANDIDATE, 1500);
CLUSTER_STEP_UNTIL_STATE_IS(2, RAFT_CANDIDATE, 1500);

/* Server 2 can't win the election, because it does not consider server 1 a
* voter, according to the configuration at index 4.
*
* Server 1 also can't win the election, because the last index it sends is
* the index of its last persisted entry (entry 1), and so server 2 doesn't
* grant its vote. */
CLUSTER_STEP_UNTIL_ELAPSED(3000);
CLUSTER_STEP_UNTIL_STATE_IS(1, RAFT_CANDIDATE, 1500);
CLUSTER_STEP_UNTIL_STATE_IS(2, RAFT_CANDIDATE, 1500);

/* Server 0 is still leader, since it can contact server 3. */
munit_assert_int(CLUSTER_STATE(0), ==, RAFT_LEADER);

/* Several election timeouts lapse, but server 1 does not become a
* candidate, because it's waiting for the entry to be persisted. */
CLUSTER_STEP_UNTIL_ELAPSED(5000);
munit_assert_int(CLUSTER_STATE(1), ==, RAFT_FOLLOWER);

/* Eventually, server 1 finishes persisting the entry and becomes a
* candidate. */
CLUSTER_STEP_UNTIL_STATE_IS(1, RAFT_CANDIDATE, 10000);

return MUNIT_OK;
}
Loading