diff --git a/crates/hotshot/src/tasks/task_state.rs b/crates/hotshot/src/tasks/task_state.rs index 0d6217ec35..56ec2489c0 100644 --- a/crates/hotshot/src/tasks/task_state.rs +++ b/crates/hotshot/src/tasks/task_state.rs @@ -1,9 +1,3 @@ -use std::{ - collections::{BTreeMap, HashMap}, - marker::PhantomData, - sync::{atomic::AtomicBool, Arc}, -}; - use async_trait::async_trait; use hotshot_task_impls::{ builder::BuilderClient, consensus::ConsensusTaskState, consensus2::Consensus2TaskState, @@ -16,6 +10,12 @@ use hotshot_types::traits::{ consensus_api::ConsensusApi, node_implementation::{ConsensusTime, NodeImplementation, NodeType}, }; +use std::{ + collections::{BTreeMap, HashMap}, + marker::PhantomData, + sync::{atomic::AtomicBool, Arc}, +}; + use vbs::version::StaticVersionType; use crate::types::SystemContextHandle; @@ -184,6 +184,7 @@ impl> CreateTaskState { async fn create_from(handle: &SystemContextHandle) -> ConsensusTaskState { let consensus = handle.hotshot.consensus(); + let timeout_task = handle.spawn_initial_timeout_task(); ConsensusTaskState { consensus, @@ -194,7 +195,7 @@ impl> CreateTaskState payload_commitment_and_metadata: None, vote_collector: None.into(), timeout_vote_collector: None.into(), - timeout_task: None, + timeout_task, spawned_tasks: BTreeMap::new(), formed_upgrade_certificate: None, proposal_cert: None, @@ -248,6 +249,8 @@ impl> CreateTaskState handle: &SystemContextHandle, ) -> QuorumProposalTaskState { let consensus = handle.hotshot.consensus(); + let timeout_task = handle.spawn_initial_timeout_task(); + QuorumProposalTaskState { latest_proposed_view: handle.cur_view().await, propose_dependencies: HashMap::new(), @@ -262,7 +265,7 @@ impl> CreateTaskState private_key: handle.private_key().clone(), storage: Arc::clone(&handle.storage), timeout: handle.hotshot.config.next_view_timeout, - timeout_task: None, + timeout_task, round_start_delay: handle.hotshot.config.round_start_delay, id: handle.hotshot.id, version: *handle.hotshot.version.read().await, @@ -278,6 +281,8 @@ impl> CreateTaskState handle: &SystemContextHandle, ) -> QuorumProposalRecvTaskState { let consensus = handle.hotshot.consensus(); + let timeout_task = handle.spawn_initial_timeout_task(); + QuorumProposalRecvTaskState { public_key: handle.public_key().clone(), private_key: handle.private_key().clone(), @@ -286,7 +291,7 @@ impl> CreateTaskState quorum_network: Arc::clone(&handle.hotshot.networks.quorum_network), quorum_membership: handle.hotshot.memberships.quorum_membership.clone().into(), timeout_membership: handle.hotshot.memberships.quorum_membership.clone().into(), - timeout_task: None, + timeout_task, timeout: handle.hotshot.config.next_view_timeout, round_start_delay: handle.hotshot.config.round_start_delay, output_event_stream: handle.hotshot.external_event_stream.0.clone(), @@ -308,6 +313,8 @@ impl> CreateTaskState { async fn create_from(handle: &SystemContextHandle) -> Consensus2TaskState { let consensus = handle.hotshot.consensus(); + let timeout_task = handle.spawn_initial_timeout_task(); + Consensus2TaskState { public_key: handle.public_key().clone(), private_key: handle.private_key().clone(), @@ -322,7 +329,7 @@ impl> CreateTaskState storage: Arc::clone(&handle.storage), cur_view: handle.cur_view().await, output_event_stream: handle.hotshot.external_event_stream.0.clone(), - timeout_task: None, + timeout_task, timeout: handle.hotshot.config.next_view_timeout, consensus, last_decided_view: handle.cur_view().await, diff --git a/crates/hotshot/src/types/handle.rs b/crates/hotshot/src/types/handle.rs index a35fc43e76..cb859b659a 100644 --- a/crates/hotshot/src/types/handle.rs +++ b/crates/hotshot/src/types/handle.rs @@ -3,10 +3,13 @@ use std::sync::Arc; use async_broadcast::{InactiveReceiver, Receiver, Sender}; +use async_compatibility_layer::art::{async_sleep, async_spawn}; use async_lock::RwLock; +#[cfg(async_executor_impl = "async-std")] +use async_std::task::JoinHandle; use futures::Stream; use hotshot_task::task::TaskRegistry; -use hotshot_task_impls::events::HotShotEvent; +use hotshot_task_impls::{events::HotShotEvent, helpers::broadcast_event}; use hotshot_types::{ boxed_sync, consensus::Consensus, @@ -15,6 +18,9 @@ use hotshot_types::{ traits::{election::Membership, node_implementation::NodeType}, BoxSyncFuture, }; +use std::time::Duration; +#[cfg(async_executor_impl = "tokio")] +use tokio::task::JoinHandle; use crate::{traits::NodeImplementation, types::Event, SystemContext}; @@ -190,4 +196,26 @@ impl + 'static> SystemContextHandl pub fn storage(&self) -> Arc> { Arc::clone(&self.storage) } + + /// A helper function to spawn the initial timeout task from a given `SystemContextHandle`. + #[must_use] + pub fn spawn_initial_timeout_task(&self) -> JoinHandle<()> { + // Clone the event stream that we send the timeout event to + let event_stream = self.internal_event_stream.0.clone(); + let next_view_timeout = self.hotshot.config.next_view_timeout; + let start_view = self.hotshot.start_view; + + // Spawn a task that will sleep for the next view timeout and then send a timeout event + // if not cancelled + async_spawn({ + async move { + async_sleep(Duration::from_millis(next_view_timeout)).await; + broadcast_event( + Arc::new(HotShotEvent::Timeout(start_view + 1)), + &event_stream, + ) + .await; + } + }) + } } diff --git a/crates/task-impls/src/consensus/mod.rs b/crates/task-impls/src/consensus/mod.rs index 31eb95ec3e..6a3a43bbc7 100644 --- a/crates/task-impls/src/consensus/mod.rs +++ b/crates/task-impls/src/consensus/mod.rs @@ -108,7 +108,7 @@ pub struct ConsensusTaskState> { RwLock, TimeoutCertificate>>, /// timeout task handle - pub timeout_task: Option>, + pub timeout_task: JoinHandle<()>, /// Spawned tasks related to a specific view, so we can cancel them when /// they are stale diff --git a/crates/task-impls/src/consensus/view_change.rs b/crates/task-impls/src/consensus/view_change.rs index 0269cb997d..4037693a4f 100644 --- a/crates/task-impls/src/consensus/view_change.rs +++ b/crates/task-impls/src/consensus/view_change.rs @@ -51,11 +51,6 @@ pub(crate) async fn update_view>( error!("Progress: entered view {:>6}", *new_view); } - // cancel the old timeout task - if let Some(timeout_task) = task_state.timeout_task.take() { - cancel_task(timeout_task).await; - } - task_state.cur_view = new_view; // The next view is just the current view + 1 @@ -77,7 +72,7 @@ pub(crate) async fn update_view>( } // Spawn a timeout task if we did actually update view - task_state.timeout_task = Some(async_spawn({ + let new_timeout_task = async_spawn({ let stream = event_stream.clone(); // Nuance: We timeout on the view + 1 here because that means that we have // not seen evidence to transition to this new view @@ -91,7 +86,15 @@ pub(crate) async fn update_view>( ) .await; } - })); + }); + + // Cancel the old timeout task + cancel_task(std::mem::replace( + &mut task_state.timeout_task, + new_timeout_task, + )) + .await; + let consensus = consensus.upgradable_read().await; consensus .metrics diff --git a/crates/task-impls/src/consensus2/handlers.rs b/crates/task-impls/src/consensus2/handlers.rs index 2a1f751043..673f49079c 100644 --- a/crates/task-impls/src/consensus2/handlers.rs +++ b/crates/task-impls/src/consensus2/handlers.rs @@ -130,17 +130,12 @@ pub(crate) async fn handle_view_change> { pub output_event_stream: async_broadcast::Sender>, /// Timeout task handle - pub timeout_task: Option>, + pub timeout_task: JoinHandle<()>, /// View timeout from config. pub timeout: u64, diff --git a/crates/task-impls/src/quorum_proposal/mod.rs b/crates/task-impls/src/quorum_proposal/mod.rs index 1ff461fe5d..4f261346ae 100644 --- a/crates/task-impls/src/quorum_proposal/mod.rs +++ b/crates/task-impls/src/quorum_proposal/mod.rs @@ -80,7 +80,7 @@ pub struct QuorumProposalTaskState pub round_start_delay: u64, /// timeout task handle - pub timeout_task: Option>, + pub timeout_task: JoinHandle<()>, /// This node's storage ref pub storage: Arc>, diff --git a/crates/task-impls/src/quorum_proposal_recv.rs b/crates/task-impls/src/quorum_proposal_recv.rs index e9cca7f355..9405bed8b1 100644 --- a/crates/task-impls/src/quorum_proposal_recv.rs +++ b/crates/task-impls/src/quorum_proposal_recv.rs @@ -55,7 +55,7 @@ pub struct QuorumProposalRecvTaskState, /// timeout task handle - pub timeout_task: Option>, + pub timeout_task: JoinHandle<()>, /// View timeout from config. pub timeout: u64, diff --git a/crates/testing/src/test_runner.rs b/crates/testing/src/test_runner.rs index 2d197a3d80..83905e5e9c 100644 --- a/crates/testing/src/test_runner.rs +++ b/crates/testing/src/test_runner.rs @@ -333,6 +333,11 @@ where let (mut builder_task, builder_url) = B::start(config.num_nodes_with_stake.into(), B::Config::default()).await; + + // Collect uninitialized nodes because we need to wait for all networks to be ready before starting the tasks + let mut uninitialized_nodes = Vec::new(); + let mut networks_ready = Vec::new(); + for i in 0..total { let mut config = config.clone(); let node_id = self.next_node_id; @@ -366,6 +371,17 @@ where let networks = (self.launcher.resource_generator.channel_generator)(node_id).await; let storage = (self.launcher.resource_generator.storage)(node_id); + // Create a future that waits for the networks to be ready + let network0 = networks.0.clone(); + let network1 = networks.1.clone(); + let networks_ready_future = async move { + network0.wait_for_ready().await; + network1.wait_for_ready().await; + }; + + // Collect it so we can wait for all networks to be ready before starting the tasks + networks_ready.push(networks_ready_future); + if self.launcher.metadata.skip_late && late_start.contains(&node_id) { self.late_start.insert( node_id, @@ -403,23 +419,32 @@ where }, ); } else { - let handle = hotshot.run_tasks().await; - if node_id == 1 { - if let Some(task) = builder_task.take() { - task.start(Box::new(handle.event_stream())) - } - } - - self.nodes.push(Node { - node_id, - networks, - handle, - }); + uninitialized_nodes.push((node_id, networks, hotshot)); } } + results.push(node_id); } + // Wait for all networks to be ready + join_all(networks_ready).await; + + // Then start the necessary tasks + for (node_id, networks, hotshot) in uninitialized_nodes { + let handle = hotshot.run_tasks().await; + if node_id == 1 { + if let Some(task) = builder_task.take() { + task.start(Box::new(handle.event_stream())) + } + } + + self.nodes.push(Node { + node_id, + networks, + handle, + }); + } + results }