Skip to content

Commit

Permalink
Merge branch 'main' into ss/task-structure-cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
ss-es committed May 27, 2024
2 parents c3e4bb6 + 4528af7 commit 38918a6
Show file tree
Hide file tree
Showing 16 changed files with 680 additions and 347 deletions.
22 changes: 11 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 37 additions & 3 deletions crates/hotshot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,25 +298,59 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> SystemContext<TYPES, I> {
Ok(inner)
}

/// "Starts" consensus by sending a `QcFormed` event
/// "Starts" consensus by sending a `QcFormed`, `ViewChange`, and `ValidatedStateUpdated` events
///
/// # Panics
/// Panics if sending genesis fails
pub async fn start_consensus(&self) {
#[cfg(feature = "dependncy-tasks")]
error!("HotShot is running with the dependency tasks feature enabled!!");
debug!("Starting Consensus");
let consensus = self.consensus.read().await;

#[allow(clippy::panic)]
self.internal_event_stream
.0
.broadcast_direct(Arc::new(HotShotEvent::ViewChange(self.start_view)))
.await
.expect("Genesis Broadcast failed");
.unwrap_or_else(|_| {
panic!(
"Genesis Broadcast failed; event = ViewChange({:?})",
self.start_view
)
});
#[cfg(feature = "dependency-tasks")]
{
if let Some(validated_state) = consensus.validated_state_map().get(&self.start_view) {
#[allow(clippy::panic)]
self.internal_event_stream
.0
.broadcast_direct(Arc::new(HotShotEvent::ValidatedStateUpdated(
TYPES::Time::new(*self.start_view),
validated_state.clone(),
)))
.await
.unwrap_or_else(|_| {
panic!(
"Genesis Broadcast failed; event = ValidatedStateUpdated({:?})",
self.start_view,
)
});
}
}
#[allow(clippy::panic)]
self.internal_event_stream
.0
.broadcast_direct(Arc::new(HotShotEvent::QcFormed(either::Left(
consensus.high_qc().clone(),
))))
.await
.expect("Genesis Broadcast failed");
.unwrap_or_else(|_| {
panic!(
"Genesis Broadcast failed; event = QcFormed(either::Left({:?}))",
consensus.high_qc()
)
});

{
// Some applications seem to expect a leaf decide event for the genesis leaf,
Expand Down
24 changes: 11 additions & 13 deletions crates/task-impls/src/consensus/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,12 @@ use crate::{

/// Validate the state and safety and liveness of a proposal then emit
/// a `QuorumProposalValidated` event.
///
/// TODO - This should just take the QuorumProposalRecv task state after
/// we merge the dependency tasks.
#[allow(clippy::too_many_arguments)]
#[allow(clippy::too_many_lines)]
async fn validate_proposal_safety_and_liveness<TYPES: NodeType>(
pub async fn validate_proposal_safety_and_liveness<TYPES: NodeType>(
proposal: Proposal<TYPES, QuorumProposal<TYPES>>,
parent_leaf: Leaf<TYPES>,
consensus: Arc<RwLock<Consensus<TYPES>>>,
Expand Down Expand Up @@ -577,24 +580,16 @@ pub async fn publish_proposal_if_able<TYPES: NodeType>(
}
}

/// TEMPORARY TYPE: Quorum proposal recv task state when using dependency tasks
#[cfg(feature = "dependency-tasks")]
pub(crate) type TemporaryProposalRecvCombinedType<TYPES, I> = QuorumProposalRecvTaskState<TYPES, I>;

/// TEMPORARY TYPE: Consensus task state when not using dependency tasks
#[cfg(not(feature = "dependency-tasks"))]
pub(crate) type TemporaryProposalRecvCombinedType<TYPES, I> = ConsensusTaskState<TYPES, I>;

// TODO: Fix `clippy::too_many_lines`.
/// Handle the received quorum proposal.
///
/// Returns the proposal that should be used to set the `cur_proposal` for other tasks.
#[allow(clippy::too_many_lines)]
pub async fn handle_quorum_proposal_recv<TYPES: NodeType, I: NodeImplementation<TYPES>>(
pub(crate) async fn handle_quorum_proposal_recv<TYPES: NodeType, I: NodeImplementation<TYPES>>(
proposal: &Proposal<TYPES, QuorumProposal<TYPES>>,
sender: &TYPES::SignatureKey,
event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
task_state: &mut TemporaryProposalRecvCombinedType<TYPES, I>,
task_state: &mut ConsensusTaskState<TYPES, I>,
version: Version,
) -> Result<Option<QuorumProposal<TYPES>>> {
let sender = sender.clone();
Expand Down Expand Up @@ -623,11 +618,14 @@ pub async fn handle_quorum_proposal_recv<TYPES: NodeType, I: NodeImplementation<
}

// NOTE: We could update our view with a valid TC but invalid QC, but that is not what we do here
if let Err(e) = update_view::<TYPES, I>(
task_state,
if let Err(e) = update_view::<TYPES>(
view,
&event_stream,
task_state.timeout,
Arc::clone(&task_state.consensus),
&mut task_state.cur_view,
&mut task_state.timeout_task,
&task_state.output_event_stream,
SEND_VIEW_CHANGE_EVENT,
)
.await
Expand Down
7 changes: 5 additions & 2 deletions crates/task-impls/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,11 +539,14 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> ConsensusTaskState<TYPES, I>
// update the view in state to the one in the message
// Publish a view change event to the application
// Returns if the view does not need updating.
if let Err(e) = update_view::<TYPES, _>(
self,
if let Err(e) = update_view::<TYPES>(
new_view,
&event_stream,
self.timeout,
Arc::clone(&self.consensus),
&mut self.cur_view,
&mut self.timeout_task,
&self.output_event_stream,
DONT_SEND_VIEW_CHANGE_EVENT,
)
.await
Expand Down
42 changes: 23 additions & 19 deletions crates/task-impls/src/consensus/view_change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@ use anyhow::{ensure, Result};
use async_broadcast::Sender;
use async_compatibility_layer::art::{async_sleep, async_spawn};
use async_lock::{RwLock, RwLockUpgradableReadGuard};
#[cfg(async_executor_impl = "async-std")]
use async_std::task::JoinHandle;
use hotshot_types::{
consensus::Consensus,
event::{Event, EventType},
traits::node_implementation::{ConsensusTime, NodeImplementation, NodeType},
traits::node_implementation::{ConsensusTime, NodeType},
};
#[cfg(async_executor_impl = "tokio")]
use tokio::task::JoinHandle;
use tracing::{debug, error};

use crate::{
consensus::helpers::TemporaryProposalRecvCombinedType,
events::HotShotEvent,
helpers::{broadcast_event, cancel_task},
};
Expand All @@ -29,19 +32,24 @@ pub(crate) const DONT_SEND_VIEW_CHANGE_EVENT: bool = false;
///
/// # Errors
/// Returns an [`anyhow::Error`] when the new view is not greater than the current view.
pub(crate) async fn update_view<TYPES: NodeType, I: NodeImplementation<TYPES>>(
task_state: &mut TemporaryProposalRecvCombinedType<TYPES, I>,
/// TODO: Remove args when we merge dependency tasks.
#[allow(clippy::too_many_arguments)]
pub(crate) async fn update_view<TYPES: NodeType>(
new_view: TYPES::Time,
event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
timeout: u64,
consensus: Arc<RwLock<Consensus<TYPES>>>,
cur_view: &mut TYPES::Time,
timeout_task: &mut JoinHandle<()>,
output_event_stream: &Sender<Event<TYPES>>,
send_view_change_event: bool,
) -> Result<()> {
ensure!(
new_view > task_state.cur_view,
new_view > *cur_view,
"New view is not greater than our current view"
);

let old_view = task_state.cur_view;
let old_view = *cur_view;

debug!("Updating view from {} to {}", *old_view, *new_view);

Expand All @@ -51,10 +59,10 @@ pub(crate) async fn update_view<TYPES: NodeType, I: NodeImplementation<TYPES>>(
error!("Progress: entered view {:>6}", *new_view);
}

task_state.cur_view = new_view;
*cur_view = new_view;

// The next view is just the current view + 1
let next_view = task_state.cur_view + 1;
let next_view = *cur_view + 1;

if send_view_change_event {
futures::join! {
Expand All @@ -66,7 +74,7 @@ pub(crate) async fn update_view<TYPES: NodeType, I: NodeImplementation<TYPES>>(
view_number: old_view,
},
},
&task_state.output_event_stream,
output_event_stream,
)
};
}
Expand All @@ -77,7 +85,7 @@ pub(crate) async fn update_view<TYPES: NodeType, I: NodeImplementation<TYPES>>(
// Nuance: We timeout on the view + 1 here because that means that we have
// not seen evidence to transition to this new view
let view_number = next_view;
let timeout = Duration::from_millis(task_state.timeout);
let timeout = Duration::from_millis(timeout);
async move {
async_sleep(timeout).await;
broadcast_event(
Expand All @@ -88,26 +96,22 @@ pub(crate) async fn update_view<TYPES: NodeType, I: NodeImplementation<TYPES>>(
}
});

// Cancel the old timeout task
cancel_task(std::mem::replace(
&mut task_state.timeout_task,
new_timeout_task,
))
.await;
// cancel the old timeout task
cancel_task(std::mem::replace(timeout_task, new_timeout_task)).await;

let consensus = consensus.upgradable_read().await;
consensus
.metrics
.current_view
.set(usize::try_from(task_state.cur_view.u64()).unwrap());
.set(usize::try_from(cur_view.u64()).unwrap());

// Do the comparison before the subtraction to avoid potential overflow, since
// `last_decided_view` may be greater than `cur_view` if the node is catching up.
if usize::try_from(task_state.cur_view.u64()).unwrap()
if usize::try_from(cur_view.u64()).unwrap()
> usize::try_from(consensus.last_decided_view().u64()).unwrap()
{
consensus.metrics.number_of_views_since_last_decide.set(
usize::try_from(task_state.cur_view.u64()).unwrap()
usize::try_from(cur_view.u64()).unwrap()
- usize::try_from(consensus.last_decided_view().u64()).unwrap(),
);
}
Expand Down
10 changes: 7 additions & 3 deletions crates/task-impls/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::sync::Arc;
use either::Either;
use hotshot_task::task::TaskEvent;
use hotshot_types::{
consensus::ProposalDependencyData,
data::{DaProposal, Leaf, QuorumProposal, UpgradeProposal, VidDisperse, VidDisperseShare},
message::Proposal,
simple_certificate::{
Expand Down Expand Up @@ -156,8 +155,13 @@ pub enum HotShotEvent<TYPES: NodeType> {
UpgradeCertificateFormed(UpgradeCertificate<TYPES>),
/// HotShot was upgraded, with a new network version.
VersionUpgrade(Version),
/// Initiate a proposal right now for a provided view.
ProposeNow(TYPES::Time, ProposalDependencyData<TYPES>),

/// Initiate a proposal for a proposal without a parent, but passing the liveness check.
/// This is distinct from `QuorumProposalValidated` due to the fact that it is in a
/// different state than what we'd typically see with a fully validated proposal and,
/// as a result, it need to be its own event.
QuorumProposalLivenessValidated(QuorumProposal<TYPES>),

/// Initiate a vote right now for the designated view.
VoteNow(TYPES::Time, VoteDependencyData<TYPES>),

Expand Down
Loading

0 comments on commit 38918a6

Please sign in to comment.