Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/new_index/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,13 @@ impl DB {
self.db.put_opt(key, value, &opts).unwrap();
}

pub fn write_batch(&self, batch: rocksdb::WriteBatch) {
let mut opts = rocksdb::WriteOptions::new();
opts.set_sync(true);
opts.disable_wal(false);
self.db.write_opt(batch, &opts).unwrap();
}

pub fn get(&self, key: &[u8]) -> Option<Bytes> {
self.db.get(key).unwrap().map(|v| v.to_vec())
}
Expand Down
263 changes: 238 additions & 25 deletions src/new_index/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use bitcoin::merkle_tree::MerkleBlock;

use crypto::digest::Digest;
use crypto::sha2::Sha256;
use itertools::Itertools;
use rayon::prelude::*;

#[cfg(not(feature = "liquid"))]
Expand Down Expand Up @@ -270,6 +269,149 @@ impl Indexer {
Ok(result)
}

/// Clean up orphaned data using the specific list of removed headers
/// This is much more efficient than scanning the entire database
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it does scan the entire database?

The approach in this PR - iterating the entire H, I, C and A indexes to look for entries with matching heights - seems inefficient to the point of being unfeasible.

The approach I had in mind was to reuse the existing code to 'index' the orphaned blocks, but turn the put operations into deletes. That way we can delete the relevant entries directly by their key, without a full db scan.

fn cleanup_orphaned_data(&self, orphaned_headers: &[HeaderEntry]) -> Result<()> {
if orphaned_headers.is_empty() {
return Ok(());
}

let min_height = orphaned_headers.first().unwrap().height();
let max_height = orphaned_headers.last().unwrap().height();

info!(
"Cleaning up orphaned data for {} blocks (heights {} to {})",
orphaned_headers.len(),
min_height,
max_height
);

// Build HashSet of orphaned blockhashes and heights for O(1) lookup
let orphaned_hashes: HashSet<BlockHash> = orphaned_headers
.iter()
.map(|h| *h.hash())
.collect();

let orphaned_heights: HashSet<usize> = orphaned_headers
.iter()
.map(|h| h.height())
.collect();

self.cleanup_history(&orphaned_heights)?;
self.cleanup_confirmations(&orphaned_hashes)?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The confirmations index could be removed entirely, see this comment

self.cleanup_cache(&orphaned_heights)?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cache already handles reorgs internally by invalidating the cache and recomputing the stats/utxos, there's no need to cleanup anything here.

We could, however, make this more efficient by explicitly undoing the effects of reorged blocks over the stats/utxo cache*, rather than recomputing it from scratch. This could be done separately in a followup PR.

* It will probably no longer be technically accurate to call it a 'cache' once we implement this.


Ok(())
}

/// Clean up history entries for specific orphaned heights
fn cleanup_history(&self, orphaned_heights: &HashSet<usize>) -> Result<()> {
let _timer = self.start_timer("reorg_cleanup_history");
let mut batch = rocksdb::WriteBatch::default();
let mut count = 0;

// Scan history entries (scripthash history with 'H' prefix)
for row in self.store.history_db.iter_scan(&[b'H']) {
let history_row = TxHistoryRow::from_row(row);
let height = history_row.key.confirmed_height as usize;

if orphaned_heights.contains(&height) {
batch.delete(&history_row.into_row().key);
count += 1;
}
}

// Also clean up asset history entries if on Elements/Liquid
#[cfg(feature = "liquid")]
for row in self.store.history_db.iter_scan(&[b'I']) {
let history_row = TxHistoryRow::from_row(row);
let height = history_row.key.confirmed_height as usize;

if orphaned_heights.contains(&height) {
batch.delete(&history_row.into_row().key);
count += 1;
}
}

info!("Deleted {} orphaned history entries", count);
self.store.history_db.write_batch(batch);
Ok(())
}

/// Clean up confirmation entries for specific orphaned blockhashes
fn cleanup_confirmations(&self, orphaned_hashes: &HashSet<BlockHash>) -> Result<()> {
let _timer = self.start_timer("reorg_cleanup_confirmations");
let mut batch = rocksdb::WriteBatch::default();
let mut count = 0;

// Scan confirmation entries (prefix 'C')
for row in self.store.txstore_db.iter_scan(&[b'C']) {
let conf_row = TxConfRow::from_row(row);
let blockhash: BlockHash = deserialize(&conf_row.key.blockhash).unwrap();

if orphaned_hashes.contains(&blockhash) {
batch.delete(&conf_row.into_row().key);
count += 1;
}
}

info!("Deleted {} orphaned confirmation entries", count);
self.store.txstore_db.write_batch(batch);
Ok(())
}

/// Clean up cached data for specific orphaned heights
fn cleanup_cache(&self, orphaned_heights: &HashSet<usize>) -> Result<()> {
let _timer = self.start_timer("reorg_cleanup_cache");
let mut batch = rocksdb::WriteBatch::default();
let mut count = 0;

// Clean up aggregated stats (prefix 'A')
for row in self.store.cache_db.iter_scan(&[b'A']) {
let key = row.key;
// AggStats keys contain height
// The key format is: b'A' + scripthash + height (big-endian u32)
if key.len() >= 37 {
let height_bytes = &key[33..37];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes the code dependent on the exact byte encoding structure for keys, which would break if we ever changed the keys. I would instead deserialize into the db *Key structs and get the height from there.

let height = u32::from_be_bytes([
height_bytes[0],
height_bytes[1],
height_bytes[2],
height_bytes[3]
]) as usize;

if orphaned_heights.contains(&height) {
batch.delete(&key);
count += 1;
}
}
}

// Clean up UTXO sets (prefix 'U')
for row in self.store.cache_db.iter_scan(&[b'U']) {
let key = row.key;
// UTXO keys contain height similarly
if key.len() >= 37 {
let height_bytes = &key[33..37];
let height = u32::from_be_bytes([
height_bytes[0],
height_bytes[1],
height_bytes[2],
height_bytes[3]
]) as usize;

if orphaned_heights.contains(&height) {
batch.delete(&key);
count += 1;
}
}
}

info!("Deleted {} orphaned cache entries", count);
self.store.cache_db.write_batch(batch);
Ok(())
}

pub fn update(&mut self, daemon: &Daemon) -> Result<BlockHash> {
let daemon = daemon.reconnect()?;
let tip = daemon.getbestblockhash()?;
Expand Down Expand Up @@ -304,14 +446,30 @@ impl Indexer {
debug!("updating synced tip to {:?}", tip);
self.store.txstore_db.put_sync(b"t", &serialize(&tip));

let mut headers = self.store.indexed_headers.write().unwrap();
headers.apply(new_headers);
assert_eq!(tip, *headers.tip());
// Apply headers and get any orphaned headers from reorg
let orphaned_headers = {
let mut headers = self.store.indexed_headers.write().unwrap();
let orphaned = headers.apply(new_headers);
assert_eq!(tip, *headers.tip());
orphaned
};

// Cleanup orphaned data AFTER applying headers - no race condition
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a race condition here. Between updating the in-memory HeaderList and removing orphaned data from the db, there could be entries read from the db that point to a block height that doesn't actually confirm the entry's txid. The cleanup should happen BEFORE the new headers are applied to avoid that.

Also, cleaning up orphaned data should happen BEFORE the entries from the new blocks are written. We have to first undo the reorged blocks and only then apply the new ones, otherwise the cleanup could remove entries that were just added by the new blocks (i.e., if the same tx re-confirmed under a different block at the same height).

I believe the correct order would be:

  1. Remove reorged headers from the in-memory HeaderList
  2. Cleanup reorged history entries from the database
  3. Index new history entries to the database
  4. Apply new headers to the in-memory HeaderList

This ordering also makes the API more consistent - it will never return blocks (e.g. in /blocks/tip or /block/:hash) that aren't fully processed and populated in the history db (both for new blocks and reorged blocks).

But it also means that the tip will momentarily drop back to the common ancestor before advancing up to the new tip. Is that acceptable, or is the tip height expected to increase monotonically in the public APIs? (/cc @philippem @RCasatta)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does it do today?

// Orphaned data is now unreachable via the new chain state
if !orphaned_headers.is_empty() {
warn!(
"Blockchain reorganization detected, cleaning up {} orphaned blocks",
orphaned_headers.len()
);
self.cleanup_orphaned_data(&orphaned_headers)?;
info!("Reorg cleanup complete");
}

if let FetchFrom::BlkFiles = self.from {
self.from = FetchFrom::Bitcoind;
}

let headers = self.store.indexed_headers.read().unwrap();
self.tip_metric.set(headers.len() as i64 - 1);

Ok(tip)
Expand Down Expand Up @@ -494,23 +652,43 @@ impl ChainQuery {
limit: usize,
) -> Vec<(Transaction, BlockId)> {
let _timer_scan = self.start_timer("history");
let txs_conf = self

// Acquire header lock once upfront instead of per-txid
let headers = self.store.indexed_headers.read().unwrap();

// Group by txid and use the confirmed_height from the row itself
let mut seen = std::collections::HashSet::new();
let mut found_last_seen = last_seen_txid.is_none();

let txs_conf: Vec<(Txid, BlockId)> = self
.history_iter_scan_reverse(code, hash)
.map(|row| TxHistoryRow::from_row(row).get_txid())
// XXX: unique() requires keeping an in-memory list of all txids, can we avoid that?
.unique()
// TODO seek directly to last seen tx without reading earlier rows
.skip_while(|txid| {
// skip until we reach the last_seen_txid
last_seen_txid.map_or(false, |last_seen_txid| last_seen_txid != txid)
})
.skip(match last_seen_txid {
Some(_) => 1, // skip the last_seen_txid itself
None => 0,
.map(|row| TxHistoryRow::from_row(row))
.filter_map(|row| {
let txid = row.get_txid();
// Only process each txid once
if !seen.insert(txid) {
return None;
}

// Skip until we reach the last_seen_txid
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change the existing skip_while() implementation?

if !found_last_seen {
if Some(&txid) == last_seen_txid {
found_last_seen = true;
}
return None;
}

// Fast path: Use the height from the row (no DB lookup needed)
let height = row.key.confirmed_height as usize;
if let Some(header) = headers.header_by_height(height) {
return Some((txid, BlockId::from(header)));
}

// Slow path fallback: Header not yet indexed or reorged
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite get what the "Slow Path" is supposed to do here?

Header not yet indexed or reorged

If that is the case, tx_confirming_block() wouldn't be able to get it either, since it also uses the same in-memory indexed_headers: HeaderList that the "Fast Path" uses (and that only includes headers that are part of the best chain, so reorged blocks are never available regardless).

But more importantly - if we don't have a corresponding header because new blocks are still being processed or due to a reorg (possible with the ordering proposed here), those db entries should be skipped.

With reorg handling implemented, the correct approach would be to use the "Fast Path" only (skipping over entries without a corresponding header), remove tx_confirming_block() entirely, and drop the C index (txid->blockhash confirmations map) which becomes unnecessary.

self.tx_confirming_block(&txid).map(|b| (txid, b))
})
.filter_map(|txid| self.tx_confirming_block(&txid).map(|b| (txid, b)))
.take(limit)
.collect::<Vec<(Txid, BlockId)>>();
.collect();

self.lookup_txns(&txs_conf)
.expect("failed looking up txs in history index")
Expand All @@ -527,12 +705,36 @@ impl ChainQuery {

fn _history_txids(&self, code: u8, hash: &[u8], limit: usize) -> Vec<(Txid, BlockId)> {
let _timer = self.start_timer("history_txids");
self.history_iter_scan(code, hash, 0)
.map(|row| TxHistoryRow::from_row(row).get_txid())
.unique()
.filter_map(|txid| self.tx_confirming_block(&txid).map(|b| (txid, b)))

// Acquire header lock once upfront instead of per-txid
let headers = self.store.indexed_headers.read().unwrap();

// Group by txid and use the confirmed_height from the row itself
let mut seen = std::collections::HashSet::new();
let result: Vec<(Txid, BlockId)> = self
.history_iter_scan(code, hash, 0)
.map(|row| TxHistoryRow::from_row(row))
.filter_map(|row| {
let txid = row.get_txid();
// Only process each txid once
if !seen.insert(txid) {
return None;
}

// Fast path: Use the height from the row (no DB lookup needed)
let height = row.key.confirmed_height as usize;
if let Some(header) = headers.header_by_height(height) {
return Some((txid, BlockId::from(header)));
}

// Slow path fallback: Header not yet indexed or reorged
// Fall back to old method: lookup by txid in txstore_db
self.tx_confirming_block(&txid).map(|b| (txid, b))
})
.take(limit)
.collect()
.collect();

result
}

// TODO: avoid duplication with stats/stats_delta?
Expand Down Expand Up @@ -604,12 +806,23 @@ impl ChainQuery {
limit: usize,
) -> Result<(UtxoMap, Option<BlockHash>, usize)> {
let _timer = self.start_timer("utxo_delta");

// Acquire header lock once upfront instead of per-transaction
let headers = self.store.indexed_headers.read().unwrap();

let history_iter = self
.history_iter_scan(b'H', scripthash, start_height)
.map(TxHistoryRow::from_row)
.filter_map(|history| {
self.tx_confirming_block(&history.get_txid())
.map(|b| (history, b))
// Fast path: Use the height from the history row (no DB lookup needed)
let height = history.key.confirmed_height as usize;
if let Some(header) = headers.header_by_height(height) {
return Some((history, BlockId::from(header)));
}

// Slow path fallback: Header not yet indexed or reorged
let txid = history.get_txid();
self.tx_confirming_block(&txid).map(|blockid| (history, blockid))
});

let mut utxos = init_utxos;
Expand Down
7 changes: 4 additions & 3 deletions src/util/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ impl HeaderList {
}

#[trace]
pub fn apply(&mut self, new_headers: Vec<HeaderEntry>) {
pub fn apply(&mut self, new_headers: Vec<HeaderEntry>) -> Vec<HeaderEntry> {
// new_headers[i] -> new_headers[i - 1] (i.e. new_headers.last() is the tip)
for i in 1..new_headers.len() {
assert_eq!(new_headers[i - 1].height() + 1, new_headers[i].height());
Expand All @@ -193,21 +193,22 @@ impl HeaderList {
assert_eq!(entry.header().prev_blockhash, expected_prev_blockhash);
height
}
None => return,
None => return vec![],
};
debug!(
"applying {} new headers from height {}",
new_headers.len(),
new_height
);
let _removed = self.headers.split_off(new_height); // keep [0..new_height) entries
let orphaned = self.headers.split_off(new_height); // Keep [0..new_height), return orphaned
for new_header in new_headers {
let height = new_header.height();
assert_eq!(height, self.headers.len());
self.tip = *new_header.hash();
self.headers.push(new_header);
self.heights.insert(self.tip, height);
}
orphaned
}

#[trace]
Expand Down