From 74b2fa9411e3761441d12c262fcefdf24aa06713 Mon Sep 17 00:00:00 2001 From: Ludo Galabru Date: Fri, 4 Aug 2023 22:30:58 +0200 Subject: [PATCH] fix: address remaining issues --- components/hord-cli/Cargo.toml | 4 +- components/hord-cli/src/cli/mod.rs | 6 +- components/hord-cli/src/core/mod.rs | 64 +- components/hord-cli/src/core/pipeline/mod.rs | 2 +- .../pipeline/processors/block_ingestion.rs | 61 +- .../processors/inscription_indexing.rs | 268 +++--- .../processors/transfers_recomputing.rs | 142 +-- .../{inscribing.rs => inscription_parsing.rs} | 53 +- .../core/protocol/inscription_sequencing.rs | 689 +++++++++++++++ .../src/core/protocol/inscription_tracking.rs | 212 +++++ components/hord-cli/src/core/protocol/mod.rs | 7 +- .../{numbering.rs => satoshi_numbering.rs} | 2 +- .../hord-cli/src/core/protocol/sequencing.rs | 836 ------------------ components/hord-cli/src/db/mod.rs | 89 +- components/hord-cli/src/scan/bitcoin.rs | 21 +- components/hord-cli/src/service/mod.rs | 17 +- 16 files changed, 1206 insertions(+), 1267 deletions(-) rename components/hord-cli/src/core/protocol/{inscribing.rs => inscription_parsing.rs} (87%) create mode 100644 components/hord-cli/src/core/protocol/inscription_sequencing.rs create mode 100644 components/hord-cli/src/core/protocol/inscription_tracking.rs rename components/hord-cli/src/core/protocol/{numbering.rs => satoshi_numbering.rs} (99%) delete mode 100644 components/hord-cli/src/core/protocol/sequencing.rs diff --git a/components/hord-cli/Cargo.toml b/components/hord-cli/Cargo.toml index 4d9d364d..db4257f3 100644 --- a/components/hord-cli/Cargo.toml +++ b/components/hord-cli/Cargo.toml @@ -14,8 +14,8 @@ redis = "0.21.5" serde-redis = "0.12.0" hex = "0.4.3" rand = "0.8.5" -chainhook-sdk = { version = "=0.8.0", default-features = false, features = ["zeromq"] } -# chainhook-sdk = { version = "=0.7.12", path = "../../../chainhook/components/chainhook-sdk", default-features = false, features = ["zeromq"] } +chainhook-sdk = { version = "=0.8.2", default-features = false, features = ["zeromq"] } +# chainhook-sdk = { version = "=0.8.2", path = "../../../chainhook/components/chainhook-sdk", default-features = false, features = ["zeromq"] } hiro-system-kit = "0.1.0" clap = { version = "3.2.23", features = ["derive"], optional = true } clap_generate = { version = "3.0.3", optional = true } diff --git a/components/hord-cli/src/cli/mod.rs b/components/hord-cli/src/cli/mod.rs index 854b5c1f..7e4ab08a 100644 --- a/components/hord-cli/src/cli/mod.rs +++ b/components/hord-cli/src/cli/mod.rs @@ -3,7 +3,7 @@ use crate::config::Config; use crate::core::pipeline::download_and_pipeline_blocks; use crate::core::pipeline::processors::block_ingestion::start_block_ingestion_processor; use crate::core::pipeline::processors::start_inscription_indexing_processor; -use crate::core::{self}; +use crate::core::protocol::inscription_parsing::parse_ordinals_and_standardize_block; use crate::download::download_ordinals_dataset_if_required; use crate::scan::bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate; use crate::service::Service; @@ -11,7 +11,7 @@ use crate::service::Service; use crate::db::{ delete_data_in_hord_db, find_all_inscription_transfers, find_all_inscriptions_in_block, find_all_transfers_in_block, find_inscription_with_id, find_last_block_inserted, - find_latest_inscription_block_height, find_lazy_block_at_block_height, initialize_hord_db, + find_latest_inscription_block_height, find_lazy_block_at_block_height, open_readonly_hord_db_conn, open_readonly_hord_db_conn_rocks_db, open_readwrite_hord_db_conn, open_readwrite_hord_db_conn_rocks_db, }; @@ -742,7 +742,7 @@ pub async fn fetch_and_standardize_block( download_and_parse_block_with_retry(http_client, &block_hash, &bitcoin_config, &ctx) .await?; - core::parse_ordinals_and_standardize_block(block_breakdown, &bitcoin_config.network, &ctx) + parse_ordinals_and_standardize_block(block_breakdown, &bitcoin_config.network, &ctx) .map_err(|(e, _)| e) } diff --git a/components/hord-cli/src/core/mod.rs b/components/hord-cli/src/core/mod.rs index 7dfad6db..60f7a493 100644 --- a/components/hord-cli/src/core/mod.rs +++ b/components/hord-cli/src/core/mod.rs @@ -1,14 +1,11 @@ pub mod pipeline; pub mod protocol; -use chainhook_sdk::types::{ - BitcoinBlockData, BitcoinNetwork, OrdinalInscriptionRevealData, OrdinalOperation, -}; +use chainhook_sdk::types::{BitcoinBlockData, OrdinalOperation}; use dashmap::DashMap; use fxhash::{FxBuildHasher, FxHasher}; use rocksdb::DB; use rusqlite::Connection; -use std::collections::BTreeMap; use std::hash::BuildHasherDefault; use std::ops::Div; use std::path::PathBuf; @@ -20,8 +17,6 @@ use chainhook_sdk::{ use crate::config::{Config, LogConfig}; -use chainhook_sdk::indexer::bitcoin::{standardize_bitcoin_block, BitcoinBlockFullBreakdown}; - use crate::db::{ find_last_block_inserted, find_latest_inscription_block_height, initialize_hord_db, open_readonly_hord_db_conn, open_readonly_hord_db_conn_rocks_db, @@ -32,10 +27,6 @@ use crate::db::{ LazyBlockTransaction, }; -use self::protocol::inscribing::{ - get_inscriptions_from_full_tx, get_inscriptions_from_standardized_tx, -}; - #[derive(Clone, Debug)] pub struct HordConfig { pub network_thread_max: usize, @@ -47,52 +38,6 @@ pub struct HordConfig { pub logs: LogConfig, } -pub fn parse_ordinals_and_standardize_block( - raw_block: BitcoinBlockFullBreakdown, - network: &BitcoinNetwork, - ctx: &Context, -) -> Result { - let mut ordinal_operations = BTreeMap::new(); - - for tx in raw_block.tx.iter() { - ordinal_operations.insert(tx.txid.to_string(), get_inscriptions_from_full_tx(&tx, ctx)); - } - - let mut block = standardize_bitcoin_block(raw_block, network, ctx)?; - - for tx in block.transactions.iter_mut() { - if let Some(ordinal_operations) = - ordinal_operations.remove(tx.transaction_identifier.get_hash_bytes_str()) - { - tx.metadata.ordinal_operations = ordinal_operations; - } - } - Ok(block) -} - -pub fn parse_inscriptions_in_standardized_block(block: &mut BitcoinBlockData, ctx: &Context) { - for tx in block.transactions.iter_mut() { - tx.metadata.ordinal_operations = get_inscriptions_from_standardized_tx(tx, ctx); - } -} - -pub fn get_inscriptions_revealed_in_block( - block: &BitcoinBlockData, -) -> Vec<&OrdinalInscriptionRevealData> { - let mut ops = vec![]; - for tx in block.transactions.iter() { - for op in tx.metadata.ordinal_operations.iter() { - if let OrdinalOperation::InscriptionRevealed(op) = op { - ops.push(op); - } - if let OrdinalOperation::CursedInscriptionRevealed(op) = op { - ops.push(op); - } - } - } - ops -} - pub fn revert_hord_db_with_augmented_bitcoin_block( block: &BitcoinBlockData, blocks_db_rw: &DB, @@ -106,8 +51,7 @@ pub fn revert_hord_db_with_augmented_bitcoin_block( let tx = &block.transactions[block.transactions.len() - tx_index]; for ordinal_event in tx.metadata.ordinal_operations.iter() { match ordinal_event { - OrdinalOperation::InscriptionRevealed(data) - | OrdinalOperation::CursedInscriptionRevealed(data) => { + OrdinalOperation::InscriptionRevealed(data) => { // We remove any new inscription created remove_entry_from_inscriptions( &data.inscription_id, @@ -248,13 +192,13 @@ pub fn should_sync_hord_db( (end_block.min(200_000), 10_000) } else if start_block < 550_000 { (end_block.min(550_000), 1_000) - } else { + } else { (end_block, 100) }; if start_block < 767430 && end_block > 767430 { end_block = 767430; - } + } if start_block <= end_block { Ok(Some((start_block, end_block, speed))) diff --git a/components/hord-cli/src/core/pipeline/mod.rs b/components/hord-cli/src/core/pipeline/mod.rs index adf50ac8..e90101a1 100644 --- a/components/hord-cli/src/core/pipeline/mod.rs +++ b/components/hord-cli/src/core/pipeline/mod.rs @@ -16,7 +16,7 @@ use chainhook_sdk::indexer::bitcoin::{ build_http_client, parse_downloaded_block, try_download_block_bytes_with_retry, }; -use super::parse_ordinals_and_standardize_block; +use super::protocol::inscription_parsing::parse_ordinals_and_standardize_block; pub enum PostProcessorCommand { Start, diff --git a/components/hord-cli/src/core/pipeline/processors/block_ingestion.rs b/components/hord-cli/src/core/pipeline/processors/block_ingestion.rs index 9d2acb66..05d6345a 100644 --- a/components/hord-cli/src/core/pipeline/processors/block_ingestion.rs +++ b/components/hord-cli/src/core/pipeline/processors/block_ingestion.rs @@ -6,11 +6,12 @@ use std::{ use chainhook_sdk::{types::BitcoinBlockData, utils::Context}; use crossbeam_channel::TryRecvError; +use rocksdb::DB; use crate::{ config::Config, core::pipeline::{PostProcessorCommand, PostProcessorController, PostProcessorEvent}, - db::{insert_entry_in_blocks, open_readwrite_hord_db_conn_rocks_db}, + db::{insert_entry_in_blocks, open_readwrite_hord_db_conn_rocks_db, LazyBlock}, }; pub fn start_block_ingestion_processor( @@ -25,18 +26,14 @@ pub fn start_block_ingestion_processor( let ctx = ctx.clone(); let handle: JoinHandle<()> = hiro_system_kit::thread_named("Processor Runloop") .spawn(move || { - let mut num_writes = 0; let blocks_db_rw = open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx).unwrap(); - let mut empty_cycles = 0; - let mut tip: u64 = 0; if let Ok(PostProcessorCommand::Start) = commands_rx.recv() { info!(ctx.expect_logger(), "Start block indexing runloop"); } - loop { let (compacted_blocks, _) = match commands_rx.try_recv() { Ok(PostProcessorCommand::ProcessBlocks(compacted_blocks, blocks)) => { @@ -62,40 +59,7 @@ pub fn start_block_ingestion_processor( } }, }; - - let batch_size = compacted_blocks.len(); - num_writes += batch_size; - for (block_height, compacted_block) in compacted_blocks.into_iter() { - tip = tip.max(block_height); - insert_entry_in_blocks( - block_height as u32, - &compacted_block, - &blocks_db_rw, - &ctx, - ); - } - info!(ctx.expect_logger(), "{batch_size} blocks saved to disk (total: {tip})"); - - // Early return - if num_writes >= 512 { - ctx.try_log(|logger| { - info!(logger, "Flushing DB to disk ({num_writes} inserts)"); - }); - if let Err(e) = blocks_db_rw.flush() { - ctx.try_log(|logger| { - error!(logger, "{}", e.to_string()); - }); - } - num_writes = 0; - continue; - } - - // Write blocks to disk, before traversals - if let Err(e) = blocks_db_rw.flush() { - ctx.try_log(|logger| { - error!(logger, "{}", e.to_string()); - }); - } + store_compacted_blocks(compacted_blocks, &blocks_db_rw, &ctx); } if let Err(e) = blocks_db_rw.flush() { @@ -112,3 +76,22 @@ pub fn start_block_ingestion_processor( thread_handle: handle, } } + +pub fn store_compacted_blocks( + mut compacted_blocks: Vec<(u64, LazyBlock)>, + blocks_db_rw: &DB, + ctx: &Context, +) { + compacted_blocks.sort_by(|(a, _), (b, _)| a.cmp(b)); + + for (block_height, compacted_block) in compacted_blocks.into_iter() { + insert_entry_in_blocks(block_height as u32, &compacted_block, &blocks_db_rw, &ctx); + info!(ctx.expect_logger(), "Block #{block_height} saved to disk"); + } + + if let Err(e) = blocks_db_rw.flush() { + ctx.try_log(|logger| { + error!(logger, "{}", e.to_string()); + }); + } +} diff --git a/components/hord-cli/src/core/pipeline/processors/inscription_indexing.rs b/components/hord-cli/src/core/pipeline/processors/inscription_indexing.rs index e08d3754..dafd571d 100644 --- a/components/hord-cli/src/core/pipeline/processors/inscription_indexing.rs +++ b/components/hord-cli/src/core/pipeline/processors/inscription_indexing.rs @@ -1,28 +1,36 @@ use std::{ + collections::BTreeMap, sync::Arc, thread::{sleep, JoinHandle}, time::Duration, }; use chainhook_sdk::{ - bitcoincore_rpc_json::bitcoin::{hashes::hex::FromHex, Address, Network, Script}, - types::{ - BitcoinBlockData, BitcoinNetwork, OrdinalInscriptionCurseType, - OrdinalInscriptionTransferData, OrdinalOperation, TransactionIdentifier, - }, + types::{BitcoinBlockData, TransactionIdentifier}, utils::Context, }; use crossbeam_channel::{Sender, TryRecvError}; +use rusqlite::Transaction; use dashmap::DashMap; use fxhash::FxHasher; use rusqlite::Connection; -use std::collections::HashMap; use std::hash::BuildHasherDefault; use crate::{ - core::{protocol::sequencing::update_hord_db_and_augment_bitcoin_block_v3, HordConfig}, - db::{find_all_inscriptions_in_block, find_all_transfers_in_block, format_satpoint_to_watch}, + core::{ + pipeline::processors::block_ingestion::store_compacted_blocks, + protocol::{ + inscription_parsing::get_inscriptions_revealed_in_block, + inscription_sequencing::{ + augment_block_with_ordinals_inscriptions_data_and_write_to_db_tx, + retrieve_inscribed_satoshi_points_from_block_v3, SequenceCursor, + }, + inscription_tracking::augment_block_with_ordinals_transfer_data, + }, + HordConfig, + }, + db::{get_any_entry_in_ordinal_activities, open_readonly_hord_db_conn}, }; use crate::db::{LazyBlockTransaction, TraversalResult}; @@ -33,10 +41,7 @@ use crate::{ new_traversals_lazy_cache, pipeline::{PostProcessorCommand, PostProcessorController, PostProcessorEvent}, }, - db::{ - insert_entry_in_blocks, open_readwrite_hord_db_conn, open_readwrite_hord_db_conn_rocks_db, - InscriptionHeigthHint, - }, + db::{open_readwrite_hord_db_conn, open_readwrite_hord_db_conn_rocks_db}, }; pub fn start_inscription_indexing_processor( @@ -58,13 +63,13 @@ pub fn start_inscription_indexing_processor( let mut inscriptions_db_conn_rw = open_readwrite_hord_db_conn(&config.expected_cache_path(), &ctx).unwrap(); let hord_config = config.get_hord_config(); - let mut num_writes = 0; - let mut tip: u64 = 0; let blocks_db_rw = open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx).unwrap(); - - let mut inscription_height_hint = InscriptionHeigthHint::new(); let mut empty_cycles = 0; + + let inscriptions_db_conn = + open_readonly_hord_db_conn(&config.expected_cache_path(), &ctx).unwrap(); + let mut sequence_cursor = SequenceCursor::new(inscriptions_db_conn); if let Ok(PostProcessorCommand::Start) = commands_rx.recv() { info!(ctx.expect_logger(), "Start inscription indexing runloop"); @@ -94,55 +99,22 @@ pub fn start_inscription_indexing_processor( }, }; - let batch_size = compacted_blocks.len(); - num_writes += batch_size; - for (block_height, compacted_block) in compacted_blocks.into_iter() { - tip = tip.max(block_height); - insert_entry_in_blocks( - block_height as u32, - &compacted_block, - &blocks_db_rw, - &ctx, - ); - } - info!(ctx.expect_logger(), "{batch_size} blocks saved to disk (total: {tip})"); - - // Early return - if blocks.is_empty() { - if num_writes >= 512 { - ctx.try_log(|logger| { - info!(logger, "Flushing DB to disk ({num_writes} inserts)"); - }); - if let Err(e) = blocks_db_rw.flush() { - ctx.try_log(|logger| { - error!(logger, "{}", e.to_string()); - }); - } - num_writes = 0; - } - continue; - } + store_compacted_blocks(compacted_blocks, &blocks_db_rw, &Context::empty()); info!(ctx.expect_logger(), "Processing {} blocks", blocks.len()); - // Write blocks to disk, before traversals - if let Err(e) = blocks_db_rw.flush() { - ctx.try_log(|logger| { - error!(logger, "{}", e.to_string()); - }); - } - garbage_collect_nth_block += blocks.len(); - - process_blocks( + blocks = process_blocks( &mut blocks, + &mut sequence_cursor, &cache_l2, - &mut inscription_height_hint, &mut inscriptions_db_conn_rw, &hord_config, &post_processor, &ctx, ); + garbage_collect_nth_block += blocks.len(); + // Clear L2 cache on a regular basis if garbage_collect_nth_block > garbage_collect_every_n_blocks { info!( @@ -154,12 +126,6 @@ pub fn start_inscription_indexing_processor( garbage_collect_nth_block = 0; } } - - if let Err(e) = blocks_db_rw.flush() { - ctx.try_log(|logger| { - error!(logger, "{}", e.to_string()); - }); - } }) .expect("unable to spawn thread"); @@ -172,29 +138,88 @@ pub fn start_inscription_indexing_processor( pub fn process_blocks( next_blocks: &mut Vec, + sequence_cursor: &mut SequenceCursor, cache_l2: &Arc>>, - inscription_height_hint: &mut InscriptionHeigthHint, inscriptions_db_conn_rw: &mut Connection, hord_config: &HordConfig, post_processor: &Option>, ctx: &Context, ) -> Vec { - let mut cache_l1 = HashMap::new(); + let mut cache_l1 = BTreeMap::new(); + let mut updated_blocks = vec![]; + for _cursor in 0..next_blocks.len() { + let inscriptions_db_tx: rusqlite::Transaction<'_> = + inscriptions_db_conn_rw.transaction().unwrap(); + let mut block = next_blocks.remove(0); + // We check before hand if some data were pre-existing, before processing + let any_existing_activity = get_any_entry_in_ordinal_activities( + &block.block_identifier.index, + &inscriptions_db_tx, + ctx, + ); + let _ = process_block( &mut block, &next_blocks, + sequence_cursor, &mut cache_l1, cache_l2, - inscription_height_hint, - inscriptions_db_conn_rw, + &inscriptions_db_tx, hord_config, ctx, ); + let inscriptions_revealed = get_inscriptions_revealed_in_block(&block) + .iter() + .map(|d| d.inscription_number.to_string()) + .collect::>(); + + ctx.try_log(|logger| { + info!( + logger, + "Block #{} revealed {} inscriptions [{}]", + block.block_identifier.index, + inscriptions_revealed.len(), + inscriptions_revealed.join(", ") + ) + }); + + if any_existing_activity { + ctx.try_log(|logger| { + warn!( + logger, + "Dropping updates for block #{}, activities present in database", + block.block_identifier.index, + ) + }); + let _ = inscriptions_db_tx.rollback(); + } else { + match inscriptions_db_tx.commit() { + Ok(_) => { + ctx.try_log(|logger| { + info!( + logger, + "Updates saved for block {}", block.block_identifier.index, + ) + }); + } + Err(e) => { + ctx.try_log(|logger| { + error!( + logger, + "Unable to update changes in block #{}: {}", + block.block_identifier.index, + e.to_string() + ) + }); + } + } + } + if let Some(post_processor_tx) = post_processor { let _ = post_processor_tx.send(block.clone()); } @@ -206,104 +231,45 @@ pub fn process_blocks( pub fn process_block( block: &mut BitcoinBlockData, next_blocks: &Vec, - cache_l1: &mut HashMap<(TransactionIdentifier, usize), TraversalResult>, + sequence_cursor: &mut SequenceCursor, + cache_l1: &mut BTreeMap<(TransactionIdentifier, usize), TraversalResult>, cache_l2: &Arc>>, - inscription_height_hint: &mut InscriptionHeigthHint, - inscriptions_db_conn_rw: &mut Connection, + inscriptions_db_tx: &Transaction, hord_config: &HordConfig, ctx: &Context, ) -> Result<(), String> { - update_hord_db_and_augment_bitcoin_block_v3( - block, - next_blocks, + let any_processable_transactions = retrieve_inscribed_satoshi_points_from_block_v3( + &block, + &next_blocks, cache_l1, cache_l2, - inscription_height_hint, - inscriptions_db_conn_rw, - hord_config, + inscriptions_db_tx, + &hord_config, ctx, - ) -} - -pub fn re_augment_block_with_ordinals_operations( - block: &mut BitcoinBlockData, - inscriptions_db_conn: &Connection, - ctx: &Context, -) { - let network = match block.metadata.network { - BitcoinNetwork::Mainnet => Network::Bitcoin, - BitcoinNetwork::Regtest => Network::Regtest, - BitcoinNetwork::Testnet => Network::Testnet, - }; - - // Restore inscriptions data - let mut inscriptions = - find_all_inscriptions_in_block(&block.block_identifier.index, inscriptions_db_conn, ctx); + )?; - let mut should_become_cursed = vec![]; - for (tx_index, tx) in block.transactions.iter_mut().enumerate() { - for (op_index, operation) in tx.metadata.ordinal_operations.iter_mut().enumerate() { - let (inscription, is_cursed) = match operation { - OrdinalOperation::CursedInscriptionRevealed(ref mut inscription) => { - (inscription, true) - } - OrdinalOperation::InscriptionRevealed(ref mut inscription) => (inscription, false), - OrdinalOperation::InscriptionTransferred(_) => continue, - }; - - let Some(traversal) = inscriptions.remove(&(tx.transaction_identifier.clone(), inscription.inscription_input_index)) else { - continue; - }; - - inscription.ordinal_offset = traversal.get_ordinal_coinbase_offset(); - inscription.ordinal_block_height = traversal.get_ordinal_coinbase_height(); - inscription.ordinal_number = traversal.ordinal_number; - inscription.inscription_number = traversal.inscription_number; - inscription.transfers_pre_inscription = traversal.transfers; - inscription.inscription_fee = tx.metadata.fee; - inscription.tx_index = tx_index; - inscription.satpoint_post_inscription = format_satpoint_to_watch( - &traversal.transfer_data.transaction_identifier_location, - traversal.transfer_data.output_index, - traversal.transfer_data.inscription_offset_intra_output, - ); - - let Some(output) = tx.metadata.outputs.get(traversal.transfer_data.output_index) else { - continue; - }; - inscription.inscription_output_value = output.value; - inscription.inscriber_address = { - let script_pub_key = output.get_script_pubkey_hex(); - match Script::from_hex(&script_pub_key) { - Ok(script) => match Address::from_script(&script, network.clone()) { - Ok(a) => Some(a.to_string()), - _ => None, - }, - _ => None, - } - }; - - if !is_cursed && inscription.inscription_number < 0 { - inscription.curse_type = Some(OrdinalInscriptionCurseType::Reinscription); - should_become_cursed.push((tx_index, op_index)); - } - } + if !any_processable_transactions { + return Ok(()); } - for (tx_index, op_index) in should_become_cursed.into_iter() { - let Some(tx) = block.transactions.get_mut(tx_index) else { - continue; - }; - let OrdinalOperation::InscriptionRevealed(inscription) = tx.metadata.ordinal_operations.remove(op_index) else { - continue; - }; - tx.metadata.ordinal_operations.insert( - op_index, - OrdinalOperation::CursedInscriptionRevealed(inscription), - ); - } + // Always discard if we have some existing content at this block height (inscription or transfers) + let inner_ctx = if hord_config.logs.ordinals_internals { + ctx.clone() + } else { + Context::empty() + }; - // TODO: Handle transfers + // Handle inscriptions + let _ = augment_block_with_ordinals_inscriptions_data_and_write_to_db_tx( + block, + sequence_cursor, + cache_l1, + &inscriptions_db_tx, + &inner_ctx, + ); + // Handle transfers + let _ = augment_block_with_ordinals_transfer_data(block, inscriptions_db_tx, true, &inner_ctx); + Ok(()) } diff --git a/components/hord-cli/src/core/pipeline/processors/transfers_recomputing.rs b/components/hord-cli/src/core/pipeline/processors/transfers_recomputing.rs index 07bdd513..c1625699 100644 --- a/components/hord-cli/src/core/pipeline/processors/transfers_recomputing.rs +++ b/components/hord-cli/src/core/pipeline/processors/transfers_recomputing.rs @@ -1,30 +1,26 @@ use std::{ - collections::BTreeMap, thread::{sleep, JoinHandle}, time::Duration, }; -use chainhook_sdk::{ - bitcoincore_rpc_json::bitcoin::{hashes::hex::FromHex, Address, Network, Script}, - types::{BitcoinBlockData, BitcoinNetwork, OrdinalInscriptionTransferData, OrdinalOperation}, - utils::Context, -}; +use chainhook_sdk::{types::BitcoinBlockData, utils::Context}; use crossbeam_channel::{Sender, TryRecvError}; use crate::{ - core::protocol::sequencing::update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_tx, + config::Config, + core::{ + pipeline::{PostProcessorCommand, PostProcessorController, PostProcessorEvent}, + protocol::{ + inscription_sequencing::consolidate_block_with_pre_computed_ordinals_data, + inscription_tracking::augment_block_with_ordinals_transfer_data, + }, + }, db::{ - find_all_inscriptions_in_block, format_satpoint_to_watch, insert_entry_in_locations, - parse_satpoint_to_watch, remove_entries_from_locations_at_block_height, + insert_new_inscriptions_from_block_in_locations, open_readwrite_hord_db_conn, + remove_entries_from_locations_at_block_height, }, }; -use crate::{ - config::Config, - core::pipeline::{PostProcessorCommand, PostProcessorController, PostProcessorEvent}, - db::open_readwrite_hord_db_conn, -}; - pub fn start_transfers_recomputing_processor( config: &Config, ctx: &Context, @@ -70,123 +66,33 @@ pub fn start_transfers_recomputing_processor( }; info!(ctx.expect_logger(), "Processing {} blocks", blocks.len()); + let inscriptions_db_tx = inscriptions_db_conn_rw.transaction().unwrap(); for block in blocks.iter_mut() { - let network = match block.metadata.network { - BitcoinNetwork::Mainnet => Network::Bitcoin, - BitcoinNetwork::Regtest => Network::Regtest, - BitcoinNetwork::Testnet => Network::Testnet, - }; - - info!( - ctx.expect_logger(), - "Cleaning transfers from block {}", block.block_identifier.index - ); - let inscriptions = find_all_inscriptions_in_block( - &block.block_identifier.index, - &inscriptions_db_conn_rw, + consolidate_block_with_pre_computed_ordinals_data( + block, + &inscriptions_db_tx, + false, &ctx, ); - info!( - ctx.expect_logger(), - "{} inscriptions retrieved at block {}", - inscriptions.len(), - block.block_identifier.index - ); - let mut operations = BTreeMap::new(); - - let transaction = inscriptions_db_conn_rw.transaction().unwrap(); remove_entries_from_locations_at_block_height( &block.block_identifier.index, - &transaction, + &inscriptions_db_tx, &ctx, ); - for (_, entry) in inscriptions.iter() { - let inscription_id = entry.get_inscription_id(); - info!( - ctx.expect_logger(), - "Processing inscription {}", inscription_id - ); - insert_entry_in_locations( - &inscription_id, - block.block_identifier.index, - &entry.transfer_data, - &transaction, - &ctx, - ); - - operations.insert( - entry.transaction_identifier_inscription.clone(), - OrdinalInscriptionTransferData { - inscription_id: entry.get_inscription_id(), - updated_address: None, - satpoint_pre_transfer: format_satpoint_to_watch( - &entry.transaction_identifier_inscription, - entry.inscription_input_index, - 0, - ), - satpoint_post_transfer: format_satpoint_to_watch( - &entry.transfer_data.transaction_identifier_location, - entry.transfer_data.output_index, - entry.transfer_data.inscription_offset_intra_output, - ), - post_transfer_output_value: None, - tx_index: 0, - }, - ); - } - - info!( - ctx.expect_logger(), - "Rewriting transfers for block {}", block.block_identifier.index - ); - - for (tx_index, tx) in block.transactions.iter_mut().enumerate() { - tx.metadata.ordinal_operations.clear(); - if let Some(mut entry) = operations.remove(&tx.transaction_identifier) { - let (_, output_index, _) = - parse_satpoint_to_watch(&entry.satpoint_post_transfer); - - let script_pub_key_hex = - tx.metadata.outputs[output_index].get_script_pubkey_hex(); - let updated_address = match Script::from_hex(&script_pub_key_hex) { - Ok(script) => { - match Address::from_script(&script, network.clone()) { - Ok(address) => Some(address.to_string()), - Err(_e) => None, - } - } - Err(_e) => None, - }; - - entry.updated_address = updated_address; - entry.post_transfer_output_value = - Some(tx.metadata.outputs[output_index].value); - entry.tx_index = tx_index; - tx.metadata - .ordinal_operations - .push(OrdinalOperation::InscriptionTransferred(entry)); - } - } - - update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_tx( + insert_new_inscriptions_from_block_in_locations( block, - &transaction, + &inscriptions_db_tx, &ctx, - ) - .unwrap(); - - info!( - ctx.expect_logger(), - "Saving supdates for block {}", block.block_identifier.index ); - transaction.commit().unwrap(); - info!( - ctx.expect_logger(), - "Transfers in block {} repaired", block.block_identifier.index + augment_block_with_ordinals_transfer_data( + block, + &inscriptions_db_tx, + true, + &ctx, ); if let Some(ref post_processor) = post_processor { diff --git a/components/hord-cli/src/core/protocol/inscribing.rs b/components/hord-cli/src/core/protocol/inscription_parsing.rs similarity index 87% rename from components/hord-cli/src/core/protocol/inscribing.rs rename to components/hord-cli/src/core/protocol/inscription_parsing.rs index 23735709..06f2a201 100644 --- a/components/hord-cli/src/core/protocol/inscribing.rs +++ b/components/hord-cli/src/core/protocol/inscription_parsing.rs @@ -3,9 +3,10 @@ use std::str::FromStr; use chainhook_sdk::bitcoincore_rpc_json::bitcoin::hashes::hex::FromHex; use chainhook_sdk::bitcoincore_rpc_json::bitcoin::Txid; +use chainhook_sdk::indexer::bitcoin::{standardize_bitcoin_block, BitcoinBlockFullBreakdown}; use chainhook_sdk::types::{ - BitcoinTransactionData, OrdinalInscriptionCurseType, OrdinalInscriptionRevealData, - OrdinalOperation, + BitcoinBlockData, BitcoinNetwork, BitcoinTransactionData, OrdinalInscriptionCurseType, + OrdinalInscriptionRevealData, OrdinalOperation, }; use chainhook_sdk::utils::Context; use chainhook_sdk::{ @@ -310,10 +311,7 @@ pub fn get_inscriptions_from_witness( curse_type: inscription.curse.take(), }; - match &payload.curse_type { - Some(_) => Some(OrdinalOperation::CursedInscriptionRevealed(payload)), - None => Some(OrdinalOperation::InscriptionRevealed(payload)), - } + Some(OrdinalOperation::InscriptionRevealed(payload)) } pub fn get_inscriptions_from_standardized_tx( @@ -377,3 +375,46 @@ fn test_ordinal_inscription_parsing() { println!("{:?}", inscription); } + +pub fn parse_ordinals_and_standardize_block( + raw_block: BitcoinBlockFullBreakdown, + network: &BitcoinNetwork, + ctx: &Context, +) -> Result { + let mut ordinal_operations = BTreeMap::new(); + + for tx in raw_block.tx.iter() { + ordinal_operations.insert(tx.txid.to_string(), get_inscriptions_from_full_tx(&tx, ctx)); + } + + let mut block = standardize_bitcoin_block(raw_block, network, ctx)?; + + for tx in block.transactions.iter_mut() { + if let Some(ordinal_operations) = + ordinal_operations.remove(tx.transaction_identifier.get_hash_bytes_str()) + { + tx.metadata.ordinal_operations = ordinal_operations; + } + } + Ok(block) +} + +pub fn parse_inscriptions_in_standardized_block(block: &mut BitcoinBlockData, ctx: &Context) { + for tx in block.transactions.iter_mut() { + tx.metadata.ordinal_operations = get_inscriptions_from_standardized_tx(tx, ctx); + } +} + +pub fn get_inscriptions_revealed_in_block( + block: &BitcoinBlockData, +) -> Vec<&OrdinalInscriptionRevealData> { + let mut ops = vec![]; + for tx in block.transactions.iter() { + for op in tx.metadata.ordinal_operations.iter() { + if let OrdinalOperation::InscriptionRevealed(op) = op { + ops.push(op); + } + } + } + ops +} diff --git a/components/hord-cli/src/core/protocol/inscription_sequencing.rs b/components/hord-cli/src/core/protocol/inscription_sequencing.rs new file mode 100644 index 00000000..012562b6 --- /dev/null +++ b/components/hord-cli/src/core/protocol/inscription_sequencing.rs @@ -0,0 +1,689 @@ +use std::{ + collections::{BTreeMap, HashMap, VecDeque}, + hash::BuildHasherDefault, + sync::Arc, +}; + +use chainhook_sdk::{ + bitcoincore_rpc_json::bitcoin::{hashes::hex::FromHex, Address, Network, Script}, + types::{ + BitcoinBlockData, BitcoinNetwork, BitcoinTransactionData, BlockIdentifier, + OrdinalInscriptionCurseType, OrdinalOperation, TransactionIdentifier, + }, + utils::Context, +}; +use dashmap::DashMap; +use fxhash::FxHasher; +use rusqlite::{Connection, Transaction}; + +use crate::{ + core::HordConfig, + db::{ + find_blessed_inscription_with_ordinal_number, + find_latest_cursed_inscription_number_at_block_height, + find_latest_inscription_number_at_block_height, format_satpoint_to_watch, + insert_new_inscriptions_from_block_in_inscriptions_and_locations, LazyBlockTransaction, + TraversalResult, + }, + ord::height::Height, +}; + +use rand::seq::SliceRandom; +use rand::thread_rng; +use std::sync::mpsc::channel; + +use crate::db::find_all_inscriptions_in_block; + +use super::{ + inscription_tracking::augment_transaction_with_ordinals_transfers_data, + satoshi_numbering::compute_satoshi_number, +}; + +pub fn retrieve_inscribed_satoshi_points_from_block_v3( + block: &BitcoinBlockData, + next_blocks: &Vec, + cache_l1: &mut BTreeMap<(TransactionIdentifier, usize), TraversalResult>, + cache_l2: &Arc>>, + inscriptions_db_tx: &Transaction, + hord_config: &HordConfig, + ctx: &Context, +) -> Result { + let (mut transactions_ids, l1_cache_hits) = + get_transactions_to_process(block, cache_l1, inscriptions_db_tx, ctx); + + let inner_ctx = if hord_config.logs.ordinals_internals { + ctx.clone() + } else { + Context::empty() + }; + + let has_transactions_to_process = !transactions_ids.is_empty() || !l1_cache_hits.is_empty(); + + let thread_max = hord_config.ingestion_thread_max * 2; + + if has_transactions_to_process { + let expected_traversals = transactions_ids.len() + l1_cache_hits.len(); + let (traversal_tx, traversal_rx) = channel(); + + let mut tx_thread_pool = vec![]; + let mut thread_pool_handles = vec![]; + + for thread_index in 0..thread_max { + let (tx, rx) = channel(); + tx_thread_pool.push(tx); + + let moved_traversal_tx = traversal_tx.clone(); + let moved_ctx = inner_ctx.clone(); + let moved_hord_db_path = hord_config.db_path.clone(); + let local_cache = cache_l2.clone(); + + let handle = hiro_system_kit::thread_named("Worker") + .spawn(move || { + while let Ok(Some(( + transaction_id, + block_identifier, + input_index, + prioritary, + ))) = rx.recv() + { + let traversal: Result = compute_satoshi_number( + &moved_hord_db_path, + &block_identifier, + &transaction_id, + input_index, + 0, + &local_cache, + &moved_ctx, + ); + let _ = moved_traversal_tx.send((traversal, prioritary, thread_index)); + } + }) + .expect("unable to spawn thread"); + thread_pool_handles.push(handle); + } + + // Empty cache + let mut thread_index = 0; + for key in l1_cache_hits.iter() { + if let Some(entry) = cache_l1.remove(key) { + let _ = traversal_tx.send((Ok(entry), true, thread_index)); + thread_index = (thread_index + 1) % thread_max; + } + } + + ctx.try_log(|logger| { + info!( + logger, + "Number of inscriptions in block #{} to process: {} (L1 cache hits: {}, queue len: {}, L1 cache len: {}, L2 cache len: {})", + block.block_identifier.index, + transactions_ids.len(), + l1_cache_hits.len(), + next_blocks.len(), + cache_l1.len(), + cache_l2.len(), + ) + }); + + let mut rng = thread_rng(); + transactions_ids.shuffle(&mut rng); + let mut priority_queue = VecDeque::new(); + let mut warmup_queue = VecDeque::new(); + + for (transaction_id, input_index) in transactions_ids.into_iter() { + priority_queue.push_back(( + transaction_id, + block.block_identifier.clone(), + input_index, + true, + )); + } + + // Feed each workers with 2 workitems each + for thread_index in 0..thread_max { + let _ = tx_thread_pool[thread_index].send(priority_queue.pop_front()); + } + for thread_index in 0..thread_max { + let _ = tx_thread_pool[thread_index].send(priority_queue.pop_front()); + } + + let mut next_block_iter = next_blocks.iter(); + let mut traversals_received = 0; + while let Ok((traversal_result, prioritary, thread_index)) = traversal_rx.recv() { + if prioritary { + traversals_received += 1; + } + match traversal_result { + Ok(traversal) => { + inner_ctx.try_log(|logger| { + info!( + logger, + "Satoshi #{} was minted in block #{} at offset {} and was transferred {} times (progress: {traversals_received}/{expected_traversals}) (priority queue: {prioritary}, thread: {thread_index}).", + traversal.ordinal_number, traversal.get_ordinal_coinbase_height(), traversal.get_ordinal_coinbase_offset(), traversal.transfers + ) + }); + cache_l1.insert( + ( + traversal.transaction_identifier_inscription.clone(), + traversal.inscription_input_index, + ), + traversal, + ); + } + Err(e) => { + ctx.try_log(|logger| { + error!(logger, "Unable to compute inscription's Satoshi: {e}",) + }); + } + } + if traversals_received == expected_traversals { + break; + } + + if let Some(w) = priority_queue.pop_front() { + let _ = tx_thread_pool[thread_index].send(Some(w)); + } else { + if let Some(w) = warmup_queue.pop_front() { + let _ = tx_thread_pool[thread_index].send(Some(w)); + } else { + if let Some(next_block) = next_block_iter.next() { + let (mut transactions_ids, _) = get_transactions_to_process( + next_block, + cache_l1, + inscriptions_db_tx, + ctx, + ); + + ctx.try_log(|logger| { + info!( + logger, + "Number of inscriptions in block #{} to pre-process: {}", + block.block_identifier.index, + transactions_ids.len() + ) + }); + + transactions_ids.shuffle(&mut rng); + for (transaction_id, input_index) in transactions_ids.into_iter() { + warmup_queue.push_back(( + transaction_id, + next_block.block_identifier.clone(), + input_index, + false, + )); + } + let _ = tx_thread_pool[thread_index].send(warmup_queue.pop_front()); + } + } + } + } + + for tx in tx_thread_pool.iter() { + // Empty the queue + if let Ok((traversal_result, prioritary, thread_index)) = traversal_rx.try_recv() { + if let Ok(traversal) = traversal_result { + inner_ctx.try_log(|logger| { + info!( + logger, + "Satoshi #{} was minted in block #{} at offset {} and was transferred {} times (progress: {traversals_received}/{expected_traversals}) (priority queue: {prioritary}, thread: {thread_index}).", + traversal.ordinal_number, traversal.get_ordinal_coinbase_height(), traversal.get_ordinal_coinbase_offset(), traversal.transfers + ) + }); + cache_l1.insert( + ( + traversal.transaction_identifier_inscription.clone(), + traversal.inscription_input_index, + ), + traversal, + ); + } + } + let _ = tx.send(None); + } + + let _ = hiro_system_kit::thread_named("Garbage collection").spawn(move || { + for handle in thread_pool_handles.into_iter() { + let _ = handle.join(); + } + }); + } else { + ctx.try_log(|logger| { + info!( + logger, + "No inscriptions to index in block #{}", block.block_identifier.index + ) + }); + } + Ok(has_transactions_to_process) +} + +fn get_transactions_to_process( + block: &BitcoinBlockData, + cache_l1: &mut BTreeMap<(TransactionIdentifier, usize), TraversalResult>, + inscriptions_db_tx: &Transaction, + ctx: &Context, +) -> ( + Vec<(TransactionIdentifier, usize)>, + Vec<(TransactionIdentifier, usize)>, +) { + let mut transactions_ids: Vec<(TransactionIdentifier, usize)> = vec![]; + let mut l1_cache_hits = vec![]; + + let mut known_transactions = + find_all_inscriptions_in_block(&block.block_identifier.index, inscriptions_db_tx, ctx); + + for tx in block.transactions.iter().skip(1) { + // Have a new inscription been revealed, if so, are looking at a re-inscription + for ordinal_event in tx.metadata.ordinal_operations.iter() { + let inscription_data = match ordinal_event { + OrdinalOperation::InscriptionRevealed(inscription_data) => inscription_data, + OrdinalOperation::InscriptionTransferred(_) => { + continue; + } + }; + let key = ( + tx.transaction_identifier.clone(), + inscription_data.inscription_input_index, + ); + if cache_l1.contains_key(&key) { + l1_cache_hits.push(key); + continue; + } + + if let Some(entry) = known_transactions.remove(&key) { + continue; + } + + // Enqueue for traversals + transactions_ids.push(( + tx.transaction_identifier.clone(), + inscription_data.inscription_input_index, + )); + } + } + (transactions_ids, l1_cache_hits) +} + +/// For each input of each transaction in the block, we retrieve the UTXO spent (outpoint_pre_transfer) +/// and we check using a `storage` (in-memory or sqlite) absctraction if we have some existing inscriptions +/// for this entry. +/// When this is the case, it means that an inscription_transfer event needs to be produced. We need to +/// compute the output index (if any) `post_transfer_output` that will now include the inscription. +/// When identifying the output index, we will also need to provide an updated offset for pin pointing +/// the satoshi location. + +pub struct SequenceCursor { + blessed: Option, + cursed: Option, + inscriptions_db_conn: Connection, + current_block_height: u64, +} + +impl SequenceCursor { + pub fn new(inscriptions_db_conn: Connection) -> SequenceCursor { + SequenceCursor { + blessed: None, + cursed: None, + inscriptions_db_conn, + current_block_height: 0, + } + } + + pub fn reset(&mut self) { + self.blessed = None; + self.cursed = None; + self.current_block_height = 0; + } + + pub fn pick_next(&mut self, cursed: bool, block_height: u64) -> i64 { + if block_height < self.current_block_height { + self.reset(); + } + self.current_block_height = block_height; + + match cursed { + true => self.pick_next_cursed(), + false => self.pick_next_blessed(), + } + } + + fn pick_next_blessed(&mut self) -> i64 { + match self.blessed { + None => { + match find_latest_inscription_number_at_block_height( + &self.current_block_height, + &None, + &self.inscriptions_db_conn, + &Context::empty(), + ) { + Ok(Some(inscription_number)) => { + self.blessed = Some(inscription_number); + inscription_number + 1 + } + _ => { + self.blessed = Some(0); + 0 + } + } + } + Some(value) => value + 1, + } + } + + fn pick_next_cursed(&mut self) -> i64 { + match self.cursed { + None => { + match find_latest_cursed_inscription_number_at_block_height( + &self.current_block_height, + &None, + &self.inscriptions_db_conn, + &Context::empty(), + ) { + Ok(Some(inscription_number)) => { + self.cursed = Some(inscription_number); + inscription_number - 1 + } + _ => { + self.cursed = Some(-1); + -1 + } + } + } + Some(value) => value - 1, + } + } + + pub fn increment_cursed(&mut self) { + self.cursed = Some(self.pick_next_cursed()); + } + + pub fn increment_blessed(&mut self) { + self.blessed = Some(self.pick_next_blessed()) + } +} + +pub fn augment_block_with_ordinals_inscriptions_data_and_write_to_db_tx( + block: &mut BitcoinBlockData, + sequence_cursor: &mut SequenceCursor, + inscriptions_data: &mut BTreeMap<(TransactionIdentifier, usize), TraversalResult>, + inscriptions_db_tx: &Transaction, + ctx: &Context, +) -> bool { + // Handle re-inscriptions + let mut reinscriptions_data = HashMap::new(); + for (_, inscription_data) in inscriptions_data.iter() { + if let Some(inscription_id) = find_blessed_inscription_with_ordinal_number( + &inscription_data.ordinal_number, + inscriptions_db_tx, + ctx, + ) { + reinscriptions_data.insert(inscription_data.ordinal_number, inscription_id); + } + } + + let any_events = augment_block_with_ordinals_inscriptions_data( + block, + sequence_cursor, + inscriptions_data, + &reinscriptions_data, + &ctx, + ); + + // Store inscriptions + insert_new_inscriptions_from_block_in_inscriptions_and_locations( + block, + inscriptions_db_tx, + ctx, + ); + + any_events +} + +pub fn augment_block_with_ordinals_inscriptions_data( + block: &mut BitcoinBlockData, + sequence_cursor: &mut SequenceCursor, + inscriptions_data: &mut BTreeMap<(TransactionIdentifier, usize), TraversalResult>, + reinscriptions_data: &HashMap, + ctx: &Context, +) -> bool { + // Handle sat oveflows + let mut sats_overflows = VecDeque::new(); + let mut any_event = false; + + let network = match block.metadata.network { + BitcoinNetwork::Mainnet => Network::Bitcoin, + BitcoinNetwork::Regtest => Network::Regtest, + BitcoinNetwork::Testnet => Network::Testnet, + }; + + for (tx_index, tx) in block.transactions.iter_mut().enumerate() { + any_event |= augment_transaction_with_ordinals_inscriptions_data( + tx, + tx_index, + &block.block_identifier, + sequence_cursor, + &network, + inscriptions_data, + &mut sats_overflows, + &reinscriptions_data, + ctx, + ); + } + + // Handle sats overflow + while let Some(sats_overlow) = sats_overflows.pop_front() { + // TODO + } + any_event +} + +pub fn augment_transaction_with_ordinals_inscriptions_data( + tx: &mut BitcoinTransactionData, + tx_index: usize, + block_identifier: &BlockIdentifier, + sequence_cursor: &mut SequenceCursor, + network: &Network, + inscriptions_data: &mut BTreeMap<(TransactionIdentifier, usize), TraversalResult>, + sats_overflows: &mut VecDeque<(usize, usize)>, + reinscriptions_data: &HashMap, + ctx: &Context, +) -> bool { + let any_event = tx.metadata.ordinal_operations.is_empty() == false; + let mut ordinals_ops_indexes_to_discard = VecDeque::new(); + + for (op_index, op) in tx.metadata.ordinal_operations.iter_mut().enumerate() { + let (mut is_cursed, inscription) = match op { + OrdinalOperation::InscriptionRevealed(inscription) => { + (inscription.curse_type.as_ref().is_some(), inscription) + } + OrdinalOperation::InscriptionTransferred(_) => continue, + }; + + let mut inscription_number = sequence_cursor.pick_next(is_cursed, block_identifier.index); + + let transaction_identifier = tx.transaction_identifier.clone(); + let traversal = match inscriptions_data + .remove(&(transaction_identifier, inscription.inscription_input_index)) + { + Some(traversal) => traversal, + None => { + ctx.try_log(|logger| { + error!( + logger, + "Unable to retrieve cached inscription data for inscription {}", + tx.transaction_identifier.hash + ); + }); + ordinals_ops_indexes_to_discard.push_front(op_index); + continue; + } + }; + + let mut curse_type_override = None; + + // Do we need to curse the inscription? + if !is_cursed { + // Is this inscription re-inscribing an existing blessed inscription? + if let Some(exisiting_inscription_id) = + reinscriptions_data.get(&traversal.ordinal_number) + { + ctx.try_log(|logger| { + info!( + logger, + "Satoshi #{} was inscribed with blessed inscription {}, cursing inscription {}", + traversal.ordinal_number, + exisiting_inscription_id, + traversal.get_inscription_id(), + ); + }); + + is_cursed = true; + inscription_number = sequence_cursor.pick_next(is_cursed, block_identifier.index); + curse_type_override = Some(OrdinalInscriptionCurseType::Reinscription) + } + }; + + let outputs = &tx.metadata.outputs; + inscription.inscription_number = inscription_number; + inscription.ordinal_offset = traversal.get_ordinal_coinbase_offset(); + inscription.ordinal_block_height = traversal.get_ordinal_coinbase_height(); + inscription.ordinal_number = traversal.ordinal_number; + inscription.transfers_pre_inscription = traversal.transfers; + inscription.inscription_fee = tx.metadata.fee; + inscription.tx_index = tx_index; + inscription.curse_type = match curse_type_override { + Some(curse_type) => Some(curse_type), + None => inscription.curse_type.take(), + }; + inscription.satpoint_post_inscription = format_satpoint_to_watch( + &traversal.transfer_data.transaction_identifier_location, + traversal.transfer_data.output_index, + traversal.transfer_data.inscription_offset_intra_output, + ); + if let Some(output) = outputs.get(traversal.transfer_data.output_index) { + inscription.inscription_output_value = output.value; + inscription.inscriber_address = { + let script_pub_key = output.get_script_pubkey_hex(); + match Script::from_hex(&script_pub_key) { + Ok(script) => match Address::from_script(&script, network.clone()) { + Ok(a) => Some(a.to_string()), + _ => None, + }, + _ => None, + } + }; + } else { + ctx.try_log(|logger| { + warn!( + logger, + "Database corrupted, skipping cursed inscription => {:?} / {:?}", + traversal, + outputs + ); + }); + } + + if traversal.ordinal_number == 0 { + // If the satoshi inscribed correspond to a sat overflow, we will store the inscription + // and assign an inscription number after the other inscriptions, to mimick the + // bug in ord. + sats_overflows.push_back((tx_index, op_index)); + continue; + } + + ctx.try_log(|logger| { + info!( + logger, + "Inscription {} (#{}) detected on Satoshi {} (block {}, {} transfers)", + inscription.inscription_id, + inscription.inscription_number, + inscription.ordinal_number, + block_identifier.index, + inscription.transfers_pre_inscription, + ); + }); + + if is_cursed { + sequence_cursor.increment_cursed(); + } else { + sequence_cursor.increment_blessed(); + } + } + any_event +} + +pub fn consolidate_transaction_with_pre_computed_inscription_data( + tx: &mut BitcoinTransactionData, + tx_index: usize, + inscriptions_data: &mut BTreeMap<(TransactionIdentifier, usize), TraversalResult>, + _ctx: &Context, +) { + for operation in tx.metadata.ordinal_operations.iter_mut() { + let inscription = match operation { + OrdinalOperation::InscriptionRevealed(ref mut inscription) => inscription, + OrdinalOperation::InscriptionTransferred(_) => continue, + }; + + let Some(traversal) = inscriptions_data.remove(&(tx.transaction_identifier.clone(), inscription.inscription_input_index)) else { + continue; + }; + + inscription.ordinal_offset = traversal.get_ordinal_coinbase_offset(); + inscription.ordinal_block_height = traversal.get_ordinal_coinbase_height(); + inscription.ordinal_number = traversal.ordinal_number; + inscription.inscription_number = traversal.inscription_number; + inscription.transfers_pre_inscription = traversal.transfers; + inscription.inscription_fee = tx.metadata.fee; + inscription.tx_index = tx_index; + inscription.satpoint_post_inscription = format_satpoint_to_watch( + &traversal.transfer_data.transaction_identifier_location, + traversal.transfer_data.output_index, + traversal.transfer_data.inscription_offset_intra_output, + ); + if inscription.inscription_number < 0 { + inscription.curse_type = Some(OrdinalInscriptionCurseType::Unknown); + } + } +} + +pub fn consolidate_block_with_pre_computed_ordinals_data( + block: &mut BitcoinBlockData, + inscriptions_db_tx: &Transaction, + include_transfers: bool, + ctx: &Context, +) { + let network = match block.metadata.network { + BitcoinNetwork::Mainnet => Network::Bitcoin, + BitcoinNetwork::Regtest => Network::Regtest, + BitcoinNetwork::Testnet => Network::Testnet, + }; + + let coinbase_subsidy = Height(block.block_identifier.index).subsidy(); + let coinbase_txid = &block.transactions[0].transaction_identifier.clone(); + let mut cumulated_fees = 0; + let mut inscriptions_data = + find_all_inscriptions_in_block(&block.block_identifier.index, inscriptions_db_tx, ctx); + for (tx_index, tx) in block.transactions.iter_mut().enumerate() { + // Add inscriptions data + consolidate_transaction_with_pre_computed_inscription_data( + tx, + tx_index, + &mut inscriptions_data, + ctx, + ); + + // Add transfers data + if include_transfers { + let _ = augment_transaction_with_ordinals_transfers_data( + tx, + tx_index, + &block.block_identifier, + &network, + &coinbase_txid, + coinbase_subsidy, + &mut cumulated_fees, + inscriptions_db_tx, + ctx, + ); + } + } +} diff --git a/components/hord-cli/src/core/protocol/inscription_tracking.rs b/components/hord-cli/src/core/protocol/inscription_tracking.rs new file mode 100644 index 00000000..7cd22c07 --- /dev/null +++ b/components/hord-cli/src/core/protocol/inscription_tracking.rs @@ -0,0 +1,212 @@ +use chainhook_sdk::{ + bitcoincore_rpc_json::bitcoin::{hashes::hex::FromHex, Address, Network, Script}, + types::{ + BitcoinBlockData, BitcoinNetwork, BitcoinTransactionData, BlockIdentifier, + OrdinalInscriptionTransferData, OrdinalOperation, TransactionIdentifier, + }, + utils::Context, +}; + +use crate::{ + core::{compute_next_satpoint_data, SatPosition}, + db::{ + find_inscriptions_at_wached_outpoint, format_outpoint_to_watch, + insert_transfer_in_locations_tx, + }, + ord::height::Height, +}; +use rusqlite::Transaction; + +pub fn augment_block_with_ordinals_transfer_data( + block: &mut BitcoinBlockData, + inscriptions_db_tx: &Transaction, + update_db_tx: bool, + ctx: &Context, +) -> bool { + let mut any_event = false; + + let network = match block.metadata.network { + BitcoinNetwork::Mainnet => Network::Bitcoin, + BitcoinNetwork::Regtest => Network::Regtest, + BitcoinNetwork::Testnet => Network::Testnet, + }; + + let coinbase_subsidy = Height(block.block_identifier.index).subsidy(); + let coinbase_txid = &block.transactions[0].transaction_identifier.clone(); + let mut cumulated_fees = 0; + for (tx_index, tx) in block.transactions.iter_mut().enumerate() { + let transfers = augment_transaction_with_ordinals_transfers_data( + tx, + tx_index, + &block.block_identifier, + &network, + &coinbase_txid, + coinbase_subsidy, + &mut cumulated_fees, + inscriptions_db_tx, + ctx, + ); + + any_event |= !transfers.is_empty(); + + if update_db_tx { + // Store transfers between each iteration + for transfer_data in transfers.into_iter() { + insert_transfer_in_locations_tx( + &transfer_data, + &block.block_identifier, + &inscriptions_db_tx, + &ctx, + ); + } + } + } + + any_event +} + +pub fn augment_transaction_with_ordinals_transfers_data( + tx: &mut BitcoinTransactionData, + tx_index: usize, + block_identifier: &BlockIdentifier, + network: &Network, + coinbase_txid: &TransactionIdentifier, + coinbase_subsidy: u64, + cumulated_fees: &mut u64, + inscriptions_db_tx: &Transaction, + ctx: &Context, +) -> Vec { + let mut transfers = vec![]; + + for (input_index, input) in tx.metadata.inputs.iter().enumerate() { + let outpoint_pre_transfer = format_outpoint_to_watch( + &input.previous_output.txid, + input.previous_output.vout as usize, + ); + + let entries = + match find_inscriptions_at_wached_outpoint(&outpoint_pre_transfer, &inscriptions_db_tx) + { + Ok(entries) => entries, + Err(e) => { + ctx.try_log(|logger| warn!(logger, "unable query inscriptions: {e}")); + continue; + } + }; + // For each satpoint inscribed retrieved, we need to compute the next + // outpoint to watch + for watched_satpoint in entries.into_iter() { + let satpoint_pre_transfer = + format!("{}:{}", outpoint_pre_transfer, watched_satpoint.offset); + + // Question is: are inscriptions moving to a new output, + // burnt or lost in fees and transfered to the miner? + + let inputs = tx + .metadata + .inputs + .iter() + .map(|o| o.previous_output.value) + .collect::<_>(); + let outputs = tx.metadata.outputs.iter().map(|o| o.value).collect::<_>(); + let post_transfer_data = + compute_next_satpoint_data(input_index, watched_satpoint.offset, &inputs, &outputs); + + let ( + outpoint_post_transfer, + offset_post_transfer, + updated_address, + post_transfer_output_value, + ) = match post_transfer_data { + SatPosition::Output((output_index, offset)) => { + let outpoint = + format_outpoint_to_watch(&tx.transaction_identifier, output_index); + let script_pub_key_hex = + tx.metadata.outputs[output_index].get_script_pubkey_hex(); + let updated_address = match Script::from_hex(&script_pub_key_hex) { + Ok(script) => match Address::from_script(&script, network.clone()) { + Ok(address) => Some(address.to_string()), + Err(e) => { + ctx.try_log(|logger| { + warn!( + logger, + "unable to retrieve address from {script_pub_key_hex}: {}", + e.to_string() + ) + }); + None + } + }, + Err(e) => { + ctx.try_log(|logger| { + warn!( + logger, + "unable to retrieve address from {script_pub_key_hex}: {}", + e.to_string() + ) + }); + None + } + }; + + // At this point we know that inscriptions are being moved. + ctx.try_log(|logger| { + info!( + logger, + "Inscription {} moved from {} to {} (block: {})", + watched_satpoint.inscription_id, + satpoint_pre_transfer, + outpoint, + block_identifier.index, + ) + }); + + ( + outpoint, + offset, + updated_address, + Some(tx.metadata.outputs[output_index].value), + ) + } + SatPosition::Fee(offset) => { + // Get Coinbase TX + let total_offset = coinbase_subsidy + *cumulated_fees + offset; + let outpoint = format_outpoint_to_watch(&coinbase_txid, 0); + ctx.try_log(|logger| { + info!( + logger, + "Inscription {} spent in fees ({}+{}+{})", + watched_satpoint.inscription_id, + coinbase_subsidy, + cumulated_fees, + offset + ) + }); + (outpoint, total_offset, None, None) + } + }; + + let satpoint_post_transfer = + format!("{}:{}", outpoint_post_transfer, offset_post_transfer); + + let transfer_data = OrdinalInscriptionTransferData { + inscription_id: watched_satpoint.inscription_id.clone(), + updated_address, + tx_index, + satpoint_pre_transfer, + satpoint_post_transfer, + post_transfer_output_value, + }; + + transfers.push(transfer_data.clone()); + + // Attach transfer event + tx.metadata + .ordinal_operations + .push(OrdinalOperation::InscriptionTransferred(transfer_data)); + } + } + *cumulated_fees += tx.metadata.fee; + + transfers +} diff --git a/components/hord-cli/src/core/protocol/mod.rs b/components/hord-cli/src/core/protocol/mod.rs index df32ee39..48a2322b 100644 --- a/components/hord-cli/src/core/protocol/mod.rs +++ b/components/hord-cli/src/core/protocol/mod.rs @@ -1,3 +1,4 @@ -pub mod inscribing; -pub mod numbering; -pub mod sequencing; +pub mod inscription_parsing; +pub mod inscription_sequencing; +pub mod inscription_tracking; +pub mod satoshi_numbering; diff --git a/components/hord-cli/src/core/protocol/numbering.rs b/components/hord-cli/src/core/protocol/satoshi_numbering.rs similarity index 99% rename from components/hord-cli/src/core/protocol/numbering.rs rename to components/hord-cli/src/core/protocol/satoshi_numbering.rs index 97ba78dd..5f835ed1 100644 --- a/components/hord-cli/src/core/protocol/numbering.rs +++ b/components/hord-cli/src/core/protocol/satoshi_numbering.rs @@ -13,7 +13,7 @@ use crate::db::{ use crate::db::{LazyBlockTransaction, TraversalResult}; use crate::ord::height::Height; -pub fn retrieve_satoshi_point_using_lazy_storage_v3( +pub fn compute_satoshi_number( blocks_db_dir: &PathBuf, block_identifier: &BlockIdentifier, transaction_identifier: &TransactionIdentifier, diff --git a/components/hord-cli/src/core/protocol/sequencing.rs b/components/hord-cli/src/core/protocol/sequencing.rs deleted file mode 100644 index e48eaed5..00000000 --- a/components/hord-cli/src/core/protocol/sequencing.rs +++ /dev/null @@ -1,836 +0,0 @@ -use std::{ - collections::{HashMap, VecDeque}, - hash::BuildHasherDefault, - sync::Arc, -}; - -use chainhook_sdk::{ - bitcoincore_rpc_json::bitcoin::{hashes::hex::FromHex, Address, Network, Script}, - types::{ - BitcoinBlockData, BitcoinNetwork, OrdinalInscriptionCurseType, - OrdinalInscriptionTransferData, OrdinalOperation, TransactionIdentifier, - }, - utils::Context, -}; -use dashmap::DashMap; -use fxhash::FxHasher; -use rusqlite::{Connection, Transaction}; - -use crate::{ - core::{ - compute_next_satpoint_data, get_inscriptions_revealed_in_block, HordConfig, SatPosition, - }, - db::{ - find_inscription_with_ordinal_number, find_inscriptions_at_wached_outpoint, - find_latest_cursed_inscription_number_at_block_height, - find_latest_inscription_number_at_block_height, format_outpoint_to_watch, - format_satpoint_to_watch, get_any_entry_in_ordinal_activities, - insert_entry_in_inscriptions, insert_transfer_in_locations_tx, InscriptionHeigthHint, - LazyBlockTransaction, TraversalResult, - }, - ord::height::Height, -}; - -use rand::seq::SliceRandom; -use rand::thread_rng; -use std::sync::mpsc::channel; - -use crate::db::find_all_inscriptions_in_block; - -use super::numbering::retrieve_satoshi_point_using_lazy_storage_v3; - -pub fn retrieve_inscribed_satoshi_points_from_block_v3( - block: &BitcoinBlockData, - next_blocks: &Vec, - cache_l1: &mut HashMap<(TransactionIdentifier, usize), TraversalResult>, - cache_l2: &Arc>>, - existing_inscriptions: &mut HashMap<(TransactionIdentifier, usize), TraversalResult>, - inscriptions_db_conn: &mut Connection, - hord_config: &HordConfig, - ctx: &Context, -) -> Result { - let (mut transactions_ids, l1_cache_hits) = get_transactions_to_process( - block, - cache_l1, - existing_inscriptions, - inscriptions_db_conn, - ctx, - ); - - let inner_ctx = if hord_config.logs.ordinals_internals { - ctx.clone() - } else { - Context::empty() - }; - - let has_transactions_to_process = !transactions_ids.is_empty() || !l1_cache_hits.is_empty(); - - let thread_max = hord_config.ingestion_thread_max * 2; - - if has_transactions_to_process { - let expected_traversals = transactions_ids.len() + l1_cache_hits.len(); - let (traversal_tx, traversal_rx) = channel(); - - let mut tx_thread_pool = vec![]; - let mut thread_pool_handles = vec![]; - - for thread_index in 0..thread_max { - let (tx, rx) = channel(); - tx_thread_pool.push(tx); - - let moved_traversal_tx = traversal_tx.clone(); - let moved_ctx = inner_ctx.clone(); - let moved_hord_db_path = hord_config.db_path.clone(); - let local_cache = cache_l2.clone(); - - let handle = hiro_system_kit::thread_named("Worker") - .spawn(move || { - while let Ok(Some(( - transaction_id, - block_identifier, - input_index, - prioritary, - ))) = rx.recv() - { - let traversal: Result = - retrieve_satoshi_point_using_lazy_storage_v3( - &moved_hord_db_path, - &block_identifier, - &transaction_id, - input_index, - 0, - &local_cache, - &moved_ctx, - ); - let _ = moved_traversal_tx.send((traversal, prioritary, thread_index)); - } - }) - .expect("unable to spawn thread"); - thread_pool_handles.push(handle); - } - - // Empty cache - let mut thread_index = 0; - for key in l1_cache_hits.iter() { - if let Some(entry) = cache_l1.remove(key) { - let _ = traversal_tx.send((Ok(entry), true, thread_index)); - thread_index = (thread_index + 1) % thread_max; - } - } - - ctx.try_log(|logger| { - info!( - logger, - "Number of inscriptions in block #{} to process: {} (L1 cache hits: {}, queue len: {}, L1 cache len: {}, L2 cache len: {})", - block.block_identifier.index, - transactions_ids.len(), - l1_cache_hits.len(), - next_blocks.len(), - cache_l1.len(), - cache_l2.len(), - ) - }); - - let mut rng = thread_rng(); - transactions_ids.shuffle(&mut rng); - let mut priority_queue = VecDeque::new(); - let mut warmup_queue = VecDeque::new(); - - for (transaction_id, input_index) in transactions_ids.into_iter() { - priority_queue.push_back(( - transaction_id, - block.block_identifier.clone(), - input_index, - true, - )); - } - - // Feed each workers with 2 workitems each - for thread_index in 0..thread_max { - let _ = tx_thread_pool[thread_index].send(priority_queue.pop_front()); - } - for thread_index in 0..thread_max { - let _ = tx_thread_pool[thread_index].send(priority_queue.pop_front()); - } - - let mut next_block_iter = next_blocks.iter(); - let mut traversals_received = 0; - while let Ok((traversal_result, prioritary, thread_index)) = traversal_rx.recv() { - if prioritary { - traversals_received += 1; - } - match traversal_result { - Ok(traversal) => { - inner_ctx.try_log(|logger| { - info!( - logger, - "Satoshi #{} was minted in block #{} at offset {} and was transferred {} times (progress: {traversals_received}/{expected_traversals}) (priority queue: {prioritary}, thread: {thread_index}).", - traversal.ordinal_number, traversal.get_ordinal_coinbase_height(), traversal.get_ordinal_coinbase_offset(), traversal.transfers - ) - }); - cache_l1.insert( - ( - traversal.transaction_identifier_inscription.clone(), - traversal.inscription_input_index, - ), - traversal, - ); - } - Err(e) => { - ctx.try_log(|logger| { - error!(logger, "Unable to compute inscription's Satoshi: {e}",) - }); - } - } - if traversals_received == expected_traversals { - break; - } - - if let Some(w) = priority_queue.pop_front() { - let _ = tx_thread_pool[thread_index].send(Some(w)); - } else { - if let Some(w) = warmup_queue.pop_front() { - let _ = tx_thread_pool[thread_index].send(Some(w)); - } else { - if let Some(next_block) = next_block_iter.next() { - let (mut transactions_ids, _) = get_transactions_to_process( - next_block, - cache_l1, - existing_inscriptions, - inscriptions_db_conn, - ctx, - ); - - ctx.try_log(|logger| { - info!( - logger, - "Number of inscriptions in block #{} to pre-process: {}", - block.block_identifier.index, - transactions_ids.len() - ) - }); - - transactions_ids.shuffle(&mut rng); - for (transaction_id, input_index) in transactions_ids.into_iter() { - warmup_queue.push_back(( - transaction_id, - next_block.block_identifier.clone(), - input_index, - false, - )); - } - let _ = tx_thread_pool[thread_index].send(warmup_queue.pop_front()); - } - } - } - } - - for tx in tx_thread_pool.iter() { - // Empty the queue - if let Ok((traversal_result, prioritary, thread_index)) = traversal_rx.try_recv() { - if let Ok(traversal) = traversal_result { - inner_ctx.try_log(|logger| { - info!( - logger, - "Satoshi #{} was minted in block #{} at offset {} and was transferred {} times (progress: {traversals_received}/{expected_traversals}) (priority queue: {prioritary}, thread: {thread_index}).", - traversal.ordinal_number, traversal.get_ordinal_coinbase_height(), traversal.get_ordinal_coinbase_offset(), traversal.transfers - ) - }); - cache_l1.insert( - ( - traversal.transaction_identifier_inscription.clone(), - traversal.inscription_input_index, - ), - traversal, - ); - } - } - let _ = tx.send(None); - } - - let _ = hiro_system_kit::thread_named("Garbage collection").spawn(move || { - for handle in thread_pool_handles.into_iter() { - let _ = handle.join(); - } - }); - } else { - ctx.try_log(|logger| { - info!( - logger, - "No inscriptions to index in block #{}", block.block_identifier.index - ) - }); - } - Ok(has_transactions_to_process) -} - -fn get_transactions_to_process( - block: &BitcoinBlockData, - cache_l1: &mut HashMap<(TransactionIdentifier, usize), TraversalResult>, - existing_inscriptions: &mut HashMap<(TransactionIdentifier, usize), TraversalResult>, - inscriptions_db_conn: &mut Connection, - ctx: &Context, -) -> ( - Vec<(TransactionIdentifier, usize)>, - Vec<(TransactionIdentifier, usize)>, -) { - let mut transactions_ids: Vec<(TransactionIdentifier, usize)> = vec![]; - let mut l1_cache_hits = vec![]; - - let mut known_transactions = - find_all_inscriptions_in_block(&block.block_identifier.index, inscriptions_db_conn, ctx); - - for tx in block.transactions.iter().skip(1) { - // Have a new inscription been revealed, if so, are looking at a re-inscription - for ordinal_event in tx.metadata.ordinal_operations.iter() { - let inscription_data = match ordinal_event { - OrdinalOperation::InscriptionRevealed(inscription_data) => inscription_data, - OrdinalOperation::CursedInscriptionRevealed(inscription_data) => inscription_data, - OrdinalOperation::InscriptionTransferred(_) => { - continue; - } - }; - let key = ( - tx.transaction_identifier.clone(), - inscription_data.inscription_input_index, - ); - if cache_l1.contains_key(&key) { - l1_cache_hits.push(key); - continue; - } - - if let Some(entry) = known_transactions.remove(&key) { - existing_inscriptions.insert(key, entry); - continue; - } - - // Enqueue for traversals - transactions_ids.push(( - tx.transaction_identifier.clone(), - inscription_data.inscription_input_index, - )); - } - } - (transactions_ids, l1_cache_hits) -} - -pub fn update_hord_db_and_augment_bitcoin_block_v3( - new_block: &mut BitcoinBlockData, - next_blocks: &Vec, - cache_l1: &mut HashMap<(TransactionIdentifier, usize), TraversalResult>, - cache_l2: &Arc>>, - inscription_height_hint: &mut InscriptionHeigthHint, - inscriptions_db_conn_rw: &mut Connection, - hord_config: &HordConfig, - ctx: &Context, -) -> Result<(), String> { - let mut existing_inscriptions = HashMap::new(); - - let transactions_processed = retrieve_inscribed_satoshi_points_from_block_v3( - &new_block, - &next_blocks, - cache_l1, - cache_l2, - &mut existing_inscriptions, - inscriptions_db_conn_rw, - &hord_config, - ctx, - )?; - - if !transactions_processed { - return Ok(()); - } - - let discard_changes: bool = get_any_entry_in_ordinal_activities( - &new_block.block_identifier.index, - inscriptions_db_conn_rw, - ctx, - ); - - let inner_ctx = if discard_changes { - Context::empty() - } else { - if hord_config.logs.ordinals_internals { - ctx.clone() - } else { - Context::empty() - } - }; - - let transaction = inscriptions_db_conn_rw.transaction().unwrap(); - let any_inscription_revealed = - update_storage_and_augment_bitcoin_block_with_inscription_reveal_data_tx( - new_block, - &transaction, - &cache_l1, - inscription_height_hint, - &inner_ctx, - )?; - - // Have inscriptions been transfered? - let any_inscription_transferred = - update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_tx( - new_block, - &transaction, - &inner_ctx, - )?; - - if !any_inscription_revealed && !any_inscription_transferred { - return Ok(()); - } - - if discard_changes { - ctx.try_log(|logger| { - info!( - logger, - "Ignoring updates for block #{}, activities present in database", - new_block.block_identifier.index, - ) - }); - } else { - ctx.try_log(|logger| { - info!( - logger, - "Saving updates for block {}", new_block.block_identifier.index, - ) - }); - transaction.commit().unwrap(); - ctx.try_log(|logger| { - info!( - logger, - "Updates saved for block {}", new_block.block_identifier.index, - ) - }); - } - - let inscriptions_revealed = get_inscriptions_revealed_in_block(&new_block) - .iter() - .map(|d| d.inscription_number.to_string()) - .collect::>(); - - ctx.try_log(|logger| { - info!( - logger, - "Block #{} revealed {} inscriptions [{}]", - new_block.block_identifier.index, - inscriptions_revealed.len(), - inscriptions_revealed.join(", ") - ) - }); - Ok(()) -} - -/// For each input of each transaction in the block, we retrieve the UTXO spent (outpoint_pre_transfer) -/// and we check using a `storage` (in-memory or sqlite) absctraction if we have some existing inscriptions -/// for this entry. -/// When this is the case, it means that an inscription_transfer event needs to be produced. We need to -/// compute the output index (if any) `post_transfer_output` that will now include the inscription. -/// When identifying the output index, we will also need to provide an updated offset for pin pointing -/// the satoshi location. -pub fn update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_tx( - block: &mut BitcoinBlockData, - hord_db_tx: &Transaction, - ctx: &Context, -) -> Result { - let mut storage_updated = false; - let mut cumulated_fees = 0; - let subsidy = Height(block.block_identifier.index).subsidy(); - let coinbase_txid = &block.transactions[0].transaction_identifier.clone(); - let network = match block.metadata.network { - BitcoinNetwork::Mainnet => Network::Bitcoin, - BitcoinNetwork::Regtest => Network::Regtest, - BitcoinNetwork::Testnet => Network::Testnet, - }; - for (tx_index, new_tx) in block.transactions.iter_mut().skip(1).enumerate() { - for (input_index, input) in new_tx.metadata.inputs.iter().enumerate() { - let outpoint_pre_transfer = format_outpoint_to_watch( - &input.previous_output.txid, - input.previous_output.vout as usize, - ); - - let entries = - find_inscriptions_at_wached_outpoint(&outpoint_pre_transfer, &hord_db_tx)?; - - // For each satpoint inscribed retrieved, we need to compute the next - // outpoint to watch - for watched_satpoint in entries.into_iter() { - let satpoint_pre_transfer = - format!("{}:{}", outpoint_pre_transfer, watched_satpoint.offset); - - // Question is: are inscriptions moving to a new output, - // burnt or lost in fees and transfered to the miner? - - let inputs = new_tx - .metadata - .inputs - .iter() - .map(|o| o.previous_output.value) - .collect::<_>(); - let outputs = new_tx - .metadata - .outputs - .iter() - .map(|o| o.value) - .collect::<_>(); - let post_transfer_data = compute_next_satpoint_data( - input_index, - watched_satpoint.offset, - &inputs, - &outputs, - ); - - let ( - outpoint_post_transfer, - offset_post_transfer, - updated_address, - post_transfer_output_value, - ) = match post_transfer_data { - SatPosition::Output((output_index, offset)) => { - let outpoint = - format_outpoint_to_watch(&new_tx.transaction_identifier, output_index); - let script_pub_key_hex = - new_tx.metadata.outputs[output_index].get_script_pubkey_hex(); - let updated_address = match Script::from_hex(&script_pub_key_hex) { - Ok(script) => match Address::from_script(&script, network.clone()) { - Ok(address) => Some(address.to_string()), - Err(e) => { - ctx.try_log(|logger| { - warn!( - logger, - "unable to retrieve address from {script_pub_key_hex}: {}", e.to_string() - ) - }); - None - } - }, - Err(e) => { - ctx.try_log(|logger| { - warn!( - logger, - "unable to retrieve address from {script_pub_key_hex}: {}", - e.to_string() - ) - }); - None - } - }; - - // At this point we know that inscriptions are being moved. - ctx.try_log(|logger| { - info!( - logger, - "Inscription {} moved from {} to {} (block: {})", - watched_satpoint.inscription_id, - satpoint_pre_transfer, - outpoint, - block.block_identifier.index, - ) - }); - - ( - outpoint, - offset, - updated_address, - Some(new_tx.metadata.outputs[output_index].value), - ) - } - SatPosition::Fee(offset) => { - // Get Coinbase TX - let total_offset = subsidy + cumulated_fees + offset; - let outpoint = format_outpoint_to_watch(&coinbase_txid, 0); - ctx.try_log(|logger| { - info!( - logger, - "Inscription {} spent in fees ({}+{}+{})", - watched_satpoint.inscription_id, - subsidy, - cumulated_fees, - offset - ) - }); - (outpoint, total_offset, None, None) - } - }; - - let satpoint_post_transfer = - format!("{}:{}", outpoint_post_transfer, offset_post_transfer); - - let transfer_data = OrdinalInscriptionTransferData { - inscription_id: watched_satpoint.inscription_id.clone(), - updated_address, - tx_index, - satpoint_pre_transfer, - satpoint_post_transfer, - post_transfer_output_value, - }; - - // Update watched outpoint - - insert_transfer_in_locations_tx( - &transfer_data, - &block.block_identifier, - &hord_db_tx, - &ctx, - ); - storage_updated = true; - - // Attach transfer event - new_tx - .metadata - .ordinal_operations - .push(OrdinalOperation::InscriptionTransferred(transfer_data)); - } - } - cumulated_fees += new_tx.metadata.fee; - } - Ok(storage_updated) -} - -pub fn update_storage_and_augment_bitcoin_block_with_inscription_reveal_data_tx( - block: &mut BitcoinBlockData, - transaction: &Transaction, - cache_l1: &HashMap<(TransactionIdentifier, usize), TraversalResult>, - inscription_height_hint: &mut InscriptionHeigthHint, - ctx: &Context, -) -> Result { - let mut storage_updated = false; - let network = match block.metadata.network { - BitcoinNetwork::Mainnet => Network::Bitcoin, - BitcoinNetwork::Regtest => Network::Regtest, - BitcoinNetwork::Testnet => Network::Testnet, - }; - - let mut latest_cursed_inscription_loaded = false; - let mut latest_cursed_inscription_number = 0; - let mut cursed_inscription_sequence_updated = false; - - let mut latest_blessed_inscription_loaded = false; - let mut latest_blessed_inscription_number = 0; - let mut blessed_inscription_sequence_updated = false; - - let mut sats_overflow = vec![]; - - for (tx_index, new_tx) in block.transactions.iter_mut().skip(1).enumerate() { - let mut ordinals_events_indexes_to_discard = VecDeque::new(); - let mut ordinals_events_indexes_to_curse = VecDeque::new(); - - // Have a new inscription been revealed, if so, are looking at a re-inscription - for (ordinal_event_index, ordinal_event) in - new_tx.metadata.ordinal_operations.iter_mut().enumerate() - { - let (inscription, is_cursed) = match ordinal_event { - OrdinalOperation::InscriptionRevealed(inscription) => (inscription, false), - OrdinalOperation::CursedInscriptionRevealed(inscription) => (inscription, true), - OrdinalOperation::InscriptionTransferred(_) => continue, - }; - - let mut inscription_number = if is_cursed { - latest_cursed_inscription_number = if !latest_cursed_inscription_loaded { - latest_cursed_inscription_loaded = true; - match find_latest_cursed_inscription_number_at_block_height( - &block.block_identifier.index, - &inscription_height_hint.cursed, - &transaction, - &ctx, - )? { - None => -1, - Some(inscription_number) => inscription_number - 1, - } - } else { - latest_cursed_inscription_number - 1 - }; - latest_cursed_inscription_number - } else { - latest_blessed_inscription_number = if !latest_blessed_inscription_loaded { - latest_blessed_inscription_loaded = true; - match find_latest_inscription_number_at_block_height( - &block.block_identifier.index, - &inscription_height_hint.blessed, - &transaction, - &ctx, - )? { - None => 0, - Some(inscription_number) => inscription_number + 1, - } - } else { - latest_blessed_inscription_number + 1 - }; - latest_blessed_inscription_number - }; - - let transaction_identifier = new_tx.transaction_identifier.clone(); - let traversal = match cache_l1 - .get(&(transaction_identifier, inscription.inscription_input_index)) - { - Some(traversal) => traversal, - None => { - ctx.try_log(|logger| { - info!( - logger, - "Unable to retrieve cached inscription data for inscription {}", - new_tx.transaction_identifier.hash - ); - }); - ordinals_events_indexes_to_discard.push_front(ordinal_event_index); - continue; - } - }; - - let outputs = &new_tx.metadata.outputs; - inscription.ordinal_offset = traversal.get_ordinal_coinbase_offset(); - inscription.ordinal_block_height = traversal.get_ordinal_coinbase_height(); - inscription.ordinal_number = traversal.ordinal_number; - inscription.inscription_number = traversal.inscription_number; - inscription.transfers_pre_inscription = traversal.transfers; - inscription.inscription_fee = new_tx.metadata.fee; - inscription.tx_index = tx_index; - inscription.satpoint_post_inscription = format_satpoint_to_watch( - &traversal.transfer_data.transaction_identifier_location, - traversal.transfer_data.output_index, - traversal.transfer_data.inscription_offset_intra_output, - ); - if let Some(output) = outputs.get(traversal.transfer_data.output_index) { - inscription.inscription_output_value = output.value; - inscription.inscriber_address = { - let script_pub_key = output.get_script_pubkey_hex(); - match Script::from_hex(&script_pub_key) { - Ok(script) => match Address::from_script(&script, network) { - Ok(a) => Some(a.to_string()), - _ => None, - }, - _ => None, - } - }; - } else { - ctx.try_log(|logger| { - warn!( - logger, - "Database corrupted, skipping cursed inscription => {:?} / {:?}", - traversal, - outputs - ); - }); - } - - if traversal.ordinal_number == 0 { - // If the satoshi inscribed correspond to a sat overflow, we will store the inscription - // and assign an inscription number after the other inscriptions, to mimick the - // bug in ord. - sats_overflow.push(inscription.clone()); - continue; - } - - if let Some(_entry) = - find_inscription_with_ordinal_number(&traversal.ordinal_number, &transaction, &ctx) - { - ctx.try_log(|logger| { - info!( - logger, - "Transaction {} in block {} is overriding an existing inscription {}", - new_tx.transaction_identifier.hash, - block.block_identifier.index, - traversal.ordinal_number - ); - }); - - inscription_number = if !latest_cursed_inscription_loaded { - latest_cursed_inscription_loaded = true; - match find_latest_cursed_inscription_number_at_block_height( - &block.block_identifier.index, - &inscription_height_hint.cursed, - &transaction, - &ctx, - )? { - None => -1, - Some(inscription_number) => inscription_number - 1, - } - } else { - latest_cursed_inscription_number - 1 - }; - inscription.curse_type = Some(OrdinalInscriptionCurseType::Reinscription); - - if !is_cursed { - ordinals_events_indexes_to_curse.push_front(ordinal_event_index); - latest_blessed_inscription_number -= 1; - } - } - - inscription.inscription_number = inscription_number; - ctx.try_log(|logger| { - info!( - logger, - "Inscription {} (#{}) detected on Satoshi {} (block {}, {} transfers)", - inscription.inscription_id, - inscription.inscription_number, - inscription.ordinal_number, - block.block_identifier.index, - inscription.transfers_pre_inscription, - ); - }); - insert_entry_in_inscriptions(&inscription, &block.block_identifier, &transaction, &ctx); - if inscription.curse_type.is_some() { - cursed_inscription_sequence_updated = true; - } else { - blessed_inscription_sequence_updated = true; - } - storage_updated = true; - } - - for index in ordinals_events_indexes_to_curse.into_iter() { - match new_tx.metadata.ordinal_operations.remove(index) { - OrdinalOperation::InscriptionRevealed(inscription_data) - | OrdinalOperation::CursedInscriptionRevealed(inscription_data) => { - ctx.try_log(|logger| { - info!( - logger, - "Inscription {} (#{}) transitioned from blessed to cursed", - inscription_data.inscription_id, - inscription_data.inscription_number, - ); - }); - new_tx.metadata.ordinal_operations.insert( - index, - OrdinalOperation::CursedInscriptionRevealed(inscription_data), - ); - } - _ => unreachable!(), - } - } - - for index in ordinals_events_indexes_to_discard.into_iter() { - new_tx.metadata.ordinal_operations.remove(index); - } - } - - for inscription in sats_overflow.iter_mut() { - inscription.inscription_number = latest_blessed_inscription_number; - ctx.try_log(|logger| { - info!( - logger, - "Inscription {} (#{}) detected on Satoshi overflow {} (block {}, {} transfers)", - inscription.inscription_id, - inscription.inscription_number, - inscription.ordinal_number, - block.block_identifier.index, - inscription.transfers_pre_inscription, - ); - }); - insert_entry_in_inscriptions(&inscription, &block.block_identifier, &transaction, &ctx); - latest_blessed_inscription_number += 1; - storage_updated = true; - if inscription.curse_type.is_some() { - cursed_inscription_sequence_updated = true; - } else { - blessed_inscription_sequence_updated = true; - } - } - - if cursed_inscription_sequence_updated { - inscription_height_hint.cursed = Some(block.block_identifier.index); - } - if blessed_inscription_sequence_updated { - inscription_height_hint.blessed = Some(block.block_identifier.index); - } - - Ok(storage_updated) -} diff --git a/components/hord-cli/src/db/mod.rs b/components/hord-cli/src/db/mod.rs index 4768be95..bc7c44e7 100644 --- a/components/hord-cli/src/db/mod.rs +++ b/components/hord-cli/src/db/mod.rs @@ -19,22 +19,9 @@ use chainhook_sdk::{ utils::Context, }; -use crate::ord::sat::Sat; - -#[derive(Clone, Debug)] -pub struct InscriptionHeigthHint { - pub cursed: Option, - pub blessed: Option, -} - -impl InscriptionHeigthHint { - pub fn new() -> InscriptionHeigthHint { - InscriptionHeigthHint { - cursed: None, - blessed: None, - } - } -} +use crate::{ + core::protocol::inscription_parsing::get_inscriptions_revealed_in_block, ord::sat::Sat, +}; fn get_default_hord_db_file_path(base_dir: &PathBuf) -> PathBuf { let mut destination_path = base_dir.clone(); @@ -94,7 +81,7 @@ pub fn initialize_hord_db(path: &PathBuf, ctx: &Context) -> Connection { [], ) { ctx.try_log(|logger| warn!(logger, "{}", e.to_string())); - } + } } if let Err(e) = conn.execute( "CREATE TABLE IF NOT EXISTS locations ( @@ -107,7 +94,11 @@ pub fn initialize_hord_db(path: &PathBuf, ctx: &Context) -> Connection { [], ) { ctx.try_log(|logger| { - warn!(logger, "Unable to create table locations: {}", e.to_string()) + warn!( + logger, + "Unable to create table locations: {}", + e.to_string() + ) }); } else { if let Err(e) = conn.execute( @@ -127,7 +118,7 @@ pub fn initialize_hord_db(path: &PathBuf, ctx: &Context) -> Connection { [], ) { ctx.try_log(|logger| warn!(logger, "{}", e.to_string())); - } + } } conn @@ -379,12 +370,42 @@ pub fn insert_entry_in_inscriptions( ) { ctx.try_log(|logger| error!(logger, "{}", e.to_string())); } - insert_inscription_in_locations( - &inscription_data, - &block_identifier, - &inscriptions_db_conn_rw, - ctx, - ); +} + +pub fn insert_new_inscriptions_from_block_in_inscriptions_and_locations( + block: &BitcoinBlockData, + inscriptions_db_conn_rw: &Connection, + ctx: &Context, +) { + for inscription_data in get_inscriptions_revealed_in_block(&block).iter() { + insert_entry_in_inscriptions( + inscription_data, + &block.block_identifier, + inscriptions_db_conn_rw, + &ctx, + ); + insert_inscription_in_locations( + &inscription_data, + &block.block_identifier, + &inscriptions_db_conn_rw, + ctx, + ); + } +} + +pub fn insert_new_inscriptions_from_block_in_locations( + block: &BitcoinBlockData, + inscriptions_db_conn_rw: &Connection, + ctx: &Context, +) { + for inscription_data in get_inscriptions_revealed_in_block(&block).iter() { + insert_inscription_in_locations( + inscription_data, + &block.block_identifier, + inscriptions_db_conn_rw, + &ctx, + ); + } } pub fn insert_inscription_in_locations( @@ -438,18 +459,18 @@ pub fn insert_transfer_in_locations( pub fn get_any_entry_in_ordinal_activities( block_height: &u64, - inscriptions_db_conn: &Connection, + inscriptions_db_tx: &Connection, _ctx: &Context, ) -> bool { let args: &[&dyn ToSql] = &[&block_height.to_sql().unwrap()]; - let mut stmt = inscriptions_db_conn + let mut stmt = inscriptions_db_tx .prepare("SELECT DISTINCT block_height FROM inscriptions WHERE block_height = ?") .unwrap(); let mut rows = stmt.query(args).unwrap(); while let Ok(Some(_)) = rows.next() { return true; } - let mut stmt = inscriptions_db_conn + let mut stmt = inscriptions_db_tx .prepare("SELECT DISTINCT block_height FROM locations WHERE block_height = ?") .unwrap(); let mut rows = stmt.query(args).unwrap(); @@ -539,7 +560,7 @@ pub fn find_latest_transfers_block_height( let mut rows = stmt.query(args).unwrap(); while let Ok(Some(row)) = rows.next() { let block_height: u64 = row.get(0).unwrap(); - return Some(block_height) + return Some(block_height); } None } @@ -682,14 +703,14 @@ pub fn find_latest_cursed_inscription_number_at_block_height( Ok(None) } -pub fn find_inscription_with_ordinal_number( +pub fn find_blessed_inscription_with_ordinal_number( ordinal_number: &u64, inscriptions_db_conn: &Connection, _ctx: &Context, ) -> Option { let args: &[&dyn ToSql] = &[&ordinal_number.to_sql().unwrap()]; let mut stmt = inscriptions_db_conn - .prepare("SELECT inscription_id FROM inscriptions WHERE ordinal_number = ? AND inscription_number > 0") + .prepare("SELECT inscription_id FROM inscriptions WHERE ordinal_number = ? AND inscription_number >= 0") .unwrap(); let mut rows = stmt.query(args).unwrap(); while let Ok(Some(row)) = rows.next() { @@ -750,17 +771,17 @@ pub fn find_inscription_with_id( pub fn find_all_inscriptions_in_block( block_height: &u64, - inscriptions_db_conn: &Connection, + inscriptions_db_tx: &Connection, ctx: &Context, ) -> BTreeMap<(TransactionIdentifier, usize), TraversalResult> { let args: &[&dyn ToSql] = &[&block_height.to_sql().unwrap()]; - let mut stmt = inscriptions_db_conn + let mut stmt = inscriptions_db_tx .prepare("SELECT inscription_number, ordinal_number, inscription_id FROM inscriptions where block_height = ? ORDER BY inscription_number ASC") .unwrap(); let mut results = BTreeMap::new(); let mut rows = stmt.query(args).unwrap(); - let transfers_data = find_all_transfers_in_block(block_height, inscriptions_db_conn, ctx); + let transfers_data = find_all_transfers_in_block(block_height, inscriptions_db_tx, ctx); while let Ok(Some(row)) = rows.next() { let inscription_number: i64 = row.get(0).unwrap(); let ordinal_number: u64 = row.get(1).unwrap(); diff --git a/components/hord-cli/src/scan/bitcoin.rs b/components/hord-cli/src/scan/bitcoin.rs index 3301e719..cc844099 100644 --- a/components/hord-cli/src/scan/bitcoin.rs +++ b/components/hord-cli/src/scan/bitcoin.rs @@ -1,6 +1,8 @@ use crate::config::{Config, PredicatesApi}; -use crate::core::pipeline::processors::inscription_indexing::re_augment_block_with_ordinals_operations; -use crate::core::{self, get_inscriptions_revealed_in_block}; +use crate::core::protocol::inscription_parsing::{ + get_inscriptions_revealed_in_block, parse_ordinals_and_standardize_block, +}; +use crate::core::protocol::inscription_sequencing::consolidate_block_with_pre_computed_ordinals_data; use crate::db::{get_any_entry_in_ordinal_activities, open_readonly_hord_db_conn}; use crate::download::download_ordinals_dataset_if_required; use crate::service::{ @@ -75,7 +77,7 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate( // Are we dealing with an ordinals-based predicate? // If so, we could use the ordinal storage to provide a set of hints. - let inscriptions_db_conn = open_readonly_hord_db_conn(&config.expected_cache_path(), ctx)?; + let mut inscriptions_db_conn = open_readonly_hord_db_conn(&config.expected_cache_path(), ctx)?; info!( ctx.expect_logger(), @@ -109,7 +111,7 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate( let block_breakdown = download_and_parse_block_with_retry(&http_client, &block_hash, &bitcoin_config, ctx) .await?; - let mut block = match core::parse_ordinals_and_standardize_block( + let mut block = match parse_ordinals_and_standardize_block( block_breakdown, &event_observer_config.bitcoin_network, ctx, @@ -124,8 +126,15 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate( } }; - let empty_ctx = Context::empty(); - re_augment_block_with_ordinals_operations(&mut block, &inscriptions_db_conn, &empty_ctx); + { + let inscriptions_db_tx = inscriptions_db_conn.transaction().unwrap(); + consolidate_block_with_pre_computed_ordinals_data( + &mut block, + &inscriptions_db_tx, + true, + &ctx, + ); + } let inscriptions_revealed = get_inscriptions_revealed_in_block(&block) .iter() diff --git a/components/hord-cli/src/service/mod.rs b/components/hord-cli/src/service/mod.rs index 36df1779..3020a135 100644 --- a/components/hord-cli/src/service/mod.rs +++ b/components/hord-cli/src/service/mod.rs @@ -6,13 +6,13 @@ use crate::core::pipeline::processors::inscription_indexing::process_blocks; use crate::core::pipeline::processors::start_inscription_indexing_processor; use crate::core::pipeline::processors::transfers_recomputing::start_transfers_recomputing_processor; use crate::core::pipeline::{download_and_pipeline_blocks, PostProcessorCommand}; +use crate::core::protocol::inscription_parsing::parse_inscriptions_in_standardized_block; +use crate::core::protocol::inscription_sequencing::SequenceCursor; use crate::core::{ - new_traversals_lazy_cache, parse_inscriptions_in_standardized_block, - revert_hord_db_with_augmented_bitcoin_block, should_sync_hord_db, + new_traversals_lazy_cache, revert_hord_db_with_augmented_bitcoin_block, should_sync_hord_db, }; use crate::db::{ - find_latest_inscription_block_height, initialize_hord_db, insert_entry_in_blocks, - open_readonly_hord_db_conn, open_readwrite_hord_dbs, InscriptionHeigthHint, LazyBlock, find_latest_transfers_block_height, + insert_entry_in_blocks, open_readonly_hord_db_conn, open_readwrite_hord_dbs, LazyBlock, }; use crate::scan::bitcoin::process_block_with_predicates; use crate::service::http_api::{load_predicates_from_redis, start_predicate_api_server}; @@ -105,7 +105,6 @@ impl Service { }) .expect("unable to spawn thread"); - // let (cursor, tip) = { // let inscriptions_db_conn = // open_readonly_hord_db_conn(&self.config.expected_cache_path(), &self.ctx)?; @@ -117,6 +116,7 @@ impl Service { // }; // self.replay_transfers(cursor, tip, Some(tx_replayer.clone())) // .await?; + self.update_state(Some(tx_replayer.clone())).await?; // Catch-up with chain tip @@ -259,12 +259,15 @@ impl Service { parse_inscriptions_in_standardized_block(block, &ctx); } + let inscriptions_db_conn = + open_readonly_hord_db_conn(&config.expected_cache_path(), &ctx) + .expect("unable to open inscriptions db"); + let mut sequence_cursor = SequenceCursor::new(inscriptions_db_conn); - let mut hint = InscriptionHeigthHint::new(); let updated_blocks = process_blocks( &mut blocks, + &mut sequence_cursor, &moved_traversals_cache, - &mut hint, &mut inscriptions_db_conn_rw, &config.get_hord_config(), &None,