Skip to content

Commit

Permalink
Fix: replication should be able to shutdown when replicating snapshot…
Browse files Browse the repository at this point in the history
… to unreachable node

If a replication is sending a snapshot, it should
periodically verify the input channel's status. When the input channel
is closed during replication rebuilding, it should immediately exit the
loop instead of attempting retries indefinitely.

- Fix: #808
  • Loading branch information
drmingdrmer committed May 1, 2023
1 parent 60d49fd commit d012705
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 0 deletions.
6 changes: 6 additions & 0 deletions openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,9 @@ where
Err(err) => {
tracing::warn!(error=%err, "error sending InstallSnapshot RPC to target");

// If sender is closed, return at once
self.try_drain_events().await?;

// Sleep a short time otherwise in test environment it is a dead-loop that
// never yields. Because network implementation does
// not yield.
Expand All @@ -594,6 +597,9 @@ where
Err(err) => {
tracing::warn!(error=%err, "timeout while sending InstallSnapshot RPC to target");

// If sender is closed, return at once
self.try_drain_events().await?;

// Sleep a short time otherwise in test environment it is a dead-loop that never
// yields. Because network implementation does not yield.
sleep(Duration::from_millis(10)).await;
Expand Down
1 change: 1 addition & 0 deletions tests/tests/snapshot/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ mod t50_snapshot_line_rate_to_snapshot;
mod t50_snapshot_when_lacking_log;
mod t51_after_snapshot_add_learner_and_request_a_log;
mod t60_snapshot_chunk_size;
mod t90_issue_808_snapshot_to_unreachable_node_should_not_block;
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use maplit::btreeset;
use openraft::testing::log_id;
use openraft::Config;

use crate::fixtures::init_default_ut_tracing;
use crate::fixtures::RaftRouter;

/// When transferring snapshot to unreachable node, it should not block for ever.
#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn snapshot_to_unreachable_node_should_not_block() -> Result<()> {
let config = Arc::new(
Config {
purge_batch_size: 1,
max_in_snapshot_log_to_keep: 0,
enable_heartbeat: false,
..Default::default()
}
.validate()?,
);
let mut router = RaftRouter::new(config.clone());

tracing::info!("--- initializing cluster");
let mut log_index = router.new_cluster(btreeset! {0,1}, btreeset! {2}).await?;

tracing::info!(log_index, "--- isolate replication 0 -> 2");
router.isolate_node(2);

let n = 10;
tracing::info!(log_index, "--- write {} logs", n);
{
log_index += router.client_request_many(0, "0", n).await?;
router.wait(&0, timeout()).log(Some(log_index), format!("{} writes", n)).await?;
}

let n0 = router.get_raft_handle(&0)?;

tracing::info!(log_index, "--- build a snapshot");
{
n0.trigger_snapshot().await?;

n0.wait(timeout()).snapshot(log_id(1, 0, log_index), "snapshot").await?;
n0.wait(timeout()).purged(Some(log_id(1, 0, log_index)), "logs in snapshot are purged").await?;
}

tracing::info!(
log_index,
"--- change membership to {{0}}, replication should be closed and re-spawned, snapshot streaming should stop at once"
);
{
n0.change_membership(btreeset! {0}, true).await?;
n0.wait(timeout()).members(btreeset! {0}, "change membership to {{0}}").await?;
}

Ok(())
}

fn timeout() -> Option<Duration> {
Some(Duration::from_millis(1_000))
}

0 comments on commit d012705

Please sign in to comment.