diff --git a/rs/p2p/consensus_manager/BUILD.bazel b/rs/p2p/consensus_manager/BUILD.bazel index a836f4e88e5..aa17d8e8ca6 100644 --- a/rs/p2p/consensus_manager/BUILD.bazel +++ b/rs/p2p/consensus_manager/BUILD.bazel @@ -22,6 +22,7 @@ DEPENDENCIES = [ "@crate_index//:rand", "@crate_index//:slog", "@crate_index//:tokio", + "@crate_index//:tokio-util", ] DEV_DEPENDENCIES = [ @@ -29,7 +30,6 @@ DEV_DEPENDENCIES = [ "//rs/test_utilities/logger", "//rs/types/types_test_utils", "@crate_index//:mockall", - "@crate_index//:tokio-util", "@crate_index//:turmoil", ] diff --git a/rs/p2p/consensus_manager/Cargo.toml b/rs/p2p/consensus_manager/Cargo.toml index b1afdb86b8f..b4c0afe2d82 100644 --- a/rs/p2p/consensus_manager/Cargo.toml +++ b/rs/p2p/consensus_manager/Cargo.toml @@ -22,11 +22,11 @@ prometheus = { workspace = true } rand = "0.8.5" slog = { workspace = true } tokio = { workspace = true } +tokio-util = { workspace = true } [dev-dependencies] ic-p2p-test-utils = { path = "../test_utils" } ic-test-utilities-logger = { path = "../../test_utilities/logger" } ic-types-test-utils = { path = "../../types/types_test_utils" } mockall = { workspace = true } -tokio-util = { workspace = true } turmoil = { workspace = true } diff --git a/rs/p2p/consensus_manager/src/lib.rs b/rs/p2p/consensus_manager/src/lib.rs index 72534fa7fa7..126c1aea8fb 100644 --- a/rs/p2p/consensus_manager/src/lib.rs +++ b/rs/p2p/consensus_manager/src/lib.rs @@ -27,6 +27,7 @@ use tokio::{ watch, }, }; +use tokio_util::sync::CancellationToken; mod metrics; mod receiver; @@ -40,6 +41,7 @@ pub struct ConsensusManagerBuilder { rt_handle: Handle, clients: Vec, router: Option, + cancellation_token: CancellationToken, } impl ConsensusManagerBuilder { @@ -50,6 +52,7 @@ impl ConsensusManagerBuilder { rt_handle, clients: Vec::new(), router: None, + cancellation_token: CancellationToken::new(), } } @@ -68,6 +71,7 @@ impl ConsensusManagerBuilder { let log = self.log.clone(); let rt_handle = self.rt_handle.clone(); let metrics_registry = self.metrics_registry.clone(); + let cancellation_token = self.cancellation_token.child_token(); let builder = move |transport: Arc, topology_watcher| { start_consensus_manager( @@ -81,6 +85,7 @@ impl ConsensusManagerBuilder { sender, transport, topology_watcher, + cancellation_token, ) }; @@ -97,10 +102,11 @@ impl ConsensusManagerBuilder { self, transport: Arc, topology_watcher: watch::Receiver, - ) { + ) -> CancellationToken { for client in self.clients { client(transport.clone(), topology_watcher.clone()); } + self.cancellation_token } } @@ -117,6 +123,7 @@ fn start_consensus_manager( sender: UnboundedSender>, transport: Arc, topology_watcher: watch::Receiver, + cancellation_token: CancellationToken, ) where Pool: 'static + Send + Sync + ValidatedPoolReader, Artifact: ArtifactKind, @@ -130,6 +137,7 @@ fn start_consensus_manager( raw_pool.clone(), transport.clone(), adverts_to_send, + cancellation_token, ); ConsensusManagerReceiver::run( diff --git a/rs/p2p/consensus_manager/src/sender.rs b/rs/p2p/consensus_manager/src/sender.rs index d3a86b6f02b..2e44e20b6b7 100644 --- a/rs/p2p/consensus_manager/src/sender.rs +++ b/rs/p2p/consensus_manager/src/sender.rs @@ -1,5 +1,6 @@ use std::{ collections::{hash_map::Entry, HashMap}, + panic, sync::{Arc, RwLock}, time::Duration, }; @@ -19,9 +20,10 @@ use tokio::{ runtime::Handle, select, sync::mpsc::Receiver, - task::{AbortHandle, JoinHandle, JoinSet}, + task::{JoinError, JoinHandle, JoinSet}, time, }; +use tokio_util::sync::CancellationToken; use crate::{ metrics::ConsensusManagerMetrics, uri_prefix, CommitId, SlotNumber, SlotUpdate, Update, @@ -38,6 +40,20 @@ const BACKOFF_MULTIPLIER: f64 = 2.0; // Used to log warnings if the slot table grows beyond the threshold. const SLOT_TABLE_THRESHOLD: u64 = 30_000; +// Convenience function to check for join errors and panic on them. +fn panic_on_join_err(result: Result) -> T { + match result { + Ok(value) => value, + Err(err) => { + if err.is_panic() { + panic::resume_unwind(err.into_panic()); + } else { + panic!("Join error: {:?}", err); + } + } + } +} + pub(crate) struct ConsensusManagerSender { log: ReplicaLogger, metrics: ConsensusManagerMetrics, @@ -47,7 +63,9 @@ pub(crate) struct ConsensusManagerSender { adverts_to_send: Receiver>, slot_manager: AvailableSlotSet, current_commit_id: CommitId, - active_adverts: HashMap, SlotNumber)>, + active_adverts: HashMap, + join_set: JoinSet<()>, + cancellation_token: CancellationToken, } impl ConsensusManagerSender { @@ -58,7 +76,8 @@ impl ConsensusManagerSender { pool_reader: Arc + Send + Sync>>, transport: Arc, adverts_to_send: Receiver>, - ) { + cancellation_token: CancellationToken, + ) -> JoinHandle<()> { let slot_manager = AvailableSlotSet::new(log.clone(), metrics.clone(), Artifact::TAG.into()); @@ -72,9 +91,11 @@ impl ConsensusManagerSender { slot_manager, current_commit_id: CommitId::from(0), active_adverts: HashMap::new(), + join_set: JoinSet::new(), + cancellation_token, }; - rt_handle.spawn(manager.start_event_loop()); + rt_handle.spawn(manager.start_event_loop()) } async fn start_event_loop(mut self) { @@ -92,28 +113,62 @@ impl ConsensusManagerSender { self.handle_send_advert(advert); } - while let Some(advert) = self.adverts_to_send.recv().await { - match advert { - ArtifactProcessorEvent::Advert(advert) => self.handle_send_advert(advert), - ArtifactProcessorEvent::Purge(id) => { - self.handle_purge_advert(&id); + loop { + select! { + _ = self.cancellation_token.cancelled() => { + error!( + self.log, + "Sender event loop for the P2P client `{:?}` terminated. No more adverts will be sent for this client.", + uri_prefix::() + ); + break; + } + Some(advert) = self.adverts_to_send.recv() => { + match advert { + ArtifactProcessorEvent::Advert(advert) => self.handle_send_advert(advert), + ArtifactProcessorEvent::Purge(id) => self.handle_purge_advert(&id), + } + + self.current_commit_id.inc_assign(); + } + + Some(result) = self.join_set.join_next() => { + panic_on_join_err(result); } } - self.current_commit_id.inc_assign(); + #[cfg(debug_assertions)] + { + if !(self.join_set.len() >= self.active_adverts.len()) { + // This invariant can be violated if the root cancellation token is cancelled. + // It can be violated because the active_adverts HashMap is only cleared + // when purging artifacts, and not when the tasks join due to a cancellation + // not triggered by the manager. + let is_not_cancelled = + time::timeout(Duration::from_secs(5), self.cancellation_token.cancelled()) + .await + .is_err(); + + if is_not_cancelled { + panic!( + "Invariant violated: join_set.len() {:?} >= active_adverts.len() {:?}.", + self.join_set.len(), + self.active_adverts.len() + ); + } + } + } } - error!( - self.log, - "Sender event loop for the P2P client `{:?}` terminated. No more adverts will be sent for this client.", - uri_prefix::() - ); + while let Some(result) = self.join_set.join_next().await { + panic_on_join_err(result); + } } fn handle_purge_advert(&mut self, id: &Artifact::Id) { - if let Some((send_task, free_slot)) = self.active_adverts.remove(id) { + if let Some((cancellation_token, free_slot)) = self.active_adverts.remove(id) { self.metrics.send_view_consensus_purge_active_total.inc(); - send_task.abort(); + cancellation_token.cancel(); self.slot_manager.push(free_slot); } else { self.metrics.send_view_consensus_dup_purge_total.inc(); @@ -128,6 +183,9 @@ impl ConsensusManagerSender { let slot = self.slot_manager.pop(); + let child_token = self.cancellation_token.child_token(); + let child_token_clone = child_token.clone(); + let send_future = Self::send_advert_to_all_peers( self.rt_handle.clone(), self.log.clone(), @@ -137,9 +195,11 @@ impl ConsensusManagerSender { slot, advert, self.pool_reader.clone(), + child_token_clone, ); - entry.insert((self.rt_handle.spawn(send_future), slot)); + self.join_set.spawn_on(send_future, &self.rt_handle); + entry.insert((child_token, slot)); } else { self.metrics.send_view_consensus_dup_adverts_total.inc(); } @@ -160,6 +220,7 @@ impl ConsensusManagerSender { .. }: Advert, pool_reader: Arc + Send + Sync>>, + cancellation_token: CancellationToken, ) { // Try to push artifact if size below threshold && the artifact is not a relay. let push_artifact = size < ARTIFACT_PUSH_THRESHOLD_BYTES; @@ -194,8 +255,9 @@ impl ConsensusManagerSender { let body = Bytes::from(pb::SlotUpdate::proxy_encode(advert_update)); let mut in_progress_transmissions = JoinSet::new(); - // Stores the connection ID and the `AbortHandle` of the last successful transmission task to a peer. - let mut initiated_transmissions: HashMap = HashMap::new(); + // Stores the connection ID and the [`CancellationToken`] of the last successful transmission task to a peer. + let mut initiated_transmissions: HashMap = + HashMap::new(); let mut periodic_check_interval = time::interval(Duration::from_secs(5)); loop { @@ -205,11 +267,11 @@ impl ConsensusManagerSender { // spawn task for peers with higher conn id or not in completed transmissions. // add task to join map for (peer, connection_id) in transport.peers() { - let is_initiated = initiated_transmissions.get(&peer).is_some_and(|(c, abort_handle)| { - if *c == connection_id { + let is_initiated = initiated_transmissions.get(&peer).is_some_and(|(id, token)| { + if *id == connection_id { true } else { - abort_handle.abort(); + token.cancel(); metrics.send_view_resend_reconnect_total.inc(); false } @@ -217,25 +279,34 @@ impl ConsensusManagerSender { if !is_initiated { + let child_token = cancellation_token.child_token(); + let child_token_clone = child_token.clone(); metrics.send_view_send_to_peer_total.inc(); - let task = send_advert_to_peer(transport.clone(), body.clone(), peer, uri_prefix::()); - let abort_handle = in_progress_transmissions.spawn_on(task, &rt_handle); - initiated_transmissions.insert(peer, (connection_id, abort_handle)); + + let transport = transport.clone(); + let body = body.clone(); + + let send_future = async move { + select! { + _ = send_advert_to_peer(transport, body, peer, uri_prefix::()) => {}, + _ = child_token.cancelled() => {}, + } + }; + + in_progress_transmissions.spawn_on(send_future, &rt_handle); + initiated_transmissions.insert(peer, (connection_id, child_token_clone)); } } } Some(result) = in_progress_transmissions.join_next() => { - match result { - Ok(_) => { - metrics.send_view_send_to_peer_delivered_total.inc(); - }, - Err(err) => { - // Cancelling tasks is ok. Panicking tasks are not. - if err.is_panic() { - std::panic::resume_unwind(err.into_panic()); - } - }, + panic_on_join_err(result); + metrics.send_view_send_to_peer_delivered_total.inc(); + } + _ = cancellation_token.cancelled() => { + while let Some(result) = in_progress_transmissions.join_next().await { + panic_on_join_err(result); } + break; } } } @@ -328,8 +399,6 @@ impl AvailableSlotSet { #[cfg(test)] mod tests { - use std::backtrace::Backtrace; - use ic_logger::replica_logger::no_op_logger; use ic_metrics::MetricsRegistry; use ic_p2p_test_utils::{ @@ -341,25 +410,20 @@ mod tests { use ic_test_utilities_logger::with_test_replica_logger; use ic_types_test_utils::ids::{NODE_1, NODE_2}; use mockall::Sequence; + use tokio::{runtime::Handle, time::timeout}; use super::*; /// Verify that initial validated pool is sent to peers. - #[test] - fn initial_validated_pool_is_sent_to_all_peers() { - // Abort process if a thread panics. This catches detached tokio tasks that panic. - // https://github.com/tokio-rs/tokio/issues/4516 - std::panic::set_hook(Box::new(|info| { - let stacktrace = Backtrace::force_capture(); - println!("Got panic. @info:{}\n@stackTrace:{}", info, stacktrace); - std::process::abort(); - })); - - with_test_replica_logger(|log| { - let rt = tokio::runtime::Runtime::new().unwrap(); + #[tokio::test] + async fn initial_validated_pool_is_sent_to_all_peers() { + let (push_tx, mut push_rx) = tokio::sync::mpsc::unbounded_channel(); + let cancel_token = CancellationToken::new(); + let cancel_token_clone = cancel_token.clone(); + + let consensus_sender_join_handle = with_test_replica_logger(move |log| { let mut mock_reader = MockValidatedPoolReader::new(); let mut mock_transport = MockTransport::new(); - let (push_tx, mut push_rx) = tokio::sync::mpsc::unbounded_channel(); mock_transport .expect_peers() .return_const(vec![(NODE_1, ConnId::from(1))]); @@ -382,31 +446,35 @@ mod tests { ConsensusManagerSender::run( log, ConsensusManagerMetrics::new::(&MetricsRegistry::default()), - rt.handle().clone(), + Handle::current(), Arc::new(RwLock::new(mock_reader)), Arc::new(mock_transport), rx, - ); - assert_eq!(push_rx.blocking_recv().unwrap(), NODE_1); + cancel_token_clone, + ) }); + + assert_eq!(push_rx.recv().await.unwrap(), NODE_1); + + cancel_token.cancel(); + + timeout(Duration::from_secs(5), consensus_sender_join_handle) + .await + .expect("ConsensusManagerSender did not terminate in time.") + .unwrap() } /// Verify that advert is sent to multiple peers. - #[test] - fn send_advert_to_all_peers() { - // Abort process if a thread panics. This catches detached tokio tasks that panic. - // https://github.com/tokio-rs/tokio/issues/4516 - std::panic::set_hook(Box::new(|info| { - let stacktrace = Backtrace::force_capture(); - println!("Got panic. @info:{}\n@stackTrace:{}", info, stacktrace); - std::process::abort(); - })); - - with_test_replica_logger(|log| { - let rt = tokio::runtime::Runtime::new().unwrap(); + #[tokio::test] + async fn send_advert_to_all_peers() { + let (push_tx, mut push_rx) = tokio::sync::mpsc::unbounded_channel(); + let cancel_token = CancellationToken::new(); + let cancel_token_clone = cancel_token.clone(); + let (tx, rx) = tokio::sync::mpsc::channel(100); + + let consensus_sender_join_handle = with_test_replica_logger(|log| { let mut mock_reader = MockValidatedPoolReader::new(); let mut mock_transport = MockTransport::new(); - let (push_tx, mut push_rx) = tokio::sync::mpsc::unbounded_channel(); mock_transport .expect_peers() .return_const(vec![(NODE_1, ConnId::from(1)), (NODE_2, ConnId::from(2))]); @@ -424,44 +492,52 @@ mod tests { .expect_get_validated_by_identifier() .returning(|id| Some(*id)); - let (tx, rx) = tokio::sync::mpsc::channel(100); ConsensusManagerSender::run( log, ConsensusManagerMetrics::new::(&MetricsRegistry::default()), - rt.handle().clone(), + Handle::current(), Arc::new(RwLock::new(mock_reader)), Arc::new(mock_transport), rx, - ); - tx.blocking_send(ArtifactProcessorEvent::Advert( - U64Artifact::message_to_advert(&1), - )) - .unwrap(); - let first_push_node = push_rx.blocking_recv().unwrap(); - let second_push_node = push_rx.blocking_recv().unwrap(); - assert!( - first_push_node == NODE_1 && second_push_node == NODE_2 - || first_push_node == NODE_2 && second_push_node == NODE_1 - ); + cancel_token_clone, + ) }); + + tx.send(ArtifactProcessorEvent::Advert( + U64Artifact::message_to_advert(&1), + )) + .await + .unwrap(); + + let first_push_node = push_rx.recv().await.unwrap(); + let second_push_node: phantom_newtype::Id< + ic_base_types::NodeTag, + ic_base_types::PrincipalId, + > = push_rx.recv().await.unwrap(); + assert!( + first_push_node == NODE_1 && second_push_node == NODE_2 + || first_push_node == NODE_2 && second_push_node == NODE_1 + ); + + cancel_token.cancel(); + + timeout(Duration::from_secs(5), consensus_sender_join_handle) + .await + .expect("ConsensusManagerSender did not terminate in time.") + .unwrap() } /// Verify that increasing connection id causes advert to be resent. - #[test] - fn resend_advert_to_reconnected_peer() { - // Abort process if a thread panics. This catches detached tokio tasks that panic. - // https://github.com/tokio-rs/tokio/issues/4516 - std::panic::set_hook(Box::new(|info| { - let stacktrace = Backtrace::force_capture(); - println!("Got panic. @info:{}\n@stackTrace:{}", info, stacktrace); - std::process::abort(); - })); - - with_test_replica_logger(|log| { - let rt = tokio::runtime::Runtime::new().unwrap(); + #[tokio::test] + async fn resend_advert_to_reconnected_peer() { + let (push_tx, mut push_rx) = tokio::sync::mpsc::unbounded_channel(); + let cancel_token = CancellationToken::new(); + let cancel_token_clone = cancel_token.clone(); + let (tx, rx) = tokio::sync::mpsc::channel(100); + + let consensus_sender_join_handle = with_test_replica_logger(|log| { let mut mock_reader = MockValidatedPoolReader::new(); let mut mock_transport = MockTransport::new(); - let (push_tx, mut push_rx) = tokio::sync::mpsc::unbounded_channel(); let mut seq = Sequence::new(); mock_transport @@ -489,47 +565,50 @@ mod tests { .expect_get_validated_by_identifier() .returning(|id| Some(*id)); - let (tx, rx) = tokio::sync::mpsc::channel(100); ConsensusManagerSender::run( log, ConsensusManagerMetrics::new::(&MetricsRegistry::default()), - rt.handle().clone(), + Handle::current(), Arc::new(RwLock::new(mock_reader)), Arc::new(mock_transport), rx, - ); - tx.blocking_send(ArtifactProcessorEvent::Advert( - U64Artifact::message_to_advert(&1), - )) - .unwrap(); - - // Received two messages from NODE_1 because of reconnection. - let pushes = [ - push_rx.blocking_recv().unwrap(), - push_rx.blocking_recv().unwrap(), - push_rx.blocking_recv().unwrap(), - ]; - assert_eq!(pushes.iter().filter(|&&n| n == NODE_1).count(), 2); - assert_eq!(pushes.iter().filter(|&&n| n == NODE_2).count(), 1); + cancel_token_clone, + ) }); + + tx.send(ArtifactProcessorEvent::Advert( + U64Artifact::message_to_advert(&1), + )) + .await + .unwrap(); + + // Received two messages from NODE_1 because of reconnection. + let pushes = [ + push_rx.recv().await.unwrap(), + push_rx.recv().await.unwrap(), + push_rx.recv().await.unwrap(), + ]; + assert_eq!(pushes.iter().filter(|&&n| n == NODE_1).count(), 2); + assert_eq!(pushes.iter().filter(|&&n| n == NODE_2).count(), 1); + + cancel_token.cancel(); + timeout(Duration::from_secs(5), consensus_sender_join_handle) + .await + .expect("ConsensusManagerSender did not terminate in time.") + .unwrap() } /// Verify failed send is retried. - #[test] - fn retry_peer_error() { - // Abort process if a thread panics. This catches detached tokio tasks that panic. - // https://github.com/tokio-rs/tokio/issues/4516 - std::panic::set_hook(Box::new(|info| { - let stacktrace = Backtrace::force_capture(); - println!("Got panic. @info:{}\n@stackTrace:{}", info, stacktrace); - std::process::abort(); - })); - - with_test_replica_logger(|log| { - let rt = tokio::runtime::Runtime::new().unwrap(); + #[tokio::test] + async fn retry_peer_error() { + let (push_tx, mut push_rx) = tokio::sync::mpsc::unbounded_channel(); + let cancel_token = CancellationToken::new(); + let cancel_token_clone = cancel_token.clone(); + let (tx, rx) = tokio::sync::mpsc::channel(100); + + let consensus_sender_join_handle = with_test_replica_logger(|log| { let mut mock_reader = MockValidatedPoolReader::new(); let mut mock_transport = MockTransport::new(); - let (push_tx, mut push_rx) = tokio::sync::mpsc::unbounded_channel(); let mut seq = Sequence::new(); mock_transport @@ -555,40 +634,42 @@ mod tests { .expect_get_validated_by_identifier() .returning(|id| Some(*id)); - let (tx, rx) = tokio::sync::mpsc::channel(100); ConsensusManagerSender::run( log, ConsensusManagerMetrics::new::(&MetricsRegistry::default()), - rt.handle().clone(), + Handle::current(), Arc::new(RwLock::new(mock_reader)), Arc::new(mock_transport), rx, - ); - tx.blocking_send(ArtifactProcessorEvent::Advert( - U64Artifact::message_to_advert(&1), - )) - .unwrap(); - // Verify that we successfully retried. - assert_eq!(push_rx.blocking_recv().unwrap(), NODE_1); + cancel_token_clone, + ) }); + tx.send(ArtifactProcessorEvent::Advert( + U64Artifact::message_to_advert(&1), + )) + .await + .unwrap(); + // Verify that we successfully retried. + assert_eq!(push_rx.recv().await.unwrap(), NODE_1); + + cancel_token.cancel(); + timeout(Duration::from_secs(5), consensus_sender_join_handle) + .await + .expect("ConsensusManagerSender did not terminate in time.") + .unwrap() } /// Verify commit id increases with new adverts/purge events. - #[test] - fn increasing_commit_id() { - // Abort process if a thread panics. This catches detached tokio tasks that panic. - // https://github.com/tokio-rs/tokio/issues/4516 - std::panic::set_hook(Box::new(|info| { - let stacktrace = Backtrace::force_capture(); - println!("Got panic. @info:{}\n@stackTrace:{}", info, stacktrace); - std::process::abort(); - })); - - with_test_replica_logger(|log| { - let rt = tokio::runtime::Runtime::new().unwrap(); + #[tokio::test] + async fn increasing_commit_id() { + let (commit_id_tx, mut commit_id_rx) = tokio::sync::mpsc::unbounded_channel(); + let cancel_token = CancellationToken::new(); + let cancel_token_clone = cancel_token.clone(); + let (tx, rx) = tokio::sync::mpsc::channel(100); + + let consensus_sender_join_handle = with_test_replica_logger(|log| { let mut mock_reader = MockValidatedPoolReader::new(); let mut mock_transport = MockTransport::new(); - let (commit_id_tx, mut commit_id_rx) = tokio::sync::mpsc::unbounded_channel(); mock_transport .expect_peers() @@ -609,54 +690,59 @@ mod tests { .expect_get_validated_by_identifier() .returning(|id| Some(*id)); - let (tx, rx) = tokio::sync::mpsc::channel(100); ConsensusManagerSender::run( log, ConsensusManagerMetrics::new::(&MetricsRegistry::default()), - rt.handle().clone(), + Handle::current(), Arc::new(RwLock::new(mock_reader)), Arc::new(mock_transport), rx, - ); - // Send advert and verify commit it. - tx.blocking_send(ArtifactProcessorEvent::Advert( - U64Artifact::message_to_advert(&1), - )) - .unwrap(); - assert_eq!(commit_id_rx.blocking_recv().unwrap().get(), 0); - - // Send second advert and observe commit id bump. - tx.blocking_send(ArtifactProcessorEvent::Advert( - U64Artifact::message_to_advert(&2), - )) - .unwrap(); - assert_eq!(commit_id_rx.blocking_recv().unwrap().get(), 1); - // Send purge and new advert and observe commit id increase by 2. - tx.blocking_send(ArtifactProcessorEvent::Purge(2)).unwrap(); - tx.blocking_send(ArtifactProcessorEvent::Advert( - U64Artifact::message_to_advert(&3), - )) - .unwrap(); - assert_eq!(commit_id_rx.blocking_recv().unwrap().get(), 3); + cancel_token_clone, + ) }); + // Send advert and verify commit it. + tx.send(ArtifactProcessorEvent::Advert( + U64Artifact::message_to_advert(&1), + )) + .await + .unwrap(); + assert_eq!(commit_id_rx.recv().await.unwrap().get(), 0); + + // Send second advert and observe commit id bump. + tx.send(ArtifactProcessorEvent::Advert( + U64Artifact::message_to_advert(&2), + )) + .await + .unwrap(); + assert_eq!(commit_id_rx.recv().await.unwrap().get(), 1); + // Send purge and new advert and observe commit id increase by 2. + tx.send(ArtifactProcessorEvent::Purge(2)).await.unwrap(); + tx.send(ArtifactProcessorEvent::Advert( + U64Artifact::message_to_advert(&3), + )) + .await + .unwrap(); + + assert_eq!(commit_id_rx.recv().await.unwrap().get(), 3); + + cancel_token.cancel(); + timeout(Duration::from_secs(5), consensus_sender_join_handle) + .await + .expect("ConsensusManagerSender did not terminate in time.") + .unwrap() } /// Verify that duplicate Advert event does not cause sending twice. - #[test] - fn send_same_advert_twice() { - // Abort process if a thread panics. This catches detached tokio tasks that panic. - // https://github.com/tokio-rs/tokio/issues/4516 - std::panic::set_hook(Box::new(|info| { - let stacktrace = Backtrace::force_capture(); - println!("Got panic. @info:{}\n@stackTrace:{}", info, stacktrace); - std::process::abort(); - })); - - with_test_replica_logger(|log| { - let rt = tokio::runtime::Runtime::new().unwrap(); + #[tokio::test] + async fn send_same_advert_twice() { + let (commit_id_tx, mut commit_id_rx) = tokio::sync::mpsc::unbounded_channel(); + let cancel_token = CancellationToken::new(); + let cancel_token_clone = cancel_token.clone(); + let (tx, rx) = tokio::sync::mpsc::channel(100); + + let consensus_sender_join_handle = with_test_replica_logger(|log| { let mut mock_reader = MockValidatedPoolReader::new(); let mut mock_transport = MockTransport::new(); - let (commit_id_tx, mut commit_id_rx) = tokio::sync::mpsc::unbounded_channel(); mock_transport .expect_peers() @@ -677,33 +763,104 @@ mod tests { .expect_get_validated_by_identifier() .returning(|id| Some(*id)); - let (tx, rx) = tokio::sync::mpsc::channel(100); ConsensusManagerSender::run( log, ConsensusManagerMetrics::new::(&MetricsRegistry::default()), - rt.handle().clone(), + Handle::current(), Arc::new(RwLock::new(mock_reader)), Arc::new(mock_transport), rx, - ); - // Send advert and verify commit id. - tx.blocking_send(ArtifactProcessorEvent::Advert( - U64Artifact::message_to_advert(&1), - )) - .unwrap(); - assert_eq!(commit_id_rx.blocking_recv().unwrap().get(), 0); - // Send same advert again. This should be noop. - tx.blocking_send(ArtifactProcessorEvent::Advert( - U64Artifact::message_to_advert(&1), - )) - .unwrap(); - // Check that new advert is advertised with correct commit id. - tx.blocking_send(ArtifactProcessorEvent::Advert( - U64Artifact::message_to_advert(&2), - )) - .unwrap(); - assert_eq!(commit_id_rx.blocking_recv().unwrap().get(), 2); + cancel_token_clone, + ) + }); + // Send advert and verify commit id. + tx.send(ArtifactProcessorEvent::Advert( + U64Artifact::message_to_advert(&1), + )) + .await + .unwrap(); + assert_eq!(commit_id_rx.recv().await.unwrap().get(), 0); + + // Send same advert again. This should be noop. + tx.send(ArtifactProcessorEvent::Advert( + U64Artifact::message_to_advert(&1), + )) + .await + .unwrap(); + + // Check that new advert is advertised with correct commit id. + tx.send(ArtifactProcessorEvent::Advert( + U64Artifact::message_to_advert(&2), + )) + .await + .unwrap(); + + assert_eq!(commit_id_rx.recv().await.unwrap().get(), 2); + + cancel_token.cancel(); + timeout(Duration::from_secs(5), consensus_sender_join_handle) + .await + .expect("ConsensusManagerSender did not terminate in time.") + .unwrap() + } + + /// Verify that a panic happening in one of the tasks spawned by the ConsensusManagerSender + /// is propagated when awaiting on [`ConsensusManagerSender::run`]. + /// + // This test is ignored because the panic is caught in the panic hook set in /consensus_manager/receiver.rs + #[ignore] + #[tokio::test] + async fn panic_in_task_is_propagated() { + let cancel_token = CancellationToken::new(); + let cancel_token_clone = cancel_token.clone(); + let (tx, rx) = tokio::sync::mpsc::channel(100); + + let consensus_sender_join_handle = with_test_replica_logger(|log| { + let mut mock_reader = MockValidatedPoolReader::new(); + let mut mock_transport = MockTransport::new(); + + mock_transport + .expect_peers() + .return_const(vec![(NODE_1, ConnId::from(1))]); + + // We don't create an expectation for `push` here, so that we can trigger a panic + mock_transport + .expect_push() + .times(2) + .returning(move |_, _| { + panic!("Panic in mock transport expectation."); + }); + + mock_reader + .expect_get_all_validated_by_filter() + .returning(|_| Box::new(std::iter::empty())); + mock_reader + .expect_get_validated_by_identifier() + .returning(|id| Some(*id)); + + ConsensusManagerSender::run( + log, + ConsensusManagerMetrics::new::(&MetricsRegistry::default()), + Handle::current(), + Arc::new(RwLock::new(mock_reader)), + Arc::new(mock_transport), + rx, + cancel_token_clone, + ) }); + + tx.send(ArtifactProcessorEvent::Advert( + U64Artifact::message_to_advert(&1), + )) + .await + .unwrap(); + + let join_error = timeout(Duration::from_secs(5), consensus_sender_join_handle) + .await + .expect("ConsensusManagerSender should terminate since the downstream service `transport` panicked.") + .expect_err("Expected a join error"); + + assert!(join_error.is_panic(), "The join error should be a panic."); } /// Test that we can take more slots than SLOT_TABLE_THRESHOLD diff --git a/rs/replica/setup_ic_network/src/lib.rs b/rs/replica/setup_ic_network/src/lib.rs index af27b070d9c..86b14148a4e 100644 --- a/rs/replica/setup_ic_network/src/lib.rs +++ b/rs/replica/setup_ic_network/src/lib.rs @@ -399,7 +399,7 @@ pub fn setup_consensus_and_p2p( state_sync_manager_rx, ); - new_p2p_consensus.run(quic_transport, topology_watcher); + let _cancellation_token = new_p2p_consensus.run(quic_transport, topology_watcher); // Tcp transport let oldest_registry_version_in_use = consensus_pool_cache.get_oldest_registry_version_in_use();