diff --git a/Cargo.lock b/Cargo.lock index 95a190337dd..b53573171d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11265,6 +11265,7 @@ dependencies = [ "ic-test-utilities-logger", "ic-types", "ic-types-test-utils", + "mockall", "prometheus", "prost", "rand 0.8.5", diff --git a/rs/p2p/state_sync_manager/BUILD.bazel b/rs/p2p/state_sync_manager/BUILD.bazel index 17dc76b1166..1fed9d66691 100644 --- a/rs/p2p/state_sync_manager/BUILD.bazel +++ b/rs/p2p/state_sync_manager/BUILD.bazel @@ -35,6 +35,7 @@ DEV_DEPENDENCIES = [ "//rs/p2p/test_utils", "//rs/test_utilities/logger", "//rs/types/types_test_utils", + "@crate_index//:mockall", "@crate_index//:turmoil", ] diff --git a/rs/p2p/state_sync_manager/Cargo.toml b/rs/p2p/state_sync_manager/Cargo.toml index a720204428b..a0769a85bfd 100644 --- a/rs/p2p/state_sync_manager/Cargo.toml +++ b/rs/p2p/state_sync_manager/Cargo.toml @@ -34,5 +34,5 @@ ic-p2p-test-utils = { path = "../test_utils" } ic-state-manager = { path = "../../state_manager" } ic-test-utilities-logger = { path = "../../test_utilities/logger" } ic-types-test-utils = { path = "../../types/types_test_utils" } +mockall = { workspace = true } turmoil = { workspace = true } - diff --git a/rs/p2p/state_sync_manager/src/lib.rs b/rs/p2p/state_sync_manager/src/lib.rs index 06a7ae98dbd..2213730a7a8 100644 --- a/rs/p2p/state_sync_manager/src/lib.rs +++ b/rs/p2p/state_sync_manager/src/lib.rs @@ -150,8 +150,12 @@ impl StateSyncManager { self.metrics.adverts_received_total.inc(); // Remove ongoing state sync if finished or try to add peer if ongoing. if let Some(ongoing) = &mut self.ongoing_state_sync { - // Try to add peer to state sync peer set. - let _ = ongoing.sender.try_send(peer_id); + if ongoing.artifact_id == artifact_id { + // `try_send` is used beacuse the ongoing state sync can be blocked. This can, for example happen because of + // file system operations. In that case we don't want to block the main event loop here. It is also fine + // to drop adverts since peers will readvertise anyway. + let _ = ongoing.sender.try_send(peer_id); + } if ongoing.jh.is_finished() { info!(self.log, "Cleaning up state sync {}", artifact_id.height); self.ongoing_state_sync = None; @@ -178,7 +182,7 @@ impl StateSyncManager { &self.rt, self.metrics.ongoing_state_sync_metrics.clone(), Arc::new(Mutex::new(chunkable)), - artifact_id, + artifact_id.clone(), self.state_sync.clone(), self.transport.clone(), self.cancellation.child_token(), @@ -234,3 +238,119 @@ impl StateSyncManager { } } } + +#[cfg(test)] +mod tests { + use std::backtrace::Backtrace; + + use axum::{http::StatusCode, response::Response}; + use bytes::{Bytes, BytesMut}; + use ic_interfaces::p2p::state_sync::ChunkId; + use ic_metrics::MetricsRegistry; + use ic_p2p_test_utils::mocks::{MockChunkable, MockStateSync, MockTransport}; + use ic_test_utilities_logger::with_test_replica_logger; + use ic_types::{crypto::CryptoHash, CryptoHashOfState, Height}; + use ic_types_test_utils::ids::{NODE_1, NODE_2}; + use mockall::Sequence; + use prost::Message; + use tokio::{runtime::Runtime, sync::Notify}; + + use super::*; + + #[derive(Clone)] + struct TestMessage; + + fn compress_empty_bytes() -> Bytes { + let mut raw = BytesMut::new(); + Bytes::new() + .encode(&mut raw) + .expect("Allocated enough memory"); + Bytes::from(zstd::bulk::compress(&raw, zstd::DEFAULT_COMPRESSION_LEVEL).unwrap()) + } + + /// Don't add peers that advertise a state that differs from the current sync. + #[test] + fn test_reject_peer_with_different_state() { + // Abort process if a thread panics. This catches detached tokio tasks that panic. + // https://github.com/tokio-rs/tokio/issues/4516 + std::panic::set_hook(Box::new(|info| { + let stacktrace = Backtrace::force_capture(); + println!("Got panic. @info:{}\n@stackTrace:{}", info, stacktrace); + std::process::abort(); + })); + with_test_replica_logger(|log| { + let finished = Arc::new(Notify::new()); + let finished_c = finished.clone(); + let mut s = MockStateSync::::default(); + let mut seq = Sequence::new(); + let mut seq2 = Sequence::new(); + s.expect_should_cancel().returning(move |_| false); + s.expect_deliver_state_sync().return_once(move |_| { + finished_c.notify_waiters(); + }); + s.expect_available_states().return_const(vec![]); + let mut t = MockTransport::default(); + t.expect_rpc().returning(|p, _| { + if p == &NODE_2 { + panic!("NODE 2 should not be added to the state sync") + } + Ok(Response::builder() + .status(StatusCode::OK) + .extension(NODE_1) + .body(compress_empty_bytes()) + .unwrap()) + }); + let mut c = MockChunkable::::default(); + // Endless iterator + c.expect_chunks_to_download() + .once() + .returning(|| Box::new((0..50).map(ChunkId::from))); + c.expect_chunks_to_download() + .returning(|| Box::new(std::iter::empty())); + c.expect_add_chunk() + .times(49) + .return_const(Ok(())) + .in_sequence(&mut seq); + c.expect_add_chunk() + .once() + .return_once(|_, _| Ok(())) + .in_sequence(&mut seq); + c.expect_completed() + .times(49) + .return_const(None) + .in_sequence(&mut seq2); + c.expect_completed() + .once() + .return_once(|| Some(TestMessage)) + .in_sequence(&mut seq2); + s.expect_start_state_sync() + .once() + .return_once(|_| Some(Box::new(c))); + + let rt = Runtime::new().unwrap(); + let old_id = StateSyncArtifactId { + height: Height::from(0), + hash: CryptoHashOfState::new(CryptoHash(vec![])), + }; + let id = StateSyncArtifactId { + height: Height::from(1), + hash: CryptoHashOfState::new(CryptoHash(vec![])), + }; + + let (handler_tx, handler_rx) = tokio::sync::mpsc::channel(100); + start_state_sync_manager( + &log, + &MetricsRegistry::default(), + rt.handle(), + Arc::new(t) as Arc<_>, + Arc::new(s) as Arc<_>, + handler_rx, + ); + rt.block_on(async move { + handler_tx.send((id, NODE_1)).await.unwrap(); + handler_tx.send((old_id, NODE_2)).await.unwrap(); + finished.notified().await; + }); + }); + } +} diff --git a/rs/p2p/state_sync_manager/src/ongoing.rs b/rs/p2p/state_sync_manager/src/ongoing.rs index 07e2bcee78f..0bd3a8e5686 100644 --- a/rs/p2p/state_sync_manager/src/ongoing.rs +++ b/rs/p2p/state_sync_manager/src/ongoing.rs @@ -72,6 +72,7 @@ struct OngoingStateSync { pub(crate) struct OngoingStateSyncHandle { pub sender: Sender, + pub artifact_id: StateSyncArtifactId, pub jh: JoinHandle<()>, } @@ -94,7 +95,7 @@ pub(crate) fn start_ongoing_state_sync( let ongoing = OngoingStateSync { log, rt: rt.clone(), - artifact_id, + artifact_id: artifact_id.clone(), metrics, transport, cancellation, @@ -111,6 +112,7 @@ pub(crate) fn start_ongoing_state_sync( let jh = rt.spawn(ongoing.run()); OngoingStateSyncHandle { sender: new_peers_tx, + artifact_id, jh, } } @@ -148,7 +150,6 @@ impl OngoingStateSync { // Usually it is discouraged to use await in the event loop. // In this case it is ok because the function only is async if state sync completed. self.handle_downloaded_chunk_result(result).await; - self.spawn_chunk_downloads(); } Err(err) => { @@ -383,7 +384,6 @@ mod tests { use tokio::runtime::Runtime; use super::*; - #[derive(Clone)] struct TestMessage;