From 1c80b67658a27057b12874e1d7ebfbc0d84dd287 Mon Sep 17 00:00:00 2001 From: Ludovic Galabru Date: Thu, 11 Jan 2024 14:45:43 -0500 Subject: [PATCH] feat: revisit config files --- components/ordhook-cli/src/cli/mod.rs | 49 +++++--- components/ordhook-cli/src/config/file.rs | 90 ++++++--------- .../ordhook-cli/src/config/generator.rs | 14 +-- components/ordhook-core/src/config/mod.rs | 105 +++++++++--------- components/ordhook-core/src/core/mod.rs | 23 ++-- .../ordhook-core/src/core/pipeline/mod.rs | 29 ++++- .../pipeline/processors/block_archiving.rs | 9 +- .../processors/inscription_indexing.rs | 2 + .../core/protocol/inscription_sequencing.rs | 24 ++-- .../src/core/protocol/satoshi_numbering.rs | 5 +- components/ordhook-core/src/db/mod.rs | 24 ++-- components/ordhook-core/src/service/mod.rs | 50 +++++---- .../ordhook-core/src/service/runloops.rs | 2 +- 13 files changed, 243 insertions(+), 183 deletions(-) diff --git a/components/ordhook-cli/src/cli/mod.rs b/components/ordhook-cli/src/cli/mod.rs index 93ffbbbc..d62fbefd 100644 --- a/components/ordhook-cli/src/cli/mod.rs +++ b/components/ordhook-cli/src/cli/mod.rs @@ -575,15 +575,14 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> { while let Some(block_height) = block_range.pop_front() { let inscriptions = find_all_inscriptions_in_block(&block_height, &inscriptions_db_conn, ctx); - let mut locations = + let locations = find_all_transfers_in_block(&block_height, &inscriptions_db_conn, ctx); let mut total_transfers_in_block = 0; for (_, inscription) in inscriptions.iter() { println!("Inscription {} revealed at block #{} (inscription_number {}, ordinal_number {})", inscription.get_inscription_id(), block_height, inscription.inscription_number.jubilee, inscription.ordinal_number); - if let Some(transfers) = locations.remove(&inscription.get_inscription_id()) - { + if let Some(transfers) = locations.get(&inscription.ordinal_number) { for t in transfers.iter().skip(1) { total_transfers_in_block += 1; println!( @@ -678,6 +677,8 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> { &transaction_identifier, 0, &Arc::new(cache), + config.resources.ulimit, + config.resources.memory_available, true, ctx, )?; @@ -707,7 +708,13 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> { let last_known_block = find_latest_inscription_block_height(&inscriptions_db_conn, ctx)?; if last_known_block.is_none() { - open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), ctx); + open_ordhook_db_conn_rocks_db_loop( + true, + &config.expected_cache_path(), + config.resources.ulimit, + config.resources.memory_available, + ctx, + ); } let ordhook_config = config.get_ordhook_config(); @@ -766,7 +773,13 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> { Command::Db(OrdhookDbCommand::New(cmd)) => { let config = ConfigFile::default(false, false, false, &cmd.config_path)?; initialize_ordhook_db(&config.expected_cache_path(), ctx); - open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), ctx); + open_ordhook_db_conn_rocks_db_loop( + true, + &config.expected_cache_path(), + config.resources.ulimit, + config.resources.memory_available, + ctx, + ); } Command::Db(OrdhookDbCommand::Sync(cmd)) => { let config = ConfigFile::default(false, false, false, &cmd.config_path)?; @@ -779,10 +792,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> { let config = ConfigFile::default(false, false, false, &cmd.config_path)?; let mut ordhook_config = config.get_ordhook_config(); if let Some(network_threads) = cmd.network_threads { - ordhook_config.network_thread_max = network_threads; - } - if let Some(network_threads) = cmd.network_threads { - ordhook_config.network_thread_max = network_threads; + ordhook_config.resources.bitcoind_rpc_threads = network_threads; } let blocks = cmd.get_blocks(); let block_ingestion_processor = @@ -800,6 +810,8 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> { let blocks_db = open_ordhook_db_conn_rocks_db_loop( false, &config.get_ordhook_config().db_path, + config.resources.ulimit, + config.resources.memory_available, ctx, ); for i in cmd.get_blocks().into_iter() { @@ -819,7 +831,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> { let config = ConfigFile::default(false, false, false, &cmd.config_path)?; let mut ordhook_config = config.get_ordhook_config(); if let Some(network_threads) = cmd.network_threads { - ordhook_config.network_thread_max = network_threads; + ordhook_config.resources.bitcoind_rpc_threads = network_threads; } let block_post_processor = match cmd.repair_observers { Some(true) => { @@ -870,8 +882,12 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> { Command::Db(OrdhookDbCommand::Check(cmd)) => { let config = ConfigFile::default(false, false, false, &cmd.config_path)?; { - let blocks_db = - open_readonly_ordhook_db_conn_rocks_db(&config.expected_cache_path(), ctx)?; + let blocks_db = open_readonly_ordhook_db_conn_rocks_db( + &config.expected_cache_path(), + config.resources.ulimit, + config.resources.memory_available, + ctx, + )?; let tip = find_last_block_inserted(&blocks_db); println!("Tip: {}", tip); let missing_blocks = find_missing_blocks(&blocks_db, 1, tip, ctx); @@ -880,8 +896,13 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> { } Command::Db(OrdhookDbCommand::Drop(cmd)) => { let config = ConfigFile::default(false, false, false, &cmd.config_path)?; - let blocks_db = - open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), ctx); + let blocks_db = open_ordhook_db_conn_rocks_db_loop( + true, + &config.expected_cache_path(), + config.resources.ulimit, + config.resources.memory_available, + ctx, + ); let inscriptions_db_conn_rw = open_readwrite_ordhook_db_conn(&config.expected_cache_path(), ctx)?; diff --git a/components/ordhook-cli/src/config/file.rs b/components/ordhook-cli/src/config/file.rs index 1234c725..5cb432b4 100644 --- a/components/ordhook-cli/src/config/file.rs +++ b/components/ordhook-cli/src/config/file.rs @@ -1,29 +1,24 @@ use ordhook::chainhook_sdk::indexer::IndexerConfig; +use ordhook::chainhook_sdk::observer::DEFAULT_INGESTION_PORT; use ordhook::chainhook_sdk::types::{ BitcoinBlockSignaling, BitcoinNetwork, StacksNetwork, StacksNodeConfig, }; use ordhook::config::{ - BootstrapConfig, Config, LimitsConfig, LogConfig, PredicatesApi, PredicatesApiConfig, - StorageConfig, + Config, LogConfig, PredicatesApi, PredicatesApiConfig, ResourcesConfig, SnapshotConfig, + StorageConfig, DEFAULT_BITCOIND_RPC_THREADS_AVAILABLE, DEFAULT_CONTROL_PORT, + DEFAULT_MEMORY_AVAILABLE, DEFAULT_ULIMIT, }; use std::fs::File; use std::io::{BufReader, Read}; -pub const DEFAULT_INGESTION_PORT: u16 = 20455; -pub const DEFAULT_CONTROL_PORT: u16 = 20456; -pub const STACKS_SCAN_THREAD_POOL_SIZE: usize = 10; -pub const BITCOIN_SCAN_THREAD_POOL_SIZE: usize = 10; -pub const STACKS_MAX_PREDICATE_REGISTRATION: usize = 50; -pub const BITCOIN_MAX_PREDICATE_REGISTRATION: usize = 50; - #[derive(Deserialize, Debug, Clone)] pub struct ConfigFile { pub storage: StorageConfigFile, pub http_api: Option, - pub limits: LimitsConfigFile, + pub resources: ResourcesConfigFile, pub network: NetworkConfigFile, pub logs: Option, - pub bootstrap: Option, + pub snapthot: Option, } impl ConfigFile { @@ -54,12 +49,12 @@ impl ConfigFile { _ => return Err("network.mode not supported".to_string()), }; - let bootstrap = match config_file.bootstrap { + let snapshot = match config_file.snapthot { Some(bootstrap) => match bootstrap.download_url { - Some(ref url) => BootstrapConfig::Download(url.to_string()), - None => BootstrapConfig::Build, + Some(ref url) => SnapshotConfig::Download(url.to_string()), + None => SnapshotConfig::Build, }, - None => BootstrapConfig::Build, + None => SnapshotConfig::Build, }; let config = Config { @@ -76,36 +71,25 @@ impl ConfigFile { }), }, }, - bootstrap, - limits: LimitsConfig { - max_number_of_stacks_predicates: config_file - .limits - .max_number_of_stacks_predicates - .unwrap_or(STACKS_MAX_PREDICATE_REGISTRATION), - max_number_of_bitcoin_predicates: config_file - .limits - .max_number_of_bitcoin_predicates - .unwrap_or(BITCOIN_MAX_PREDICATE_REGISTRATION), - max_number_of_concurrent_stacks_scans: config_file - .limits - .max_number_of_concurrent_stacks_scans - .unwrap_or(STACKS_SCAN_THREAD_POOL_SIZE), - max_number_of_concurrent_bitcoin_scans: config_file - .limits - .max_number_of_concurrent_bitcoin_scans - .unwrap_or(BITCOIN_SCAN_THREAD_POOL_SIZE), - max_number_of_processing_threads: config_file - .limits - .max_number_of_processing_threads - .unwrap_or(1.max(num_cpus::get().saturating_sub(1))), - bitcoin_concurrent_http_requests_max: config_file - .limits - .bitcoin_concurrent_http_requests_max - .unwrap_or(1.max(num_cpus::get().saturating_sub(1))), - max_caching_memory_size_mb: config_file - .limits - .max_caching_memory_size_mb - .unwrap_or(2048), + snapshot, + resources: ResourcesConfig { + ulimit: config_file.resources.ulimit.unwrap_or(DEFAULT_ULIMIT), + cpu_core_available: config_file + .resources + .cpu_core_available + .unwrap_or(num_cpus::get()), + memory_available: config_file + .resources + .memory_available + .unwrap_or(DEFAULT_MEMORY_AVAILABLE), + bitcoind_rpc_threads: config_file + .resources + .bitcoind_rpc_threads + .unwrap_or(DEFAULT_BITCOIND_RPC_THREADS_AVAILABLE), + expected_observers_count: config_file + .resources + .expected_observers_count + .unwrap_or(1), }, network: IndexerConfig { bitcoind_rpc_url: config_file.network.bitcoind_rpc_url.to_string(), @@ -176,19 +160,17 @@ pub struct PredicatesApiConfigFile { } #[derive(Deserialize, Debug, Clone)] -pub struct BootstrapConfigFile { +pub struct SnapshotConfigFile { pub download_url: Option, } #[derive(Deserialize, Debug, Clone)] -pub struct LimitsConfigFile { - pub max_number_of_bitcoin_predicates: Option, - pub max_number_of_concurrent_bitcoin_scans: Option, - pub max_number_of_stacks_predicates: Option, - pub max_number_of_concurrent_stacks_scans: Option, - pub max_number_of_processing_threads: Option, - pub max_caching_memory_size_mb: Option, - pub bitcoin_concurrent_http_requests_max: Option, +pub struct ResourcesConfigFile { + pub ulimit: Option, + pub cpu_core_available: Option, + pub memory_available: Option, + pub bitcoind_rpc_threads: Option, + pub expected_observers_count: Option, } #[derive(Deserialize, Debug, Clone)] diff --git a/components/ordhook-cli/src/config/generator.rs b/components/ordhook-cli/src/config/generator.rs index 53b6cce5..eabd1a51 100644 --- a/components/ordhook-cli/src/config/generator.rs +++ b/components/ordhook-cli/src/config/generator.rs @@ -26,16 +26,16 @@ bitcoind_zmq_url = "tcp://0.0.0.0:18543" # but stacks can also be used: # stacks_node_rpc_url = "http://0.0.0.0:20443" -[limits] -max_number_of_bitcoin_predicates = 100 -max_number_of_concurrent_bitcoin_scans = 100 -max_number_of_processing_threads = 16 -bitcoin_concurrent_http_requests_max = 16 -max_caching_memory_size_mb = 32000 +[resources] +ulimit = 2048 +cpu_core_available = 16 +memory_available = 32 +bitcoind_rpc_threads = 8 +expected_observers_count = 1 # Disable the following section if the state # must be built locally -[bootstrap] +[snapshot] download_url = "https://archive.hiro.so/mainnet/ordhook/mainnet-ordhook-sqlite-latest" [logs] diff --git a/components/ordhook-core/src/config/mod.rs b/components/ordhook-core/src/config/mod.rs index 950e090e..8506ef24 100644 --- a/components/ordhook-core/src/config/mod.rs +++ b/components/ordhook-core/src/config/mod.rs @@ -11,18 +11,17 @@ const DEFAULT_MAINNET_ORDINALS_SQLITE_ARCHIVE: &str = pub const DEFAULT_INGESTION_PORT: u16 = 20455; pub const DEFAULT_CONTROL_PORT: u16 = 20456; -pub const STACKS_SCAN_THREAD_POOL_SIZE: usize = 10; -pub const BITCOIN_SCAN_THREAD_POOL_SIZE: usize = 10; -pub const STACKS_MAX_PREDICATE_REGISTRATION: usize = 50; -pub const BITCOIN_MAX_PREDICATE_REGISTRATION: usize = 50; +pub const DEFAULT_ULIMIT: usize = 2048; +pub const DEFAULT_MEMORY_AVAILABLE: usize = 8; +pub const DEFAULT_BITCOIND_RPC_THREADS_AVAILABLE: usize = 4; #[derive(Clone, Debug)] pub struct Config { pub storage: StorageConfig, pub http_api: PredicatesApi, - pub limits: LimitsConfig, + pub resources: ResourcesConfig, pub network: IndexerConfig, - pub bootstrap: BootstrapConfig, + pub snapshot: SnapshotConfig, pub logs: LogConfig, } @@ -50,7 +49,7 @@ pub struct PredicatesApiConfig { } #[derive(Clone, Debug)] -pub enum BootstrapConfig { +pub enum SnapshotConfig { Build, Download(String), } @@ -65,15 +64,22 @@ pub struct UrlConfig { pub file_url: String, } -#[derive(Clone, Debug)] -pub struct LimitsConfig { - pub max_number_of_bitcoin_predicates: usize, - pub max_number_of_concurrent_bitcoin_scans: usize, - pub max_number_of_stacks_predicates: usize, - pub max_number_of_concurrent_stacks_scans: usize, - pub max_number_of_processing_threads: usize, - pub bitcoin_concurrent_http_requests_max: usize, - pub max_caching_memory_size_mb: usize, +#[derive(Deserialize, Debug, Clone)] +pub struct ResourcesConfig { + pub ulimit: usize, + pub cpu_core_available: usize, + pub memory_available: usize, + pub bitcoind_rpc_threads: usize, + pub expected_observers_count: usize, +} + +impl ResourcesConfig { + pub fn get_optimal_thread_pool_capacity(&self) -> usize { + // Generally speaking when dealing a pool, we need one thread for + // feeding the thread pool and eventually another thread for + // handling the "reduce" step. + self.cpu_core_available.saturating_sub(2).max(1) + } } impl Config { @@ -86,10 +92,7 @@ impl Config { pub fn get_ordhook_config(&self) -> OrdhookConfig { OrdhookConfig { - network_thread_max: self.limits.bitcoin_concurrent_http_requests_max, - ingestion_thread_max: self.limits.max_number_of_processing_threads, - ingestion_thread_queue_size: 4, - cache_size: self.limits.max_caching_memory_size_mb, + resources: self.resources.clone(), db_path: self.expected_cache_path(), first_inscription_height: match self.network.bitcoin_network { BitcoinNetwork::Mainnet => 767430, @@ -119,9 +122,9 @@ impl Config { } pub fn should_bootstrap_through_download(&self) -> bool { - match &self.bootstrap { - BootstrapConfig::Build => false, - BootstrapConfig::Download(_) => true, + match &self.snapshot { + SnapshotConfig::Build => false, + SnapshotConfig::Download(_) => true, } } @@ -139,9 +142,9 @@ impl Config { } fn expected_remote_ordinals_sqlite_base_url(&self) -> &str { - match &self.bootstrap { - BootstrapConfig::Build => unreachable!(), - BootstrapConfig::Download(url) => &url, + match &self.snapshot { + SnapshotConfig::Build => unreachable!(), + SnapshotConfig::Download(url) => &url, } } @@ -159,15 +162,13 @@ impl Config { working_dir: default_cache_path(), }, http_api: PredicatesApi::Off, - bootstrap: BootstrapConfig::Build, - limits: LimitsConfig { - max_number_of_bitcoin_predicates: BITCOIN_MAX_PREDICATE_REGISTRATION, - max_number_of_concurrent_bitcoin_scans: BITCOIN_SCAN_THREAD_POOL_SIZE, - max_number_of_stacks_predicates: STACKS_MAX_PREDICATE_REGISTRATION, - max_number_of_concurrent_stacks_scans: STACKS_SCAN_THREAD_POOL_SIZE, - max_number_of_processing_threads: 1.max(num_cpus::get().saturating_sub(1)), - bitcoin_concurrent_http_requests_max: 1.max(num_cpus::get().saturating_sub(1)), - max_caching_memory_size_mb: 2048, + snapshot: SnapshotConfig::Build, + resources: ResourcesConfig { + cpu_core_available: num_cpus::get(), + memory_available: DEFAULT_MEMORY_AVAILABLE, + ulimit: DEFAULT_ULIMIT, + bitcoind_rpc_threads: DEFAULT_BITCOIND_RPC_THREADS_AVAILABLE, + expected_observers_count: 1, }, network: IndexerConfig { bitcoind_rpc_url: "http://0.0.0.0:18443".into(), @@ -192,15 +193,13 @@ impl Config { working_dir: default_cache_path(), }, http_api: PredicatesApi::Off, - bootstrap: BootstrapConfig::Build, - limits: LimitsConfig { - max_number_of_bitcoin_predicates: BITCOIN_MAX_PREDICATE_REGISTRATION, - max_number_of_concurrent_bitcoin_scans: BITCOIN_SCAN_THREAD_POOL_SIZE, - max_number_of_stacks_predicates: STACKS_MAX_PREDICATE_REGISTRATION, - max_number_of_concurrent_stacks_scans: STACKS_SCAN_THREAD_POOL_SIZE, - max_number_of_processing_threads: 1.max(num_cpus::get().saturating_sub(1)), - bitcoin_concurrent_http_requests_max: 1.max(num_cpus::get().saturating_sub(1)), - max_caching_memory_size_mb: 2048, + snapshot: SnapshotConfig::Build, + resources: ResourcesConfig { + cpu_core_available: num_cpus::get(), + memory_available: DEFAULT_MEMORY_AVAILABLE, + ulimit: DEFAULT_ULIMIT, + bitcoind_rpc_threads: DEFAULT_BITCOIND_RPC_THREADS_AVAILABLE, + expected_observers_count: 1, }, network: IndexerConfig { bitcoind_rpc_url: "http://0.0.0.0:18332".into(), @@ -225,17 +224,13 @@ impl Config { working_dir: default_cache_path(), }, http_api: PredicatesApi::Off, - bootstrap: BootstrapConfig::Download( - DEFAULT_MAINNET_ORDINALS_SQLITE_ARCHIVE.to_string(), - ), - limits: LimitsConfig { - max_number_of_bitcoin_predicates: BITCOIN_MAX_PREDICATE_REGISTRATION, - max_number_of_concurrent_bitcoin_scans: BITCOIN_SCAN_THREAD_POOL_SIZE, - max_number_of_stacks_predicates: STACKS_MAX_PREDICATE_REGISTRATION, - max_number_of_concurrent_stacks_scans: STACKS_SCAN_THREAD_POOL_SIZE, - max_number_of_processing_threads: 1.max(num_cpus::get().saturating_sub(1)), - bitcoin_concurrent_http_requests_max: 1.max(num_cpus::get().saturating_sub(1)), - max_caching_memory_size_mb: 2048, + snapshot: SnapshotConfig::Download(DEFAULT_MAINNET_ORDINALS_SQLITE_ARCHIVE.to_string()), + resources: ResourcesConfig { + cpu_core_available: num_cpus::get(), + memory_available: DEFAULT_MEMORY_AVAILABLE, + ulimit: DEFAULT_ULIMIT, + bitcoind_rpc_threads: DEFAULT_BITCOIND_RPC_THREADS_AVAILABLE, + expected_observers_count: 1, }, network: IndexerConfig { bitcoind_rpc_url: "http://0.0.0.0:8332".into(), diff --git a/components/ordhook-core/src/core/mod.rs b/components/ordhook-core/src/core/mod.rs index 8c510f43..d9d78a3d 100644 --- a/components/ordhook-core/src/core/mod.rs +++ b/components/ordhook-core/src/core/mod.rs @@ -13,7 +13,7 @@ use chainhook_sdk::{ }; use crate::{ - config::{Config, LogConfig}, + config::{Config, LogConfig, ResourcesConfig}, db::{find_pinned_block_bytes_at_block_height, open_ordhook_db_conn_rocks_db_loop}, }; @@ -26,10 +26,7 @@ use crate::db::TransactionBytesCursor; #[derive(Clone, Debug)] pub struct OrdhookConfig { - pub network_thread_max: usize, - pub ingestion_thread_max: usize, - pub ingestion_thread_queue_size: usize, - pub cache_size: usize, + pub resources: ResourcesConfig, pub db_path: PathBuf, pub first_inscription_height: u64, pub logs: LogConfig, @@ -95,7 +92,13 @@ pub fn compute_next_satpoint_data( } pub fn should_sync_rocks_db(config: &Config, ctx: &Context) -> Result, String> { - let blocks_db = open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), &ctx); + let blocks_db = open_ordhook_db_conn_rocks_db_loop( + true, + &config.expected_cache_path(), + config.resources.ulimit, + config.resources.memory_available, + &ctx, + ); let inscriptions_db_conn = open_readonly_ordhook_db_conn(&config.expected_cache_path(), &ctx)?; let last_compressed_block = find_last_block_inserted(&blocks_db) as u64; let last_indexed_block = match find_latest_inscription_block_height(&inscriptions_db_conn, ctx)? @@ -128,7 +131,13 @@ pub fn should_sync_ordhook_db( } }; - let blocks_db = open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), &ctx); + let blocks_db = open_ordhook_db_conn_rocks_db_loop( + true, + &config.expected_cache_path(), + config.resources.ulimit, + config.resources.memory_available, + &ctx, + ); let mut start_block = find_last_block_inserted(&blocks_db) as u64; if start_block == 0 { diff --git a/components/ordhook-core/src/core/pipeline/mod.rs b/components/ordhook-core/src/core/pipeline/mod.rs index 855ea6db..4cb67b9f 100644 --- a/components/ordhook-core/src/core/pipeline/mod.rs +++ b/components/ordhook-core/src/core/pipeline/mod.rs @@ -73,11 +73,27 @@ pub async fn download_and_pipeline_blocks( let end_block = *blocks.last().expect("no blocks to pipeline"); let mut block_heights = VecDeque::from(blocks); - for _ in 0..ordhook_config.ingestion_thread_queue_size { + // All the requests are being processed on the same thread. + // As soon as we are getting the bytes back from wire, the + // processing is moved to a thread pool, to defer the parsing, quite expensive. + // We are initially seeding the networking thread with N requests, + // with N being the number of threads in the pool handling the response. + // We need: + // - 1 thread for the thread handling networking + // - 1 thread for the thread handling disk serialization + let thread_pool_network_response_processing_capacity = + ordhook_config.resources.get_optimal_thread_pool_capacity(); + // For each worker in that pool, we want to bound the size of the queue to avoid OOM + // Blocks size can range from 1 to 4Mb (when packed with witness data). + // Start blocking networking when each worker has a backlog of 8 blocks seems reasonable. + let worker_queue_size = 8; + + for _ in 0..thread_pool_network_response_processing_capacity { if let Some(block_height) = block_heights.pop_front() { let config = moved_config.clone(); let ctx = moved_ctx.clone(); let http_client = moved_http_client.clone(); + // We interleave the initial requests to avoid DDOSing bitcoind from the get go. sleep(Duration::from_millis(500)); set.spawn(try_download_block_bytes_with_retry( http_client, @@ -95,8 +111,8 @@ pub async fn download_and_pipeline_blocks( let mut rx_thread_pool = vec![]; let mut thread_pool_handles = vec![]; - for _ in 0..ordhook_config.ingestion_thread_max { - let (tx, rx) = bounded::>>(ordhook_config.ingestion_thread_queue_size); + for _ in 0..thread_pool_network_response_processing_capacity { + let (tx, rx) = bounded::>>(worker_queue_size); tx_thread_pool.push(tx); rx_thread_pool.push(rx); } @@ -244,11 +260,11 @@ pub async fn download_and_pipeline_blocks( }) .expect("unable to spawn thread"); - let mut thread_index = 0; + let mut round_robin_worker_thread_index = 0; while let Some(res) = set.join_next().await { let block = res.unwrap().unwrap(); - let _ = tx_thread_pool[thread_index].send(Some(block)); + let _ = tx_thread_pool[round_robin_worker_thread_index].send(Some(block)); if let Some(block_height) = block_heights.pop_front() { let config = moved_config.clone(); let ctx = ctx.clone(); @@ -260,7 +276,8 @@ pub async fn download_and_pipeline_blocks( ctx, )); } - thread_index = (thread_index + 1) % ordhook_config.ingestion_thread_max; + round_robin_worker_thread_index = (round_robin_worker_thread_index + 1) + % thread_pool_network_response_processing_capacity; } ctx.try_log(|logger| { diff --git a/components/ordhook-core/src/core/pipeline/processors/block_archiving.rs b/components/ordhook-core/src/core/pipeline/processors/block_archiving.rs index 38453e52..557ca396 100644 --- a/components/ordhook-core/src/core/pipeline/processors/block_archiving.rs +++ b/components/ordhook-core/src/core/pipeline/processors/block_archiving.rs @@ -25,8 +25,13 @@ pub fn start_block_archiving_processor( let ctx = ctx.clone(); let handle: JoinHandle<()> = hiro_system_kit::thread_named("Processor Runloop") .spawn(move || { - let blocks_db_rw = - open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), &ctx); + let blocks_db_rw = open_ordhook_db_conn_rocks_db_loop( + true, + &config.expected_cache_path(), + config.resources.ulimit, + config.resources.memory_available, + &ctx, + ); let mut processed_blocks = 0; loop { diff --git a/components/ordhook-core/src/core/pipeline/processors/inscription_indexing.rs b/components/ordhook-core/src/core/pipeline/processors/inscription_indexing.rs index 9a95b9a8..9d0c9c05 100644 --- a/components/ordhook-core/src/core/pipeline/processors/inscription_indexing.rs +++ b/components/ordhook-core/src/core/pipeline/processors/inscription_indexing.rs @@ -107,6 +107,8 @@ pub fn start_inscription_indexing_processor( let blocks_db_rw = open_ordhook_db_conn_rocks_db_loop( true, &config.expected_cache_path(), + config.resources.ulimit, + config.resources.memory_available, &ctx, ); store_compacted_blocks( diff --git a/components/ordhook-core/src/core/protocol/inscription_sequencing.rs b/components/ordhook-core/src/core/protocol/inscription_sequencing.rs index 911c6226..d585ce45 100644 --- a/components/ordhook-core/src/core/protocol/inscription_sequencing.rs +++ b/components/ordhook-core/src/core/protocol/inscription_sequencing.rs @@ -91,7 +91,7 @@ pub fn parallelize_inscription_data_computations( let has_transactions_to_process = !transactions_ids.is_empty() || !l1_cache_hits.is_empty(); - let thread_max = ordhook_config.ingestion_thread_max; + let thread_pool_capacity = ordhook_config.resources.get_optimal_thread_pool_capacity(); // Nothing to do? early return if !has_transactions_to_process { @@ -104,13 +104,16 @@ pub fn parallelize_inscription_data_computations( let mut tx_thread_pool = vec![]; let mut thread_pool_handles = vec![]; - for thread_index in 0..thread_max { + for thread_index in 0..thread_pool_capacity { let (tx, rx) = channel(); tx_thread_pool.push(tx); let moved_traversal_tx = traversal_tx.clone(); let moved_ctx = inner_ctx.clone(); let moved_ordhook_db_path = ordhook_config.db_path.clone(); + let ulimit = ordhook_config.resources.ulimit; + let memory_available = ordhook_config.resources.memory_available; + let local_cache = cache_l2.clone(); let handle = hiro_system_kit::thread_named("Worker") @@ -124,6 +127,8 @@ pub fn parallelize_inscription_data_computations( &transaction_id, input_index, &local_cache, + ulimit, + memory_available, false, &moved_ctx, ); @@ -135,12 +140,13 @@ pub fn parallelize_inscription_data_computations( } // Consume L1 cache: if the traversal was performed in a previous round - // retrieve it and use it. - let mut thread_index = 0; + // retrieve it and inject it to the "reduce" worker (by-passing the "map" thread pool) + let mut round_robin_thread_index = 0; for key in l1_cache_hits.iter() { if let Some(entry) = cache_l1.get(key) { - let _ = traversal_tx.send((Ok((entry.clone(), vec![])), true, thread_index)); - thread_index = (thread_index + 1) % thread_max; + let _ = + traversal_tx.send((Ok((entry.clone(), vec![])), true, round_robin_thread_index)); + round_robin_thread_index = (round_robin_thread_index + 1) % thread_pool_capacity; } } @@ -176,11 +182,11 @@ pub fn parallelize_inscription_data_computations( )); } - // Feed each workers with 2 workitems each - for thread_index in 0..thread_max { + // Feed each worker from the thread pool with 2 workitems each + for thread_index in 0..thread_pool_capacity { let _ = tx_thread_pool[thread_index].send(priority_queue.pop_front()); } - for thread_index in 0..thread_max { + for thread_index in 0..thread_pool_capacity { let _ = tx_thread_pool[thread_index].send(priority_queue.pop_front()); } diff --git a/components/ordhook-core/src/core/protocol/satoshi_numbering.rs b/components/ordhook-core/src/core/protocol/satoshi_numbering.rs index 33ce0593..f6fde9e4 100644 --- a/components/ordhook-core/src/core/protocol/satoshi_numbering.rs +++ b/components/ordhook-core/src/core/protocol/satoshi_numbering.rs @@ -22,6 +22,8 @@ pub fn compute_satoshi_number( traversals_cache: &Arc< DashMap<(u32, [u8; 8]), TransactionBytesCursor, BuildHasherDefault>, >, + ulimit: usize, + memory_available: usize, _back_tracking: bool, ctx: &Context, ) -> Result<(TraversalResult, Vec<(u32, [u8; 8])>), String> { @@ -31,7 +33,8 @@ pub fn compute_satoshi_number( let mut ordinal_block_number = block_identifier.index as u32; let txid = transaction_identifier.get_8_hash_bytes(); let mut back_track = vec![]; - let blocks_db = open_ordhook_db_conn_rocks_db_loop(false, &blocks_db_dir, &ctx); + let blocks_db = + open_ordhook_db_conn_rocks_db_loop(false, &blocks_db_dir, ulimit, memory_available, &ctx); let (sats_ranges, inscription_offset_cross_outputs) = match traversals_cache .get(&(block_identifier.index as u32, txid.clone())) diff --git a/components/ordhook-core/src/db/mod.rs b/components/ordhook-core/src/db/mod.rs index bf7bbef4..fe3ca68b 100644 --- a/components/ordhook-core/src/db/mod.rs +++ b/components/ordhook-core/src/db/mod.rs @@ -248,7 +248,7 @@ fn get_default_ordhook_db_file_path_rocks_db(base_dir: &PathBuf) -> PathBuf { destination_path } -fn rocks_db_default_options() -> rocksdb::Options { +fn rocks_db_default_options(ulimit: usize, memory_available: usize) -> rocksdb::Options { let mut opts = rocksdb::Options::default(); // Per rocksdb's documentation: // If cache_index_and_filter_blocks is false (which is default), @@ -262,7 +262,7 @@ fn rocks_db_default_options() -> rocksdb::Options { // opts.set_write_buffer_size(64 * 1024 * 1024); // opts.set_blob_file_size(1 * 1024 * 1024 * 1024); // opts.set_target_file_size_base(64 * 1024 * 1024); - opts.set_max_open_files(2048); + opts.set_max_open_files(ulimit as i32); opts.create_if_missing(true); // opts.optimize_for_point_lookup(1 * 1024 * 1024 * 1024); // opts.set_level_zero_stop_writes_trigger(64); @@ -279,10 +279,12 @@ fn rocks_db_default_options() -> rocksdb::Options { pub fn open_readonly_ordhook_db_conn_rocks_db( base_dir: &PathBuf, + ulimit: usize, + memory_available: usize, _ctx: &Context, ) -> Result { let path = get_default_ordhook_db_file_path_rocks_db(&base_dir); - let mut opts = rocks_db_default_options(); + let mut opts = rocks_db_default_options(ulimit, memory_available); opts.set_disable_auto_compactions(true); opts.set_max_background_jobs(0); let db = DB::open_for_read_only(&opts, path, false) @@ -293,14 +295,16 @@ pub fn open_readonly_ordhook_db_conn_rocks_db( pub fn open_ordhook_db_conn_rocks_db_loop( readwrite: bool, base_dir: &PathBuf, + ulimit: usize, + memory_available: usize, ctx: &Context, ) -> DB { let mut retries = 0; let blocks_db = loop { let res = if readwrite { - open_readwrite_ordhook_db_conn_rocks_db(&base_dir, &ctx) + open_readwrite_ordhook_db_conn_rocks_db(&base_dir, ulimit, memory_available, &ctx) } else { - open_readonly_ordhook_db_conn_rocks_db(&base_dir, &ctx) + open_readonly_ordhook_db_conn_rocks_db(&base_dir, ulimit, memory_available, &ctx) }; match res { Ok(db) => break db, @@ -323,19 +327,24 @@ pub fn open_ordhook_db_conn_rocks_db_loop( pub fn open_readwrite_ordhook_dbs( base_dir: &PathBuf, + ulimit: usize, + memory_available: usize, ctx: &Context, ) -> Result<(DB, Connection), String> { - let blocks_db = open_ordhook_db_conn_rocks_db_loop(true, &base_dir, &ctx); + let blocks_db = + open_ordhook_db_conn_rocks_db_loop(true, &base_dir, ulimit, memory_available, &ctx); let inscriptions_db = open_readwrite_ordhook_db_conn(&base_dir, &ctx)?; Ok((blocks_db, inscriptions_db)) } fn open_readwrite_ordhook_db_conn_rocks_db( base_dir: &PathBuf, + ulimit: usize, + memory_available: usize, _ctx: &Context, ) -> Result { let path = get_default_ordhook_db_file_path_rocks_db(&base_dir); - let opts = rocks_db_default_options(); + let opts = rocks_db_default_options(ulimit, memory_available); let db = DB::open(&opts, path) .map_err(|e| format!("unable to read-write hord.rocksdb: {}", e.to_string()))?; Ok(db) @@ -396,7 +405,6 @@ pub fn find_pinned_block_bytes_at_block_height<'a>( // read_options.set_verify_checksums(false); let mut backoff: f64 = 1.0; let mut rng = thread_rng(); - loop { match blocks_db.get_pinned(block_height.to_be_bytes()) { Ok(Some(res)) => return Some(res), diff --git a/components/ordhook-core/src/service/mod.rs b/components/ordhook-core/src/service/mod.rs index b8621867..99327f62 100644 --- a/components/ordhook-core/src/service/mod.rs +++ b/components/ordhook-core/src/service/mod.rs @@ -413,9 +413,7 @@ impl Service { bitcoin_blocks_mutator: Some((block_mutator_in_tx, block_mutator_out_rx)), bitcoin_chain_event_notifier: Some(chain_event_notifier_tx), }; - let cache_l2 = Arc::new(new_traversals_lazy_cache( - self.config.limits.max_caching_memory_size_mb, - )); + let cache_l2 = Arc::new(new_traversals_lazy_cache(100_000)); let ctx = self.ctx.clone(); let config = self.config.clone(); @@ -455,6 +453,8 @@ impl Service { let blocks_db = open_ordhook_db_conn_rocks_db_loop( false, &self.config.expected_cache_path(), + self.config.resources.ulimit, + self.config.resources.memory_available, &self.ctx, ); let tip = find_last_block_inserted(&blocks_db); @@ -486,6 +486,8 @@ impl Service { let blocks_db_rw = open_ordhook_db_conn_rocks_db_loop( false, &self.config.expected_cache_path(), + self.config.resources.ulimit, + self.config.resources.memory_available, &self.ctx, ); info!(self.ctx.expect_logger(), "Running database compaction",); @@ -496,6 +498,8 @@ impl Service { let blocks_db_rw = open_ordhook_db_conn_rocks_db_loop( false, &self.config.expected_cache_path(), + self.config.resources.ulimit, + self.config.resources.memory_available, &self.ctx, ); @@ -616,14 +620,18 @@ impl Service { } fn chainhook_sidecar_mutate_ordhook_db(command: HandleBlock, config: &Config, ctx: &Context) { - let (blocks_db_rw, inscriptions_db_conn_rw) = - match open_readwrite_ordhook_dbs(&config.expected_cache_path(), &ctx) { - Ok(dbs) => dbs, - Err(e) => { - ctx.try_log(|logger| error!(logger, "Unable to open readwtite connection: {e}",)); - return; - } - }; + let (blocks_db_rw, inscriptions_db_conn_rw) = match open_readwrite_ordhook_dbs( + &config.expected_cache_path(), + config.resources.ulimit, + config.resources.memory_available, + &ctx, + ) { + Ok(dbs) => dbs, + Err(e) => { + ctx.try_log(|logger| error!(logger, "Unable to open readwtite connection: {e}",)); + return; + } + }; match command { HandleBlock::UndoBlock(block) => { @@ -725,14 +733,18 @@ pub fn chainhook_sidecar_mutate_blocks( ) { let mut updated_blocks_ids = vec![]; - let (blocks_db_rw, mut inscriptions_db_conn_rw) = - match open_readwrite_ordhook_dbs(&config.expected_cache_path(), &ctx) { - Ok(dbs) => dbs, - Err(e) => { - ctx.try_log(|logger| error!(logger, "Unable to open readwtite connection: {e}",)); - return; - } - }; + let (blocks_db_rw, mut inscriptions_db_conn_rw) = match open_readwrite_ordhook_dbs( + &config.expected_cache_path(), + config.resources.ulimit, + config.resources.memory_available, + &ctx, + ) { + Ok(dbs) => dbs, + Err(e) => { + ctx.try_log(|logger| error!(logger, "Unable to open readwtite connection: {e}",)); + return; + } + }; let inscriptions_db_tx = inscriptions_db_conn_rw.transaction().unwrap(); diff --git a/components/ordhook-core/src/service/runloops.rs b/components/ordhook-core/src/service/runloops.rs index 2bea99a7..3ca9537c 100644 --- a/components/ordhook-core/src/service/runloops.rs +++ b/components/ordhook-core/src/service/runloops.rs @@ -21,7 +21,7 @@ pub fn start_bitcoin_scan_runloop( observer_command_tx: Sender, ctx: &Context, ) { - let bitcoin_scan_pool = ThreadPool::new(config.limits.max_number_of_concurrent_bitcoin_scans); + let bitcoin_scan_pool = ThreadPool::new(config.resources.expected_observers_count); while let Ok(predicate_spec) = bitcoin_scan_op_rx.recv() { let moved_ctx = ctx.clone();