Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 110 additions & 54 deletions src/chain/cbf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,6 @@ pub(super) struct CbfChainSource {
fee_source: FeeSource,
/// Tracks whether the bip157 node is running and holds the command handle.
cbf_runtime_status: Arc<Mutex<CbfRuntimeStatus>>,
/// Latest chain tip hash, updated by the background event processing task.
latest_tip: Arc<Mutex<Option<BlockHash>>>,
/// Scripts to match against compact block filters during a scan.
watched_scripts: Arc<RwLock<Vec<ScriptBuf>>>,
/// Block (height, hash) pairs where filters matched watched scripts.
Expand Down Expand Up @@ -119,7 +117,6 @@ enum CbfRuntimeStatus {

/// Shared state passed to the background event processing task.
struct CbfEventState {
latest_tip: Arc<Mutex<Option<BlockHash>>>,
watched_scripts: Arc<RwLock<Vec<ScriptBuf>>>,
matched_block_hashes: Arc<Mutex<Vec<(u32, BlockHash)>>>,
sync_completion_tx: Arc<Mutex<Option<oneshot::Sender<SyncUpdate>>>>,
Expand Down Expand Up @@ -148,7 +145,6 @@ impl CbfChainSource {
};

let cbf_runtime_status = Arc::new(Mutex::new(CbfRuntimeStatus::Stopped));
let latest_tip = Arc::new(Mutex::new(None));
let watched_scripts = Arc::new(RwLock::new(Vec::new()));
let matched_block_hashes = Arc::new(Mutex::new(Vec::new()));
let sync_completion_tx = Arc::new(Mutex::new(None));
Expand All @@ -163,7 +159,6 @@ impl CbfChainSource {
sync_config,
fee_source,
cbf_runtime_status,
latest_tip,
watched_scripts,
matched_block_hashes,
sync_completion_tx,
Expand Down Expand Up @@ -288,7 +283,6 @@ impl CbfChainSource {
// block is 'static (no borrows of `self`).
let restart_status = Arc::clone(&self.cbf_runtime_status);
let restart_logger = Arc::clone(&self.logger);
let restart_latest_tip = Arc::clone(&self.latest_tip);
let restart_watched_scripts = Arc::clone(&self.watched_scripts);
let restart_matched_block_hashes = Arc::clone(&self.matched_block_hashes);
let restart_sync_completion_tx = Arc::clone(&self.sync_completion_tx);
Expand Down Expand Up @@ -317,7 +311,6 @@ impl CbfChainSource {
Arc::clone(&restart_logger),
));
let event_state = CbfEventState {
latest_tip: Arc::clone(&restart_latest_tip),
watched_scripts: Arc::clone(&restart_watched_scripts),
matched_block_hashes: Arc::clone(&restart_matched_block_hashes),
sync_completion_tx: Arc::clone(&restart_sync_completion_tx),
Expand Down Expand Up @@ -428,7 +421,6 @@ impl CbfChainSource {
match event {
Event::FiltersSynced(sync_update) => {
let tip = sync_update.tip();
*state.latest_tip.lock().unwrap() = Some(tip.hash);
log_info!(
logger,
"CBF filters synced to tip: height={}, hash={}",
Expand Down Expand Up @@ -609,19 +601,18 @@ impl CbfChainSource {
},
};

// Build chain checkpoint extending from the wallet's current tip.
// Build chain checkpoint extending from the wallet's current tip,
// using `insert` (not `push`) so that reorgs are handled correctly.
// `insert` detects conflicting hashes and purges stale blocks,
// matching bdk-kyoto's approach in `UpdateBuilder::apply_chain_event`.
let mut cp = onchain_wallet.latest_checkpoint();
for (height, header) in sync_update.recent_history() {
if *height > cp.height() {
let block_id = BlockId { height: *height, hash: header.block_hash() };
cp = cp.push(block_id).unwrap_or_else(|old| old);
}
let block_id = BlockId { height: *height, hash: header.block_hash() };
cp = cp.insert(block_id);
}
let tip = sync_update.tip();
if tip.height > cp.height() {
let tip_block_id = BlockId { height: tip.height, hash: tip.hash };
cp = cp.push(tip_block_id).unwrap_or_else(|old| old);
}
let tip_block_id = BlockId { height: tip.height, hash: tip.hash };
cp = cp.insert(tip_block_id);

let update =
Update { last_active_indices: BTreeMap::new(), tx_update, chain: Some(cp) };
Expand Down Expand Up @@ -861,60 +852,69 @@ impl CbfChainSource {

/// Derive per-target fee rates from recent blocks' coinbase outputs.
///
/// Returns `Ok(None)` when no chain tip is available yet (first startup before sync).
/// Returns `Ok(None)` when the chain is too short to sample `FEE_RATE_LOOKBACK_BLOCKS`
/// blocks (e.g. kyoto has not yet synced past the genesis region).
async fn fee_rate_cache_from_cbf(
&self,
) -> Result<Option<HashMap<crate::fee_estimator::ConfirmationTarget, FeeRate>>, Error> {
let requester = self.requester()?;

let tip_hash = match *self.latest_tip.lock().unwrap() {
Some(hash) => hash,
None => {
log_debug!(self.logger, "No tip available yet for fee rate estimation, skipping.");
let timeout = Duration::from_secs(
self.sync_config.timeouts_config.fee_rate_cache_update_timeout_secs,
);
let fetch_start = Instant::now();

// Ask kyoto for its current chain tip rather than maintaining a mirrored
// cache: the returned hash is always fresh (post-reorg, post-restart),
// so no defensive invalidation is needed below.
let tip = match tokio::time::timeout(timeout, requester.chain_tip()).await {
Ok(Ok(tip)) => tip,
Ok(Err(e)) => {
log_debug!(
self.logger,
"Failed to fetch CBF chain tip for fee estimation: {:?}",
e,
);
return Ok(None);
},
Err(e) => {
log_error!(self.logger, "Timed out fetching CBF chain tip: {}", e);
return Err(Error::FeerateEstimationUpdateTimeout);
},
};

if (tip.height as usize) < FEE_RATE_LOOKBACK_BLOCKS {
log_debug!(
self.logger,
"CBF chain tip at height {} is below the {}-block lookback window, \
skipping fee estimation.",
tip.height,
FEE_RATE_LOOKBACK_BLOCKS,
);
return Ok(None);
}

let now = Instant::now();

// Fetch fee rates from the last N blocks for per-target estimation.
// We compute fee rates ourselves rather than using Requester::average_fee_rate,
// so we can sample multiple blocks and select percentiles per confirmation target.
let mut block_fee_rates: Vec<u64> = Vec::with_capacity(FEE_RATE_LOOKBACK_BLOCKS);
let mut current_hash = tip_hash;
let mut current_hash = tip.hash;

let timeout = Duration::from_secs(
self.sync_config.timeouts_config.fee_rate_cache_update_timeout_secs,
);
let fetch_start = Instant::now();

for idx in 0..FEE_RATE_LOOKBACK_BLOCKS {
for _ in 0..FEE_RATE_LOOKBACK_BLOCKS {
// Check if we've exceeded the overall timeout for fee estimation.
let remaining_timeout = timeout.saturating_sub(fetch_start.elapsed());
if remaining_timeout.is_zero() {
log_error!(self.logger, "Updating fee rate estimates timed out.");
return Err(Error::FeerateEstimationUpdateTimeout);
}

// Fetch the block via P2P. On the first iteration, a fetch failure
// likely means the cached tip is stale (initial sync or reorg), so
// we clear the tip and skip gracefully instead of returning an error.
let indexed_block =
match tokio::time::timeout(remaining_timeout, requester.get_block(current_hash))
.await
{
Ok(Ok(indexed_block)) => indexed_block,
Ok(Err(e)) if idx == 0 => {
log_debug!(
self.logger,
"Cached CBF tip {} was unavailable during fee estimation, \
likely due to initial sync or a reorg: {:?}",
current_hash,
e
);
*self.latest_tip.lock().unwrap() = None;
return Ok(None);
},
Ok(Err(e)) => {
log_error!(
self.logger,
Expand All @@ -923,17 +923,6 @@ impl CbfChainSource {
);
return Err(Error::FeerateEstimationUpdateFailed);
},
Err(e) if idx == 0 => {
log_debug!(
self.logger,
"Timed out fetching cached CBF tip {} during fee estimation, \
likely due to initial sync or a reorg: {}",
current_hash,
e
);
*self.latest_tip.lock().unwrap() = None;
return Ok(None);
},
Err(e) => {
log_error!(self.logger, "Updating fee rate estimates timed out: {}", e);
return Err(Error::FeerateEstimationUpdateTimeout);
Expand Down Expand Up @@ -1396,4 +1385,71 @@ mod tests {
}
}
}

/// Test that checkpoint building from `recent_history` handles reorgs.
///
/// Scenario: wallet synced to height 103. A 3-block reorg replaces blocks
/// 101-103 with new ones (same tip height). `recent_history` returns
/// {94..=103} (last 10 blocks ending at tip) with new hashes at 101-103.
///
/// The checkpoint must reflect the reorged chain: new hashes at 101-103,
/// pre-reorg blocks at ≤100 preserved.
#[test]
fn checkpoint_building_handles_reorg() {
use bdk_chain::local_chain::LocalChain;
use bdk_chain::{BlockId, CheckPoint};
use bitcoin::BlockHash;
use std::collections::BTreeMap;

fn hash(seed: u32) -> BlockHash {
use bitcoin::hashes::{sha256d, Hash, HashEngine};
let mut engine = sha256d::Hash::engine();
engine.input(&seed.to_le_bytes());
BlockHash::from_raw_hash(sha256d::Hash::from_engine(engine))
}

let genesis = BlockId { height: 0, hash: hash(0) };

// Wallet checkpoint: 0 → 100 → 101 → 102 → 103
let wallet_cp = CheckPoint::from_block_ids([
genesis,
BlockId { height: 100, hash: hash(100) },
BlockId { height: 101, hash: hash(101) },
BlockId { height: 102, hash: hash(102) },
BlockId { height: 103, hash: hash(103) },
])
.unwrap();

// recent_history after reorg: 94-103, heights 101-103 have NEW hashes.
let recent_history: BTreeMap<u32, BlockHash> = (94..=103)
.map(|h| {
let seed = if (101..=103).contains(&h) { h + 1000 } else { h };
(h, hash(seed))
})
.collect();

// Build checkpoint using the same logic as sync_onchain_wallet.
let mut cp = wallet_cp;
for (height, block_hash) in &recent_history {
let block_id = BlockId { height: *height, hash: *block_hash };
cp = cp.insert(block_id);
}

// Reorged blocks must have the NEW hashes.
assert_eq!(cp.height(), 103);
assert_eq!(
cp.get(101).expect("height 101 must exist").hash(),
hash(1101),
"block 101 must have the reorged hash"
);
assert_eq!(cp.get(102).expect("height 102 must exist").hash(), hash(1102));
assert_eq!(cp.get(103).expect("height 103 must exist").hash(), hash(1103));

// Pre-reorg blocks are preserved.
assert_eq!(cp.get(100).expect("height 100 must exist").hash(), hash(100));

// The checkpoint must connect cleanly to a LocalChain.
let (mut chain, _) = LocalChain::from_genesis_hash(genesis.hash);
chain.apply_update(cp).expect("checkpoint must connect to chain");
}
}
6 changes: 3 additions & 3 deletions tests/integration_tests_rust.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ use common::{
expect_payment_successful_event, expect_splice_pending_event, generate_blocks_and_wait,
generate_listening_addresses, open_channel, open_channel_push_amt, open_channel_with_all,
premine_and_distribute_funds, premine_blocks, prepare_rbf, random_chain_source, random_config,
random_listening_addresses, setup_bitcoind_and_electrsd, setup_builder, setup_node,
setup_two_nodes, skip_if_cbf, splice_in_with_all, wait_for_cbf_sync, wait_for_tx,
TestChainSource, TestStoreType, TestSyncStore,
setup_bitcoind_and_electrsd, setup_builder, setup_node, setup_two_nodes, skip_if_cbf,
splice_in_with_all, wait_for_cbf_sync, wait_for_tx, TestChainSource, TestStoreType,
TestSyncStore,
};
use electrsd::corepc_node::Node as BitcoinD;
use electrsd::ElectrsD;
Expand Down
Loading