Skip to content

Commit

Permalink
feat: ability to target blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
lgalabru committed Aug 3, 2023
1 parent 8464b82 commit f6be49e
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 108 deletions.
180 changes: 76 additions & 104 deletions components/hord-cli/src/scan/bitcoin.rs
@@ -1,13 +1,7 @@
use crate::config::{Config, PredicatesApi};
use crate::core::protocol::sequencing::{
update_storage_and_augment_bitcoin_block_with_inscription_reveal_data_tx,
update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_tx,
};
use crate::core::pipeline::processors::inscription_indexing::re_augment_block_with_ordinals_operations;
use crate::core::{self, get_inscriptions_revealed_in_block};
use crate::db::{
find_all_inscriptions_in_block, get_any_entry_in_ordinal_activities,
open_readonly_hord_db_conn, InscriptionHeigthHint,
};
use crate::db::{get_any_entry_in_ordinal_activities, open_readonly_hord_db_conn};
use crate::download::download_ordinals_dataset_if_required;
use crate::service::{
open_readwrite_predicates_db_conn_or_panic, update_predicate_status, PredicateStatus,
Expand All @@ -19,15 +13,15 @@ use chainhook_sdk::chainhooks::bitcoin::{
evaluate_bitcoin_chainhooks_on_chain_event, handle_bitcoin_hook_action,
BitcoinChainhookOccurrence, BitcoinTriggerChainhook,
};
use chainhook_sdk::chainhooks::types::{BitcoinChainhookSpecification, BitcoinPredicateType};
use chainhook_sdk::chainhooks::types::BitcoinChainhookSpecification;
use chainhook_sdk::indexer::bitcoin::{
build_http_client, download_and_parse_block_with_retry, retrieve_block_hash_with_retry,
};
use chainhook_sdk::observer::{gather_proofs, EventObserverConfig};
use chainhook_sdk::types::{
BitcoinBlockData, BitcoinChainEvent, BitcoinChainUpdatedWithBlocksData,
};
use chainhook_sdk::utils::{file_append, send_request, Context};
use chainhook_sdk::utils::{file_append, send_request, BlockHeights, Context};
use std::collections::HashMap;

// TODO(lgalabru): Re-introduce support for blocks[] !!! gracefully handle hints for non consecutive blocks
Expand All @@ -49,72 +43,69 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
return Err(format!("Bitcoin RPC error: {}", message.to_string()));
}
};
let mut floating_end_block = false;

let start_block = match predicate_spec.start_block {
Some(start_block) => start_block,
None => {
return Err(
"Bitcoin chainhook specification must include a field start_block in replay mode"
.into(),
);
}
};

let (mut end_block, floating_end_block) = match predicate_spec.end_block {
Some(end_block) => (end_block, false),
None => match bitcoin_rpc.get_blockchain_info() {
Ok(result) => (result.blocks - 1, true),
Err(e) => {
return Err(format!(
"unable to retrieve Bitcoin chain tip ({})",
e.to_string()
));
let mut block_heights_to_scan = if let Some(ref blocks) = predicate_spec.blocks {
BlockHeights::Blocks(blocks.clone()).get_sorted_entries()
} else {
let start_block = match predicate_spec.start_block {
Some(start_block) => start_block,
None => {
return Err(
"Bitcoin chainhook specification must include a field start_block in replay mode"
.into(),
);
}
},
};
let (end_block, update_end_block) = match predicate_spec.end_block {
Some(end_block) => (end_block, false),
None => match bitcoin_rpc.get_blockchain_info() {
Ok(result) => (result.blocks - 1, true),
Err(e) => {
return Err(format!(
"unable to retrieve Bitcoin chain tip ({})",
e.to_string()
));
}
},
};
floating_end_block = update_end_block;
BlockHeights::BlockRange(start_block, end_block).get_sorted_entries()
};

// Are we dealing with an ordinals-based predicate?
// If so, we could use the ordinal storage to provide a set of hints.
let mut inscriptions_db_conn = None;

if let BitcoinPredicateType::OrdinalsProtocol(_) = &predicate_spec.predicate {
inscriptions_db_conn = Some(open_readonly_hord_db_conn(
&config.expected_cache_path(),
ctx,
)?);
}
let inscriptions_db_conn = open_readonly_hord_db_conn(&config.expected_cache_path(), ctx)?;

info!(
ctx.expect_logger(),
"Starting predicate evaluation on Bitcoin blocks",
);

let mut blocks_scanned = 0;
let mut actions_triggered = 0;
let occurrences_found = 0u64;
let mut err_count = 0;

let event_observer_config = config.get_event_observer_config();
let bitcoin_config = event_observer_config.get_bitcoin_config();
let mut traversals = HashMap::new();
let number_of_blocks_to_scan = block_heights_to_scan.len() as u64;
let mut number_of_blocks_scanned = 0;
let mut number_of_blocks_sent = 0u64;
let http_client = build_http_client();

let mut cursor = start_block.saturating_sub(1);
while let Some(current_block_height) = block_heights_to_scan.pop_front() {
number_of_blocks_scanned += 1;

let mut inscription_height_hint = InscriptionHeigthHint::new();

while cursor <= end_block {
cursor += 1;
blocks_scanned += 1;

if let Some(ref inscriptions_db_conn) = inscriptions_db_conn {
if !get_any_entry_in_ordinal_activities(&cursor, &inscriptions_db_conn, &ctx) {
continue;
}
if !get_any_entry_in_ordinal_activities(&current_block_height, &inscriptions_db_conn, &ctx)
{
continue;
}

let block_hash =
retrieve_block_hash_with_retry(&http_client, &cursor, &bitcoin_config, ctx).await?;
let block_hash = retrieve_block_hash_with_retry(
&http_client,
&current_block_height,
&bitcoin_config,
ctx,
)
.await?;
let block_breakdown =
download_and_parse_block_with_retry(&http_client, &block_hash, &bitcoin_config, ctx)
.await?;
Expand All @@ -127,49 +118,26 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
Err((e, _)) => {
warn!(
ctx.expect_logger(),
"Unable to standardize block#{} {}: {}", cursor, block_hash, e
"Unable to standardize block#{} {}: {}", current_block_height, block_hash, e
);
continue;
}
};

if let Some(ref mut inscriptions_db_conn) = inscriptions_db_conn {
// Evaluating every single block is required for also keeping track of transfers.
let local_traverals =
find_all_inscriptions_in_block(&cursor, &inscriptions_db_conn, &ctx);
for (key, traversal_result) in local_traverals.into_iter() {
traversals.insert(key, traversal_result);
}

let transaction = inscriptions_db_conn.transaction().unwrap();
let empty_ctx = Context::empty();
let _ = update_storage_and_augment_bitcoin_block_with_inscription_reveal_data_tx(
&mut block,
&transaction,
&traversals,
&mut inscription_height_hint,
&empty_ctx,
)?;

let _ = update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_tx(
&mut block,
&transaction,
&empty_ctx,
)?;
let empty_ctx = Context::empty();
re_augment_block_with_ordinals_operations(&mut block, &inscriptions_db_conn, &empty_ctx);

let inscriptions_revealed = get_inscriptions_revealed_in_block(&block)
.iter()
.map(|d| d.inscription_number.to_string())
.collect::<Vec<String>>();
let inscriptions_revealed = get_inscriptions_revealed_in_block(&block)
.iter()
.map(|d| d.inscription_number.to_string())
.collect::<Vec<String>>();

info!(
ctx.expect_logger(),
"Processing block #{} through {} predicate (inscriptions revealed: [{}])",
cursor,
predicate_spec.uuid,
inscriptions_revealed.join(", ")
);
}
info!(
ctx.expect_logger(),
"Processing block #{current_block_height} through {} predicate (inscriptions revealed: [{}])",
predicate_spec.uuid,
inscriptions_revealed.join(", ")
);

match process_block_with_predicates(
block,
Expand All @@ -188,12 +156,12 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
}

if let PredicatesApi::On(ref api_config) = config.http_api {
if blocks_scanned % 50 == 0 {
if number_of_blocks_scanned % 50 == 0 {
let status = PredicateStatus::Scanning(ScanningData {
start_block,
end_block,
cursor,
occurrences_found,
number_of_blocks_to_scan,
number_of_blocks_scanned,
number_of_blocks_sent,
current_block_height,
});
let mut predicates_db_conn =
open_readwrite_predicates_db_conn_or_panic(api_config, &ctx);
Expand All @@ -206,9 +174,13 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
}
}

if cursor == end_block && floating_end_block {
end_block = match bitcoin_rpc.get_blockchain_info() {
Ok(result) => result.blocks - 1,
if block_heights_to_scan.is_empty() && floating_end_block {
match bitcoin_rpc.get_blockchain_info() {
Ok(result) => {
for entry in (current_block_height + 1)..result.blocks {
block_heights_to_scan.push_back(entry);
}
}
Err(_e) => {
continue;
}
Expand All @@ -217,15 +189,15 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
}
info!(
ctx.expect_logger(),
"{blocks_scanned} blocks scanned, {actions_triggered} actions triggered"
"{number_of_blocks_scanned} blocks scanned, {actions_triggered} actions triggered"
);

if let PredicatesApi::On(ref api_config) = config.http_api {
let status = PredicateStatus::Scanning(ScanningData {
start_block,
end_block,
cursor,
occurrences_found,
number_of_blocks_to_scan,
number_of_blocks_scanned,
number_of_blocks_sent,
current_block_height: 0,
});
let mut predicates_db_conn = open_readwrite_predicates_db_conn_or_panic(api_config, &ctx);
update_predicate_status(&predicate_spec.key(), status, &mut predicates_db_conn, &ctx)
Expand Down
8 changes: 4 additions & 4 deletions components/hord-cli/src/service/mod.rs
Expand Up @@ -595,10 +595,10 @@ pub enum PredicateStatus {

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScanningData {
pub start_block: u64,
pub cursor: u64,
pub end_block: u64,
pub occurrences_found: u64,
pub number_of_blocks_to_scan: u64,
pub number_of_blocks_scanned: u64,
pub number_of_blocks_sent: u64,
pub current_block_height: u64,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down

0 comments on commit f6be49e

Please sign in to comment.