From e1a91249b3f02b768066fd2716df94b8f17859a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Fri, 28 Jul 2023 19:11:14 +0800 Subject: [PATCH] Improvements for `bdk_bitcoind_rpc::Emitter` * Have separate methods for emitting block vs mempool updates. * `into_update` (which has been renamed to `into_tx_graph_update`) can now filter out irrelevant transactions. * Update docs. Add tests: * `test_sync_local_chain` ensures that `Emitter::emit_block` emits blocks in order, even after reorg. * `test_into_tx_graph` ensures that `into_tx_graph` behaves appropriately for both mempool and block updates. It should also filter txs and map anchors correctly. --- crates/bitcoind_rpc/Cargo.toml | 5 +- crates/bitcoind_rpc/src/lib.rs | 604 +++++++++++++--------- crates/bitcoind_rpc/tests/test_emitter.rs | 332 ++++++++++++ example-crates/example_rpc/src/main.rs | 32 +- 4 files changed, 731 insertions(+), 242 deletions(-) create mode 100644 crates/bitcoind_rpc/tests/test_emitter.rs diff --git a/crates/bitcoind_rpc/Cargo.toml b/crates/bitcoind_rpc/Cargo.toml index c5dff5ceda..2faf16eb13 100644 --- a/crates/bitcoind_rpc/Cargo.toml +++ b/crates/bitcoind_rpc/Cargo.toml @@ -7,5 +7,8 @@ edition = "2021" [dependencies] bdk_chain = { path = "../chain", version = "0.5.0", features = ["serde", "miniscript"] } -bitcoincore-rpc = { version = "0.16" } +bitcoincore-rpc = { version = "0.17.0" } + +[dev-dependencies] +bitcoind = { version = "0.32.0", features = ["25_0"] } anyhow = { version = "1" } diff --git a/crates/bitcoind_rpc/src/lib.rs b/crates/bitcoind_rpc/src/lib.rs index 3b0f075eba..d35fc35d9d 100644 --- a/crates/bitcoind_rpc/src/lib.rs +++ b/crates/bitcoind_rpc/src/lib.rs @@ -1,43 +1,214 @@ //! This crate is used for updating [`bdk_chain`] structures with data from the `bitcoind` RPC //! interface. +//! +//! The main structure is [`Emitter`], which sources blockchain data from +//! [`bitcoincore_rpc::Client`]. +//! +//! To only get block updates (exlude mempool transactions), the caller can use +//! [`Emitter::emit_block`] until it returns `Ok(None)` (which means the chain tip is reached). A +//! separate method, [`Emitter::emit_mempool`] can be used to emit the whole mempool. Another +//! method, [`Emitter::emit_update`] is avaliable, which emits block updates until the block tip is +//! reached, then the next update will be the mempool. +//! +//! # [`IntoIterator`] implementation +//! +//! [`Emitter`] implements [`IntoIterator`] which transforms itself into [`UpdateIter`]. The +//! iterator is implemented in a way that even after a call to [`Iterator::next`] returns [`None`], +//! subsequent calls may resume returning [`Some`]. +//! +//! The iterator initially returns blocks in increasing height order. After the chain tip is +//! reached, the next update is the mempool. After the mempool update is released, the first +//! succeeding call to [`Iterator::next`] will return [`None`]. +//! +//! This logic is useful if the caller wishes to "update once". +//! +//! ```rust,no_run +//! use bdk_bitcoind_rpc::{EmittedUpdate, Emitter}; +//! # let client: bdk_bitcoind_rpc::bitcoincore_rpc::Client = todo!(); +//! +//! for r in Emitter::new(&client, 709_632, None) { +//! let update = r.expect("todo: deal with the error properly"); +//! +//! if update.is_block() { +//! let cp = update.checkpoint(); +//! println!("block {}:{}", cp.height(), cp.hash()); +//! } else { +//! println!("mempool!"); +//! } +//! } +//! ``` +//! +//! Alternatively, if the caller wishes to keep [`Emitter`] in a dedicated update-thread, the caller +//! can continue to poll [`Iterator::next`] with a delay. #![warn(missing_docs)] use bdk_chain::{ - bitcoin::{Block, Transaction, Txid}, + bitcoin::{Block, Transaction}, + indexed_tx_graph::Indexer, local_chain::CheckPoint, - BlockId, ConfirmationHeightAnchor, ConfirmationTimeAnchor, TxGraph, + Append, BlockId, ConfirmationHeightAnchor, ConfirmationTimeAnchor, TxGraph, }; pub use bitcoincore_rpc; -use bitcoincore_rpc::{bitcoincore_rpc_json::GetBlockResult, Client, RpcApi}; -use std::collections::HashSet; +use bitcoincore_rpc::{json::GetBlockResult, RpcApi}; +use std::fmt::Debug; -/// An update emitted from [`BitcoindRpcEmitter`]. This can either be of a block or a subset of +/// An update emitted from [`Emitter`]. This can either be of a block or a subset of /// mempool transactions. #[derive(Debug, Clone)] -pub enum BitcoindRpcUpdate { +pub enum EmittedUpdate { /// An emitted block. - Block { - /// The checkpoint constructed from the block's height/hash and connected to the previous - /// block. - cp: CheckPoint, - /// The actual block of the blockchain. - block: Box, - }, + Block(EmittedBlock), /// An emitted subset of mempool transactions. /// - /// [`BitcoindRpcEmitter`] attempts to avoid re-emitting transactions. - Mempool { - /// The checkpoint of the last-seen tip. - cp: CheckPoint, - /// Subset of mempool transactions. - txs: Vec<(Transaction, u64)>, - }, + /// [`Emitter`] attempts to avoid re-emitting transactions. + Mempool(EmittedMempool), } -/// A closure that transforms a [`BitcoindRpcUpdate`] into a [`ConfirmationHeightAnchor`]. +impl EmittedUpdate { + /// Returns whether the update is of a subset of the mempool. + pub fn is_mempool(&self) -> bool { + matches!(self, Self::Mempool { .. }) + } + + /// Returns whether the update is of a block. + pub fn is_block(&self) -> bool { + matches!(self, Self::Block { .. }) + } + + /// Get the emission's checkpoint. + pub fn checkpoint(&self) -> CheckPoint { + match self { + EmittedUpdate::Block(e) => e.checkpoint(), + EmittedUpdate::Mempool(e) => e.checkpoint(), + } + } + + /// Transforms the emitted update into a [`TxGraph`] update. + /// + /// The `tx_filter` parameter takes in a closure that filters out irrelevant transactions so + /// they do not get included in the [`TxGraph`] update. We have provided two closures; + /// [`empty_filter`] and [`indexer_filter`] for this purpose. + /// + /// The `anchor_map` parameter takes in a closure that creates anchors of a specific type. + /// [`confirmation_height_anchor`] and [`confirmation_time_anchor`] are avaliable to create + /// updates with [`ConfirmationHeightAnchor`] and [`ConfirmationTimeAnchor`] respectively. + pub fn into_tx_graph_update(self, tx_filter: F, anchor_map: M) -> TxGraph + where + F: FnMut(&Transaction) -> bool, + M: Fn(&CheckPoint, &Block, usize) -> A, + A: Clone + Ord + PartialOrd, + { + match self { + EmittedUpdate::Block(e) => e.into_tx_graph_update(tx_filter, anchor_map), + EmittedUpdate::Mempool(e) => e.into_tx_graph_update(tx_filter), + } + } +} + +/// An emitted block. +#[derive(Debug, Clone)] +pub struct EmittedBlock { + /// The checkpoint constructed from the block's height/hash and connected to the previous block. + pub cp: CheckPoint, + /// The actual block of the chain. + pub block: Block, +} + +impl EmittedBlock { + /// Get the emission's checkpoint. + pub fn checkpoint(&self) -> CheckPoint { + self.cp.clone() + } + + /// Transforms the emitted update into a [`TxGraph`] update. + /// + /// The `tx_filter` parameter takes in a closure that filters out irrelevant transactions so + /// they do not get included in the [`TxGraph`] update. We have provided two closures; + /// [`empty_filter`] and [`indexer_filter`] for this purpose. + /// + /// The `anchor_map` parameter takes in a closure that creates anchors of a specific type. + /// [`confirmation_height_anchor`] and [`confirmation_time_anchor`] are avaliable to create + /// updates with [`ConfirmationHeightAnchor`] and [`ConfirmationTimeAnchor`] respectively. + pub fn into_tx_graph_update(self, mut tx_filter: F, anchor_map: M) -> TxGraph + where + F: FnMut(&Transaction) -> bool, + M: Fn(&CheckPoint, &Block, usize) -> A, + A: Clone + Ord + PartialOrd, + { + let mut tx_graph = TxGraph::default(); + let tx_iter = self + .block + .txdata + .iter() + .enumerate() + .filter(move |(_, tx)| tx_filter(tx)); + for (tx_pos, tx) in tx_iter { + let txid = tx.txid(); + let _ = tx_graph.insert_anchor(txid, anchor_map(&self.cp, &self.block, tx_pos)); + let _ = tx_graph.insert_tx(tx.clone()); + } + tx_graph + } +} + +/// An emitted subset of mempool transactions. +#[derive(Debug, Clone)] +pub struct EmittedMempool { + /// The checkpoint of the last-seen tip. + pub cp: CheckPoint, + /// Subset of mempool transactions. + pub txs: Vec<(Transaction, u64)>, +} + +impl EmittedMempool { + /// Get the emission's checkpoint. + pub fn checkpoint(&self) -> CheckPoint { + self.cp.clone() + } + + /// Transforms the emitted mempool into a [`TxGraph`] update. + /// + /// The `tx_filter` parameter takes in a closure that filters out irrelevant transactions so + /// they do not get included in the [`TxGraph`] update. We have provided two closures; + /// [`empty_filter`] and [`indexer_filter`] for this purpose. + pub fn into_tx_graph_update(self, mut tx_filter: F) -> TxGraph + where + F: FnMut(&Transaction) -> bool, + A: Clone + Ord + PartialOrd, + { + let mut tx_graph = TxGraph::default(); + let tx_iter = self.txs.into_iter().filter(move |(tx, _)| tx_filter(tx)); + for (tx, seen_at) in tx_iter { + let _ = tx_graph.insert_seen_at(tx.txid(), seen_at); + let _ = tx_graph.insert_tx(tx); + } + tx_graph + } +} + +/// Creates a closure that filters transactions based on an [`Indexer`] implementation. +pub fn indexer_filter<'i, I: Indexer>( + indexer: &'i mut I, + changeset: &'i mut I::Additions, +) -> impl FnMut(&Transaction) -> bool + 'i +where + I::Additions: bdk_chain::Append, +{ + |tx| { + changeset.append(indexer.index_tx(tx)); + indexer.is_tx_relevant(tx) + } +} + +/// Returns an empty filter-closure. +pub fn empty_filter() -> impl FnMut(&Transaction) -> bool { + |_| true +} + +/// A closure that transforms a [`EmittedUpdate`] into a [`ConfirmationHeightAnchor`]. /// -/// This is to be used as an input to [`BitcoindRpcUpdate::into_update`]. +/// This is to be used as an input to [`EmittedUpdate::into_tx_graph_update`]. pub fn confirmation_height_anchor( cp: &CheckPoint, _block: &Block, @@ -50,9 +221,9 @@ pub fn confirmation_height_anchor( } } -/// A closure that transforms a [`BitcoindRpcUpdate`] into a [`ConfirmationTimeAnchor`]. +/// A closure that transforms a [`EmittedUpdate`] into a [`ConfirmationTimeAnchor`]. /// -/// This is to be used as an input to [`BitcoindRpcUpdate::into_update`]. +/// This is to be used as an input to [`EmittedUpdate::into_tx_graph_update`]. pub fn confirmation_time_anchor( cp: &CheckPoint, block: &Block, @@ -66,249 +237,183 @@ pub fn confirmation_time_anchor( } } -impl BitcoindRpcUpdate { - /// Returns whether the update is of a subset of the mempool. - pub fn is_mempool(&self) -> bool { - matches!(self, Self::Mempool { .. }) - } - - /// Returns whether the update is of a block. - pub fn is_block(&self) -> bool { - matches!(self, Self::Block { .. }) - } - - /// Transforms the [`BitcoindRpcUpdate`] into a [`TxGraph`] update. - /// - /// The [`CheckPoint`] tip is also returned. This is the block height and hash that the - /// [`TxGraph`] update is created from. - /// - /// The `anchor` parameter specifies the anchor type of the update. - /// [`confirmation_height_anchor`] and [`confirmation_time_anchor`] are avaliable to create - /// updates with [`ConfirmationHeightAnchor`] and [`ConfirmationTimeAnchor`] respectively. - pub fn into_update(self, anchor: F) -> (CheckPoint, TxGraph) - where - A: Clone + Ord + PartialOrd, - F: Fn(&CheckPoint, &Block, usize) -> A, - { - let mut tx_graph = TxGraph::default(); - match self { - BitcoindRpcUpdate::Block { cp, block } => { - for (tx_pos, tx) in block.txdata.iter().enumerate() { - let txid = tx.txid(); - let _ = tx_graph.insert_anchor(txid, anchor(&cp, &block, tx_pos)); - let _ = tx_graph.insert_tx(tx.clone()); - } - (cp, tx_graph) - } - BitcoindRpcUpdate::Mempool { cp, txs } => { - for (tx, seen_at) in txs { - let _ = tx_graph.insert_seen_at(tx.txid(), seen_at); - let _ = tx_graph.insert_tx(tx); - } - (cp, tx_graph) - } - } - } -} - /// A structure that emits updates for [`bdk_chain`] structures, sourcing blockchain data from /// [`bitcoincore_rpc::Client`]. /// -/// Updates are of type [`BitcoindRpcUpdate`], where each update can either be of a whole block, or -/// a subset of the mempool. -/// -/// A [`BitcoindRpcEmitter`] emits updates starting from the `fallback_height` provided in [`new`], -/// or if `last_cp` is provided, we start from the height above the agreed-upon blockhash (between -/// `last_cp` and the state of `bitcoind`). Blocks are emitted in sequence (ascending order), and -/// the mempool contents emitted if the last emission is the chain tip. -/// -/// # [`Iterator`] implementation -/// -/// [`BitcoindRpcEmitter`] implements [`Iterator`] in a way such that even after [`Iterator::next`] -/// returns [`None`], subsequent calls may resume returning [`Some`]. -/// -/// Returning [`None`] means that the previous call to [`next`] is the mempool. This is useful if -/// the caller wishes to update once. -/// -/// ```rust,no_run -/// use bdk_bitcoind_rpc::{BitcoindRpcEmitter, BitcoindRpcUpdate}; -/// # let client = todo!(); -/// -/// for update in BitcoindRpcEmitter::new(&client, 709_632, None) { -/// match update.expect("todo: deal with the error properly") { -/// BitcoindRpcUpdate::Block { cp, .. } => println!("block {}:{}", cp.height(), cp.hash()), -/// BitcoindRpcUpdate::Mempool { .. } => println!("mempool"), -/// } -/// } -/// ``` -/// -/// Alternatively, if the caller wishes to keep [`BitcoindRpcEmitter`] in a dedicated update-thread, -/// the caller can continue to poll [`next`] (potentially with a delay). +/// Refer to [module-level documentation] for more. /// -/// [`new`]: BitcoindRpcEmitter::new -/// [`next`]: Iterator::next -pub struct BitcoindRpcEmitter<'a> { - client: &'a Client, +/// [module-level documentation]: crate +pub struct Emitter<'c, C> { + client: &'c C, fallback_height: u32, last_cp: Option, last_info: Option, - - seen_txids: HashSet, - last_emission_was_mempool: bool, } -impl<'a> Iterator for BitcoindRpcEmitter<'a> { - /// Represents an emitted item. - type Item = Result; +impl<'c, C: RpcApi> IntoIterator for Emitter<'c, C> { + type Item = as Iterator>::Item; + type IntoIter = UpdateIter<'c, C>; - fn next(&mut self) -> Option { - if self.last_emission_was_mempool { - self.last_emission_was_mempool = false; - None - } else { - Some(self.next_update()) + fn into_iter(self) -> Self::IntoIter { + UpdateIter { + emitter: self, + last_emission_was_mempool: false, } } } -impl<'a> BitcoindRpcEmitter<'a> { - /// Constructs a new [`BitcoindRpcEmitter`] with the provided [`bitcoincore_rpc::Client`]. +impl<'c, C: RpcApi> Emitter<'c, C> { + /// Constructs a new [`Emitter`] with the provided [`bitcoincore_rpc::Client`]. /// /// * `fallback_height` is the block height to start from if `last_cp` is not provided, or a /// point of agreement is not found. /// * `last_cp` is the last known checkpoint to build updates on (if any). - pub fn new(client: &'a Client, fallback_height: u32, last_cp: Option) -> Self { + pub fn new(client: &'c C, fallback_height: u32, last_cp: Option) -> Self { Self { client, fallback_height, last_cp, last_info: None, - seen_txids: HashSet::new(), - last_emission_was_mempool: false, } } - /// Continuously poll [`bitcoincore_rpc::Client`] until an update is found. - pub fn next_update(&mut self) -> Result { - loop { - match self.poll()? { - Some(item) => return Ok(item), - None => continue, - }; - } + /// Emits the whole mempool contents. + pub fn emit_mempool(&self) -> Result { + let txs = self + .client + .get_raw_mempool()? + .into_iter() + .map( + |txid| -> Result<(Transaction, u64), bitcoincore_rpc::Error> { + let first_seen = self + .client + .get_mempool_entry(&txid) + .map(|entry| entry.time)?; + let tx = self.client.get_raw_transaction(&txid, None)?; + Ok((tx, first_seen)) + }, + ) + .collect::, _>>()?; + let cp = match &self.last_cp { + Some(cp) => cp.clone(), + None => { + let hash = self.client.get_best_block_hash()?; + let height = self.client.get_block_info(&hash)?.height as u32; + CheckPoint::new(BlockId { height, hash }) + } + }; + Ok(EmittedMempool { cp, txs }) } - /// Performs a single round of polling [`bitcoincore_rpc::Client`] and updating the internal - /// state. This returns [`Ok(Some(BitcoindRpcUpdate))`] if an update is found. - pub fn poll(&mut self) -> Result, bitcoincore_rpc::Error> { - let client = self.client; - self.last_emission_was_mempool = false; - - match (&mut self.last_cp, &mut self.last_info) { - // If `last_cp` and `last_info` are both none, we need to emit from the - // `fallback_height`. `last_cp` and `last_info` will both be updated to the emitted - // block. - (last_cp @ None, last_info @ None) => { - let info = - client.get_block_info(&client.get_block_hash(self.fallback_height as _)?)?; - let block = self.client.get_block(&info.hash)?; - let cp = CheckPoint::new(BlockId { - height: info.height as _, - hash: info.hash, - }); - *last_cp = Some(cp.clone()); - *last_info = Some(info); - Ok(Some(BitcoindRpcUpdate::Block { - cp, - block: Box::new(block), - })) - } - // If `last_cp` exists, but `last_info` does not, it means we have not fetched a - // block from the client yet, but we have a previous checkpoint which we can use to - // find the point of agreement with. - // - // We don't emit in this match case. Instead, we set the state to either: - // * { last_cp: Some, last_info: Some } : When we find a point of agreement. - // * { last_cp: None, last_indo: None } : When we cannot find a point of agreement. - (last_cp @ Some(_), last_info @ None) => { - for cp in last_cp.clone().iter().flat_map(CheckPoint::iter) { - let cp_block = cp.block_id(); - - let info = client.get_block_info(&cp_block.hash)?; - if info.confirmations < 0 { - // block is not in the main chain - continue; - } - // agreement found - *last_cp = Some(cp); - *last_info = Some(info); - return Ok(None); - } + /// Emits the next block (if any). + pub fn emit_block(&mut self) -> Result, bitcoincore_rpc::Error> { + enum PollResponse { + /// A new block that is in chain is found. Congratulations! + Block { + cp: CheckPoint, + info: GetBlockResult, + }, + /// This either signals that we have reached the tip, or that the blocks ahead are not + /// in the best chain. In either case, we need to find the agreement point again. + NoMoreBlocks, + /// We have exhausted the local checkpoint history and there is no agreement point. We + /// should emit from the fallback height for the next round. + AgreementPointNotFound, + /// We have found an agreement point! Do not emit this one, emit the one higher. + AgreementPointFound { + cp: CheckPoint, + info: GetBlockResult, + }, + } - // no point of agreement found, next call will emit block @ fallback height - *last_cp = None; - *last_info = None; - Ok(None) - } - // If `last_cp` and `last_info` is both `Some`, we either emit a block at - // `last_info.nextblockhash` (if it exists), or we emit a subset of the mempool. - (Some(last_cp), last_info @ Some(_)) => { - // find next block - match last_info.as_ref().unwrap().nextblockhash { - Some(next_hash) => { - let info = self.client.get_block_info(&next_hash)?; + fn poll(emitter: &mut Emitter) -> Result + where + C: RpcApi, + { + let client = emitter.client; + match (&mut emitter.last_cp, &mut emitter.last_info) { + (None, None) => { + let info = client + .get_block_info(&client.get_block_hash(emitter.fallback_height as _)?)?; + let cp = CheckPoint::new(BlockId { + height: info.height as _, + hash: info.hash, + }); + Ok(PollResponse::Block { cp, info }) + } + (Some(last_cp), None) => { + for cp in last_cp.iter() { + let cp_block = cp.block_id(); + let info = client.get_block_info(&cp_block.hash)?; if info.confirmations < 0 { - *last_info = None; - return Ok(None); + // block is not in the main chain + continue; } - - let block = self.client.get_block(&info.hash)?; - let cp = last_cp - .clone() - .push(BlockId { - height: info.height as _, - hash: info.hash, - }) - .expect("must extend from checkpoint"); - - *last_cp = cp.clone(); - *last_info = Some(info); - - Ok(Some(BitcoindRpcUpdate::Block { - cp, - block: Box::new(block), - })) + // agreement point found + return Ok(PollResponse::AgreementPointFound { cp, info }); } - None => { - let mempool_txs = client - .get_raw_mempool()? - .into_iter() - .filter(|&txid| self.seen_txids.insert(txid)) - .map( - |txid| -> Result<(Transaction, u64), bitcoincore_rpc::Error> { - let first_seen = - client.get_mempool_entry(&txid).map(|entry| entry.time)?; - let tx = client.get_raw_transaction(&txid, None)?; - Ok((tx, first_seen)) - }, - ) - .collect::, _>>()?; - - // After a mempool emission, we want to find the point of agreement in - // the next round. - *last_info = None; - - self.last_emission_was_mempool = true; - Ok(Some(BitcoindRpcUpdate::Mempool { - txs: mempool_txs, - cp: last_cp.clone(), - })) + // no agreement point found + Ok(PollResponse::AgreementPointNotFound) + } + (Some(last_cp), Some(last_info)) => { + let next_hash = match last_info.nextblockhash { + None => return Ok(PollResponse::NoMoreBlocks), + Some(next_hash) => next_hash, + }; + let info = client.get_block_info(&next_hash)?; + if info.confirmations < 0 { + return Ok(PollResponse::NoMoreBlocks); } + let cp = last_cp + .clone() + .push(BlockId { + height: info.height as _, + hash: info.hash, + }) + .expect("must extend from checkpoint"); + Ok(PollResponse::Block { cp, info }) + } + (None, Some(last_info)) => unreachable!( + "info cannot exist without checkpoint: info={:#?}", + last_info + ), + } + } + + loop { + match poll(self)? { + PollResponse::Block { cp, info } => { + let block = self.client.get_block(&info.hash)?; + self.last_cp = Some(cp.clone()); + self.last_info = Some(info); + return Ok(Some(EmittedBlock { cp, block })); + } + PollResponse::NoMoreBlocks => { + // we have reached the tip, try find agreement point in next round + self.last_info = None; + return Ok(None); + } + PollResponse::AgreementPointNotFound => { + self.last_cp = None; + self.last_info = None; + continue; + } + PollResponse::AgreementPointFound { cp, info } => { + self.last_cp = Some(cp); + self.last_info = Some(info); + continue; } } - (None, Some(info)) => unreachable!("got info with no checkpoint? info={:#?}", info), + } + } + + /// Continuously poll [`bitcoincore_rpc::Client`] until an update is found. + pub fn emit_update(&mut self) -> Result { + match self.emit_block()? { + Some(emitted_block) => Ok(EmittedUpdate::Block(emitted_block)), + None => self.emit_mempool().map(EmittedUpdate::Mempool), } } } @@ -317,7 +422,7 @@ impl<'a> BitcoindRpcEmitter<'a> { pub trait BitcoindRpcErrorExt { /// Returns whether the error is a "not found" error. /// - /// This is useful since [`BitcoindRpcEmitter`] emits [`Result<_, bitcoincore_rpc::Error>`]s as + /// This is useful since [`Emitter`] emits [`Result<_, bitcoincore_rpc::Error>`]s as /// [`Iterator::Item`]. fn is_not_found_error(&self) -> bool; } @@ -332,3 +437,40 @@ impl BitcoindRpcErrorExt for bitcoincore_rpc::Error { } } } + +/// An [`Iterator`] that wraps an [`Emitter`], and emits [`Result`]s of [`EmittedUpdate`]. +/// +/// ```rust,no_run +/// use bdk_bitcoind_rpc::{EmittedUpdate, Emitter, UpdateIter}; +/// use core::iter::{IntoIterator, Iterator}; +/// # let client: bdk_bitcoind_rpc::bitcoincore_rpc::Client = todo!(); +/// +/// let mut update_iter = Emitter::new(&client, 706_932, None).into_iter(); +/// let update = update_iter.next().expect("must get next update"); +/// println!("got update: {:?}", update); +/// ``` +/// +/// Refer to [module-level documentation] for more. +/// +/// [module-level documentation]: crate +pub struct UpdateIter<'c, C> { + emitter: Emitter<'c, C>, + last_emission_was_mempool: bool, +} + +impl<'c, C: RpcApi> Iterator for UpdateIter<'c, C> { + type Item = Result; + + fn next(&mut self) -> Option { + if self.last_emission_was_mempool { + self.last_emission_was_mempool = false; + None + } else { + let update = self.emitter.emit_update(); + if matches!(update, Ok(EmittedUpdate::Mempool(_))) { + self.last_emission_was_mempool = true; + } + Some(update) + } + } +} diff --git a/crates/bitcoind_rpc/tests/test_emitter.rs b/crates/bitcoind_rpc/tests/test_emitter.rs new file mode 100644 index 0000000000..8aed6d589b --- /dev/null +++ b/crates/bitcoind_rpc/tests/test_emitter.rs @@ -0,0 +1,332 @@ +use std::collections::{BTreeMap, BTreeSet}; + +use bdk_bitcoind_rpc::Emitter; +use bdk_chain::{ + bitcoin::{Address, Amount, BlockHash, Txid}, + local_chain::LocalChain, + Append, BlockId, ConfirmationHeightAnchor, IndexedTxGraph, SpkTxOutIndex, +}; +use bitcoincore_rpc::RpcApi; + +struct TestEnv { + #[allow(dead_code)] + daemon: bitcoind::BitcoinD, + client: bitcoincore_rpc::Client, +} + +impl TestEnv { + fn new() -> anyhow::Result { + let daemon = match std::env::var_os("TEST_BITCOIND") { + Some(bitcoind_path) => bitcoind::BitcoinD::new(bitcoind_path), + None => bitcoind::BitcoinD::from_downloaded(), + }?; + let client = bitcoincore_rpc::Client::new( + &daemon.rpc_url(), + bitcoincore_rpc::Auth::CookieFile(daemon.params.cookie_file.clone()), + )?; + Ok(Self { daemon, client }) + } + + fn mine_blocks( + &self, + count: usize, + address: Option
, + ) -> anyhow::Result> { + let coinbase_address = match address { + Some(address) => address, + None => self.client.get_new_address(None, None)?.assume_checked(), + }; + let block_hashes = self + .client + .generate_to_address(count as _, &coinbase_address)?; + Ok(block_hashes) + } + + fn reorg(&self, count: usize) -> anyhow::Result> { + let start_height = self.client.get_block_count()?; + + let mut hash = self.client.get_best_block_hash()?; + for _ in 0..count { + let prev_hash = self.client.get_block_info(&hash)?.previousblockhash; + self.client.invalidate_block(&hash)?; + match prev_hash { + Some(prev_hash) => hash = prev_hash, + None => break, + } + } + + let res = self.mine_blocks(count, None); + assert_eq!( + self.client.get_block_count()?, + start_height, + "reorg should not result in height change" + ); + res + } +} + +/// Ensure that blocks are emitted in order even after reorg. +/// +/// 1. Mine 101 blocks. +/// 2. Emit blocks from [`Emitter`] and update the [`LocalChain`]. +/// 3. Reorg highest 6 blocks. +/// 4. Emit blocks from [`Emitter`] and re-update the [`LocalChain`]. +#[test] +pub fn test_sync_local_chain() -> anyhow::Result<()> { + let env = TestEnv::new()?; + let mut local_chain = LocalChain::default(); + let mut emitter = Emitter::new(&env.client, 0, local_chain.tip()); + + // mine some blocks and returned the actual block hashes + let exp_hashes = { + let mut hashes = vec![env.client.get_block_hash(0)?]; // include genesis block + hashes.extend(env.mine_blocks(101, None)?); + hashes + }; + + // see if the emitter outputs the right blocks + loop { + let cp = match emitter.emit_block()? { + Some(b) => b.checkpoint(), + None => break, + }; + assert_eq!( + cp.hash(), + exp_hashes[cp.height() as usize], + "emitted block hash is unexpected" + ); + + let chain_update = bdk_chain::local_chain::Update { + tip: cp.clone(), + introduce_older_blocks: false, + }; + assert_eq!( + local_chain.apply_update(chain_update)?, + BTreeMap::from([(cp.height(), Some(cp.hash()))]), + "chain update changeset is unexpected", + ); + } + + assert_eq!( + local_chain.blocks(), + &exp_hashes + .iter() + .enumerate() + .map(|(i, hash)| (i as u32, *hash)) + .collect(), + "final local_chain state is unexpected", + ); + + // create new emitter (just for testing sake) + drop(emitter); + let mut emitter = Emitter::new(&env.client, 0, local_chain.tip()); + + // perform reorg + let reorged_blocks = env.reorg(6)?; + let exp_hashes = exp_hashes + .iter() + .take(exp_hashes.len() - reorged_blocks.len()) + .chain(&reorged_blocks) + .cloned() + .collect::>(); + + // see if the emitter outputs the right blocks + let mut exp_height = exp_hashes.len() - reorged_blocks.len(); + loop { + let cp = match emitter.emit_block()? { + Some(b) => b.checkpoint(), + None => break, + }; + assert_eq!( + cp.height(), + exp_height as u32, + "emitted block has unexpected height" + ); + + assert_eq!( + cp.hash(), + exp_hashes[cp.height() as usize], + "emitted block is unexpected" + ); + + let chain_update = bdk_chain::local_chain::Update { + tip: cp.clone(), + introduce_older_blocks: false, + }; + assert_eq!( + local_chain.apply_update(chain_update)?, + if exp_height == exp_hashes.len() - reorged_blocks.len() { + core::iter::once((cp.height(), Some(cp.hash()))) + .chain((cp.height() + 1..exp_hashes.len() as u32).map(|h| (h, None))) + .collect::() + } else { + BTreeMap::from([(cp.height(), Some(cp.hash()))]) + }, + "chain update changeset is unexpected", + ); + + exp_height += 1; + } + + assert_eq!( + local_chain.blocks(), + &exp_hashes + .iter() + .enumerate() + .map(|(i, hash)| (i as u32, *hash)) + .collect(), + "final local_chain state is unexpected after reorg", + ); + + Ok(()) +} + +/// Ensure that [`EmittedUpdate::into_tx_graph_update`] behaves appropriately for both mempool and +/// block updates. +/// +/// [`EmittedUpdate::into_tx_graph_update`]: bdk_bitcoind_rpc::EmittedUpdate::into_tx_graph_update +#[test] +fn test_into_tx_graph() -> anyhow::Result<()> { + let env = TestEnv::new()?; + + println!("getting new addresses!"); + let addr_0 = env.client.get_new_address(None, None)?.assume_checked(); + let addr_1 = env.client.get_new_address(None, None)?.assume_checked(); + let addr_2 = env.client.get_new_address(None, None)?.assume_checked(); + println!("got new addresses!"); + + println!("mining block!"); + env.mine_blocks(101, None)?; + println!("mined blocks!"); + + let mut chain = LocalChain::default(); + let mut indexed_tx_graph = IndexedTxGraph::::new({ + let mut index = SpkTxOutIndex::::default(); + index.insert_spk(0, addr_0.script_pubkey()); + index.insert_spk(1, addr_1.script_pubkey()); + index.insert_spk(2, addr_2.script_pubkey()); + index + }); + + for r in Emitter::new(&env.client, 0, chain.tip()) { + let update = r?; + + let _ = chain.apply_update(bdk_chain::local_chain::Update { + tip: update.checkpoint(), + introduce_older_blocks: false, + })?; + + let tx_graph_update = update.into_tx_graph_update( + bdk_bitcoind_rpc::indexer_filter(&mut indexed_tx_graph.index, &mut ()), + bdk_bitcoind_rpc::confirmation_height_anchor, + ); + assert_eq!(tx_graph_update.full_txs().count(), 0); + assert_eq!(tx_graph_update.all_txouts().count(), 0); + assert_eq!(tx_graph_update.all_anchors().len(), 0); + + let indexed_additions = indexed_tx_graph.apply_update(tx_graph_update); + assert!(indexed_additions.is_empty()); + } + + // send 3 txs to a tracked address, these txs will be in the mempool + let exp_txids = { + let mut txids = BTreeSet::new(); + for _ in 0..3 { + txids.insert(env.client.send_to_address( + &addr_0, + Amount::from_sat(10_000), + None, + None, + None, + None, + None, + None, + )?); + } + txids + }; + + // expect the next update to be a mempool update (with 3 relevant tx) + { + let update = Emitter::new(&env.client, 0, chain.tip()).emit_update()?; + assert!(update.is_mempool()); + + let tx_graph_update = update.into_tx_graph_update( + bdk_bitcoind_rpc::indexer_filter(&mut indexed_tx_graph.index, &mut ()), + bdk_bitcoind_rpc::confirmation_height_anchor, + ); + assert_eq!( + tx_graph_update + .full_txs() + .map(|tx| tx.txid) + .collect::>(), + exp_txids, + "the mempool update should have 3 relevant transactions", + ); + + let indexed_additions = indexed_tx_graph.apply_update(tx_graph_update); + assert_eq!( + indexed_additions + .graph_additions + .txs + .iter() + .map(|tx| tx.txid()) + .collect::>(), + exp_txids, + "changeset should have the 3 mempool transactions", + ); + assert!(indexed_additions.graph_additions.anchors.is_empty()); + } + + // mine a block that confirms the 3 txs + let exp_block_hash = env.mine_blocks(1, None)?[0]; + let exp_block_height = env.client.get_block_info(&exp_block_hash)?.height as u32; + let exp_anchors = exp_txids + .iter() + .map({ + let anchor = ConfirmationHeightAnchor { + anchor_block: BlockId { + height: exp_block_height, + hash: exp_block_hash, + }, + confirmation_height: exp_block_height, + }; + move |&txid| (anchor, txid) + }) + .collect::>(); + + { + let update = Emitter::new(&env.client, 0, chain.tip()).emit_update()?; + assert!(update.is_block()); + + let _ = chain.apply_update(bdk_chain::local_chain::Update { + tip: update.checkpoint(), + introduce_older_blocks: false, + })?; + + let tx_graph_update = update.into_tx_graph_update( + bdk_bitcoind_rpc::indexer_filter(&mut indexed_tx_graph.index, &mut ()), + bdk_bitcoind_rpc::confirmation_height_anchor, + ); + assert_eq!( + tx_graph_update + .full_txs() + .map(|tx| tx.txid) + .collect::>(), + exp_txids, + "block update should have 3 relevant transactions", + ); + assert_eq!( + tx_graph_update.all_anchors(), + &exp_anchors, + "the block update should introduce anchors", + ); + + let indexed_additions = indexed_tx_graph.apply_update(tx_graph_update); + assert!(indexed_additions.graph_additions.txs.is_empty()); + assert!(indexed_additions.graph_additions.txouts.is_empty()); + assert_eq!(indexed_additions.graph_additions.anchors, exp_anchors); + } + + Ok(()) +} diff --git a/example-crates/example_rpc/src/main.rs b/example-crates/example_rpc/src/main.rs index 21cca8b704..f3c093b779 100644 --- a/example-crates/example_rpc/src/main.rs +++ b/example-crates/example_rpc/src/main.rs @@ -10,13 +10,14 @@ use std::{ use bdk_bitcoind_rpc::{ bitcoincore_rpc::{Auth, Client, RpcApi}, - confirmation_time_anchor, BitcoindRpcEmitter, BitcoindRpcUpdate, + EmittedUpdate, Emitter, }; use bdk_chain::{ - bitcoin::{Address, Transaction}, + bitcoin::{address, Address, Transaction}, + indexed_tx_graph::IndexedAdditions, keychain::LocalChangeSet, local_chain::{self, LocalChain}, - BlockId, ConfirmationTimeAnchor, IndexedTxGraph, + Append, BlockId, ConfirmationTimeAnchor, IndexedTxGraph, }; use example_cli::{ anyhow, @@ -79,7 +80,7 @@ enum RpcCommands { /// Create and broadcast a transaction. Tx { value: u64, - address: Address, + address: Address, #[clap(short, default_value = "bnb")] coin_select: CoinSelectionAlgo, #[clap(flatten)] @@ -150,13 +151,13 @@ fn main() -> anyhow::Result<()> { } => { graph.lock().unwrap().index.set_lookahead_for_all(lookahead); - let (chan, recv) = sync_channel::<(BitcoindRpcUpdate, u32)>(CHANNEL_BOUND); + let (chan, recv) = sync_channel::<(EmittedUpdate, u32)>(CHANNEL_BOUND); let prev_cp = chain.lock().unwrap().tip(); let join_handle = std::thread::spawn(move || -> anyhow::Result<()> { let mut tip_height = Option::::None; - for item in BitcoindRpcEmitter::new(&rpc_client, fallback_height, prev_cp) { + for item in Emitter::new(&rpc_client, fallback_height, prev_cp).into_iter() { let item = item?; let is_block = !item.is_mempool(); let is_mempool = item.is_mempool(); @@ -185,19 +186,28 @@ fn main() -> anyhow::Result<()> { for (item, tip_height) in recv { let is_mempool = item.is_mempool(); - let (tip, graph_update) = item.into_update(confirmation_time_anchor); - + let tip = item.checkpoint(); let current_height = tip.height(); let db_changeset = { + let mut indexed_additions = IndexedAdditions::default(); let mut chain = chain.lock().unwrap(); let mut graph = graph.lock().unwrap(); + let graph_update = { + let tx_filter = bdk_bitcoind_rpc::indexer_filter( + &mut graph.index, + &mut indexed_additions.index_additions, + ); + let anchor_map = bdk_bitcoind_rpc::confirmation_time_anchor; + item.into_tx_graph_update(tx_filter, anchor_map) + }; + indexed_additions.append(graph.apply_update(graph_update)); + let chain_changeset = chain.apply_update(local_chain::Update { tip, introduce_older_blocks: false, })?; - let indexed_additions = graph.prune_and_apply_update(graph_update); ChangeSet { indexed_additions, @@ -261,7 +271,9 @@ fn main() -> anyhow::Result<()> { &*chain, &keymap, coin_select, - address, + address + .require_network(args.network) + .expect("address has the wrong network"), value, broadcast, )