From 3069586d23bf20951129e7b554d44785d24ec5c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mathieu=20Border=C3=A9?= Date: Thu, 22 Apr 2021 13:18:29 +0200 Subject: [PATCH] snapshot: Accept AppendEntries RPCs while taking snapshot MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Mathieu Borderé --- src/recv_append_entries.c | 2 +- src/replication.c | 15 ++++++----- src/replication.h | 3 +++ test/integration/test_snapshot.c | 46 +++++++++++++++++++++++++++++++- 4 files changed, 57 insertions(+), 9 deletions(-) diff --git a/src/recv_append_entries.c b/src/recv_append_entries.c index 8d6cc6e4b..e9aa52270 100644 --- a/src/recv_append_entries.c +++ b/src/recv_append_entries.c @@ -112,7 +112,7 @@ int recvAppendEntries(struct raft *r, /* If we are installing a snapshot, ignore these entries. TODO: we should do * something smarter, e.g. buffering the entries in the I/O backend, which * should be in charge of serializing everything. */ - if (r->snapshot.put.data != NULL && args->n_entries > 0) { + if (replicationInstallSnapshotBusy(r) && args->n_entries > 0) { tracef("ignoring AppendEntries RPC during snapshot install"); entryBatchesDestroy(args->entries, args->n_entries); return 0; diff --git a/src/replication.c b/src/replication.c index 3e76329c9..f4672c7d9 100644 --- a/src/replication.c +++ b/src/replication.c @@ -33,11 +33,6 @@ #define min(a, b) ((a) < (b) ? (a) : (b)) #endif -static inline bool snapshotInstallBusy(struct raft *r) -{ - return r->last_stored == 0 && r->snapshot.put.data != NULL; -} - /* Context of a RAFT_IO_APPEND_ENTRIES request that was submitted with * raft_io_>send(). */ struct sendAppendEntries @@ -855,7 +850,7 @@ static void appendFollowerCb(struct raft_io_append *req, int status) /* We received an InstallSnapshot RCP while these entries were being * persisted to disk */ - if (snapshotInstallBusy(r)) { + if (replicationInstallSnapshotBusy(r)) { goto out; } @@ -1086,7 +1081,8 @@ int replicationAppend(struct raft *r, * entry). */ if (n == 0) { - if ((args->leader_commit > r->commit_index) && !snapshotInstallBusy(r)) { + if ((args->leader_commit > r->commit_index) + && !replicationInstallSnapshotBusy(r)) { r->commit_index = min(args->leader_commit, r->last_stored); rv = replicationApply(r); if (rv != 0) { @@ -1577,4 +1573,9 @@ void replicationQuorum(struct raft *r, const raft_index index) return; } +inline bool replicationInstallSnapshotBusy(struct raft *r) +{ + return r->last_stored == 0 && r->snapshot.put.data != NULL; +} + #undef tracef diff --git a/src/replication.h b/src/replication.h index 5d74eaa33..dc2a41cac 100644 --- a/src/replication.h +++ b/src/replication.h @@ -76,6 +76,9 @@ int replicationInstallSnapshot(struct raft *r, raft_index *rejected, bool *async); +/* Returns `true` if the raft instance is currently installing a snapshot */ +bool replicationInstallSnapshotBusy(struct raft *r); + /* Apply any committed entry that was not applied yet. * * It must be called by leaders or followers. */ diff --git a/test/integration/test_snapshot.c b/test/integration/test_snapshot.c index 966f6adcc..96ac9385a 100644 --- a/test/integration/test_snapshot.c +++ b/test/integration/test_snapshot.c @@ -252,7 +252,13 @@ TEST(snapshot, installMultipleTimeOutAppendAfter, setUp, tearDown, 0, NULL) static bool server_installing_snapshot(struct raft_fixture *f, void* data) { (void) f; const struct raft *r = data; - return r->snapshot.put.data != NULL; + return r->snapshot.put.data != NULL && r->last_stored == 0; +} + +static bool server_taking_snapshot(struct raft_fixture *f, void* data) { + (void) f; + const struct raft *r = data; + return r->snapshot.put.data != NULL && r->last_stored != 0; } static bool server_snapshot_done(struct raft_fixture *f, void *data) { @@ -339,3 +345,41 @@ TEST(snapshot, installSnapshotDuringEntriesWrite, setUp, tearDown, 0, NULL) CLUSTER_STEP_UNTIL_APPLIED(1, 7, 5000); return MUNIT_OK; } + +/* Follower receives AppendEntries RPCs while taking a snapshot */ +TEST(snapshot, takeSnapshotAppendEntries, setUp, tearDown, 0, NULL) +{ + struct fixture *f = data; + (void)params; + + /* Set very low threshold and trailing entries number */ + SET_SNAPSHOT_THRESHOLD(3); + SET_SNAPSHOT_TRAILING(1); + + /* Set a large disk latency on the follower, this will allow AppendEntries + * to be sent while a snapshot is taken */ + CLUSTER_SET_DISK_LATENCY(1, 2000); + + /* Apply a few of entries, to force a snapshot to be taken. */ + CLUSTER_MAKE_PROGRESS; + CLUSTER_MAKE_PROGRESS; + CLUSTER_MAKE_PROGRESS; + + /* Step the cluster until server 1 takes a snapshot */ + const struct raft *r = CLUSTER_RAFT(1); + CLUSTER_STEP_UNTIL(server_taking_snapshot, (void*) r, 2000); + + /* Send AppendEntries RPCs while server 1 is taking a snapshot */ + static struct raft_apply reqs[5]; + for (int i = 0; i < 5; i++) { + CLUSTER_APPLY_ADD_X(CLUSTER_LEADER, &reqs[i], 1, NULL); + } + CLUSTER_STEP_UNTIL(server_snapshot_done, (void*) r, 5000); + + /* Make sure the AppendEntries are applied and we can make progress */ + CLUSTER_STEP_UNTIL_APPLIED(1, 9, 5000); + CLUSTER_MAKE_PROGRESS; + CLUSTER_MAKE_PROGRESS; + CLUSTER_STEP_UNTIL_APPLIED(1, 11, 5000); + return MUNIT_OK; +}