Skip to content

Commit

Permalink
feat: ability to track updates when scanning bitcoin (+refactor)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ludo Galabru committed Jun 12, 2023
1 parent 7d9e179 commit 9e54bff
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 177 deletions.
2 changes: 1 addition & 1 deletion components/chainhook-cli/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,7 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
},
Command::Stacks(subcmd) => match subcmd {
StacksCommand::Db(StacksDbCommand::GetBlock(cmd)) => {
let mut config = Config::default(false, false, false, &cmd.config_path)?;
let config = Config::default(false, false, false, &cmd.config_path)?;
let stacks_db = open_readonly_stacks_db_conn(&config.expected_cache_path(), &ctx)
.expect("unable to read stacks_db");
match get_stacks_block_at_block_height(cmd.block_height, true, 10, &stacks_db) {
Expand Down
252 changes: 103 additions & 149 deletions components/chainhook-cli/src/scan/bitcoin.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use crate::archive::download_ordinals_dataset_if_required;
use crate::config::Config;
use crate::config::{Config, PredicatesApi};
use crate::service::{
open_readwrite_predicates_db_conn_or_panic, update_predicate_status, PredicateStatus,
ScanningData,
};
use chainhook_sdk::bitcoincore_rpc::RpcApi;
use chainhook_sdk::bitcoincore_rpc::{Auth, Client};
use chainhook_sdk::chainhooks::bitcoin::{
Expand All @@ -8,10 +12,9 @@ use chainhook_sdk::chainhooks::bitcoin::{
};
use chainhook_sdk::chainhooks::types::{BitcoinChainhookSpecification, BitcoinPredicateType};
use chainhook_sdk::hord::db::{
fetch_and_cache_blocks_in_hord_db, find_all_inscriptions, find_last_block_inserted,
find_lazy_block_at_block_height, open_readonly_hord_db_conn,
open_readonly_hord_db_conn_rocks_db, open_readwrite_hord_db_conn,
open_readwrite_hord_db_conn_rocks_db,
fetch_and_cache_blocks_in_hord_db, find_all_inscriptions_in_block, find_last_block_inserted,
find_lazy_block_at_block_height, get_any_entry_in_ordinal_activities,
open_readonly_hord_db_conn, open_readwrite_hord_db_conn, open_readwrite_hord_db_conn_rocks_db,
};
use chainhook_sdk::hord::{
get_inscriptions_revealed_in_block,
Expand Down Expand Up @@ -71,48 +74,27 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(

// Are we dealing with an ordinals-based predicate?
// If so, we could use the ordinal storage to provide a set of hints.
let mut inscriptions_cache = BTreeMap::new();
let mut is_predicate_evaluating_ordinals = false;
let mut hord_blocks_requires_update = false;
let mut inscriptions_db_conn = None;

if let BitcoinPredicateType::OrdinalsProtocol(_) = &predicate_spec.predicate {
is_predicate_evaluating_ordinals = true;
if let Ok(inscriptions_db_conn) =
open_readonly_hord_db_conn(&config.expected_cache_path(), &ctx)
{
inscriptions_cache = find_all_inscriptions(&inscriptions_db_conn);
// Will we have to update the blocks table?
if let Ok(blocks_db) =
open_readonly_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)
{
if find_lazy_block_at_block_height(end_block as u32, 3, &blocks_db).is_none() {
hord_blocks_requires_update = true;
}
}
}
}
let blocks_db_rw =
open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), ctx)?;

// Do we need a seeded hord db?
if is_predicate_evaluating_ordinals && inscriptions_cache.is_empty() {
// Do we need to update the blocks table first?
if hord_blocks_requires_update {
if find_lazy_block_at_block_height(end_block as u32, 3, &blocks_db_rw).is_none() {
// Count how many entries in the table
// Compute the right interval
// Start the build local storage routine

// TODO: make sure that we have a contiguous chain
// check_compacted_blocks_chain_integrity(&hord_db_conn);

let blocks_db_rw =
open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), ctx)?;

let start_block = find_last_block_inserted(&blocks_db_rw) as u64;
if start_block < end_block {
warn!(
ctx.expect_logger(),
"Database hord.sqlite appears to be outdated regarding the window of blocks provided. Syncing {} missing blocks",
(end_block - start_block)
);
ctx.expect_logger(),
"Database hord.sqlite appears to be outdated regarding the window of blocks provided. Syncing {} missing blocks",
(end_block - start_block)
);

let inscriptions_db_conn_rw =
open_readwrite_hord_db_conn(&config.expected_cache_path(), ctx)?;
Expand All @@ -126,10 +108,13 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
&ctx,
)
.await?;

inscriptions_cache = find_all_inscriptions(&inscriptions_db_conn_rw);
}
}

inscriptions_db_conn = Some(open_readonly_hord_db_conn(
&config.expected_cache_path(),
ctx,
)?);
}

info!(
Expand All @@ -139,53 +124,60 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(

let mut blocks_scanned = 0;
let mut actions_triggered = 0;
let mut occurrences_found = 0u64;
let mut err_count = 0;

let event_observer_config = config.get_event_observer_config();
let bitcoin_config = event_observer_config.get_bitcoin_config();
let mut traversals = HashMap::new();
if is_predicate_evaluating_ordinals {
let hord_db_conn = open_readonly_hord_db_conn(&config.expected_cache_path(), ctx)?;

let mut storage = Storage::Memory(BTreeMap::new());
let mut cursor = start_block.saturating_sub(1);
while cursor <= end_block {
cursor += 1;
let mut storage = Storage::Memory(BTreeMap::new());
let mut cursor = start_block.saturating_sub(1);

while cursor <= end_block {
cursor += 1;
blocks_scanned += 1;

let block_hash = retrieve_block_hash_with_retry(&cursor, &bitcoin_config, ctx).await?;
let block_breakdown =
download_and_parse_block_with_retry(&block_hash, &bitcoin_config, ctx).await?;
let mut block = match indexer::bitcoin::standardize_bitcoin_block(
block_breakdown,
&event_observer_config.bitcoin_network,
ctx,
) {
Ok(data) => data,
Err(e) => {
warn!(
ctx.expect_logger(),
"Unable to standardize block#{} {}: {}", cursor, block_hash, e
);
continue;
}
};

if let Some(ref inscriptions_db_conn) = inscriptions_db_conn {
if !get_any_entry_in_ordinal_activities(&cursor, &inscriptions_db_conn, &ctx) {}

// Evaluating every single block is required for also keeping track of transfers.
let local_traverals = match inscriptions_cache.remove(&cursor) {
let local_traverals = match find_all_inscriptions_in_block(
&cursor,
&inscriptions_db_conn,
)
.remove(&cursor)
{
Some(entry) => entry,
None => vec![],
};
for (transaction_identifier, traversal_result) in local_traverals.into_iter() {
traversals.insert(transaction_identifier, traversal_result);
}

blocks_scanned += 1;

let block_hash = retrieve_block_hash_with_retry(&cursor, &bitcoin_config, ctx).await?;
let block_breakdown =
download_and_parse_block_with_retry(&block_hash, &bitcoin_config, ctx).await?;
let mut block = match indexer::bitcoin::standardize_bitcoin_block(
block_breakdown,
&event_observer_config.bitcoin_network,
ctx,
) {
Ok(data) => data,
Err(e) => {
warn!(
ctx.expect_logger(),
"Unable to standardize block#{} {}: {}", cursor, block_hash, e
);
continue;
}
};

let _ = update_storage_and_augment_bitcoin_block_with_inscription_reveal_data(
&mut block,
&mut storage,
&traversals,
&hord_db_conn,
&inscriptions_db_conn,
&ctx,
)?;

Expand All @@ -200,116 +192,78 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
.map(|d| d.inscription_number.to_string())
.collect::<Vec<String>>();

let chain_event =
BitcoinChainEvent::ChainUpdatedWithBlocks(BitcoinChainUpdatedWithBlocksData {
new_blocks: vec![block],
confirmed_blocks: vec![],
});

let (predicates_triggered, _predicates_evaluated) =
evaluate_bitcoin_chainhooks_on_chain_event(
&chain_event,
vec![&predicate_spec],
ctx,
);

info!(
ctx.expect_logger(),
"Processing block #{} through {} predicate (inscriptions revealed: [{}])",
cursor,
predicate_spec.uuid,
inscriptions_revealed.join(", ")
);
}

match execute_predicates_action(predicates_triggered, &event_observer_config, &ctx)
.await
{
Ok(actions) => actions_triggered += actions,
Err(_) => err_count += 1,
}
let chain_event =
BitcoinChainEvent::ChainUpdatedWithBlocks(BitcoinChainUpdatedWithBlocksData {
new_blocks: vec![block],
confirmed_blocks: vec![],
});

if err_count >= 3 {
return Err(format!("Scan aborted (consecutive action errors >= 3)"));
}
let (predicates_triggered, _predicates_evaluated) =
evaluate_bitcoin_chainhooks_on_chain_event(&chain_event, vec![&predicate_spec], ctx);
occurrences_found += predicates_triggered.len() as u64;

if cursor == end_block && floating_end_block {
end_block = match bitcoin_rpc.get_blockchain_info() {
Ok(result) => result.blocks - 1,
Err(_e) => {
continue;
}
};
if let PredicatesApi::On(ref api_config) = config.http_api {
if blocks_scanned % 50 == 0 {
let status = PredicateStatus::Scanning(ScanningData {
start_block,
end_block,
cursor,
occurrences_found,
});
let mut predicates_db_conn =
open_readwrite_predicates_db_conn_or_panic(api_config, &ctx);
update_predicate_status(
&predicate_spec.key(),
status,
&mut predicates_db_conn,
&ctx,
)
}
}
} else {
let use_scan_to_seed_hord_db = true;

if use_scan_to_seed_hord_db {
// Start ingestion pipeline
match execute_predicates_action(predicates_triggered, &event_observer_config, &ctx).await {
Ok(actions) => actions_triggered += actions,
Err(_) => err_count += 1,
}

let mut cursor = start_block.saturating_sub(1);
while cursor <= end_block {
cursor += 1;
blocks_scanned += 1;
let block_hash = retrieve_block_hash_with_retry(&cursor, &bitcoin_config, ctx).await?;
let block_breakdown =
download_and_parse_block_with_retry(&block_hash, &bitcoin_config, ctx).await?;

let block = match indexer::bitcoin::standardize_bitcoin_block(
block_breakdown,
&event_observer_config.bitcoin_network,
ctx,
) {
Ok(data) => data,
Err(e) => {
warn!(
ctx.expect_logger(),
"Unable to standardize block#{} {}: {}", cursor, block_hash, e
);
if err_count >= 3 {
return Err(format!("Scan aborted (consecutive action errors >= 3)"));
}

if cursor == end_block && floating_end_block {
end_block = match bitcoin_rpc.get_blockchain_info() {
Ok(result) => result.blocks - 1,
Err(_e) => {
continue;
}
};

let chain_event =
BitcoinChainEvent::ChainUpdatedWithBlocks(BitcoinChainUpdatedWithBlocksData {
new_blocks: vec![block],
confirmed_blocks: vec![],
});

let (predicates_triggered, _predicates_evaluated) =
evaluate_bitcoin_chainhooks_on_chain_event(
&chain_event,
vec![&predicate_spec],
ctx,
);

match execute_predicates_action(predicates_triggered, &event_observer_config, &ctx)
.await
{
Ok(actions) => actions_triggered += actions,
Err(_) => err_count += 1,
}

if err_count >= 3 {
return Err(format!("Scan aborted (consecutive action errors >= 3)"));
}

if cursor == end_block && floating_end_block {
end_block = match bitcoin_rpc.get_blockchain_info() {
Ok(result) => result.blocks - 1,
Err(_e) => {
continue;
}
};
}
}
}
info!(
ctx.expect_logger(),
"{blocks_scanned} blocks scanned, {actions_triggered} actions triggered"
);

if let PredicatesApi::On(ref api_config) = config.http_api {
let status = PredicateStatus::Scanning(ScanningData {
start_block,
end_block,
cursor,
occurrences_found,
});
let mut predicates_db_conn = open_readwrite_predicates_db_conn_or_panic(api_config, &ctx);
update_predicate_status(&predicate_spec.key(), status, &mut predicates_db_conn, &ctx)
}

Ok(())
}

Expand Down
13 changes: 7 additions & 6 deletions components/chainhook-cli/src/scan/stacks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,13 +248,14 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate(
ctx.expect_logger(),
"{blocks_scanned} blocks scanned, {occurrences_found} occurrences found"
);
let status = PredicateStatus::Scanning(ScanningData {
start_block,
end_block,
cursor,
occurrences_found,
});

if let Some(ref mut predicates_db_conn) = predicates_db_conn {
let status = PredicateStatus::Scanning(ScanningData {
start_block,
end_block,
cursor,
occurrences_found,
});
update_predicate_status(&predicate_spec.key(), status, predicates_db_conn, &ctx)
}
Ok(last_block_scanned)
Expand Down
Loading

0 comments on commit 9e54bff

Please sign in to comment.