diff --git a/Cargo.lock b/Cargo.lock index 4461dcf9..b7438eac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -452,8 +452,6 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chainhook-sdk" version = "0.7.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "462ce230bfc1cd6570fa3717d6ef44a0f79729db542dc4f86efd7ad386aa159c" dependencies = [ "base58 0.2.0", "base64", @@ -472,7 +470,7 @@ dependencies = [ "rand 0.8.5", "reqwest", "rocket", - "schemars 0.8.11", + "schemars 0.8.12", "serde", "serde-hex", "serde_derive", @@ -486,11 +484,9 @@ dependencies = [ [[package]] name = "chainhook-types" version = "1.0.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e205711f1d01227cd9239754309c9ff2ef6a637faaf737b4642570753980ee5" dependencies = [ "hex", - "schemars 0.8.11", + "schemars 0.8.12", "serde", "serde_derive", "serde_json", @@ -1387,9 +1383,9 @@ dependencies = [ [[package]] name = "futures" -version = "0.3.25" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38390104763dc37a5145a53c29c63c1290b5d316d6086ec32c293f6736051bb0" +checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" dependencies = [ "futures-channel", "futures-core", @@ -1402,9 +1398,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.25" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52ba265a92256105f45b719605a571ffe2d1f0fea3807304b522c1d778f79eed" +checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" dependencies = [ "futures-core", "futures-sink", @@ -1412,15 +1408,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.25" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04909a7a7e4633ae6c4a9ab280aeb86da1236243a77b694a49eacd659a4bd3ac" +checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" [[package]] name = "futures-executor" -version = "0.3.25" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7acc85df6714c176ab5edf386123fafe217be88c0840ec11f199441134a074e2" +checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0" dependencies = [ "futures-core", "futures-task", @@ -1429,38 +1425,38 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.25" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00f5fb52a06bdcadeb54e8d3671f8888a39697dcb0b81b23b55174030427f4eb" +checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" [[package]] name = "futures-macro" -version = "0.3.25" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bdfb8ce053d86b91919aad980c220b1fb8401a9394410e1c289ed7e66b61835d" +checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 1.0.105", + "syn 2.0.18", ] [[package]] name = "futures-sink" -version = "0.3.25" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39c15cf1a4aa79df40f1bb462fb39676d0ad9e366c2a33b590d7c66f4f81fcf9" +checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e" [[package]] name = "futures-task" -version = "0.3.25" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ffb393ac5d9a6eaa9d3fdf37ae2776656b706e200c8e16b1bdb227f5198e6ea" +checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" [[package]] name = "futures-util" -version = "0.3.25" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "197676987abd2f9cadff84926f410af1c183608d36641465df73ae8211dc65d6" +checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" dependencies = [ "futures-channel", "futures-core", @@ -1715,6 +1711,7 @@ dependencies = [ "dashmap 5.4.0", "flate2", "flume", + "futures", "futures-util", "fxhash", "hex", diff --git a/components/hord-cli/Cargo.toml b/components/hord-cli/Cargo.toml index 0970cdae..8bcb652b 100644 --- a/components/hord-cli/Cargo.toml +++ b/components/hord-cli/Cargo.toml @@ -14,7 +14,8 @@ redis = "0.21.5" serde-redis = "0.12.0" hex = "0.4.3" rand = "0.8.5" -chainhook-sdk = { version = "=0.7.7", default-features = false, features = ["zeromq"] } +# chainhook-sdk = { version = "=0.7.7", default-features = false, features = ["zeromq"] } +chainhook-sdk = { version = "=0.7.7", path = "../../../chainhook/components/chainhook-sdk", default-features = false, features = ["zeromq"] } hiro-system-kit = "0.1.0" clap = { version = "3.2.23", features = ["derive"], optional = true } clap_generate = { version = "3.0.3", optional = true } @@ -40,6 +41,7 @@ anyhow = { version = "1.0.56", features = ["backtrace"] } schemars = { version = "0.8.10", git = "https://github.com/hirosystems/schemars.git", branch = "feat-chainhook-fixes" } pprof = { version = "0.12", features = ["flamegraph"] } progressing = '3' +futures = "0.3.28" [dependencies.rocksdb] version = "0.20.1" diff --git a/components/hord-cli/src/cli/mod.rs b/components/hord-cli/src/cli/mod.rs index 73c29785..4d7eeb58 100644 --- a/components/hord-cli/src/cli/mod.rs +++ b/components/hord-cli/src/cli/mod.rs @@ -728,22 +728,10 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> { let mut hord_config = config.get_hord_config(); hord_config.network_thread_max = cmd.network_threads; - let bitcoin_config = BitcoinConfig { - username: config.network.bitcoind_rpc_username.clone(), - password: config.network.bitcoind_rpc_password.clone(), - rpc_url: config.network.bitcoind_rpc_url.clone(), - network: config.network.bitcoin_network.clone(), - bitcoin_block_signaling: config.network.bitcoin_block_signaling.clone(), - }; - let blocks_db = - open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?; - rebuild_rocks_db( - &bitcoin_config, - &blocks_db, + &config, cmd.start_block, cmd.end_block, - &config.get_hord_config(), &ctx, ) .await? diff --git a/components/hord-cli/src/db/mod.rs b/components/hord-cli/src/db/mod.rs index 8dc6ca4b..291b1677 100644 --- a/components/hord-cli/src/db/mod.rs +++ b/components/hord-cli/src/db/mod.rs @@ -1,5 +1,5 @@ use std::{ - collections::{BTreeMap, HashMap}, + collections::{BTreeMap, HashMap, VecDeque}, hash::BuildHasherDefault, path::PathBuf, sync::{mpsc::Sender, Arc}, @@ -7,7 +7,7 @@ use std::{ use chainhook_sdk::{ indexer::bitcoin::{ - build_http_client, download_block_with_retry, retrieve_block_hash_with_retry, + build_http_client, download_block_with_retry, retrieve_block_hash_with_retry, try_fetch_block_bytes_with_retry, parse_downloaded_block, }, types::{ BitcoinBlockData, BlockIdentifier, OrdinalInscriptionRevealData, @@ -21,6 +21,7 @@ use rand::{thread_rng, Rng}; use rocksdb::DB; use rusqlite::{Connection, OpenFlags, ToSql, Transaction}; +use tokio::task::JoinSet; use std::io::Cursor; use std::io::{Read, Write}; use threadpool::ThreadPool; @@ -29,7 +30,7 @@ use chainhook_sdk::{ indexer::bitcoin::BitcoinBlockFullBreakdown, observer::BitcoinConfig, utils::Context, }; -use crate::hord::{self, HordConfig}; +use crate::{hord::{self, HordConfig}, config::Config}; use crate::hord::{new_traversals_lazy_cache, update_hord_db_and_augment_bitcoin_block}; use crate::ord::{height::Height, sat::Sat}; @@ -1842,98 +1843,69 @@ impl<'a> Iterator for LazyBlockTransactionIterator<'a> { } pub async fn rebuild_rocks_db( - bitcoin_config: &BitcoinConfig, - blocks_db_rw: &DB, + config: &Config, start_block: u64, end_block: u64, - hord_config: &HordConfig, ctx: &Context, ) -> Result<(), String> { - let guard = pprof::ProfilerGuardBuilder::default() - .frequency(20) - .blocklist(&["libc", "libgcc", "pthread", "vdso"]) - .build() - .unwrap(); + // let guard = pprof::ProfilerGuardBuilder::default() + // .frequency(20) + // .blocklist(&["libc", "libgcc", "pthread", "vdso"]) + // .build() + // .unwrap(); + + let bitcoin_config = BitcoinConfig { + username: config.network.bitcoind_rpc_username.clone(), + password: config.network.bitcoind_rpc_password.clone(), + rpc_url: config.network.bitcoind_rpc_url.clone(), + network: config.network.bitcoin_network.clone(), + bitcoin_block_signaling: config.network.bitcoin_block_signaling.clone(), + }; + + let hord_config = config.get_hord_config(); ctx.try_log(|logger| { slog::info!(logger, "Generating report"); }); let number_of_blocks_to_process = end_block - start_block + 1; - let (block_hash_req_lim, block_req_lim, block_process_lim) = (256, 128, 128); + let (block_req_lim, block_process_lim) = (128, 128); + - let retrieve_block_hash_pool = ThreadPool::new(hord_config.network_thread_max); - let (block_hash_tx, block_hash_rx) = crossbeam_channel::bounded(block_hash_req_lim); - let retrieve_block_data_pool = ThreadPool::new(hord_config.network_thread_max); let (block_data_tx, block_data_rx) = crossbeam_channel::bounded(block_req_lim); let compress_block_data_pool = ThreadPool::new(hord_config.ingestion_thread_max); let (block_compressed_tx, block_compressed_rx) = crossbeam_channel::bounded(block_process_lim); let http_client = build_http_client(); // Thread pool #1: given a block height, retrieve the block hash - for block_cursor in start_block..=end_block { - let block_height = block_cursor.clone(); - let block_hash_tx = block_hash_tx.clone(); - let config = bitcoin_config.clone(); - let moved_ctx = ctx.clone(); - let moved_http_client = http_client.clone(); - - retrieve_block_hash_pool.execute(move || { - let future = retrieve_block_hash_with_retry( - &moved_http_client, - &block_height, - &config, - &moved_ctx, - ); - let block_hash = hiro_system_kit::nestable_block_on(future).unwrap(); - block_hash_tx - .send(Some((block_height, block_hash))) - .expect("unable to channel block_hash"); - }) - } - - // Thread pool #2: given a block hash, retrieve the full block (verbosity max, including prevout) - let bitcoin_config = bitcoin_config.clone(); + let moved_config = bitcoin_config.clone(); let moved_ctx = ctx.clone(); - let block_data_tx_moved = block_data_tx.clone(); - let _ = hiro_system_kit::thread_named("Block data retrieval") - .spawn(move || { - while let Ok(Some((block_height, block_hash))) = block_hash_rx.recv() { - let moved_bitcoin_config = bitcoin_config.clone(); - let block_data_tx = block_data_tx_moved.clone(); - let moved_ctx = moved_ctx.clone(); - let moved_http_client = http_client.clone(); - retrieve_block_data_pool.execute(move || { - moved_ctx - .try_log(|logger| slog::debug!(logger, "Fetching block #{block_height}")); - let future = download_block_with_retry( - &moved_http_client, - &block_hash, - &moved_bitcoin_config, - &moved_ctx, - ); - let res = match hiro_system_kit::nestable_block_on(future) { - Ok(block_data) => Some(block_data), - Err(e) => { - moved_ctx.try_log(|logger| { - slog::error!(logger, "unable to fetch block #{block_height}: {e}") - }); - None - } - }; - let _ = block_data_tx.send(res); - }); - } - let res = retrieve_block_data_pool.join(); - res - }) - .expect("unable to spawn thread"); + let moved_http_client = http_client.clone(); + + let mut set = JoinSet::new(); + + let mut block_heights = VecDeque::from((start_block..=end_block).collect::>()); + + for _ in 0..hord_config.network_thread_max { + if let Some(block_height) = block_heights.pop_front() { + let config = moved_config.clone(); + let ctx = moved_ctx.clone(); + let http_client = moved_http_client.clone(); + set.spawn(try_fetch_block_bytes_with_retry( + http_client, + block_height, + config, + ctx, + )); + } + } let _ = hiro_system_kit::thread_named("Block data compression") .spawn(move || { - while let Ok(Some(block_data)) = block_data_rx.recv() { + while let Ok(Some(block_bytes)) = block_data_rx.recv() { let block_compressed_tx_moved = block_compressed_tx.clone(); compress_block_data_pool.execute(move || { + let block_data = parse_downloaded_block(block_bytes).unwrap(); let compressed_block = LazyBlock::from_full_block(&block_data).expect("unable to serialize block"); let block_index = block_data.height as u32; @@ -1947,72 +1919,93 @@ pub async fn rebuild_rocks_db( let res = compress_block_data_pool.join(); res }) - .expect("unable to spawn thread"); - - let mut blocks_stored = 0; - let mut num_writes = 0; - - while let Ok(Some((block_height, compacted_block, _raw_block))) = block_compressed_rx.recv() { - insert_entry_in_blocks(block_height, &compacted_block, &blocks_db_rw, &ctx); - blocks_stored += 1; - num_writes += 1; - - // In the context of ordinals, we're constrained to process blocks sequentially - // Blocks are processed by a threadpool and could be coming out of order. - // Inbox block for later if the current block is not the one we should be - // processing. - - // Should we start look for inscriptions data in blocks? - ctx.try_log(|logger| slog::info!(logger, "Storing compacted block #{block_height}",)); - - if blocks_stored == number_of_blocks_to_process { - let _ = block_data_tx.send(None); - let _ = block_hash_tx.send(None); - ctx.try_log(|logger| { - slog::info!( - logger, - "Local block storage successfully seeded with #{blocks_stored} blocks" - ) - }); + .expect("unable to spawn thread"); - match guard.report().build() { - Ok(report) => { - ctx.try_log(|logger| { - slog::info!(logger, "Generating report"); + let blocks_db_rw = + open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?; + let cloned_ctx = ctx.clone(); + let ingestion_thread = hiro_system_kit::thread_named("Block data ingestion") + .spawn(move || { + let mut blocks_stored = 0; + let mut num_writes = 0; + + while let Ok(Some((block_height, compacted_block, _raw_block))) = block_compressed_rx.recv() { + insert_entry_in_blocks(block_height, &compacted_block, &blocks_db_rw, &cloned_ctx); + blocks_stored += 1; + num_writes += 1; + + // In the context of ordinals, we're constrained to process blocks sequentially + // Blocks are processed by a threadpool and could be coming out of order. + // Inbox block for later if the current block is not the one we should be + // processing. + + // Should we start look for inscriptions data in blocks? + cloned_ctx.try_log(|logger| slog::info!(logger, "Storing compacted block #{block_height}",)); + + if blocks_stored == number_of_blocks_to_process { + cloned_ctx.try_log(|logger| { + slog::info!( + logger, + "Local block storage successfully seeded with #{blocks_stored} blocks" + ) }); - - let file = std::fs::File::create("hord-perf.svg").unwrap(); - report.flamegraph(file).unwrap(); + + // match guard.report().build() { + // Ok(report) => { + // ctx.try_log(|logger| { + // slog::info!(logger, "Generating report"); + // }); + + // let file = std::fs::File::create("hord-perf.svg").unwrap(); + // report.flamegraph(file).unwrap(); + // } + // Err(e) => { + // ctx.try_log(|logger| { + // slog::error!(logger, "Reporting failed: {}", e.to_string()); + // }); + // } + // } } - Err(e) => { - ctx.try_log(|logger| { - slog::error!(logger, "Reporting failed: {}", e.to_string()); + + if num_writes % 128 == 0 { + cloned_ctx.try_log(|logger| { + slog::info!(logger, "Flushing DB to disk ({num_writes} inserts)"); }); + if let Err(e) = blocks_db_rw.flush() { + cloned_ctx.try_log(|logger| { + slog::error!(logger, "{}", e.to_string()); + }); + } + num_writes = 0; } } - return Ok(()); - } - - if num_writes % 128 == 0 { - ctx.try_log(|logger| { - slog::info!(logger, "Flushing DB to disk ({num_writes} inserts)"); - }); + if let Err(e) = blocks_db_rw.flush() { - ctx.try_log(|logger| { + cloned_ctx.try_log(|logger| { slog::error!(logger, "{}", e.to_string()); }); } - num_writes = 0; + () + }).expect("unable to spawn thread"); + + while let Some(res) = set.join_next().await { + let block = res.unwrap().unwrap(); + + let _ = block_data_tx + .send(Some(block)); + + if let Some(block_height) = block_heights.pop_front() { + let config = moved_config.clone(); + let ctx = ctx.clone(); + let http_client = moved_http_client.clone(); + set.spawn(try_fetch_block_bytes_with_retry( + http_client, + block_height, + config, + ctx, + )); } } - if let Err(e) = blocks_db_rw.flush() { - ctx.try_log(|logger| { - slog::error!(logger, "{}", e.to_string()); - }); - } - - retrieve_block_hash_pool.join(); - Ok(()) }