Skip to content

Commit

Permalink
fix: compatibility with clarinet
Browse files Browse the repository at this point in the history
  • Loading branch information
lgalabru committed Mar 22, 2023
1 parent f55a5ee commit a282655
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 61 deletions.
10 changes: 10 additions & 0 deletions README.md
Expand Up @@ -372,6 +372,16 @@ The current `stacks` predicates supports the following `if_this` constructs:
}
}

// Get any stacks block matching constraints
// `block_height` mandatory argument admits:
// - `equals`, `higher_than`, `lower_than`, `between`: integer type.
{
"if_this": {
"scope": "block_height",
"higher_than": 10000
}
}

// Get any transaction related to a given fungible token asset identifier
// `asset-identifier` mandatory argument admits:
// - string type, fully qualifying the asset identifier to observe. example: `ST1PQHQKV0RJXZFY1DGX8MNSNYVE3VGZJSRTPGZGM.cbtc-sip10::cbtc`
Expand Down
11 changes: 7 additions & 4 deletions components/chainhook-cli/src/cli/mod.rs
Expand Up @@ -7,7 +7,7 @@ use crate::scan::stacks::scan_stacks_chain_with_predicate;
use chainhook_event_observer::chainhooks::types::ChainhookFullSpecification;
use chainhook_event_observer::indexer::ordinals::db::{
build_bitcoin_traversal_local_storage, open_readonly_ordinals_db_conn,
retrieve_satoshi_point_using_local_storage,
retrieve_satoshi_point_using_local_storage, open_readwrite_ordinals_db_conn, initialize_ordinal_state_storage,
};
use chainhook_event_observer::indexer::ordinals::ord::height::Height;
use chainhook_event_observer::observer::BitcoinConfig;
Expand Down Expand Up @@ -294,8 +294,9 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
index: cmd.block_height,
hash: "".into(),
};
let storage_conn =
open_readonly_ordinals_db_conn(&config.expected_cache_path()).unwrap();

let storage_conn = open_readonly_ordinals_db_conn(&config.expected_cache_path(), &ctx).unwrap();

let (block_height, offset) = retrieve_satoshi_point_using_local_storage(
&storage_conn,
&block_identifier,
Expand All @@ -318,9 +319,11 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
rpc_url: config.network.bitcoin_node_rpc_url.clone(),
};

let storage_conn = initialize_ordinal_state_storage(&config.expected_cache_path(), &ctx);

let _ = build_bitcoin_traversal_local_storage(
&bitcoin_config,
&config.expected_cache_path(),
&storage_conn,
cmd.start_block,
cmd.end_block,
&ctx,
Expand Down
11 changes: 5 additions & 6 deletions components/chainhook-cli/src/scan/bitcoin.rs
Expand Up @@ -13,9 +13,8 @@ use chainhook_event_observer::indexer::bitcoin::{
retrieve_block_hash, retrieve_full_block_breakdown_with_retry,
};
use chainhook_event_observer::indexer::ordinals::db::{
get_default_ordinals_db_file_path, initialize_ordinal_state_storage,
open_readonly_ordinals_db_conn, retrieve_satoshi_point_using_local_storage,
write_compacted_block_to_index, CompactedBlock,
initialize_ordinal_state_storage, open_readonly_ordinals_db_conn,
retrieve_satoshi_point_using_local_storage, write_compacted_block_to_index, CompactedBlock,
};
use chainhook_event_observer::indexer::ordinals::ord::indexing::entry::Entry;
use chainhook_event_observer::indexer::ordinals::ord::indexing::{
Expand Down Expand Up @@ -193,7 +192,8 @@ pub async fn scan_bitcoin_chain_with_predicate(
ctx_.expect_logger(),
"Retrieving satoshi point for {}", transaction.transaction_identifier.hash
);
let storage_conn = open_readonly_ordinals_db_conn(&cache_path).unwrap();

let storage_conn = open_readonly_ordinals_db_conn(&cache_path, &ctx_).unwrap();
let res = retrieve_satoshi_point_using_local_storage(
&storage_conn,
&block_identifier,
Expand Down Expand Up @@ -237,10 +237,9 @@ pub async fn scan_bitcoin_chain_with_predicate(
.expect("unable to detach thread");

let ctx_ = ctx.clone();
let db_file = get_default_ordinals_db_file_path(&config.expected_cache_path());
let conn = initialize_ordinal_state_storage(&config.expected_cache_path(), &ctx_);
let handle_3 = hiro_system_kit::thread_named("Ordinal ingestion")
.spawn(move || {
let conn = initialize_ordinal_state_storage(&db_file, &ctx_);
while let Ok(Some((height, compacted_block))) = cache_block_rx.recv() {
info!(ctx_.expect_logger(), "Caching block #{height}");
write_compacted_block_to_index(height, &compacted_block, &conn, &ctx_);
Expand Down
10 changes: 10 additions & 0 deletions components/chainhook-event-observer/src/chainhooks/bitcoin/mod.rs
Expand Up @@ -406,6 +406,16 @@ impl BitcoinPredicateType {
}
false
}
BitcoinPredicateType::Protocol(Protocols::Ordinal(
OrdinalOperations::InscriptionTransfered,
)) => {
for op in tx.metadata.ordinal_operations.iter() {
if let OrdinalOperation::InscriptionTransfered(_) = op {
return true;
}
}
false
}
}
}
}
Expand Up @@ -516,6 +516,7 @@ pub enum StacksOperations {
#[serde(rename_all = "snake_case")]
pub enum OrdinalOperations {
InscriptionRevealed,
InscriptionTransfered,
}

pub fn get_stacks_canonical_magic_bytes(network: &BitcoinNetwork) -> [u8; 2] {
Expand Down
152 changes: 109 additions & 43 deletions components/chainhook-event-observer/src/indexer/ordinals/db/mod.rs
@@ -1,6 +1,8 @@
use std::{path::PathBuf, time::Duration};

use chainhook_types::{BlockIdentifier, OrdinalInscriptionRevealData, TransactionIdentifier};
use chainhook_types::{
BitcoinBlockData, BlockIdentifier, OrdinalInscriptionRevealData, TransactionIdentifier,
};
use hiro_system_kit::slog;
use rand::RngCore;
use rusqlite::{Connection, OpenFlags, ToSql};
Expand All @@ -14,15 +16,26 @@ use crate::{
utils::Context,
};

pub fn get_default_ordinals_db_file_path(base_dir: &PathBuf) -> PathBuf {
fn get_default_ordinals_db_file_path(base_dir: &PathBuf) -> PathBuf {
let mut destination_path = base_dir.clone();
destination_path.push("bitcoin_block_traversal-readonly.sqlite");
destination_path.push("bitcoin_block_traversal.sqlite");
destination_path
}

pub fn open_readonly_ordinals_db_conn(base_dir: &PathBuf) -> Result<Connection, String> {
pub fn open_readonly_ordinals_db_conn(
base_dir: &PathBuf,
ctx: &Context,
) -> Result<Connection, String> {
let path = get_default_ordinals_db_file_path(&base_dir);
let conn = open_existing_readonly_db(&path);
let conn = open_existing_readonly_db(&path, ctx);
Ok(conn)
}

pub fn open_readwrite_ordinals_db_conn(
base_dir: &PathBuf,
ctx: &Context,
) -> Result<Connection, String> {
let conn = create_or_open_readwrite_db(&base_dir, ctx);
Ok(conn)
}

Expand Down Expand Up @@ -65,12 +78,13 @@ pub fn initialize_ordinal_state_storage(path: &PathBuf, ctx: &Context) -> Connec
conn
}

fn create_or_open_readwrite_db(path: &PathBuf, ctx: &Context) -> Connection {
let open_flags = match std::fs::metadata(path) {
fn create_or_open_readwrite_db(cache_path: &PathBuf, ctx: &Context) -> Connection {
let path = get_default_ordinals_db_file_path(&cache_path);
let open_flags = match std::fs::metadata(&path) {
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
// need to create
if let Some(dirp) = PathBuf::from(path).parent() {
if let Some(dirp) = PathBuf::from(&path).parent() {
std::fs::create_dir_all(dirp).unwrap_or_else(|e| {
ctx.try_log(|logger| slog::error!(logger, "{}", e.to_string()));
});
Expand All @@ -86,15 +100,23 @@ fn create_or_open_readwrite_db(path: &PathBuf, ctx: &Context) -> Connection {
}
};

let conn = Connection::open_with_flags(path, open_flags).unwrap();
let conn = loop {
match Connection::open_with_flags(&path, open_flags) {
Ok(conn) => break conn,
Err(e) => {
ctx.try_log(|logger| slog::error!(logger, "{}", e.to_string()));
}
};
std::thread::sleep(std::time::Duration::from_secs(1));
};
// db.profile(Some(trace_profile));
// db.busy_handler(Some(tx_busy_handler))?;
conn.pragma_update(None, "journal_mode", &"WAL").unwrap();
conn.pragma_update(None, "synchronous", &"NORMAL").unwrap();
conn
}

fn open_existing_readonly_db(path: &PathBuf) -> Connection {
fn open_existing_readonly_db(path: &PathBuf, ctx: &Context) -> Connection {
let open_flags = match std::fs::metadata(path) {
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
Expand All @@ -109,8 +131,16 @@ fn open_existing_readonly_db(path: &PathBuf) -> Connection {
}
};

let conn = Connection::open_with_flags(path, open_flags).unwrap();
conn
let conn = loop {
match Connection::open_with_flags(path, open_flags) {
Ok(conn) => break conn,
Err(e) => {
ctx.try_log(|logger| slog::error!(logger, "{}", e.to_string()));
}
};
std::thread::sleep(std::time::Duration::from_secs(1));
};
return conn;
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -155,6 +185,39 @@ impl CompactedBlock {
CompactedBlock(((coinbase_txid, coinbase_value), txs))
}

pub fn from_standardized_block(block: &BitcoinBlockData) -> CompactedBlock {
let mut txs = vec![];
let mut coinbase_value = 0;
let coinbase_txid = {
let txid =
hex::decode(&block.transactions[0].transaction_identifier.hash[2..]).unwrap();
[txid[0], txid[1], txid[2], txid[3]]
};
for coinbase_output in block.transactions[0].metadata.outputs.iter() {
coinbase_value += coinbase_output.value;
}
for tx in block.transactions.iter().skip(1) {
let mut inputs = vec![];
for input in tx.metadata.inputs.iter() {
let txin = hex::decode(&input.previous_output.txid[2..]).unwrap();

inputs.push((
[txin[0], txin[1], txin[2], txin[3]],
input.previous_output.block_height as u32,
input.previous_output.vout as u16,
input.previous_output.value,
));
}
let mut outputs = vec![];
for output in tx.metadata.outputs.iter() {
outputs.push(output.value);
}
let txid = hex::decode(&tx.transaction_identifier.hash[2..]).unwrap();
txs.push(([txid[0], txid[1], txid[2], txid[3]], inputs, outputs));
}
CompactedBlock(((coinbase_txid, coinbase_value), txs))
}

pub fn from_hex_bytes(bytes: &str) -> CompactedBlock {
let bytes = hex::decode(&bytes).unwrap();
let value = ciborium::de::from_reader(&bytes[..]).unwrap();
Expand Down Expand Up @@ -327,7 +390,7 @@ pub fn write_compacted_block_to_index(

pub async fn build_bitcoin_traversal_local_storage(
bitcoin_config: &BitcoinConfig,
cache_path: &PathBuf,
storage_conn: &Connection,
start_block: u64,
end_block: u64,
ctx: &Context,
Expand All @@ -351,62 +414,68 @@ pub async fn build_bitcoin_traversal_local_storage(
let future = retrieve_block_hash(&config, &block_height);
match hiro_system_kit::nestable_block_on(future) {
Ok(block_hash) => {
err_count = 0;
block_hash_tx.send(Some((block_cursor, block_hash)));
let _ = block_hash_tx.send(Some((block_cursor, block_hash)));
break;
}
Err(e) => {
err_count += 1;
let delay = (err_count + (rng.next_u64() % 3)) * 1000;
println!("retry hash:fetch in {delay}");
std::thread::sleep(std::time::Duration::from_millis(delay));
}
}
}
});
}

let db_file = get_default_ordinals_db_file_path(&cache_path);
let bitcoin_config = bitcoin_config.clone();
let moved_ctx = ctx.clone();
let handle = hiro_system_kit::thread_named("Block data retrieval").spawn(move || {
let block_data_tx_moved = block_data_tx.clone();
let handle_1 = hiro_system_kit::thread_named("Block data retrieval").spawn(move || {
while let Ok(Some((block_height, block_hash))) = block_hash_rx.recv() {
println!("fetch {block_height}:{block_hash}");
let moved_bitcoin_config = bitcoin_config.clone();
let block_data_tx = block_data_tx.clone();
let block_data_tx = block_data_tx_moved.clone();
let moved_ctx = moved_ctx.clone();
retrieve_block_data_pool.execute(move || {
moved_ctx.try_log(|logger| slog::info!(logger, "Fetching block #{block_height}"));
let future = retrieve_full_block_breakdown_with_retry(
&moved_bitcoin_config,
&block_hash,
&moved_ctx,
);
let block_data = hiro_system_kit::nestable_block_on(future).unwrap();
block_data_tx.send(Some(block_data));
let _ = block_data_tx.send(Some(block_data));
});
retrieve_block_data_pool.join()
let res = retrieve_block_data_pool.join();
res
}
});
}).expect("unable to spawn thread");

let handle = hiro_system_kit::thread_named("Block data compression").spawn(move || {
let handle_2 = hiro_system_kit::thread_named("Block data compression").spawn(move || {
while let Ok(Some(block_data)) = block_data_rx.recv() {
println!("store {}:{}", block_data.height, block_data.hash);
let block_compressed_tx = block_compressed_tx.clone();
let block_compressed_tx_moved = block_compressed_tx.clone();
compress_block_data_pool.execute(move || {
let compressed_block = CompactedBlock::from_full_block(&block_data);
block_compressed_tx.send(Some((block_data.height as u32, compressed_block)));
let _ = block_compressed_tx_moved
.send(Some((block_data.height as u32, compressed_block)));
});

compress_block_data_pool.join()
let res = compress_block_data_pool.join();
// let _ = block_compressed_tx.send(None);
res
}
});

let conn = initialize_ordinal_state_storage(&db_file, &ctx);
}).expect("unable to spawn thread");

let mut blocks_stored = 0;
while let Ok(Some((block_height, compacted_block))) = block_compressed_rx.recv() {
ctx.try_log(|logger| slog::debug!(logger, "Storing block #{block_height}"));

write_compacted_block_to_index(block_height, &compacted_block, &conn, &ctx);
ctx.try_log(|logger| slog::info!(logger, "Storing block #{block_height}"));
write_compacted_block_to_index(block_height, &compacted_block, &storage_conn, &ctx);
blocks_stored+= 1;
if blocks_stored == end_block - start_block {
let _ = block_data_tx.send(None);
let _ = block_hash_tx.send(None);
ctx.try_log(|logger| slog::info!(logger, "Local ordinals storage successfully seeded with #{blocks_stored} blocks"));
return Ok(())
}
}

retrieve_block_hash_pool.join();
Expand All @@ -429,7 +498,12 @@ pub fn retrieve_satoshi_point_using_local_storage(
let mut tx_cursor = (txid, 0);

loop {
let res = retrieve_compacted_block_from_index(ordinal_block_number, &storage_conn).unwrap();
let res = match retrieve_compacted_block_from_index(ordinal_block_number, &storage_conn) {
Some(res) => res,
None => {
return Err(format!("unable to retrieve block ##{ordinal_block_number}"));
}
};

ctx.try_log(|logger| {
slog::debug!(
Expand Down Expand Up @@ -527,11 +601,3 @@ pub fn retrieve_satoshi_point_using_local_storage(
Ok((ordinal_block_number.into(), ordinal_offset))
}

// pub async fn scan_bitcoin_chain_for_ordinal_inscriptions(
// subscribers: Vec<HookAction>,
// first_inscription_height: u64,
// config: &Config,
// ctx: &Context,
// ) -> Result<(), String> {
// Ok(())
// }

0 comments on commit a282655

Please sign in to comment.