Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow genesis to time out #3206

Merged
merged 7 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
27 changes: 17 additions & 10 deletions crates/hotshot/src/tasks/task_state.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,3 @@
use std::{
collections::{BTreeMap, HashMap},
marker::PhantomData,
sync::{atomic::AtomicBool, Arc},
};

use async_trait::async_trait;
use hotshot_task_impls::{
builder::BuilderClient, consensus::ConsensusTaskState, consensus2::Consensus2TaskState,
Expand All @@ -16,6 +10,12 @@ use hotshot_types::traits::{
consensus_api::ConsensusApi,
node_implementation::{ConsensusTime, NodeImplementation, NodeType},
};
use std::{
collections::{BTreeMap, HashMap},
marker::PhantomData,
sync::{atomic::AtomicBool, Arc},
};

use vbs::version::StaticVersionType;

use crate::types::SystemContextHandle;
Expand Down Expand Up @@ -184,6 +184,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> CreateTaskState<TYPES, I>
{
async fn create_from(handle: &SystemContextHandle<TYPES, I>) -> ConsensusTaskState<TYPES, I> {
let consensus = handle.hotshot.consensus();
let timeout_task = handle.spawn_initial_timeout_task();

ConsensusTaskState {
consensus,
Expand All @@ -194,7 +195,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> CreateTaskState<TYPES, I>
payload_commitment_and_metadata: None,
vote_collector: None.into(),
timeout_vote_collector: None.into(),
timeout_task: None,
timeout_task,
spawned_tasks: BTreeMap::new(),
formed_upgrade_certificate: None,
proposal_cert: None,
Expand Down Expand Up @@ -248,6 +249,8 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> CreateTaskState<TYPES, I>
handle: &SystemContextHandle<TYPES, I>,
) -> QuorumProposalTaskState<TYPES, I> {
let consensus = handle.hotshot.consensus();
let timeout_task = handle.spawn_initial_timeout_task();

QuorumProposalTaskState {
latest_proposed_view: handle.cur_view().await,
propose_dependencies: HashMap::new(),
Expand All @@ -262,7 +265,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> CreateTaskState<TYPES, I>
private_key: handle.private_key().clone(),
storage: Arc::clone(&handle.storage),
timeout: handle.hotshot.config.next_view_timeout,
timeout_task: None,
timeout_task,
round_start_delay: handle.hotshot.config.round_start_delay,
id: handle.hotshot.id,
version: *handle.hotshot.version.read().await,
Expand All @@ -278,6 +281,8 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> CreateTaskState<TYPES, I>
handle: &SystemContextHandle<TYPES, I>,
) -> QuorumProposalRecvTaskState<TYPES, I> {
let consensus = handle.hotshot.consensus();
let timeout_task = handle.spawn_initial_timeout_task();

QuorumProposalRecvTaskState {
public_key: handle.public_key().clone(),
private_key: handle.private_key().clone(),
Expand All @@ -286,7 +291,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> CreateTaskState<TYPES, I>
quorum_network: Arc::clone(&handle.hotshot.networks.quorum_network),
quorum_membership: handle.hotshot.memberships.quorum_membership.clone().into(),
timeout_membership: handle.hotshot.memberships.quorum_membership.clone().into(),
timeout_task: None,
timeout_task,
timeout: handle.hotshot.config.next_view_timeout,
round_start_delay: handle.hotshot.config.round_start_delay,
output_event_stream: handle.hotshot.external_event_stream.0.clone(),
Expand All @@ -308,6 +313,8 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> CreateTaskState<TYPES, I>
{
async fn create_from(handle: &SystemContextHandle<TYPES, I>) -> Consensus2TaskState<TYPES, I> {
let consensus = handle.hotshot.consensus();
let timeout_task = handle.spawn_initial_timeout_task();

Consensus2TaskState {
public_key: handle.public_key().clone(),
private_key: handle.private_key().clone(),
Expand All @@ -322,7 +329,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> CreateTaskState<TYPES, I>
storage: Arc::clone(&handle.storage),
cur_view: handle.cur_view().await,
output_event_stream: handle.hotshot.external_event_stream.0.clone(),
timeout_task: None,
timeout_task,
timeout: handle.hotshot.config.next_view_timeout,
consensus,
last_decided_view: handle.cur_view().await,
Expand Down
30 changes: 29 additions & 1 deletion crates/hotshot/src/types/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
use std::sync::Arc;

use async_broadcast::{InactiveReceiver, Receiver, Sender};
use async_compatibility_layer::art::{async_sleep, async_spawn};
use async_lock::RwLock;
#[cfg(async_executor_impl = "async-std")]
use async_std::task::JoinHandle;
use futures::Stream;
use hotshot_task::task::TaskRegistry;
use hotshot_task_impls::events::HotShotEvent;
use hotshot_task_impls::{events::HotShotEvent, helpers::broadcast_event};
use hotshot_types::{
boxed_sync,
consensus::Consensus,
Expand All @@ -15,6 +18,9 @@ use hotshot_types::{
traits::{election::Membership, node_implementation::NodeType},
BoxSyncFuture,
};
use std::time::Duration;
#[cfg(async_executor_impl = "tokio")]
use tokio::task::JoinHandle;

use crate::{traits::NodeImplementation, types::Event, SystemContext};

Expand Down Expand Up @@ -190,4 +196,26 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static> SystemContextHandl
pub fn storage(&self) -> Arc<RwLock<I::Storage>> {
Arc::clone(&self.storage)
}

/// A helper function to spawn the initial timeout task from a given `SystemContextHandle`.
#[must_use]
pub fn spawn_initial_timeout_task(&self) -> JoinHandle<()> {
// Clone the event stream that we send the timeout event to
let event_stream = self.internal_event_stream.0.clone();
let next_view_timeout = self.hotshot.config.next_view_timeout;
let start_view = self.hotshot.start_view;

// Spawn a task that will sleep for the next view timeout and then send a timeout event
// if not cancelled
async_spawn({
async move {
async_sleep(Duration::from_millis(next_view_timeout)).await;
broadcast_event(
Arc::new(HotShotEvent::Timeout(start_view + 1)),
&event_stream,
)
.await;
}
})
}
}
2 changes: 1 addition & 1 deletion crates/task-impls/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ pub struct ConsensusTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>> {
RwLock<VoteCollectorOption<TYPES, TimeoutVote<TYPES>, TimeoutCertificate<TYPES>>>,

/// timeout task handle
pub timeout_task: Option<JoinHandle<()>>,
pub timeout_task: JoinHandle<()>,

/// Spawned tasks related to a specific view, so we can cancel them when
/// they are stale
Expand Down
17 changes: 10 additions & 7 deletions crates/task-impls/src/consensus/view_change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,6 @@ pub(crate) async fn update_view<TYPES: NodeType, I: NodeImplementation<TYPES>>(
error!("Progress: entered view {:>6}", *new_view);
}

// cancel the old timeout task
if let Some(timeout_task) = task_state.timeout_task.take() {
cancel_task(timeout_task).await;
}

task_state.cur_view = new_view;

// The next view is just the current view + 1
Expand All @@ -77,7 +72,7 @@ pub(crate) async fn update_view<TYPES: NodeType, I: NodeImplementation<TYPES>>(
}

// Spawn a timeout task if we did actually update view
task_state.timeout_task = Some(async_spawn({
let new_timeout_task = async_spawn({
let stream = event_stream.clone();
// Nuance: We timeout on the view + 1 here because that means that we have
// not seen evidence to transition to this new view
Expand All @@ -91,7 +86,15 @@ pub(crate) async fn update_view<TYPES: NodeType, I: NodeImplementation<TYPES>>(
)
.await;
}
}));
});

// Cancel the old timeout task
cancel_task(std::mem::replace(
rob-maron marked this conversation as resolved.
Show resolved Hide resolved
&mut task_state.timeout_task,
new_timeout_task,
))
.await;

let consensus = consensus.upgradable_read().await;
consensus
.metrics
Expand Down
16 changes: 9 additions & 7 deletions crates/task-impls/src/consensus2/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,17 +130,12 @@ pub(crate) async fn handle_view_change<TYPES: NodeType, I: NodeImplementation<TY
let old_view_number = task_state.cur_view;
debug!("Updating view from {old_view_number:?} to {new_view_number:?}");

// Cancel the old timeout task
if let Some(timeout_task) = task_state.timeout_task.take() {
cancel_task(timeout_task).await;
}

// Move this node to the next view
task_state.cur_view = new_view_number;

// Spawn a timeout task if we did actually update view
let timeout = task_state.timeout;
task_state.timeout_task = Some(async_spawn({
let new_timeout_task = async_spawn({
let stream = sender.clone();
// Nuance: We timeout on the view + 1 here because that means that we have
// not seen evidence to transition to this new view
Expand All @@ -153,7 +148,14 @@ pub(crate) async fn handle_view_change<TYPES: NodeType, I: NodeImplementation<TY
)
.await;
}
}));
});

// Cancel the old timeout task
cancel_task(std::mem::replace(
&mut task_state.timeout_task,
new_timeout_task,
))
.await;

let consensus = task_state.consensus.read().await;
consensus
Expand Down
2 changes: 1 addition & 1 deletion crates/task-impls/src/consensus2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ pub struct Consensus2TaskState<TYPES: NodeType, I: NodeImplementation<TYPES>> {
pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,

/// Timeout task handle
pub timeout_task: Option<JoinHandle<()>>,
pub timeout_task: JoinHandle<()>,

/// View timeout from config.
pub timeout: u64,
Expand Down
2 changes: 1 addition & 1 deletion crates/task-impls/src/quorum_proposal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub struct QuorumProposalTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>
pub round_start_delay: u64,

/// timeout task handle
pub timeout_task: Option<JoinHandle<()>>,
pub timeout_task: JoinHandle<()>,

/// This node's storage ref
pub storage: Arc<RwLock<I::Storage>>,
Expand Down
2 changes: 1 addition & 1 deletion crates/task-impls/src/quorum_proposal_recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub struct QuorumProposalRecvTaskState<TYPES: NodeType, I: NodeImplementation<TY
pub timeout_membership: Arc<TYPES::Membership>,

/// timeout task handle
pub timeout_task: Option<JoinHandle<()>>,
pub timeout_task: JoinHandle<()>,

/// View timeout from config.
pub timeout: u64,
Expand Down
49 changes: 37 additions & 12 deletions crates/testing/src/test_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,11 @@ where

let (mut builder_task, builder_url) =
B::start(config.num_nodes_with_stake.into(), B::Config::default()).await;

// Collect uninitialized nodes because we need to wait for all networks to be ready before starting the tasks
let mut uninitialized_nodes = Vec::new();
let mut networks_ready = Vec::new();

for i in 0..total {
let mut config = config.clone();
let node_id = self.next_node_id;
Expand Down Expand Up @@ -366,6 +371,17 @@ where
let networks = (self.launcher.resource_generator.channel_generator)(node_id).await;
let storage = (self.launcher.resource_generator.storage)(node_id);

// Create a future that waits for the networks to be ready
let network0 = networks.0.clone();
let network1 = networks.1.clone();
let networks_ready_future = async move {
network0.wait_for_ready().await;
network1.wait_for_ready().await;
};

// Collect it so we can wait for all networks to be ready before starting the tasks
networks_ready.push(networks_ready_future);

if self.launcher.metadata.skip_late && late_start.contains(&node_id) {
self.late_start.insert(
node_id,
Expand Down Expand Up @@ -403,23 +419,32 @@ where
},
);
} else {
let handle = hotshot.run_tasks().await;
if node_id == 1 {
if let Some(task) = builder_task.take() {
task.start(Box::new(handle.event_stream()))
}
}

self.nodes.push(Node {
node_id,
networks,
handle,
});
uninitialized_nodes.push((node_id, networks, hotshot));
}
}

results.push(node_id);
}

// Wait for all networks to be ready
join_all(networks_ready).await;

// Then start the necessary tasks
for (node_id, networks, hotshot) in uninitialized_nodes {
let handle = hotshot.run_tasks().await;
if node_id == 1 {
if let Some(task) = builder_task.take() {
task.start(Box::new(handle.event_stream()))
}
}

self.nodes.push(Node {
node_id,
networks,
handle,
});
}

results
}

Expand Down