diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index 473362cad..0707dac76 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -584,6 +584,9 @@ where Err(err) => { tracing::warn!(error=%err, "error sending InstallSnapshot RPC to target"); + // If sender is closed, return at once + self.try_drain_events().await?; + // Sleep a short time otherwise in test environment it is a dead-loop that // never yields. Because network implementation does // not yield. @@ -594,6 +597,9 @@ where Err(err) => { tracing::warn!(error=%err, "timeout while sending InstallSnapshot RPC to target"); + // If sender is closed, return at once + self.try_drain_events().await?; + // Sleep a short time otherwise in test environment it is a dead-loop that never // yields. Because network implementation does not yield. sleep(Duration::from_millis(10)).await; diff --git a/tests/tests/snapshot/main.rs b/tests/tests/snapshot/main.rs index 6f7b24c93..812c6cdab 100644 --- a/tests/tests/snapshot/main.rs +++ b/tests/tests/snapshot/main.rs @@ -17,3 +17,4 @@ mod t50_snapshot_line_rate_to_snapshot; mod t50_snapshot_when_lacking_log; mod t51_after_snapshot_add_learner_and_request_a_log; mod t60_snapshot_chunk_size; +mod t90_issue_808_snapshot_to_unreachable_node_should_not_block; diff --git a/tests/tests/snapshot/t90_issue_808_snapshot_to_unreachable_node_should_not_block.rs b/tests/tests/snapshot/t90_issue_808_snapshot_to_unreachable_node_should_not_block.rs new file mode 100644 index 000000000..11f4427c2 --- /dev/null +++ b/tests/tests/snapshot/t90_issue_808_snapshot_to_unreachable_node_should_not_block.rs @@ -0,0 +1,63 @@ +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Result; +use maplit::btreeset; +use openraft::testing::log_id; +use openraft::Config; + +use crate::fixtures::init_default_ut_tracing; +use crate::fixtures::RaftRouter; + +/// When transferring snapshot to unreachable node, it should not block for ever. +#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] +async fn snapshot_to_unreachable_node_should_not_block() -> Result<()> { + let config = Arc::new( + Config { + purge_batch_size: 1, + max_in_snapshot_log_to_keep: 0, + enable_heartbeat: false, + ..Default::default() + } + .validate()?, + ); + let mut router = RaftRouter::new(config.clone()); + + tracing::info!("--- initializing cluster"); + let mut log_index = router.new_cluster(btreeset! {0,1}, btreeset! {2}).await?; + + tracing::info!(log_index, "--- isolate replication 0 -> 2"); + router.isolate_node(2); + + let n = 10; + tracing::info!(log_index, "--- write {} logs", n); + { + log_index += router.client_request_many(0, "0", n).await?; + router.wait(&0, timeout()).log(Some(log_index), format!("{} writes", n)).await?; + } + + let n0 = router.get_raft_handle(&0)?; + + tracing::info!(log_index, "--- build a snapshot"); + { + n0.trigger_snapshot().await?; + + n0.wait(timeout()).snapshot(log_id(1, 0, log_index), "snapshot").await?; + n0.wait(timeout()).purged(Some(log_id(1, 0, log_index)), "logs in snapshot are purged").await?; + } + + tracing::info!( + log_index, + "--- change membership to {{0}}, replication should be closed and re-spawned, snapshot streaming should stop at once" + ); + { + n0.change_membership(btreeset! {0}, true).await?; + n0.wait(timeout()).members(btreeset! {0}, "change membership to {{0}}").await?; + } + + Ok(()) +} + +fn timeout() -> Option { + Some(Duration::from_millis(1_000)) +}