Skip to content

Commit

Permalink
feat(esplora)!: simplify chain update logic
Browse files Browse the repository at this point in the history
Co-authored-by: LLFourn <lloyd.fourn@gmail.com>
  • Loading branch information
evanlinjin and LLFourn committed Apr 16, 2024
1 parent 954b6b5 commit 64bc6ca
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 211 deletions.
3 changes: 1 addition & 2 deletions crates/chain/src/local_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,7 @@ impl CheckPoint {
cp = cp.prev().expect("will break before genesis block");
};

base
.extend(core::iter::once(block_id).chain(tail.into_iter().rev()))
base.extend(core::iter::once(block_id).chain(tail.into_iter().rev()))
.expect("tail is in order")
}
}
Expand Down
188 changes: 93 additions & 95 deletions crates/esplora/src/async_ext.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::collections::BTreeSet;

use async_trait::async_trait;
use bdk_chain::collections::btree_map;
use bdk_chain::Anchor;
use bdk_chain::{
bitcoin::{BlockHash, OutPoint, ScriptBuf, TxOut, Txid},
Expand Down Expand Up @@ -97,11 +96,11 @@ impl EsploraAsyncExt for esplora_client::AsyncClient {
stop_gap: usize,
parallel_requests: usize,
) -> Result<FullScanUpdate<K>, Error> {
let update_blocks = init_chain_update(self, &local_tip).await?;
let latest_blocks = fetch_latest_blocks(self).await?;
let (tx_graph, last_active_indices) =
full_scan_for_index_and_graph(self, keychain_spks, stop_gap, parallel_requests).await?;
let local_chain =
finalize_chain_update(self, &local_tip, tx_graph.all_anchors(), update_blocks).await?;
chain_update(self, &latest_blocks, &local_tip, tx_graph.all_anchors()).await?;
Ok(FullScanUpdate {
local_chain,
tx_graph,
Expand All @@ -117,124 +116,116 @@ impl EsploraAsyncExt for esplora_client::AsyncClient {
outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
parallel_requests: usize,
) -> Result<SyncUpdate, Error> {
let update_blocks = init_chain_update(self, &local_tip).await?;
let latest_blocks = fetch_latest_blocks(self).await?;
let tx_graph =
sync_for_index_and_graph(self, misc_spks, txids, outpoints, parallel_requests).await?;
let local_chain =
finalize_chain_update(self, &local_tip, tx_graph.all_anchors(), update_blocks).await?;
chain_update(self, &latest_blocks, &local_tip, tx_graph.all_anchors()).await?;
Ok(SyncUpdate {
tx_graph,
local_chain,
})
}
}

/// Create the initial chain update.
/// Fetch latest blocks from Esplora in an atomic call.
///
/// This atomically fetches the latest blocks from Esplora and additional blocks to ensure the
/// update can connect to the `start_tip`.
///
/// We want to do this before fetching transactions and anchors as we cannot fetch latest blocks and
/// We want to do this before fetching transactions and anchors as we cannot fetch latest blocks AND
/// transactions atomically, and the checkpoint tip is used to determine last-scanned block (for
/// block-based chain-sources). Therefore it's better to be conservative when setting the tip (use
/// an earlier tip rather than a later tip) otherwise the caller may accidentally skip blocks when
/// alternating between chain-sources.
async fn init_chain_update(
async fn fetch_latest_blocks(
client: &esplora_client::AsyncClient,
local_tip: &CheckPoint,
) -> Result<BTreeMap<u32, BlockHash>, Error> {
// Fetch latest N (server dependent) blocks from Esplora. The server guarantees these are
// consistent.
let mut fetched_blocks = client
Ok(client
.get_blocks(None)
.await?
.into_iter()
.map(|b| (b.time.height, b.id))
.collect::<BTreeMap<u32, BlockHash>>();
let new_tip_height = fetched_blocks
.keys()
.last()
.copied()
.expect("must atleast have one block");

// Ensure `fetched_blocks` can create an update that connects with the original chain by
// finding a "Point of Agreement".
for (height, local_hash) in local_tip.iter().map(|cp| (cp.height(), cp.hash())) {
if height > new_tip_height {
continue;
}
.collect())
}

let fetched_hash = match fetched_blocks.entry(height) {
btree_map::Entry::Occupied(entry) => *entry.get(),
btree_map::Entry::Vacant(entry) => *entry.insert(client.get_block_hash(height).await?),
};
/// Used instead of [`esplora_client::BlockingClient::get_block_hash`].
///
/// This first checks the previously fetched `latest_blocks` before fetching from Esplora again.
async fn fetch_block(
client: &esplora_client::AsyncClient,
latest_blocks: &BTreeMap<u32, BlockHash>,
height: u32,
) -> Result<Option<BlockHash>, Error> {
if let Some(&hash) = latest_blocks.get(&height) {
return Ok(Some(hash));
}

// We have found point of agreement so the update will connect!
if fetched_hash == local_hash {
break;
}
// We avoid fetching blocks higher than previously fetched `latest_blocks` as the local chain
// tip is used to signal for the last-synced-up-to-height.
let (&tip_height, _) = latest_blocks
.last_key_value()
.expect("must have atleast one entry");
if height > tip_height {
return Ok(None);
}

Ok(fetched_blocks)
Ok(Some(client.get_block_hash(height).await?))
}

/// Fetches missing checkpoints and finalizes the [`local_chain::Update`].
/// Create the [`local_chain::Update`].
///
/// A checkpoint is considered "missing" if an anchor (of `anchors`) points to a height without an
/// existing checkpoint/block under `local_tip` or `update_blocks`.
async fn finalize_chain_update<A: Anchor>(
/// We want to have a corresponding checkpoint per anchor height. However, checkpoints fetched
/// should not surpass `latest_blocks`.
async fn chain_update<A: Anchor>(
client: &esplora_client::AsyncClient,
latest_blocks: &BTreeMap<u32, BlockHash>,
local_tip: &CheckPoint,
anchors: &BTreeSet<(A, Txid)>,
mut update_blocks: BTreeMap<u32, BlockHash>,
) -> Result<local_chain::Update, Error> {
let update_tip_height = update_blocks
.keys()
.last()
.copied()
.expect("must atleast have one block");

// We want to have a corresponding checkpoint per height. We iterate the heights of anchors
// backwards, comparing it against our `local_tip`'s chain and our current set of
// `update_blocks` to see if a corresponding checkpoint already exists.
let anchor_heights = anchors
.iter()
.rev()
.map(|(a, _)| a.anchor_block().height)
// filter out heights that surpass the update tip
.filter(|h| *h <= update_tip_height)
// filter out duplicate heights
.filter({
let mut prev_height = Option::<u32>::None;
move |h| match prev_height.replace(*h) {
None => true,
Some(prev_h) => prev_h != *h,
}
});
let mut point_of_agreement = None;
let mut conflicts = vec![];
for local_cp in local_tip.iter() {
let remote_hash = match fetch_block(client, latest_blocks, local_cp.height()).await? {
Some(hash) => hash,
None => continue,
};
if remote_hash == local_cp.hash() {
point_of_agreement = Some(local_cp.clone());
break;
} else {
// it is not strictly necessary to include all the conflicted heights (we do need the
// first one) but it seems prudent to make sure the updated chain's heights are a
// superset of the existing chain after update.
conflicts.push(BlockId {
height: local_cp.height(),
hash: remote_hash,
});
}
}

// We keep track of a checkpoint node of `local_tip` to make traversing the linked-list of
// checkpoints more efficient.
let mut curr_cp = local_tip.clone();
let mut tip = point_of_agreement.expect("remote esplora should have same genesis block");

for h in anchor_heights {
if let Some(cp) = curr_cp.range(h..).last() {
curr_cp = cp.clone();
if cp.height() == h {
continue;
}
}
if let btree_map::Entry::Vacant(entry) = update_blocks.entry(h) {
entry.insert(client.get_block_hash(h).await?);
tip = tip
.extend(conflicts.into_iter().rev())
.expect("evicted are in order");

for anchor in anchors {
let height = anchor.0.anchor_block().height;
if tip.get(height).is_none() {
let hash = match fetch_block(client, latest_blocks, height).await? {
Some(hash) => hash,
None => continue,
};
tip = tip.insert(BlockId { height, hash });
}
}

// insert the most recent blocks at the tip to make sure we update the tip and make the update
// robust.
for (&height, &hash) in latest_blocks.iter() {
tip = tip.insert(BlockId { height, hash });
}

Ok(local_chain::Update {
tip: CheckPoint::from_block_ids(
update_blocks
.into_iter()
.map(|(height, hash)| BlockId { height, hash }),
)
.expect("must be in order"),
tip,
introduce_older_blocks: true,
})
}
Expand Down Expand Up @@ -424,7 +415,7 @@ mod test {
use electrsd::bitcoind::bitcoincore_rpc::RpcApi;
use esplora_client::Builder;

use crate::async_ext::{finalize_chain_update, init_chain_update};
use crate::async_ext::{chain_update, fetch_latest_blocks};

macro_rules! h {
($index:literal) => {{
Expand Down Expand Up @@ -493,9 +484,8 @@ mod test {
// craft initial `local_chain`
let local_chain = {
let (mut chain, _) = LocalChain::from_genesis_hash(env.genesis_hash()?);
let chain_tip = chain.tip();
let update_blocks = init_chain_update(&client, &chain_tip).await?;
let update_anchors = t
// force `chain_update_blocking` to add all checkpoints in `t.initial_cps`
let anchors = t
.initial_cps
.iter()
.map(|&height| -> anyhow::Result<_> {
Expand All @@ -508,10 +498,14 @@ mod test {
))
})
.collect::<anyhow::Result<BTreeSet<_>>>()?;
let chain_update =
finalize_chain_update(&client, &chain_tip, &update_anchors, update_blocks)
.await?;
chain.apply_update(chain_update)?;
let update = chain_update(
&client,
&fetch_latest_blocks(&client).await?,
&chain.tip(),
&anchors,
)
.await?;
chain.apply_update(update)?;
chain
};
println!("local chain height: {}", local_chain.tip().height());
Expand All @@ -529,9 +523,7 @@ mod test {

// craft update
let update = {
let local_tip = local_chain.tip();
let update_blocks = init_chain_update(&client, &local_tip).await?;
let update_anchors = t
let anchors = t
.anchors
.iter()
.map(|&(height, txid)| -> anyhow::Result<_> {
Expand All @@ -544,7 +536,13 @@ mod test {
))
})
.collect::<anyhow::Result<_>>()?;
finalize_chain_update(&client, &local_tip, &update_anchors, update_blocks).await?
chain_update(
&client,
&fetch_latest_blocks(&client).await?,
&local_chain.tip(),
&anchors,
)
.await?
};

// apply update
Expand Down

0 comments on commit 64bc6ca

Please sign in to comment.