diff --git a/justfile b/justfile index d0b81582..6fad8258 100644 --- a/justfile +++ b/justfile @@ -171,7 +171,7 @@ test-contract-deploy *ARGS: scripts/test-contract-deploy {{ARGS}} test-all: build_release build-test-utils - env RUST_LOG=block_checker=info,warn target/release/run \ + env RUST_LOG=timeboost_builder::submit=debug,block_checker=info,warn target/release/run \ --verbose \ --timeout 120 \ --spawn "1:anvil --port 8545" \ diff --git a/robusta/src/lib.rs b/robusta/src/lib.rs index 88baa567..57a2e17b 100644 --- a/robusta/src/lib.rs +++ b/robusta/src/lib.rs @@ -9,7 +9,7 @@ use std::time::Duration; use either::Either; use espresso_types::{Header, NamespaceId, Transaction}; -use multisig::{Unchecked, Validated}; +use multisig::Validated; use reqwest::{StatusCode, Url}; use serde::{Serialize, de::DeserializeOwned}; use serde_json as json; @@ -18,7 +18,9 @@ use timeboost_types::{BlockNumber, CertifiedBlock}; use tokio::time::sleep; use tracing::{debug, warn}; -use crate::types::{TX, TaggedBase64, TransactionsWithProof, VidCommonResponse}; +use crate::types::{ + RecvBody, SendBody, TX, TaggedBase64, TransactionsWithProof, VidCommonResponse, +}; pub use crate::multiwatcher::Multiwatcher; pub use crate::types::Height; @@ -58,11 +60,15 @@ impl Client { self.get_with_retry(u).await } - pub async fn submit(&self, nsid: N, cb: &CertifiedBlock) -> Result<(), Error> + pub async fn submit( + &self, + nsid: N, + blocks: &[CertifiedBlock], + ) -> Result<(), Error> where N: Into, { - let trx = Transaction::new(nsid.into(), minicbor::to_vec(cb)?); + let trx = Transaction::new(nsid.into(), minicbor::to_vec(SendBody { blocks })?); let url = self.config.base_url.join("submit/submit")?; self.post_with_retry::<_, TaggedBase64>(url, &trx) .await?; @@ -102,9 +108,9 @@ impl Client { warn!(node = %self.config.label, a = %nsid, b = %ns, height = %hdr.height(), "namespace mismatch"); return Either::Left(empty()); } - Either::Right(trxs.into_iter().filter_map(move |t| { - match minicbor::decode::>(t.payload()) { - Ok(b) => { + Either::Right(trxs.into_iter().flat_map(move |t| { + match minicbor::decode::(t.payload()) { + Ok(body) => Either::Right(body.blocks.into_iter().filter_map(|b| { let Some(c) = cvec.get(b.committee()) else { warn!( node = %self.config.label, @@ -120,16 +126,16 @@ impl Client { warn!(node = %self.config.label, height = %hdr.height(), "invalid block"); None } - } + })), Err(err) => { warn!( node = %self.config.label, nsid = %nsid, height = %hdr.height(), err = %err, - "could not deserialize block" + "could not decode transaction payload" ); - None + Either::Left(empty()) } } })) diff --git a/robusta/src/types.rs b/robusta/src/types.rs index d9a3741b..8bb06779 100644 --- a/robusta/src/types.rs +++ b/robusta/src/types.rs @@ -8,7 +8,10 @@ use bon::Builder; use data_encoding::BASE64URL_NOPAD; use espresso_types::{NsProof, Transaction}; use hotshot_query_service::VidCommon; +use minicbor::{Decode, Encode}; +use multisig::{Unchecked, Validated}; use serde::{Deserialize, Deserializer, Serialize, Serializer, de}; +use timeboost_types::CertifiedBlock; #[derive(Debug, Deserialize, Serialize, Builder)] pub(crate) struct TransactionsWithProof { @@ -21,6 +24,20 @@ pub(crate) struct VidCommonResponse { pub(crate) common: VidCommon, } +#[derive(Debug, Decode)] +#[cbor(map)] +pub(crate) struct RecvBody { + #[cbor(n(0))] + pub(crate) blocks: Vec>, +} + +#[derive(Debug, Encode)] +#[cbor(map)] +pub(crate) struct SendBody<'a> { + #[cbor(n(0))] + pub(crate) blocks: &'a [CertifiedBlock], +} + macro_rules! Primitive { ($name:ident, $t:ty) => { #[derive( diff --git a/timeboost-builder/src/lib.rs b/timeboost-builder/src/lib.rs index fffd1b5a..a27e6dfc 100644 --- a/timeboost-builder/src/lib.rs +++ b/timeboost-builder/src/lib.rs @@ -8,4 +8,4 @@ pub use robusta; pub use certifier::{Certifier, CertifierDown, CertifierError, Handle}; pub use config::{CertifierConfig, CertifierConfigBuilder}; pub use config::{SubmitterConfig, SubmitterConfigBuilder}; -pub use submit::Submitter; +pub use submit::{SenderTaskDown, Submitter}; diff --git a/timeboost-builder/src/metrics.rs b/timeboost-builder/src/metrics.rs index 1d7e608f..f4d6311d 100644 --- a/timeboost-builder/src/metrics.rs +++ b/timeboost-builder/src/metrics.rs @@ -1,10 +1,10 @@ -use metrics::{Gauge, Metrics, NoMetrics}; +use metrics::{Counter, Metrics, NoMetrics}; #[derive(Debug)] #[non_exhaustive] pub struct BuilderMetrics { - pub block_submit: Box, - pub submit_tasks: Box, + pub blocks_submitted: Box, + pub blocks_verified: Box, } impl Default for BuilderMetrics { @@ -16,8 +16,8 @@ impl Default for BuilderMetrics { impl BuilderMetrics { pub fn new(m: &M) -> Self { Self { - block_submit: m.create_gauge("block_submit", None), - submit_tasks: m.create_gauge("submit_tasks", None), + blocks_submitted: m.create_counter("blocks_submitted", None), + blocks_verified: m.create_counter("blocks_verified", None), } } } diff --git a/timeboost-builder/src/submit.rs b/timeboost-builder/src/submit.rs index fae0d5d6..c476eaa7 100644 --- a/timeboost-builder/src/submit.rs +++ b/timeboost-builder/src/submit.rs @@ -1,38 +1,40 @@ -use std::{cmp::min, collections::BTreeSet, sync::Arc, time::Duration}; +use std::{collections::BTreeMap, mem, sync::Arc, time::Duration}; +use bon::Builder; use multisig::{Committee, PublicKey, Validated}; -use parking_lot::Mutex; -use robusta::espresso_types::NamespaceId; +use robusta::{Client, espresso_types::NamespaceId}; use timeboost_types::{ - BlockNumber, CertifiedBlock, + CertifiedBlock, sailfish::{CommitteeVec, Empty}, }; use tokio::{ - spawn, - sync::{Mutex as AsyncMutex, OwnedSemaphorePermit, Semaphore}, + select, spawn, + sync::{Mutex, mpsc}, task::JoinHandle, - time::{Instant, error::Elapsed, sleep, timeout}, + time::{Instant, MissedTickBehavior, interval, sleep}, }; -use tokio_util::task::TaskTracker; -use tracing::{debug, info, warn}; +use tracing::{debug, warn}; + +mod verify; use crate::{config::SubmitterConfig, metrics::BuilderMetrics}; +use verify::{Verified, Verifier}; -const CACHE_SIZE: usize = 15_000; -const MAX_TASKS: usize = 1000; +const DELAY: Duration = Duration::from_secs(30); pub struct Submitter { config: SubmitterConfig, + verified: Verified<15_000>, + committees: Arc>>, + sender: mpsc::Sender>, verify_task: JoinHandle, - submitters: TaskTracker, - handler: Handler, - committees: Arc>>, - task_permits: Arc, - metrics: BuilderMetrics, + sender_task: JoinHandle<()>, + metrics: Arc, } impl Drop for Submitter { fn drop(&mut self) { + self.sender_task.abort(); self.verify_task.abort(); } } @@ -42,32 +44,37 @@ impl Submitter { where M: ::metrics::Metrics, { - let client = robusta::Client::new(cfg.robusta.0.clone()); - let verified = Arc::new(Mutex::new(BTreeSet::new())); - let committees = Arc::new(AsyncMutex::new(CommitteeVec::new(cfg.committee.clone()))); - let handler = Handler { - label: cfg.pubkey, - nsid: cfg.namespace, - client: client.clone(), - verified: verified.clone(), - }; - let verifier = Verifier { - label: cfg.pubkey, - nsid: cfg.namespace, - committees: committees.clone(), - client: client.clone(), - verified, - }; + let client = Client::new(cfg.robusta.0.clone()); + let verified = Verified::default(); + let committees = Arc::new(Mutex::new(CommitteeVec::new(cfg.committee.clone()))); + let metrics = Arc::new(BuilderMetrics::new(metrics)); + let verifier = Verifier::builder() + .label(cfg.pubkey) + .nsid(cfg.namespace) + .committees(committees.clone()) + .client(client.clone()) + .verified(verified.clone()) + .metrics(metrics.clone()) + .build(); + let (tx, rx) = mpsc::channel(10_000); + let sender = Sender::builder() + .label(cfg.pubkey) + .nsid(cfg.namespace) + .client(client) + .verified(verified.clone()) + .receiver(rx) + .clock(Instant::now()) + .build(); let mut configs = vec![cfg.robusta.0.clone()]; configs.extend(cfg.robusta.1.iter().cloned()); Submitter { - handler, config: cfg, - verify_task: spawn(verifier.verify(configs)), - submitters: TaskTracker::new(), + verified, committees, - task_permits: Arc::new(Semaphore::new(MAX_TASKS)), - metrics: BuilderMetrics::new(metrics), + metrics, + sender: tx, + verify_task: spawn(verifier.verify(configs)), + sender_task: spawn(sender.go()), } } @@ -75,254 +82,96 @@ impl Submitter { &self.config.pubkey } - pub async fn add_committe(&mut self, c: Committee) { + pub async fn add_committee(&mut self, c: Committee) { self.committees.lock().await.add(c); } - pub async fn submit(&mut self, cb: CertifiedBlock) { - let num = cb.cert().data().num(); - debug!( - node = %self.public_key(), - num = %num, - tasks = %self.submitters.len(), - "creating block handler" - ); - if self.submitters.len() > MAX_TASKS - 10 { - warn!( - node = %self.public_key(), - num = %num, - tasks = %self.submitters.len(), - "approaching task limit" - ); + pub async fn submit(&mut self, cb: CertifiedBlock) -> Result<(), SenderTaskDown> { + self.metrics.blocks_submitted.add(1); + if self.verified.contains(cb.cert().data().num()) { + return Ok(()); } - let Ok(permit) = Semaphore::acquire_owned(self.task_permits.clone()).await else { - return; - }; - self.submitters - .spawn(self.handler.clone().handle(permit, cb)); - self.metrics.block_submit.set(*num as usize); - self.metrics.submit_tasks.set(self.submitters.len()); - } - - pub async fn join(self) { - self.submitters.close(); - self.submitters.wait().await + self.sender.send(cb).await.map_err(|_| SenderTaskDown(())) } } -struct Verifier { - label: PublicKey, - nsid: NamespaceId, - client: robusta::Client, - committees: Arc>>, - verified: Arc>>, -} +#[derive(Debug, thiserror::Error)] +#[error("submit sender task terminated")] +pub struct SenderTaskDown(()); -impl Verifier { - async fn verify(self, configs: Vec) -> Empty { - let mut delays = self.client.config().delay_iter(); - let height = loop { - if let Ok(h) = self.client.height().await { - break h; - }; - let d = delays.next().expect("delay iterator repeats endlessly"); - sleep(d).await; - }; - let threshold = 2 * configs.len() / 3 + 1; - let mut watcher = robusta::Multiwatcher::new(configs, height, self.nsid, threshold); - loop { - let h = watcher.next().await; - let committees = self.committees.lock().await; - let numbers = self.client.verified(self.nsid, &h, &committees).await; - let mut set = self.verified.lock(); - for n in numbers { - info!(node = %self.label, num = %n, "verified"); - if set.len() == CACHE_SIZE { - set.pop_first(); - } - set.insert(n); - } - } - } -} - -#[derive(Clone)] -struct Handler { +#[derive(Builder)] +struct Sender { label: PublicKey, nsid: NamespaceId, - client: robusta::Client, - verified: Arc>>, + client: Client, + verified: Verified<15_000>, + receiver: mpsc::Receiver>, + clock: Instant, + #[builder(default)] + pending: BTreeMap>>, } -impl Handler { - async fn handle(mut self, _: OwnedSemaphorePermit, cb: CertifiedBlock) { - enum State { - Submit(bool), - Wait(Duration), - Verify(Duration), - } - - let num = cb.cert().data().num(); +impl Sender { + async fn go(mut self) { + let mut inbox = Vec::new(); + let mut outbox = Vec::new(); - // Maybe the block has already been verified? - if self.verified.lock().remove(&num) { - debug!(node = %self.label, %num, "block submission verified"); - return; - } + let drop_verified_blocks = |v: &mut Vec>| { + v.retain(|b| !self.verified.contains(b.cert().data().num())); + }; - let max_delay = Duration::from_secs(30); - let mut state = State::Submit(false); + let mut checkpoints = interval(Duration::from_secs(1)); + checkpoints.set_missed_tick_behavior(MissedTickBehavior::Skip); - loop { - match state { - State::Submit(force) => { - let now = Instant::now(); - match timeout(max_delay, self.submit_block(&cb, force)).await { - Ok(()) => state = State::Wait(max_delay.saturating_sub(now.elapsed())), - Err(e) => { - debug!(node = %self.label, %num, "block submission timeout"); - let _: Elapsed = e; - state = State::Submit(true) - } + 'main: loop { + select! { + k = self.receiver.recv_many(&mut inbox, 10) => { + if k == 0 { // channel is closed + return } - } - State::Wait(delay) => { - let d = min(Duration::from_secs(3), delay); - sleep(d).await; - state = State::Verify(delay.saturating_sub(d)) - } - State::Verify(delay) => { - if self.verified.lock().remove(&num) { - debug!(node = %self.label, %num, "block submission verified"); - return; - } else { - state = if delay.is_zero() { - warn!(node = %self.label, %num, "block submission verification timeout"); - State::Submit(true) + for b in inbox.drain(..) { + if b.is_leader() { + outbox.push(b) } else { - State::Wait(delay) + self.pending.entry(self.clock + DELAY).or_default().push(b); } } } - } - } - } - - pub async fn submit_block(&mut self, cb: &CertifiedBlock, force: bool) { - if !(cb.is_leader() || force) { - return; - } - let mut delays = self.client.config().delay_iter(); - debug!( - node = %self.label, - is_leader = cb.is_leader(), - force = %force, - num = %cb.cert().data().num(), - round = %cb.cert().data().round(), - "submitting block" - ); - while let Err(err) = self.client.submit(self.nsid, cb).await { - warn!(node = %self.label, %err, "error submitting block"); - let d = delays.next().expect("delay iterator repeats"); - sleep(d).await - } - } -} - -#[cfg(test)] -mod tests { - use bytes::Bytes; - use metrics::NoMetrics; - use multisig::{Committee, Keypair, PublicKey, Signed, VoteAccumulator}; - use timeboost_types::{Block, BlockInfo, BlockNumber, sailfish::Round}; - use tokio::task::JoinSet; - - use super::*; - - struct BlockGen { - p: PublicKey, - r: Round, - i: BlockNumber, - k: Vec, - c: Committee, - } - - impl BlockGen { - fn next(&mut self) -> CertifiedBlock { - let b = Block::new(self.i, self.r.num(), Bytes::new()); - let i = BlockInfo::new(self.i, self.r, b.hash()); - self.i += 1; - self.r.set_num(self.r.num() + 1); - let mut a = VoteAccumulator::new(self.c.clone()); - for k in &self.k { - if a.add(Signed::new(i.clone(), k)).unwrap().is_some() { - break; + t = checkpoints.tick() => { + self.clock = t; + // Move blocks that timed out into `outbox`: + let mut blocks = self.pending.split_off(&self.clock); + mem::swap(&mut blocks, &mut self.pending); + outbox.extend(blocks.into_values().flatten()); } } - let l = self.c.leader(*i.round().num() as usize) == self.p; - CertifiedBlock::v1(a.certificate().cloned().unwrap(), b, l) - } - } - - #[tokio::test] - async fn submit_random_block() { - const NODES: usize = 5; - - let _ = tracing_subscriber::fmt() - .with_env_filter("timeboost_builder=debug,robusta=debug") - .try_init(); - let keys: Vec = (0..NODES).map(|_| Keypair::generate()).collect(); + drop_verified_blocks(&mut outbox); - let committee = Committee::new( - 0, - keys.iter() - .enumerate() - .map(|(i, k)| (i as u8, k.public_key())), - ); - - let mut tasks = JoinSet::new(); - - for k in &keys { - let mut g = BlockGen { - p: k.public_key(), - r: Round::new(1, 0), - i: BlockNumber::from(1), - k: keys.clone(), - c: committee.clone(), - }; + if outbox.is_empty() { + continue; + } - let rcfg = robusta::Config::builder() - .base_url( - "https://query.decaf.testnet.espresso.network/v1/" - .parse() - .unwrap(), - ) - .wss_base_url( - "wss://query.decaf.testnet.espresso.network/v1/" - .parse() - .unwrap(), - ) - .label(k.public_key().to_string()) - .build(); + // TODO: Ensure that the resulting payload size does not exceed the allowed maximum. - let scfg = SubmitterConfig::builder() - .pubkey(k.public_key()) - .robusta((rcfg.clone(), Vec::new())) - .namespace(10_101u64) - .committee(committee.clone()) - .build(); + debug!(node = %self.label, blocks = %outbox.len(), "submitting blocks"); - let mut s = Submitter::new(scfg, &NoMetrics); + let mut delays = self.client.config().delay_iter(); - tasks.spawn(async move { - for _ in 0..NODES { - s.submit(g.next()).await; + while let Err(err) = self.client.submit(self.nsid, &outbox).await { + warn!(node= %self.label, %err, "error submitting blocks"); + let d = delays.next().expect("delay iterator repeats"); + sleep(d).await; + drop_verified_blocks(&mut outbox); + if outbox.is_empty() { + continue 'main; } - s.join().await - }); - } + } - tasks.join_all().await; + self.pending + .entry(self.clock + DELAY) + .or_default() + .append(&mut outbox); + } } } diff --git a/timeboost-builder/src/submit/verify.rs b/timeboost-builder/src/submit/verify.rs new file mode 100644 index 00000000..bccc4f25 --- /dev/null +++ b/timeboost-builder/src/submit/verify.rs @@ -0,0 +1,78 @@ +use std::{collections::BTreeSet, sync::Arc}; + +use bon::Builder; +use multisig::PublicKey; +use parking_lot::RwLock; +use robusta::{Client, Config, Multiwatcher, espresso_types::NamespaceId}; +use timeboost_types::{ + BlockNumber, + sailfish::{CommitteeVec, Empty}, +}; +use tokio::{sync::Mutex, time::sleep}; +use tracing::debug; + +use crate::metrics::BuilderMetrics; + +/// Verifies blocks and updates a sliding window of block numbers. +#[derive(Debug, Builder)] +pub struct Verifier { + label: PublicKey, + nsid: NamespaceId, + client: Client, + committees: Arc>>, + verified: Verified<15_000>, + metrics: Arc, +} + +impl Verifier { + pub async fn verify(self, configs: Vec) -> Empty { + let mut delays = self.client.config().delay_iter(); + let height = loop { + if let Ok(h) = self.client.height().await { + break h; + }; + let d = delays.next().expect("delay iterator repeats endlessly"); + sleep(d).await; + }; + let threshold = 2 * configs.len() / 3 + 1; + let mut watcher = Multiwatcher::new(configs, height, self.nsid, threshold); + loop { + let h = watcher.next().await; + let committees = self.committees.lock().await; + let numbers = self.client.verified(self.nsid, &h, &committees).await; + let len = self.verified.insert(numbers); + debug!(node = %self.label, %len, "blocks verified"); + self.metrics.blocks_verified.add(len); + } + } +} + +/// The sliding window of verified block numbers. +#[derive(Debug, Default, Clone)] +pub struct Verified { + set: Arc>>, +} + +impl Verified { + /// Is the given block number verified? + pub fn contains(&self, n: BlockNumber) -> bool { + self.set.read().contains(&n) + } + + /// Add a sequence of block numbers as verified. + /// + /// Returns the number of (unique) block numbers added. + fn insert(&self, it: I) -> usize + where + I: IntoIterator, + { + let mut set = self.set.write(); + let len = set.len(); + set.extend(it); + let len = set.len() - len; + while set.len() > MAX_SIZE { + set.pop_first(); + } + len + } +} diff --git a/timeboost/src/lib.rs b/timeboost/src/lib.rs index 9f627fff..ea8fbdb8 100644 --- a/timeboost/src/lib.rs +++ b/timeboost/src/lib.rs @@ -7,7 +7,7 @@ use ::metrics::prometheus::PrometheusMetrics; use anyhow::Result; use metrics::TimeboostMetrics; use multisig::PublicKey; -use timeboost_builder::{Certifier, CertifierDown, Submitter}; +use timeboost_builder::{Certifier, CertifierDown, SenderTaskDown, Submitter}; use timeboost_sequencer::{Output, Sequencer}; use timeboost_types::BundleVariant; use tokio::select; @@ -121,7 +121,10 @@ impl Timeboost { blk = self.certifier.next_block() => match blk { Ok(b) => { info!(node = %self.label, block = %b.data().round(), "certified block"); - self.submitter.submit(b).await + if let Err(e) = self.submitter.submit(b).await { + let e: SenderTaskDown = e; + return Err(e.into()) + } } Err(e) => { let e: CertifierDown = e;