Skip to content

Commit

Permalink
feat: logic to start ingestion during indexing
Browse files Browse the repository at this point in the history
  • Loading branch information
lgalabru committed Jun 30, 2023
1 parent 135297e commit 3c1c99d
Show file tree
Hide file tree
Showing 5 changed files with 256 additions and 156 deletions.
139 changes: 17 additions & 122 deletions components/chainhook-cli/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,20 @@ use crate::storage::{
open_readonly_stacks_db_conn,
};

use chainhook_sdk::bitcoincore_rpc::{Auth, Client, RpcApi};

use chainhook_sdk::chainhooks::types::{
BitcoinChainhookFullSpecification, BitcoinChainhookNetworkSpecification, BitcoinPredicateType, ChainhookFullSpecification, FileHook,
HookAction, OrdinalOperations, StacksChainhookFullSpecification,
StacksChainhookNetworkSpecification, StacksPredicate, StacksPrintEventBasedPredicate,
};
use chainhook_sdk::hord::db::{
delete_data_in_hord_db, fetch_and_cache_blocks_in_hord_db, find_last_block_inserted,
find_lazy_block_at_block_height, find_watched_satpoint_for_inscription, initialize_hord_db,
open_readonly_hord_db_conn, open_readonly_hord_db_conn_rocks_db, open_readwrite_hord_db_conn,
delete_data_in_hord_db, find_last_block_inserted, find_lazy_block_at_block_height,
find_watched_satpoint_for_inscription, initialize_hord_db, open_readonly_hord_db_conn,
open_readonly_hord_db_conn_rocks_db, open_readwrite_hord_db_conn,
open_readwrite_hord_db_conn_rocks_db, retrieve_satoshi_point_using_lazy_storage,
};
use chainhook_sdk::hord::{
new_traversals_lazy_cache, retrieve_inscribed_satoshi_points_from_block,
update_storage_and_augment_bitcoin_block_with_inscription_transfer_data, HordConfig, Storage,
update_storage_and_augment_bitcoin_block_with_inscription_transfer_data, Storage,
};
use chainhook_sdk::indexer;
use chainhook_sdk::indexer::bitcoin::{
Expand Down Expand Up @@ -474,35 +472,6 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {

info!(ctx.expect_logger(), "Starting service...",);

if !cmd.hord_disabled {
info!(
ctx.expect_logger(),
"Ordinal indexing is enabled by default hord, checking index... (use --no-hord to disable ordinals)"
);

if let Some((start_block, end_block)) = should_sync_hord_db(&config, &ctx)? {
if start_block == 0 {
info!(
ctx.expect_logger(),
"Initializing hord indexing from block #{}", start_block
);
} else {
info!(
ctx.expect_logger(),
"Resuming hord indexing from block #{}", start_block
);
}
perform_hord_db_update(
start_block,
end_block,
&config.get_hord_config(),
&config,
&ctx,
)
.await?;
}
}

let mut service = Service::new(config, ctx);
return service.run(predicates, cmd.hord_disabled).await;
}
Expand Down Expand Up @@ -673,7 +642,8 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
));
}
};

// TODO: if a stacks.rocksdb is present, use it.
// TODO: update Stacks archive file if required.
scan_stacks_chainstate_via_csv_using_predicate(
&predicate_spec,
&mut config,
Expand Down Expand Up @@ -731,11 +701,12 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
find_last_block_inserted(&hord_db_conn) as u64
};
if cmd.block_height > tip_height {
perform_hord_db_update(
crate::hord::perform_hord_db_update(
tip_height,
cmd.block_height,
&config.get_hord_config(),
&config,
None,
&ctx,
)
.await?;
Expand Down Expand Up @@ -802,11 +773,12 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
let tip_height = find_last_block_inserted(&blocks_db_conn) as u64;
let _end_at = match cmd.block_height {
Some(block_height) if block_height > tip_height => {
perform_hord_db_update(
crate::hord::perform_hord_db_update(
tip_height,
block_height,
&config.get_hord_config(),
&config,
None,
&ctx,
)
.await?;
Expand Down Expand Up @@ -860,7 +832,9 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
Command::Hord(HordCommand::Db(subcmd)) => match subcmd {
HordDbCommand::Sync(cmd) => {
let config = Config::default(false, false, false, &cmd.config_path)?;
if let Some((start_block, end_block)) = should_sync_hord_db(&config, &ctx)? {
if let Some((start_block, end_block)) =
crate::hord::should_sync_hord_db(&config, &ctx)?
{
if start_block == 0 {
info!(
ctx.expect_logger(),
Expand All @@ -872,11 +846,12 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
"Resuming hord indexing from block #{}", start_block
);
}
perform_hord_db_update(
crate::hord::perform_hord_db_update(
start_block,
end_block,
&config.get_hord_config(),
&config,
None,
&ctx,
)
.await?;
Expand All @@ -902,11 +877,12 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
)?;
}
// Update data
perform_hord_db_update(
crate::hord::perform_hord_db_update(
cmd.start_block,
cmd.end_block,
&config.get_hord_config(),
&config,
None,
&ctx,
)
.await?;
Expand Down Expand Up @@ -1023,87 +999,6 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
Ok(())
}

pub fn should_sync_hord_db(config: &Config, ctx: &Context) -> Result<Option<(u64, u64)>, String> {
let auth = Auth::UserPass(
config.network.bitcoind_rpc_username.clone(),
config.network.bitcoind_rpc_password.clone(),
);

let bitcoin_rpc = match Client::new(&config.network.bitcoind_rpc_url, auth) {
Ok(con) => con,
Err(message) => {
return Err(format!("Bitcoin RPC error: {}", message.to_string()));
}
};

let start_block = match open_readonly_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)
{
Ok(blocks_db) => find_last_block_inserted(&blocks_db) as u64,
Err(err) => {
warn!(ctx.expect_logger(), "{}", err);
0
}
};

if start_block == 0 {
let _ = initialize_hord_db(&config.expected_cache_path(), &ctx);
}

let end_block = match bitcoin_rpc.get_blockchain_info() {
Ok(result) => result.blocks,
Err(e) => {
return Err(format!(
"unable to retrieve Bitcoin chain tip ({})",
e.to_string()
));
}
};

if start_block < end_block {
Ok(Some((start_block, end_block)))
} else {
Ok(None)
}
}

pub async fn perform_hord_db_update(
start_block: u64,
end_block: u64,
hord_config: &HordConfig,
config: &Config,
ctx: &Context,
) -> Result<(), String> {
info!(
ctx.expect_logger(),
"Syncing hord_db: {} blocks to download ({start_block}: {end_block})",
end_block - start_block + 1
);

let bitcoin_config = BitcoinConfig {
username: config.network.bitcoind_rpc_username.clone(),
password: config.network.bitcoind_rpc_password.clone(),
rpc_url: config.network.bitcoind_rpc_url.clone(),
network: config.network.bitcoin_network.clone(),
bitcoin_block_signaling: config.network.bitcoin_block_signaling.clone(),
};

let blocks_db = open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
let inscriptions_db_conn_rw = open_readwrite_hord_db_conn(&config.expected_cache_path(), &ctx)?;

let _ = fetch_and_cache_blocks_in_hord_db(
&bitcoin_config,
&blocks_db,
&inscriptions_db_conn_rw,
start_block,
end_block,
hord_config,
&ctx,
)
.await?;

Ok(())
}

#[allow(dead_code)]
pub fn install_ctrlc_handler(terminate_tx: Sender<DigestingCommand>, ctx: Context) {
ctrlc::set_handler(move || {
Expand Down
113 changes: 113 additions & 0 deletions components/chainhook-cli/src/hord/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
use std::sync::mpsc::Sender;

use chainhook_sdk::{
bitcoincore_rpc::{Auth, Client, RpcApi},
hord::{
db::{
fetch_and_cache_blocks_in_hord_db, find_last_block_inserted, initialize_hord_db,
open_readonly_hord_db_conn_rocks_db, open_readwrite_hord_db_conn,
open_readwrite_hord_db_conn_rocks_db, find_latest_inscription_block_height, open_readonly_hord_db_conn,
},
HordConfig,
},
observer::BitcoinConfig,
utils::Context,
};
use chainhook_types::BitcoinBlockData;

use crate::config::Config;

pub fn should_sync_hord_db(config: &Config, ctx: &Context) -> Result<Option<(u64, u64)>, String> {
let auth = Auth::UserPass(
config.network.bitcoind_rpc_username.clone(),
config.network.bitcoind_rpc_password.clone(),
);

let bitcoin_rpc = match Client::new(&config.network.bitcoind_rpc_url, auth) {
Ok(con) => con,
Err(message) => {
return Err(format!("Bitcoin RPC error: {}", message.to_string()));
}
};

let mut start_block = match open_readonly_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)
{
Ok(blocks_db) => find_last_block_inserted(&blocks_db) as u64,
Err(err) => {
warn!(ctx.expect_logger(), "{}", err);
0
}
};

if start_block == 0 {
let _ = initialize_hord_db(&config.expected_cache_path(), &ctx);
}

let inscriptions_db_conn = open_readonly_hord_db_conn(&config.expected_cache_path(), &ctx)?;

match find_latest_inscription_block_height(&inscriptions_db_conn, ctx)? {
Some(height) => {
start_block = start_block.min(height);
}
None => {
start_block = start_block.min(config.get_hord_config().first_inscription_height);
}
};


let end_block = match bitcoin_rpc.get_blockchain_info() {
Ok(result) => result.blocks,
Err(e) => {
return Err(format!(
"unable to retrieve Bitcoin chain tip ({})",
e.to_string()
));
}
};

if start_block < end_block {
Ok(Some((start_block, end_block)))
} else {
Ok(None)
}
}

pub async fn perform_hord_db_update(
start_block: u64,
end_block: u64,
hord_config: &HordConfig,
config: &Config,
block_post_processor: Option<Sender<BitcoinBlockData>>,
ctx: &Context,
) -> Result<(), String> {
info!(
ctx.expect_logger(),
"Syncing hord_db: {} blocks to download ({start_block}: {end_block})",
end_block - start_block + 1
);

let bitcoin_config = BitcoinConfig {
username: config.network.bitcoind_rpc_username.clone(),
password: config.network.bitcoind_rpc_password.clone(),
rpc_url: config.network.bitcoind_rpc_url.clone(),
network: config.network.bitcoin_network.clone(),
bitcoin_block_signaling: config.network.bitcoin_block_signaling.clone(),
};

let blocks_db = open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
let inscriptions_db_conn_rw = open_readwrite_hord_db_conn(&config.expected_cache_path(), &ctx)?;

let _ = fetch_and_cache_blocks_in_hord_db(
&bitcoin_config,
&blocks_db,
&inscriptions_db_conn_rw,
start_block,
end_block,
hord_config,
block_post_processor,
&ctx,
)
.await?;

Ok(())
}
1 change: 1 addition & 0 deletions components/chainhook-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod archive;
pub mod block;
pub mod cli;
pub mod config;
pub mod hord;
pub mod scan;
pub mod service;
pub mod storage;
Expand Down

0 comments on commit 3c1c99d

Please sign in to comment.