From 7c3a63569397ade4f930db6391729334a7c41f0a Mon Sep 17 00:00:00 2001 From: Alexander Shevtsov Date: Wed, 1 Apr 2026 12:14:58 +0200 Subject: [PATCH 1/3] Add test to capture wallet checkpoint push problems --- src/chain/cbf.rs | 74 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/src/chain/cbf.rs b/src/chain/cbf.rs index bd8f36691..c3048d495 100644 --- a/src/chain/cbf.rs +++ b/src/chain/cbf.rs @@ -1396,4 +1396,78 @@ mod tests { } } } + + /// Test that checkpoint building from `recent_history` handles reorgs. + /// + /// Scenario: wallet synced to height 103. A 3-block reorg replaces blocks + /// 101-103 with new ones, and `recent_history` returns {97..=106} with + /// new hashes at heights 101-103. + /// + /// The checkpoint must reflect the reorged chain: new hashes at 101-103, + /// pre-reorg blocks at ≤100 preserved, new blocks 104-106 present. + #[test] + fn checkpoint_building_handles_reorg() { + use bdk_chain::local_chain::LocalChain; + use bdk_chain::{BlockId, CheckPoint}; + use bitcoin::BlockHash; + use std::collections::BTreeMap; + + fn hash(seed: u32) -> BlockHash { + use bitcoin::hashes::{sha256d, Hash, HashEngine}; + let mut engine = sha256d::Hash::engine(); + engine.input(&seed.to_le_bytes()); + BlockHash::from_raw_hash(sha256d::Hash::from_engine(engine)) + } + + let genesis = BlockId { height: 0, hash: hash(0) }; + + // Wallet checkpoint: 0 → 100 → 101 → 102 → 103 + let wallet_cp = CheckPoint::from_block_ids([ + genesis, + BlockId { height: 100, hash: hash(100) }, + BlockId { height: 101, hash: hash(101) }, + BlockId { height: 102, hash: hash(102) }, + BlockId { height: 103, hash: hash(103) }, + ]) + .unwrap(); + + // recent_history after reorg: 97-106, heights 101-103 have NEW hashes. + let recent_history: BTreeMap = (97..=106) + .map(|h| { + let seed = if (101..=103).contains(&h) { h + 1000 } else { h }; + (h, hash(seed)) + }) + .collect(); + + // Build checkpoint using the same logic as sync_onchain_wallet. + let mut cp = wallet_cp; + for (height, block_hash) in &recent_history { + if *height > cp.height() { + let block_id = BlockId { height: *height, hash: *block_hash }; + cp = cp.push(block_id).unwrap_or_else(|old| old); + } + } + + // Reorged blocks must have the NEW hashes. + assert_eq!(cp.height(), 106); + assert_eq!( + cp.get(101).expect("height 101 must exist").hash(), + hash(1101), + "block 101 must have the reorged hash" + ); + assert_eq!(cp.get(102).expect("height 102 must exist").hash(), hash(1102)); + assert_eq!(cp.get(103).expect("height 103 must exist").hash(), hash(1103)); + + // Pre-reorg blocks are preserved. + assert_eq!(cp.get(100).expect("height 100 must exist").hash(), hash(100)); + + // New blocks above the reorg are present. + assert!(cp.get(104).is_some()); + assert!(cp.get(105).is_some()); + assert!(cp.get(106).is_some()); + + // The checkpoint must connect cleanly to a LocalChain. + let (mut chain, _) = LocalChain::from_genesis_hash(genesis.hash); + chain.apply_update(cp).expect("checkpoint must connect to chain"); + } } From 8261e32bc162560952f882a7551c531ac468b10d Mon Sep 17 00:00:00 2001 From: Alexander Shevtsov Date: Tue, 21 Apr 2026 00:31:45 +0200 Subject: [PATCH 2/3] fix(cbf): use CheckPoint::insert for reorg-aware wallet sync `push` only appends above the tip, so when `recent_history` contained blocks at or below the wallet's current checkpoint height after a reorg, the stale hashes on the wallet checkpoint were never replaced. Switch to `CheckPoint::insert`, which detects conflicting hashes and purges stale blocks, matching bdk-kyoto's `UpdateBuilder::apply_chain_event`. Also clear `latest_tip` on `BlockHeaderChanges::Reorganized` so cached tip state does not point at an abandoned chain. Update the `checkpoint_building_handles_reorg` unit test (added in c1844b3) to exercise the fixed behaviour: a reorg where the new tip is at the same height as the wallet's checkpoint must still result in the reorged hashes winning. Disclosure: drafted with assistance from Claude Code. Co-Authored-By: Claude Opus 4.7 --- src/chain/cbf.rs | 41 ++++++++++++++------------------- tests/integration_tests_rust.rs | 6 ++--- 2 files changed, 20 insertions(+), 27 deletions(-) diff --git a/src/chain/cbf.rs b/src/chain/cbf.rs index c3048d495..f7c1b109a 100644 --- a/src/chain/cbf.rs +++ b/src/chain/cbf.rs @@ -448,6 +448,7 @@ impl CbfChainSource { reorganized.len(), accepted.len(), ); + *state.latest_tip.lock().unwrap() = None; // No height reset needed: skip heights are derived from // BDK's checkpoint (on-chain) and LDK's best block @@ -609,19 +610,18 @@ impl CbfChainSource { }, }; - // Build chain checkpoint extending from the wallet's current tip. + // Build chain checkpoint extending from the wallet's current tip, + // using `insert` (not `push`) so that reorgs are handled correctly. + // `insert` detects conflicting hashes and purges stale blocks, + // matching bdk-kyoto's approach in `UpdateBuilder::apply_chain_event`. let mut cp = onchain_wallet.latest_checkpoint(); for (height, header) in sync_update.recent_history() { - if *height > cp.height() { - let block_id = BlockId { height: *height, hash: header.block_hash() }; - cp = cp.push(block_id).unwrap_or_else(|old| old); - } + let block_id = BlockId { height: *height, hash: header.block_hash() }; + cp = cp.insert(block_id); } let tip = sync_update.tip(); - if tip.height > cp.height() { - let tip_block_id = BlockId { height: tip.height, hash: tip.hash }; - cp = cp.push(tip_block_id).unwrap_or_else(|old| old); - } + let tip_block_id = BlockId { height: tip.height, hash: tip.hash }; + cp = cp.insert(tip_block_id); let update = Update { last_active_indices: BTreeMap::new(), tx_update, chain: Some(cp) }; @@ -1400,11 +1400,11 @@ mod tests { /// Test that checkpoint building from `recent_history` handles reorgs. /// /// Scenario: wallet synced to height 103. A 3-block reorg replaces blocks - /// 101-103 with new ones, and `recent_history` returns {97..=106} with - /// new hashes at heights 101-103. + /// 101-103 with new ones (same tip height). `recent_history` returns + /// {94..=103} (last 10 blocks ending at tip) with new hashes at 101-103. /// /// The checkpoint must reflect the reorged chain: new hashes at 101-103, - /// pre-reorg blocks at ≤100 preserved, new blocks 104-106 present. + /// pre-reorg blocks at ≤100 preserved. #[test] fn checkpoint_building_handles_reorg() { use bdk_chain::local_chain::LocalChain; @@ -1431,8 +1431,8 @@ mod tests { ]) .unwrap(); - // recent_history after reorg: 97-106, heights 101-103 have NEW hashes. - let recent_history: BTreeMap = (97..=106) + // recent_history after reorg: 94-103, heights 101-103 have NEW hashes. + let recent_history: BTreeMap = (94..=103) .map(|h| { let seed = if (101..=103).contains(&h) { h + 1000 } else { h }; (h, hash(seed)) @@ -1442,14 +1442,12 @@ mod tests { // Build checkpoint using the same logic as sync_onchain_wallet. let mut cp = wallet_cp; for (height, block_hash) in &recent_history { - if *height > cp.height() { - let block_id = BlockId { height: *height, hash: *block_hash }; - cp = cp.push(block_id).unwrap_or_else(|old| old); - } + let block_id = BlockId { height: *height, hash: *block_hash }; + cp = cp.insert(block_id); } // Reorged blocks must have the NEW hashes. - assert_eq!(cp.height(), 106); + assert_eq!(cp.height(), 103); assert_eq!( cp.get(101).expect("height 101 must exist").hash(), hash(1101), @@ -1461,11 +1459,6 @@ mod tests { // Pre-reorg blocks are preserved. assert_eq!(cp.get(100).expect("height 100 must exist").hash(), hash(100)); - // New blocks above the reorg are present. - assert!(cp.get(104).is_some()); - assert!(cp.get(105).is_some()); - assert!(cp.get(106).is_some()); - // The checkpoint must connect cleanly to a LocalChain. let (mut chain, _) = LocalChain::from_genesis_hash(genesis.hash); chain.apply_update(cp).expect("checkpoint must connect to chain"); diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 6a3c3c70f..6ef3a2eef 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -23,9 +23,9 @@ use common::{ expect_payment_successful_event, expect_splice_pending_event, generate_blocks_and_wait, generate_listening_addresses, open_channel, open_channel_push_amt, open_channel_with_all, premine_and_distribute_funds, premine_blocks, prepare_rbf, random_chain_source, random_config, - random_listening_addresses, setup_bitcoind_and_electrsd, setup_builder, setup_node, - setup_two_nodes, skip_if_cbf, splice_in_with_all, wait_for_cbf_sync, wait_for_tx, - TestChainSource, TestStoreType, TestSyncStore, + setup_bitcoind_and_electrsd, setup_builder, setup_node, setup_two_nodes, skip_if_cbf, + splice_in_with_all, wait_for_cbf_sync, wait_for_tx, TestChainSource, TestStoreType, + TestSyncStore, }; use electrsd::corepc_node::Node as BitcoinD; use electrsd::ElectrsD; From a70d6ece18f580845bde260f27905f8bf289e3c5 Mon Sep 17 00:00:00 2001 From: Alexander Shevtsov Date: Tue, 21 Apr 2026 02:09:41 +0200 Subject: [PATCH 3/3] remove latest_tip from the cbf chain source --- src/chain/cbf.rs | 81 +++++++++++++++++++++--------------------------- 1 file changed, 35 insertions(+), 46 deletions(-) diff --git a/src/chain/cbf.rs b/src/chain/cbf.rs index f7c1b109a..8fa839cb1 100644 --- a/src/chain/cbf.rs +++ b/src/chain/cbf.rs @@ -80,8 +80,6 @@ pub(super) struct CbfChainSource { fee_source: FeeSource, /// Tracks whether the bip157 node is running and holds the command handle. cbf_runtime_status: Arc>, - /// Latest chain tip hash, updated by the background event processing task. - latest_tip: Arc>>, /// Scripts to match against compact block filters during a scan. watched_scripts: Arc>>, /// Block (height, hash) pairs where filters matched watched scripts. @@ -119,7 +117,6 @@ enum CbfRuntimeStatus { /// Shared state passed to the background event processing task. struct CbfEventState { - latest_tip: Arc>>, watched_scripts: Arc>>, matched_block_hashes: Arc>>, sync_completion_tx: Arc>>>, @@ -148,7 +145,6 @@ impl CbfChainSource { }; let cbf_runtime_status = Arc::new(Mutex::new(CbfRuntimeStatus::Stopped)); - let latest_tip = Arc::new(Mutex::new(None)); let watched_scripts = Arc::new(RwLock::new(Vec::new())); let matched_block_hashes = Arc::new(Mutex::new(Vec::new())); let sync_completion_tx = Arc::new(Mutex::new(None)); @@ -163,7 +159,6 @@ impl CbfChainSource { sync_config, fee_source, cbf_runtime_status, - latest_tip, watched_scripts, matched_block_hashes, sync_completion_tx, @@ -288,7 +283,6 @@ impl CbfChainSource { // block is 'static (no borrows of `self`). let restart_status = Arc::clone(&self.cbf_runtime_status); let restart_logger = Arc::clone(&self.logger); - let restart_latest_tip = Arc::clone(&self.latest_tip); let restart_watched_scripts = Arc::clone(&self.watched_scripts); let restart_matched_block_hashes = Arc::clone(&self.matched_block_hashes); let restart_sync_completion_tx = Arc::clone(&self.sync_completion_tx); @@ -317,7 +311,6 @@ impl CbfChainSource { Arc::clone(&restart_logger), )); let event_state = CbfEventState { - latest_tip: Arc::clone(&restart_latest_tip), watched_scripts: Arc::clone(&restart_watched_scripts), matched_block_hashes: Arc::clone(&restart_matched_block_hashes), sync_completion_tx: Arc::clone(&restart_sync_completion_tx), @@ -428,7 +421,6 @@ impl CbfChainSource { match event { Event::FiltersSynced(sync_update) => { let tip = sync_update.tip(); - *state.latest_tip.lock().unwrap() = Some(tip.hash); log_info!( logger, "CBF filters synced to tip: height={}, hash={}", @@ -448,7 +440,6 @@ impl CbfChainSource { reorganized.len(), accepted.len(), ); - *state.latest_tip.lock().unwrap() = None; // No height reset needed: skip heights are derived from // BDK's checkpoint (on-chain) and LDK's best block @@ -861,34 +852,57 @@ impl CbfChainSource { /// Derive per-target fee rates from recent blocks' coinbase outputs. /// - /// Returns `Ok(None)` when no chain tip is available yet (first startup before sync). + /// Returns `Ok(None)` when the chain is too short to sample `FEE_RATE_LOOKBACK_BLOCKS` + /// blocks (e.g. kyoto has not yet synced past the genesis region). async fn fee_rate_cache_from_cbf( &self, ) -> Result>, Error> { let requester = self.requester()?; - let tip_hash = match *self.latest_tip.lock().unwrap() { - Some(hash) => hash, - None => { - log_debug!(self.logger, "No tip available yet for fee rate estimation, skipping."); + let timeout = Duration::from_secs( + self.sync_config.timeouts_config.fee_rate_cache_update_timeout_secs, + ); + let fetch_start = Instant::now(); + + // Ask kyoto for its current chain tip rather than maintaining a mirrored + // cache: the returned hash is always fresh (post-reorg, post-restart), + // so no defensive invalidation is needed below. + let tip = match tokio::time::timeout(timeout, requester.chain_tip()).await { + Ok(Ok(tip)) => tip, + Ok(Err(e)) => { + log_debug!( + self.logger, + "Failed to fetch CBF chain tip for fee estimation: {:?}", + e, + ); return Ok(None); }, + Err(e) => { + log_error!(self.logger, "Timed out fetching CBF chain tip: {}", e); + return Err(Error::FeerateEstimationUpdateTimeout); + }, }; + if (tip.height as usize) < FEE_RATE_LOOKBACK_BLOCKS { + log_debug!( + self.logger, + "CBF chain tip at height {} is below the {}-block lookback window, \ + skipping fee estimation.", + tip.height, + FEE_RATE_LOOKBACK_BLOCKS, + ); + return Ok(None); + } + let now = Instant::now(); // Fetch fee rates from the last N blocks for per-target estimation. // We compute fee rates ourselves rather than using Requester::average_fee_rate, // so we can sample multiple blocks and select percentiles per confirmation target. let mut block_fee_rates: Vec = Vec::with_capacity(FEE_RATE_LOOKBACK_BLOCKS); - let mut current_hash = tip_hash; + let mut current_hash = tip.hash; - let timeout = Duration::from_secs( - self.sync_config.timeouts_config.fee_rate_cache_update_timeout_secs, - ); - let fetch_start = Instant::now(); - - for idx in 0..FEE_RATE_LOOKBACK_BLOCKS { + for _ in 0..FEE_RATE_LOOKBACK_BLOCKS { // Check if we've exceeded the overall timeout for fee estimation. let remaining_timeout = timeout.saturating_sub(fetch_start.elapsed()); if remaining_timeout.is_zero() { @@ -896,25 +910,11 @@ impl CbfChainSource { return Err(Error::FeerateEstimationUpdateTimeout); } - // Fetch the block via P2P. On the first iteration, a fetch failure - // likely means the cached tip is stale (initial sync or reorg), so - // we clear the tip and skip gracefully instead of returning an error. let indexed_block = match tokio::time::timeout(remaining_timeout, requester.get_block(current_hash)) .await { Ok(Ok(indexed_block)) => indexed_block, - Ok(Err(e)) if idx == 0 => { - log_debug!( - self.logger, - "Cached CBF tip {} was unavailable during fee estimation, \ - likely due to initial sync or a reorg: {:?}", - current_hash, - e - ); - *self.latest_tip.lock().unwrap() = None; - return Ok(None); - }, Ok(Err(e)) => { log_error!( self.logger, @@ -923,17 +923,6 @@ impl CbfChainSource { ); return Err(Error::FeerateEstimationUpdateFailed); }, - Err(e) if idx == 0 => { - log_debug!( - self.logger, - "Timed out fetching cached CBF tip {} during fee estimation, \ - likely due to initial sync or a reorg: {}", - current_hash, - e - ); - *self.latest_tip.lock().unwrap() = None; - return Ok(None); - }, Err(e) => { log_error!(self.logger, "Updating fee rate estimates timed out: {}", e); return Err(Error::FeerateEstimationUpdateTimeout);