Skip to content

Commit

Permalink
Merge branch 'main' into ss/task-structure-cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
ss-es committed May 28, 2024
2 parents 85e7bf0 + 4bdfda5 commit 2c98abb
Show file tree
Hide file tree
Showing 28 changed files with 421 additions and 285 deletions.
136 changes: 61 additions & 75 deletions crates/task-impls/src/consensus/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,11 @@ use async_std::task::JoinHandle;
use chrono::Utc;
use committable::Committable;
use futures::FutureExt;
#[cfg(not(feature = "dependency-tasks"))]
use hotshot_types::simple_vote::QuorumData;
use hotshot_types::{
consensus::{CommitmentAndMetadata, Consensus, View},
data::{null_block, Leaf, QuorumProposal, ViewChangeEvidence},
event::{Event, EventType, LeafInfo},
message::{GeneralConsensusMessage, Proposal},
message::Proposal,
simple_certificate::UpgradeCertificate,
traits::{
block_contents::BlockHeader,
Expand All @@ -34,6 +32,8 @@ use hotshot_types::{
utils::{Terminator, ViewInner},
vote::{Certificate, HasViewNumber},
};
#[cfg(not(feature = "dependency-tasks"))]
use hotshot_types::{message::GeneralConsensusMessage, simple_vote::QuorumData};
#[cfg(async_executor_impl = "tokio")]
use tokio::task::JoinHandle;
use tracing::{debug, error, info, warn};
Expand Down Expand Up @@ -1014,14 +1014,10 @@ pub async fn handle_quorum_proposal_validated<TYPES: NodeType, I: NodeImplementa
Ok(())
}

/// TEMPORARY TYPE: Dummy type for sending the vote.
#[cfg(feature = "dependency-tasks")]
type TemporaryVoteInfo<TYPES> = PhantomData<TYPES>;

/// TEMPORARY TYPE: Private key, latest decided upgrade certificate, committee membership, and
/// event stream, for sending the vote.
/// Private key, latest decided upgrade certificate, committee membership, and event stream, for
/// sending the vote.
#[cfg(not(feature = "dependency-tasks"))]
type TemporaryVoteInfo<TYPES> = (
type VoteInfo<TYPES> = (
<<TYPES as NodeType>::SignatureKey as SignatureKey>::PrivateKey,
Option<UpgradeCertificate<TYPES>>,
Arc<<TYPES as NodeType>::Membership>,
Expand All @@ -1031,6 +1027,7 @@ type TemporaryVoteInfo<TYPES> = (
#[allow(clippy::too_many_arguments)]
#[allow(clippy::too_many_lines)]
#[allow(unused_variables)]
#[cfg(not(feature = "dependency-tasks"))]
/// Check if we are able to vote, like whether the proposal is valid,
/// whether we have DAC and VID share, and if so, vote.
pub async fn update_state_and_vote_if_able<TYPES: NodeType, I: NodeImplementation<TYPES>>(
Expand All @@ -1041,10 +1038,9 @@ pub async fn update_state_and_vote_if_able<TYPES: NodeType, I: NodeImplementatio
storage: Arc<RwLock<I::Storage>>,
quorum_membership: Arc<TYPES::Membership>,
instance_state: Arc<TYPES::InstanceState>,
vote_info: TemporaryVoteInfo<TYPES>,
vote_info: VoteInfo<TYPES>,
version: Version,
) -> bool {
#[cfg(not(feature = "dependency-tasks"))]
use hotshot_types::simple_vote::QuorumVote;

if !quorum_membership.has_stake(&public_key) {
Expand All @@ -1066,16 +1062,13 @@ pub async fn update_state_and_vote_if_able<TYPES: NodeType, I: NodeImplementatio
return false;
};

#[cfg(not(feature = "dependency-tasks"))]
{
if let Some(upgrade_cert) = &vote_info.1 {
if upgrade_cert.in_interim(cur_view)
&& Some(proposal.block_header.payload_commitment())
!= null_block::commitment(quorum_membership.total_nodes())
{
info!("Refusing to vote on proposal because it does not have a null commitment, and we are between versions. Expected:\n\n{:?}\n\nActual:{:?}", null_block::commitment(quorum_membership.total_nodes()), Some(proposal.block_header.payload_commitment()));
return false;
}
if let Some(upgrade_cert) = &vote_info.1 {
if upgrade_cert.in_interim(cur_view)
&& Some(proposal.block_header.payload_commitment())
!= null_block::commitment(quorum_membership.total_nodes())
{
info!("Refusing to vote on proposal because it does not have a null commitment, and we are between versions. Expected:\n\n{:?}\n\nActual:{:?}", null_block::commitment(quorum_membership.total_nodes()), Some(proposal.block_header.payload_commitment()));
return false;
}
}

Expand Down Expand Up @@ -1130,41 +1123,37 @@ pub async fn update_state_and_vote_if_able<TYPES: NodeType, I: NodeImplementatio
return false;
}

let message: GeneralConsensusMessage<TYPES>;

#[cfg(not(feature = "dependency-tasks"))]
{
// Validate the DAC.
message = if cert.is_valid_cert(vote_info.2.as_ref()) {
// Validate the block payload commitment for non-genesis DAC.
if cert.date().payload_commit != proposal.block_header.payload_commitment() {
warn!(
"Block payload commitment does not equal da cert payload commitment. View = {}",
*view
);
return false;
}
if let Ok(vote) = QuorumVote::<TYPES>::create_signed_vote(
QuorumData {
leaf_commit: proposed_leaf.commit(),
},
view,
&public_key,
&vote_info.0,
) {
GeneralConsensusMessage::<TYPES>::Vote(vote)
} else {
error!("Unable to sign quorum vote!");
return false;
}
} else {
error!(
"Invalid DAC in proposal! Skipping proposal. {:?} cur view is: {:?}",
cert, cur_view
// Validate the DAC.
let message = if cert.is_valid_cert(vote_info.2.as_ref()) {
// Validate the block payload commitment for non-genesis DAC.
if cert.date().payload_commit != proposal.block_header.payload_commitment() {
warn!(
"Block payload commitment does not equal da cert payload commitment. View = {}",
*view
);
return false;
};
}
}
if let Ok(vote) = QuorumVote::<TYPES>::create_signed_vote(
QuorumData {
leaf_commit: proposed_leaf.commit(),
},
view,
&public_key,
&vote_info.0,
) {
GeneralConsensusMessage::<TYPES>::Vote(vote)
} else {
error!("Unable to sign quorum vote!");
return false;
}
} else {
error!(
"Invalid DAC in proposal! Skipping proposal. {:?} cur view is: {:?}",
cert, cur_view
);
return false;
};

let mut consensus = consensus.write().await;
consensus.update_validated_state_map(
cur_view,
Expand All @@ -1190,28 +1179,25 @@ pub async fn update_state_and_vote_if_able<TYPES: NodeType, I: NodeImplementatio
error!("Couldn't store undecided state. Error: {:?}", e);
}

#[cfg(not(feature = "dependency-tasks"))]
{
if let GeneralConsensusMessage::Vote(vote) = message {
debug!(
"Sending vote to next quorum leader {:?}",
vote.view_number() + 1
);
// Add to the storage that we have received the VID disperse for a specific view
if let Err(e) = storage.write().await.append_vid(&vid_share).await {
warn!(
"Failed to store VID Disperse Proposal with error {:?}, aborting vote",
e
);
return false;
}
broadcast_event(Arc::new(HotShotEvent::QuorumVoteSend(vote)), &vote_info.3).await;
return true;
}
if let GeneralConsensusMessage::Vote(vote) = message {
debug!(
"Received VID share, but couldn't find DAC cert for view {:?}",
*proposal.view_number(),
"Sending vote to next quorum leader {:?}",
vote.view_number() + 1
);
// Add to the storage that we have received the VID disperse for a specific view
if let Err(e) = storage.write().await.append_vid(&vid_share).await {
warn!(
"Failed to store VID Disperse Proposal with error {:?}, aborting vote",
e
);
return false;
}
broadcast_event(Arc::new(HotShotEvent::QuorumVoteSend(vote)), &vote_info.3).await;
return true;
}
debug!(
"Received VID share, but couldn't find DAC cert for view {:?}",
*proposal.view_number(),
);
false
}
15 changes: 15 additions & 0 deletions crates/task-impls/src/da.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{marker::PhantomData, sync::Arc};

use anyhow::Result;
use async_broadcast::{Receiver, Sender};
use async_compatibility_layer::art::async_spawn;
use async_lock::RwLock;
#[cfg(async_executor_impl = "async-std")]
use async_std::task::spawn_blocking;
Expand All @@ -17,6 +18,7 @@ use hotshot_types::{
traits::{
block_contents::vid_commitment,
election::Membership,
network::ConnectedNetwork,
node_implementation::{ConsensusTime, NodeImplementation, NodeType},
signature_key::SignatureKey,
storage::Storage,
Expand Down Expand Up @@ -215,6 +217,19 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> DaTaskState<TYPES, I> {
{
tracing::trace!("{e:?}");
}
// Optimistically calculate and update VID if we know that the primary network is down.
if self.da_network.is_primary_down() {
let consensus = Arc::clone(&self.consensus);
let membership = Arc::clone(&self.quorum_membership);
let pk = self.private_key.clone();
async_spawn(async move {
consensus
.write()
.await
.calculate_and_update_vid(view, membership, &pk)
.await;
});
}
}
HotShotEvent::DaVoteRecv(ref vote) => {
debug!("DA vote recv, Main Task {:?}", vote.view_number());
Expand Down
4 changes: 2 additions & 2 deletions crates/task-impls/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ impl<TYPES: NodeType> TaskEvent for HotShotEvent<TYPES> {
}

/// Marker that the task completed
#[derive(Eq, Hash, PartialEq, Debug, Clone)]
#[derive(Eq, PartialEq, Debug, Clone)]
pub struct HotShotTaskCompleted;

/// All of the possible events that can be passed between Sequecning `HotShot` tasks
#[derive(Eq, Hash, PartialEq, Debug, Clone)]
#[derive(Eq, PartialEq, Debug, Clone)]
#[allow(clippy::large_enum_variant)]
pub enum HotShotEvent<TYPES: NodeType> {
/// Shutdown the task
Expand Down
42 changes: 2 additions & 40 deletions crates/task-impls/src/helpers.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,8 @@
use std::sync::Arc;

use async_broadcast::{SendError, Sender};
#[cfg(async_executor_impl = "async-std")]
use async_std::task::{spawn_blocking, JoinHandle};
use hotshot_types::{
data::VidDisperse,
traits::{election::Membership, node_implementation::NodeType},
vid::{vid_scheme, VidPrecomputeData},
};
use jf_vid::{precomputable::Precomputable, VidScheme};
use async_std::task::JoinHandle;
#[cfg(async_executor_impl = "tokio")]
use tokio::task::{spawn_blocking, JoinHandle};
use tokio::task::JoinHandle;

/// Cancel a task
pub async fn cancel_task<T>(task: JoinHandle<T>) {
Expand Down Expand Up @@ -39,36 +31,6 @@ pub async fn broadcast_event<E: Clone + std::fmt::Debug>(event: E, sender: &Send
}
}

/// Calculate the vid disperse information from the payload given a view and membership,
/// optionally using precompute data from builder
///
/// # Panics
/// Panics if the VID calculation fails, this should not happen.
#[allow(clippy::panic)]
pub async fn calculate_vid_disperse<TYPES: NodeType>(
txns: Arc<[u8]>,
membership: &Arc<TYPES::Membership>,
view: TYPES::Time,
precompute_data: Option<VidPrecomputeData>,
) -> VidDisperse<TYPES> {
let num_nodes = membership.total_nodes();

let vid_disperse = spawn_blocking(move || {
precompute_data
.map_or_else(
|| vid_scheme(num_nodes).disperse(Arc::clone(&txns)),
|data| vid_scheme(num_nodes).disperse_precompute(Arc::clone(&txns), &data)
)
.unwrap_or_else(|err| panic!("VID disperse failure:(num_storage nodes,payload_byte_len)=({num_nodes},{}) error: {err}", txns.len()))
}).await;
#[cfg(async_executor_impl = "tokio")]
// Tokio's JoinHandle's `Output` is `Result<T, JoinError>`, while in async-std it's just `T`
// Unwrap here will just propagate any panic from the spawned task, it's not a new place we can panic.
let vid_disperse = vid_disperse.unwrap();

VidDisperse::from_membership(view, vid_disperse, membership.as_ref())
}

/// Utilities to print anyhow logs.
pub trait AnyhowTracing {
/// Print logs as debug
Expand Down
4 changes: 2 additions & 2 deletions crates/task-impls/src/quorum_proposal_recv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> QuorumProposalRecvTaskState<
);
return;
};
let Some(disperse_share) = vid_shares.get(&self.public_key) else {
let Some(vid_share) = vid_shares.get(&self.public_key) else {
error!("Did not get a VID share for our public key, aborting vote");
return;
};
Expand All @@ -175,7 +175,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> QuorumProposalRecvTaskState<
VoteDependencyData {
quorum_proposal: current_proposal,
parent_leaf,
disperse_share: disperse_share.clone(),
vid_share: vid_share.clone(),
da_cert: da_cert.clone(),
},
)),
Expand Down
Loading

0 comments on commit 2c98abb

Please sign in to comment.