Skip to content

Commit

Permalink
[CATCHUP] Fetch Proposals in Dependecny Task (#3339)
Browse files Browse the repository at this point in the history
* fetch proposal from dep task

* emit validated event from fetch_proposal

* fetch proposal when trying to propose

* fix dep build

* fix dep unit test
  • Loading branch information
bfish713 committed Jun 19, 2024
1 parent e234926 commit 4c506d1
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 60 deletions.
37 changes: 19 additions & 18 deletions crates/task-impls/src/consensus/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ use std::{
sync::Arc,
};

use crate::{events::ProposalMissing, request::REQUEST_TIMEOUT};
use anyhow::bail;
use anyhow::{ensure, Context, Result};
use async_broadcast::Sender;
use async_broadcast::{broadcast, Sender};
use async_compatibility_layer::art::async_timeout;
use async_lock::RwLock;
#[cfg(async_executor_impl = "async-std")]
use async_std::task::JoinHandle;
Expand Down Expand Up @@ -32,10 +35,6 @@ use {
consensus::{update_view, view_change::SEND_VIEW_CHANGE_EVENT},
helpers::AnyhowTracing,
},
crate::{events::ProposalMissing, request::REQUEST_TIMEOUT},
anyhow::bail,
async_broadcast::broadcast,
async_compatibility_layer::art::async_timeout,
async_compatibility_layer::art::{async_sleep, async_spawn},
chrono::Utc,
core::time::Duration,
Expand Down Expand Up @@ -535,8 +534,7 @@ pub async fn publish_proposal_if_able<TYPES: NodeType>(
}

/// Trigger a request to the network for a proposal for a view and wait for the response
#[cfg(not(feature = "dependency-tasks"))]
async fn fetch_proposal<TYPES: NodeType>(
pub(crate) async fn fetch_proposal<TYPES: NodeType>(
view: TYPES::Time,
event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
quorum_membership: Arc<TYPES::Membership>,
Expand All @@ -555,32 +553,35 @@ async fn fetch_proposal<TYPES: NodeType>(
let Ok(Ok(Some(proposal))) = async_timeout(REQUEST_TIMEOUT, rx.recv_direct()).await else {
bail!("Request for proposal failed");
};
let view = proposal.data.view_number();
let view_number = proposal.data.view_number();
let justify_qc = proposal.data.justify_qc.clone();

if !justify_qc.is_valid_cert(quorum_membership.as_ref()) {
bail!("Invalid justify_qc in proposal for view {}", *view);
bail!("Invalid justify_qc in proposal for view {}", *view_number);
}
let mut consensus_write = consensus.write().await;
let leaf = Leaf::from_quorum_proposal(&proposal.data);
let state = Arc::new(
<TYPES::ValidatedState as ValidatedState<TYPES>>::from_header(&proposal.data.block_header),
);

if let Err(e) = consensus_write.update_validated_state_map(
view,
View {
view_inner: ViewInner::Leaf {
leaf: leaf.commit(),
state,
delta: None,
},
let view = View {
view_inner: ViewInner::Leaf {
leaf: leaf.commit(),
state,
delta: None,
},
) {
};
if let Err(e) = consensus_write.update_validated_state_map(view_number, view.clone()) {
tracing::trace!("{e:?}");
}

consensus_write.update_saved_leaves(leaf.clone());
broadcast_event(
HotShotEvent::ValidatedStateUpdated(view_number, view).into(),
&event_stream,
)
.await;
Ok(leaf)
}

Expand Down
13 changes: 11 additions & 2 deletions crates/task-impls/src/quorum_proposal/dependency_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{marker::PhantomData, sync::Arc, time::Duration};

use anyhow::{ensure, Context, Result};
use async_broadcast::{Receiver, Sender};
use async_compatibility_layer::art::async_sleep;
use async_compatibility_layer::art::{async_sleep, async_spawn};
use async_lock::RwLock;
use committable::Committable;
use hotshot_task::{
Expand All @@ -24,7 +24,9 @@ use tracing::{debug, error};
use vbs::version::Version;

use crate::{
consensus::helpers::parent_leaf_and_state, events::HotShotEvent, helpers::broadcast_event,
consensus::helpers::{fetch_proposal, parent_leaf_and_state},
events::HotShotEvent,
helpers::broadcast_event,
};

/// Proposal dependency types. These types represent events that precipitate a proposal.
Expand Down Expand Up @@ -185,6 +187,13 @@ impl<TYPES: NodeType> HandleDepOutput for ProposalDependencyHandle<TYPES> {
.validated_state_map()
.contains_key(&high_qc_view_number)
{
// The proposal for the high qc view is missing, try to get it asynchronously
let memberhsip = Arc::clone(&self.quorum_membership);
let sender = self.sender.clone();
let consensus = Arc::clone(&self.consensus);
async_spawn(async move {
fetch_proposal(high_qc_view_number, sender, memberhsip, consensus).await
});
// Block on receiving the event from the event stream.
EventDependency::new(
self.receiver.clone(),
Expand Down
76 changes: 45 additions & 31 deletions crates/task-impls/src/quorum_proposal_recv/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ use tracing::{debug, error, warn};
use super::QuorumProposalRecvTaskState;
use crate::{
consensus::{
helpers::{validate_proposal_safety_and_liveness, validate_proposal_view_and_certs},
helpers::{
fetch_proposal, validate_proposal_safety_and_liveness, validate_proposal_view_and_certs,
},
view_change::{update_view, SEND_VIEW_CHANGE_EVENT},
},
events::HotShotEvent,
Expand Down Expand Up @@ -154,40 +156,52 @@ pub(crate) async fn handle_quorum_proposal_recv<TYPES: NodeType, I: NodeImplemen
debug!("Failed to update view; error = {e:#}");
}

let parent = {
let consensus_read = task_state.consensus.read().await;

// Get the parent leaf and state.
let parent = match consensus_read
.saved_leaves()
.get(&justify_qc.data.leaf_commit)
.cloned()
{
Some(leaf) => {
if let (Some(state), _) = consensus_read.state_and_delta(leaf.view_number()) {
Some((leaf, Arc::clone(&state)))
} else {
bail!("Parent state not found! Consensus internally inconsistent");
}
}
None => None,
};

if justify_qc.view_number() > consensus_read.high_qc().view_number {
if let Err(e) = task_state
.storage
.write()
.await
.update_high_qc(justify_qc.clone())
.await
{
bail!("Failed to store High QC, not voting; error = {:?}", e);
// Get the parent leaf and state.
let mut parent_leaf = task_state
.consensus
.read()
.await
.saved_leaves()
.get(&justify_qc.data.leaf_commit)
.cloned();

parent_leaf = match parent_leaf {
Some(p) => Some(p),
None => fetch_proposal(
justify_qc.view_number(),
event_sender.clone(),
Arc::clone(&task_state.quorum_membership),
Arc::clone(&task_state.consensus),
)
.await
.ok(),
};
let consensus_read = task_state.consensus.read().await;

let parent = match parent_leaf {
Some(leaf) => {
if let (Some(state), _) = consensus_read.state_and_delta(leaf.view_number()) {
Some((leaf, Arc::clone(&state)))
} else {
bail!("Parent state not found! Consensus internally inconsistent");
}
}

parent
None => None,
};

if justify_qc.view_number() > consensus_read.high_qc().view_number {
if let Err(e) = task_state
.storage
.write()
.await
.update_high_qc(justify_qc.clone())
.await
{
bail!("Failed to store High QC, not voting; error = {:?}", e);
}
}
drop(consensus_read);

let mut consensus_write = task_state.consensus.write().await;
if let Err(e) = consensus_write.update_high_qc(justify_qc.clone()) {
tracing::trace!("{e:?}");
Expand Down
31 changes: 23 additions & 8 deletions crates/task-impls/src/quorum_vote/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use tracing::{debug, error, instrument, trace, warn};
use vbs::version::Version;

use crate::{
consensus::helpers::fetch_proposal,
events::HotShotEvent,
helpers::{broadcast_event, cancel_task},
quorum_vote::handlers::handle_quorum_proposal_validated,
Expand Down Expand Up @@ -92,19 +93,33 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static> VoteDependencyHand
proposed_leaf: &Leaf<TYPES>,
vid_share: &Proposal<TYPES, VidDisperseShare<TYPES>>,
) -> Result<()> {
let consensus_reader = self.consensus.read().await;
let justify_qc = &proposed_leaf.justify_qc();

// Justify qc's leaf commitment should be the same as the parent's leaf commitment.
let parent = consensus_reader
let mut maybe_parent = self
.consensus
.read()
.await
.saved_leaves()
.get(&justify_qc.date().leaf_commit)
.cloned()
.context(format!(
"Proposal's parent missing from storage with commitment: {:?}, proposal view {:?}",
justify_qc.date().leaf_commit,
proposed_leaf.view_number(),
))?;
.cloned();
maybe_parent = match maybe_parent {
Some(p) => Some(p),
None => fetch_proposal(
justify_qc.view_number(),
self.sender.clone(),
Arc::clone(&self.quorum_membership),
Arc::clone(&self.consensus),
)
.await
.ok(),
};
let parent = maybe_parent.context(format!(
"Proposal's parent missing from storage with commitment: {:?}, proposal view {:?}",
justify_qc.date().leaf_commit,
proposed_leaf.view_number(),
))?;
let consensus_reader = self.consensus.read().await;

let (Some(parent_state), _) = consensus_reader.state_and_delta(parent.view_number()) else {
bail!("Parent state not found! Consensus internally inconsistent")
Expand Down
11 changes: 11 additions & 0 deletions crates/testing/src/predicates/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,17 @@ where
Box::new(EventPredicate { check, info })
}

pub fn quorum_proposal_missing<TYPES>() -> Box<EventPredicate<TYPES>>
where
TYPES: NodeType,
{
let info = "QuorumProposalRequest".to_string();
let check: EventCallback<TYPES> = Arc::new(move |e: Arc<HotShotEvent<TYPES>>| {
matches!(*e.clone(), QuorumProposalRequest(..))
});
Box::new(EventPredicate { check, info })
}

pub fn quorum_proposal_send<TYPES>() -> Box<EventPredicate<TYPES>>
where
TYPES: NodeType,
Expand Down
3 changes: 2 additions & 1 deletion crates/testing/tests/tests_1/quorum_proposal_recv_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use hotshot_task_impls::{
};
use hotshot_testing::{
helpers::{build_fake_view_with_leaf_and_state, build_system_handle},
predicates::event::{all_predicates, exact, vote_now},
predicates::event::{all_predicates, quorum_proposal_missing, exact, vote_now},
script::InputOrder,
serial,
view_generator::TestViewGenerator,
Expand Down Expand Up @@ -191,6 +191,7 @@ async fn test_quorum_proposal_recv_task_liveness_check() {
),
),
)),
quorum_proposal_missing(),
exact(UpdateHighQc(proposals[2].data.justify_qc.clone())),
vote_now(),
])];
Expand Down

0 comments on commit 4c506d1

Please sign in to comment.