Skip to content

Commit

Permalink
feat: Update into_tx_graph_update and add syncing
Browse files Browse the repository at this point in the history
Update into_tx_graph_update to take a Vec of tuples of
blocks and transactions as input to create a TxGraph.

Add scanning with Nakamoto client. We start by creating a list of scripts
to scan and we continue to scan the chain until we have satisfied
stop_gap condition. On each scan we get a fresh list of script to watch.
We then update the IndexedGraph and return the IndexedAdditions.
  • Loading branch information
vladimirfomene committed Aug 7, 2023
1 parent a5e682f commit 5dd5d3d
Showing 1 changed file with 156 additions and 20 deletions.
176 changes: 156 additions & 20 deletions crates/bdk_cbf/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
use std::{net, thread};

use bdk_chain::keychain::DerivationAdditions;
use nakamoto::client::network::Services;
use nakamoto::client::traits::Handle as HandleTrait;
use nakamoto::client::Handle;
use nakamoto::client::traits::Handle as HandleTrait;
use nakamoto::client::{chan, Client, Config, Error, Event};
use nakamoto::common::block::Height;
use nakamoto::net::poll;

pub use nakamoto::client::network::Network;

use bdk_chain::{
bitcoin::{Script, Transaction},
BlockId, ChainOracle, TxGraph,
collections::BTreeMap,
indexed_tx_graph::{IndexedAdditions, IndexedTxGraph, Indexer},
keychain::KeychainTxOutIndex,
BlockId, ChainOracle, ConfirmationHeightAnchor, TxGraph,
};

/// The network reactor we're going to use.
use core::fmt::Debug;

type Reactor = poll::Reactor<net::TcpStream>;

impl ChainOracle for CBFClient {
Expand Down Expand Up @@ -79,10 +86,9 @@ impl Iterator for CBFUpdateIterator {
fn next(&mut self) -> Option<Self::Item> {
match self.client.watch_events() {
Ok(update) => {
if let CBFUpdate::Synced { .. } = update {
None
} else {
Some(Ok(update))
match update {
CBFUpdate::Synced { height, tip } if height == tip => None,
_ => Some(Ok(update))
}
}
Err(e) => Some(Err(e)),
Expand All @@ -101,6 +107,8 @@ impl CBFClient {
// Wait for the client to be connected to a peer.
handle.wait_for_peers(peer_count, Services::default())?;

println!("Connected to {} peers", peer_count);

Ok(Self { handle })
}

Expand All @@ -111,13 +119,15 @@ impl CBFClient {
watch: impl Iterator<Item = Script>,
) -> Result<(), Error> {
self.handle.rescan(start_height.., watch)?;
println!("About to start scanning from height {}", start_height);
Ok(())
}

// Watch for Block events that match the scripts we're interested in
pub fn watch_events(&self) -> Result<CBFUpdate, Error> {
let events_chan = self.handle.events();
loop {
print!("looping...");
chan::select! {
recv(events_chan) -> event => {
let event = event?;
Expand All @@ -131,12 +141,14 @@ impl CBFClient {
transactions,
..
} => {
println!("Block matched: {} {}", height, hash);
return Ok(CBFUpdate::BlockMatched {
transactions,
block: BlockId { height: height as u32, hash }
});
}
Event::Synced { height, tip } => {
println!("Synced: {} {}", height, tip);
return Ok(CBFUpdate::Synced { height, tip });
}
_ => {}
Expand All @@ -147,21 +159,18 @@ impl CBFClient {
}

// Turns a CBFUpdate into a TxGraph update
pub fn into_tx_graph_update<F>(
pub fn into_tx_graph_update(
&self,
txs: Vec<Transaction>,
block: BlockId,
is_relevant: F,
) -> TxGraph<BlockId>
where
F: Fn(&Transaction) -> bool,
{
block_txs: Vec<(BlockId, Vec<Transaction>)>,
) -> TxGraph<ConfirmationHeightAnchor> {
let mut tx_graph = TxGraph::default();
let filtered_txs = txs.into_iter().filter(|tx| is_relevant(tx));
for tx in filtered_txs {
let txid = tx.txid();
let _ = tx_graph.insert_anchor(txid, block);
let _ = tx_graph.insert_tx(tx);

for (blockid, txs) in block_txs.into_iter() {
for tx in txs {
let txid = tx.txid();
let _ = tx_graph.insert_anchor(txid, to_confirmation_height_anchor(blockid));
let _ = tx_graph.insert_tx(tx);
}
}
tx_graph
}
Expand All @@ -171,4 +180,131 @@ impl CBFClient {
client: self.clone(),
}
}

pub fn scan<K>(
&self,
mut watch_per_keychain: u32,
start_height: Height,
indexed_tx_graph: &mut IndexedTxGraph<ConfirmationHeightAnchor, KeychainTxOutIndex<K>>,
stop_gap: u32,
) -> Result<IndexedAdditions<ConfirmationHeightAnchor, DerivationAdditions<K>>, Error>
where
K: Ord + Clone + Debug,
{
let mut keychain_spks = indexed_tx_graph.index.spks_of_all_keychains();
let mut empty_scripts_counter = BTreeMap::<K, u32>::new();
keychain_spks.keys().for_each(|k| {
empty_scripts_counter.insert(k.clone(), 0);
});

let mut updates = Vec::new();

while let Some(keychains) = Self::check_stop_gap(stop_gap, &empty_scripts_counter) {
keychains.iter().for_each(|k| {
/*let (_, _) =*/ indexed_tx_graph.index.set_lookahead(k, watch_per_keychain);
});

let mut spk_watchlist = BTreeMap::<K, Vec<Script>>::new();
for (k, script_iter) in keychain_spks.iter_mut() {
(0..watch_per_keychain).for_each(|_| {
if let Some((_, script)) = script_iter.next() {
let spks = spk_watchlist.entry(k.clone()).or_insert(vec![]);
spks.push(script);
}
});
}

let scripts = spk_watchlist.values().flatten().cloned().collect::<Vec<_>>();
self.start_scanning(start_height, scripts.into_iter())?;

for update in self.iter() {
match update {
Ok(CBFUpdate::BlockMatched {
transactions,
block,
}) => {
let relevant_txs = transactions
.into_iter()
.filter(|tx| indexed_tx_graph.index.is_tx_relevant(tx))
.collect::<Vec<_>>();
updates.push((block, relevant_txs));
}
Ok(CBFUpdate::BlockDisconnected { .. }) => {
//TODO: Don't know how to handle re-orgs yet
//I will love to get your comments on this.
}
Ok(_) => {}
Err(e) => {
return Err(e);
}
}
}

// Determine which scripts are part of the update.
for (k, scripts) in spk_watchlist.iter() {
for script in scripts {
let counter = empty_scripts_counter.get_mut(k).unwrap();
if Self::is_script_in_udpate(script.clone(), &updates) {
*counter = 0;
} else {
*counter += 1;
}
}
}

watch_per_keychain += watch_per_keychain;
}

//apply the updates to IndexedGraph
let graph_update = self.into_tx_graph_update(updates);
let additions = indexed_tx_graph.apply_update(graph_update);

Ok(additions)
}

fn is_script_in_udpate(script: Script, updates: &Vec<(BlockId, Vec<Transaction>)>) -> bool {
for update in updates {
for tx in update.1.iter() {
for output in tx.output.iter() {
if output.script_pubkey == script {
return true;
}
}
}
}
false
}

fn check_stop_gap<K>(stop_gap: u32, empty_scripts_counter: &BTreeMap<K, u32>) -> Option<Vec<K>>
where
K: Ord + Clone + Debug,
{
let keychains = empty_scripts_counter
.iter()
.filter(|(_, counter)| **counter < stop_gap)
.map(|(k, _)| k.clone())
.collect::<Vec<_>>();
if keychains.is_empty() {
None
} else {
Some(keychains)
}
}

pub fn submit_transaction(&self, tx: Transaction) -> Result<(), Error> {
self.handle.submit_transaction(tx)?;
Ok(())
}

pub fn shutdown(self) -> Result<(), Error>{
self.handle.shutdown()?;
Ok(())
}
}

fn to_confirmation_height_anchor(blockid: BlockId) -> ConfirmationHeightAnchor {
ConfirmationHeightAnchor {
anchor_block: blockid,
confirmation_height: blockid.height,
}
}

0 comments on commit 5dd5d3d

Please sign in to comment.