From d57ac758e143ae902bb7d92827c3a3a41bcea1da Mon Sep 17 00:00:00 2001 From: Ludo Galabru Date: Fri, 12 Jan 2024 10:24:04 -0500 Subject: [PATCH] feat: improve queue management --- components/ordhook-cli/src/config/file.rs | 11 +++++++--- .../ordhook-cli/src/config/generator.rs | 3 ++- components/ordhook-core/src/config/mod.rs | 13 ++++++++---- .../ordhook-core/src/core/pipeline/mod.rs | 21 +++++++++++++------ 4 files changed, 34 insertions(+), 14 deletions(-) diff --git a/components/ordhook-cli/src/config/file.rs b/components/ordhook-cli/src/config/file.rs index 5cb432b4..574fdac8 100644 --- a/components/ordhook-cli/src/config/file.rs +++ b/components/ordhook-cli/src/config/file.rs @@ -5,8 +5,8 @@ use ordhook::chainhook_sdk::types::{ }; use ordhook::config::{ Config, LogConfig, PredicatesApi, PredicatesApiConfig, ResourcesConfig, SnapshotConfig, - StorageConfig, DEFAULT_BITCOIND_RPC_THREADS_AVAILABLE, DEFAULT_CONTROL_PORT, - DEFAULT_MEMORY_AVAILABLE, DEFAULT_ULIMIT, + StorageConfig, DEFAULT_BITCOIND_RPC_THREADS, DEFAULT_BITCOIND_RPC_TIMEOUT, + DEFAULT_CONTROL_PORT, DEFAULT_MEMORY_AVAILABLE, DEFAULT_ULIMIT, }; use std::fs::File; use std::io::{BufReader, Read}; @@ -85,7 +85,11 @@ impl ConfigFile { bitcoind_rpc_threads: config_file .resources .bitcoind_rpc_threads - .unwrap_or(DEFAULT_BITCOIND_RPC_THREADS_AVAILABLE), + .unwrap_or(DEFAULT_BITCOIND_RPC_THREADS), + bitcoind_rpc_timeout: config_file + .resources + .bitcoind_rpc_timeout + .unwrap_or(DEFAULT_BITCOIND_RPC_TIMEOUT), expected_observers_count: config_file .resources .expected_observers_count @@ -170,6 +174,7 @@ pub struct ResourcesConfigFile { pub cpu_core_available: Option, pub memory_available: Option, pub bitcoind_rpc_threads: Option, + pub bitcoind_rpc_timeout: Option, pub expected_observers_count: Option, } diff --git a/components/ordhook-cli/src/config/generator.rs b/components/ordhook-cli/src/config/generator.rs index eabd1a51..3db85656 100644 --- a/components/ordhook-cli/src/config/generator.rs +++ b/components/ordhook-cli/src/config/generator.rs @@ -30,7 +30,8 @@ bitcoind_zmq_url = "tcp://0.0.0.0:18543" ulimit = 2048 cpu_core_available = 16 memory_available = 32 -bitcoind_rpc_threads = 8 +bitcoind_rpc_threads = 4 +bitcoind_rpc_timeout = 15 expected_observers_count = 1 # Disable the following section if the state diff --git a/components/ordhook-core/src/config/mod.rs b/components/ordhook-core/src/config/mod.rs index 8506ef24..fb782f69 100644 --- a/components/ordhook-core/src/config/mod.rs +++ b/components/ordhook-core/src/config/mod.rs @@ -13,7 +13,8 @@ pub const DEFAULT_INGESTION_PORT: u16 = 20455; pub const DEFAULT_CONTROL_PORT: u16 = 20456; pub const DEFAULT_ULIMIT: usize = 2048; pub const DEFAULT_MEMORY_AVAILABLE: usize = 8; -pub const DEFAULT_BITCOIND_RPC_THREADS_AVAILABLE: usize = 4; +pub const DEFAULT_BITCOIND_RPC_THREADS: usize = 4; +pub const DEFAULT_BITCOIND_RPC_TIMEOUT: u32 = 15; #[derive(Clone, Debug)] pub struct Config { @@ -70,6 +71,7 @@ pub struct ResourcesConfig { pub cpu_core_available: usize, pub memory_available: usize, pub bitcoind_rpc_threads: usize, + pub bitcoind_rpc_timeout: u32, pub expected_observers_count: usize, } @@ -167,7 +169,8 @@ impl Config { cpu_core_available: num_cpus::get(), memory_available: DEFAULT_MEMORY_AVAILABLE, ulimit: DEFAULT_ULIMIT, - bitcoind_rpc_threads: DEFAULT_BITCOIND_RPC_THREADS_AVAILABLE, + bitcoind_rpc_threads: DEFAULT_BITCOIND_RPC_THREADS, + bitcoind_rpc_timeout: DEFAULT_BITCOIND_RPC_TIMEOUT, expected_observers_count: 1, }, network: IndexerConfig { @@ -198,7 +201,8 @@ impl Config { cpu_core_available: num_cpus::get(), memory_available: DEFAULT_MEMORY_AVAILABLE, ulimit: DEFAULT_ULIMIT, - bitcoind_rpc_threads: DEFAULT_BITCOIND_RPC_THREADS_AVAILABLE, + bitcoind_rpc_threads: DEFAULT_BITCOIND_RPC_THREADS, + bitcoind_rpc_timeout: DEFAULT_BITCOIND_RPC_TIMEOUT, expected_observers_count: 1, }, network: IndexerConfig { @@ -229,7 +233,8 @@ impl Config { cpu_core_available: num_cpus::get(), memory_available: DEFAULT_MEMORY_AVAILABLE, ulimit: DEFAULT_ULIMIT, - bitcoind_rpc_threads: DEFAULT_BITCOIND_RPC_THREADS_AVAILABLE, + bitcoind_rpc_threads: DEFAULT_BITCOIND_RPC_THREADS, + bitcoind_rpc_timeout: DEFAULT_BITCOIND_RPC_TIMEOUT, expected_observers_count: 1, }, network: IndexerConfig { diff --git a/components/ordhook-core/src/core/pipeline/mod.rs b/components/ordhook-core/src/core/pipeline/mod.rs index 4cb67b9f..cf41c1eb 100644 --- a/components/ordhook-core/src/core/pipeline/mod.rs +++ b/components/ordhook-core/src/core/pipeline/mod.rs @@ -86,9 +86,9 @@ pub async fn download_and_pipeline_blocks( // For each worker in that pool, we want to bound the size of the queue to avoid OOM // Blocks size can range from 1 to 4Mb (when packed with witness data). // Start blocking networking when each worker has a backlog of 8 blocks seems reasonable. - let worker_queue_size = 8; + let worker_queue_size = 2; - for _ in 0..thread_pool_network_response_processing_capacity { + for _ in 0..ordhook_config.resources.bitcoind_rpc_threads { if let Some(block_height) = block_heights.pop_front() { let config = moved_config.clone(); let ctx = moved_ctx.clone(); @@ -262,9 +262,20 @@ pub async fn download_and_pipeline_blocks( let mut round_robin_worker_thread_index = 0; while let Some(res) = set.join_next().await { - let block = res.unwrap().unwrap(); + let block = res + .expect("unable to retrieve block") + .expect("unable to deserialize block"); + + loop { + let res = tx_thread_pool[round_robin_worker_thread_index].send(Some(block.clone())); + round_robin_worker_thread_index = (round_robin_worker_thread_index + 1) + % thread_pool_network_response_processing_capacity; + if res.is_ok() { + break; + } + sleep(Duration::from_millis(500)); + } - let _ = tx_thread_pool[round_robin_worker_thread_index].send(Some(block)); if let Some(block_height) = block_heights.pop_front() { let config = moved_config.clone(); let ctx = ctx.clone(); @@ -276,8 +287,6 @@ pub async fn download_and_pipeline_blocks( ctx, )); } - round_robin_worker_thread_index = (round_robin_worker_thread_index + 1) - % thread_pool_network_response_processing_capacity; } ctx.try_log(|logger| {