Skip to content

Commit

Permalink
fix: after 2 log compaction, membership should be able to be extract …
Browse files Browse the repository at this point in the history
…from prev compaction log
  • Loading branch information
drmingdrmer committed Jul 13, 2021
1 parent 447dc11 commit dba2403
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 1 deletion.
124 changes: 124 additions & 0 deletions async-raft/tests/snapshot_uses_prev_snap_membership.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
mod fixtures;

use std::sync::Arc;

use anyhow::Result;
use async_raft::raft::MembershipConfig;
use async_raft::Config;
use async_raft::LogId;
use async_raft::RaftStorage;
use async_raft::SnapshotPolicy;
use async_raft::State;
use fixtures::RaftRouter;
use maplit::hashset;

/// Test a second compaction should not lose membership.
///
/// What does this test do?
///
/// - build a cluster of 2 nodes.
/// - send enough requests to trigger a snapshot.
/// - send just enough request to trigger another snapshot.
/// - ensure membership is still valid.
///
/// export RUST_LOG=async_raft,memstore,snapshot_uses_prev_snap_membership=trace
/// cargo test -p async-raft --test snapshot_uses_prev_snap_membership
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn snapshot_uses_prev_snap_membership() -> Result<()> {
fixtures::init_tracing();

let snapshot_threshold: u64 = 10;

let config = Arc::new(
Config::build("test".into())
.snapshot_policy(SnapshotPolicy::LogsSinceLast(snapshot_threshold))
.validate()
.expect("failed to build Raft config"),
);
let router = Arc::new(RaftRouter::new(config.clone()));

let mut want = 0;

tracing::info!("--- initializing cluster of 2");
{
router.new_raft_node(0).await;

router.wait_for_log(&hashset![0], want, None, "empty").await?;
router.wait_for_state(&hashset![0], State::NonVoter, None, "empty").await?;
router.initialize_from_single_node(0).await?;
want += 1;

router.wait_for_log(&hashset![0], want, None, "init").await?;
router.wait_for_state(&hashset![0], State::Leader, None, "empty").await?;

router.new_raft_node(1).await;
router.add_non_voter(0, 1).await?;

router.change_membership(0, hashset![0, 1]).await?;
want += 2;

router.wait_for_log(&hashset![0, 1], want, None, "cluster of 2").await?;
}

let sto = router.get_storage_handle(&0).await?;

tracing::info!("--- send just enough logs to trigger snapshot");
{
router.client_request_many(0, "0", (snapshot_threshold - want) as usize).await;
want = snapshot_threshold;

router.wait_for_log(&hashset![0, 1], want, None, "send log to trigger snapshot").await?;
router.wait_for_snapshot(&hashset![0], LogId { term: 1, index: want }, None, "snapshot").await?;

let m = sto.get_membership_config().await?;
assert_eq!(
MembershipConfig {
members: hashset![0, 1],
members_after_consensus: None
},
m,
"membership "
);

// TODO(xp): this assertion fails because when change-membership, a append-entries request does not update
// voted_for and does not call save_hard_state.
// Thus the storage layer does not know about the leader==Some(0).
// Update voted_for whenever a new leader is seen would solve this issue.
// router
// .assert_storage_state(
// 1,
// want,
// Some(0),
// want,
// Some((want.into(), 1, MembershipConfig {
// members: hashset![0, 1],
// members_after_consensus: None,
// })),
// )
// .await;
}

tracing::info!("--- send just enough logs to trigger the 2nd snapshot");
{
router.client_request_many(0, "0", (snapshot_threshold * 2 - want) as usize).await;
want = snapshot_threshold * 2;

router.wait_for_log(&hashset![0, 1], want, None, "send log to trigger snapshot").await?;
router.wait_for_snapshot(&hashset![0], LogId { term: 1, index: want }, None, "snapshot").await?;
}

tracing::info!("--- check membership");
{
let m = sto.get_membership_config().await?;
assert_eq!(
MembershipConfig {
members: hashset![0, 1],
members_after_consensus: None
},
m,
"membership "
);
}

Ok(())
}
3 changes: 2 additions & 1 deletion memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
}
}

#[tracing::instrument(level = "trace", skip(self, hs))]
#[tracing::instrument(level = "trace", skip(self))]
async fn save_hard_state(&self, hs: &HardState) -> Result<()> {
*self.hs.write().await = Some(hs.clone());
Ok(())
Expand Down Expand Up @@ -305,6 +305,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
.skip_while(|entry| entry.log_id.index > last_applied_log)
.find_map(|entry| match &entry.payload {
EntryPayload::ConfigChange(cfg) => Some(cfg.membership.clone()),
EntryPayload::SnapshotPointer(cfg) => Some(cfg.membership.clone()),
_ => None,
})
.unwrap_or_else(|| MembershipConfig::new_initial(self.id));
Expand Down

0 comments on commit dba2403

Please sign in to comment.