Skip to content
This repository has been archived by the owner on Mar 4, 2024. It is now read-only.

Commit

Permalink
Merge pull request #195 from MathieuBordere/appendentries_while_takin…
Browse files Browse the repository at this point in the history
…g_snapshot

snapshot: Accept AppendEntries RPCs while taking snapshot
  • Loading branch information
stgraber committed Apr 22, 2021
2 parents b1ddac7 + 3069586 commit 269ac8b
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 9 deletions.
2 changes: 1 addition & 1 deletion src/recv_append_entries.c
Expand Up @@ -112,7 +112,7 @@ int recvAppendEntries(struct raft *r,
/* If we are installing a snapshot, ignore these entries. TODO: we should do
* something smarter, e.g. buffering the entries in the I/O backend, which
* should be in charge of serializing everything. */
if (r->snapshot.put.data != NULL && args->n_entries > 0) {
if (replicationInstallSnapshotBusy(r) && args->n_entries > 0) {
tracef("ignoring AppendEntries RPC during snapshot install");
entryBatchesDestroy(args->entries, args->n_entries);
return 0;
Expand Down
15 changes: 8 additions & 7 deletions src/replication.c
Expand Up @@ -33,11 +33,6 @@
#define min(a, b) ((a) < (b) ? (a) : (b))
#endif

static inline bool snapshotInstallBusy(struct raft *r)
{
return r->last_stored == 0 && r->snapshot.put.data != NULL;
}

/* Context of a RAFT_IO_APPEND_ENTRIES request that was submitted with
* raft_io_>send(). */
struct sendAppendEntries
Expand Down Expand Up @@ -855,7 +850,7 @@ static void appendFollowerCb(struct raft_io_append *req, int status)

/* We received an InstallSnapshot RCP while these entries were being
* persisted to disk */
if (snapshotInstallBusy(r)) {
if (replicationInstallSnapshotBusy(r)) {
goto out;
}

Expand Down Expand Up @@ -1086,7 +1081,8 @@ int replicationAppend(struct raft *r,
* entry).
*/
if (n == 0) {
if ((args->leader_commit > r->commit_index) && !snapshotInstallBusy(r)) {
if ((args->leader_commit > r->commit_index)
&& !replicationInstallSnapshotBusy(r)) {
r->commit_index = min(args->leader_commit, r->last_stored);
rv = replicationApply(r);
if (rv != 0) {
Expand Down Expand Up @@ -1577,4 +1573,9 @@ void replicationQuorum(struct raft *r, const raft_index index)
return;
}

inline bool replicationInstallSnapshotBusy(struct raft *r)
{
return r->last_stored == 0 && r->snapshot.put.data != NULL;
}

#undef tracef
3 changes: 3 additions & 0 deletions src/replication.h
Expand Up @@ -76,6 +76,9 @@ int replicationInstallSnapshot(struct raft *r,
raft_index *rejected,
bool *async);

/* Returns `true` if the raft instance is currently installing a snapshot */
bool replicationInstallSnapshotBusy(struct raft *r);

/* Apply any committed entry that was not applied yet.
*
* It must be called by leaders or followers. */
Expand Down
46 changes: 45 additions & 1 deletion test/integration/test_snapshot.c
Expand Up @@ -252,7 +252,13 @@ TEST(snapshot, installMultipleTimeOutAppendAfter, setUp, tearDown, 0, NULL)
static bool server_installing_snapshot(struct raft_fixture *f, void* data) {
(void) f;
const struct raft *r = data;
return r->snapshot.put.data != NULL;
return r->snapshot.put.data != NULL && r->last_stored == 0;
}

static bool server_taking_snapshot(struct raft_fixture *f, void* data) {
(void) f;
const struct raft *r = data;
return r->snapshot.put.data != NULL && r->last_stored != 0;
}

static bool server_snapshot_done(struct raft_fixture *f, void *data) {
Expand Down Expand Up @@ -339,3 +345,41 @@ TEST(snapshot, installSnapshotDuringEntriesWrite, setUp, tearDown, 0, NULL)
CLUSTER_STEP_UNTIL_APPLIED(1, 7, 5000);
return MUNIT_OK;
}

/* Follower receives AppendEntries RPCs while taking a snapshot */
TEST(snapshot, takeSnapshotAppendEntries, setUp, tearDown, 0, NULL)
{
struct fixture *f = data;
(void)params;

/* Set very low threshold and trailing entries number */
SET_SNAPSHOT_THRESHOLD(3);
SET_SNAPSHOT_TRAILING(1);

/* Set a large disk latency on the follower, this will allow AppendEntries
* to be sent while a snapshot is taken */
CLUSTER_SET_DISK_LATENCY(1, 2000);

/* Apply a few of entries, to force a snapshot to be taken. */
CLUSTER_MAKE_PROGRESS;
CLUSTER_MAKE_PROGRESS;
CLUSTER_MAKE_PROGRESS;

/* Step the cluster until server 1 takes a snapshot */
const struct raft *r = CLUSTER_RAFT(1);
CLUSTER_STEP_UNTIL(server_taking_snapshot, (void*) r, 2000);

/* Send AppendEntries RPCs while server 1 is taking a snapshot */
static struct raft_apply reqs[5];
for (int i = 0; i < 5; i++) {
CLUSTER_APPLY_ADD_X(CLUSTER_LEADER, &reqs[i], 1, NULL);
}
CLUSTER_STEP_UNTIL(server_snapshot_done, (void*) r, 5000);

/* Make sure the AppendEntries are applied and we can make progress */
CLUSTER_STEP_UNTIL_APPLIED(1, 9, 5000);
CLUSTER_MAKE_PROGRESS;
CLUSTER_MAKE_PROGRESS;
CLUSTER_STEP_UNTIL_APPLIED(1, 11, 5000);
return MUNIT_OK;
}

0 comments on commit 269ac8b

Please sign in to comment.