Skip to content

Commit

Permalink
fix: patch boot latency
Browse files Browse the repository at this point in the history
  • Loading branch information
lgalabru committed Aug 3, 2023
1 parent 108117b commit 0e3faf9
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 7 deletions.
12 changes: 12 additions & 0 deletions components/hord-cli/src/core/pipeline/mod.rs
Expand Up @@ -19,6 +19,7 @@ use chainhook_sdk::indexer::bitcoin::{
use super::parse_ordinals_and_standardize_block;

pub enum PostProcessorCommand {
Start,
ProcessBlocks(Vec<(u64, LazyBlock)>, Vec<BitcoinBlockData>),
Terminate,
}
Expand Down Expand Up @@ -150,6 +151,8 @@ pub async fn download_and_pipeline_blocks(
let mut inbox = HashMap::new();
let mut inbox_cursor = start_sequencing_blocks_at_height.max(start_block);
let mut blocks_processed = 0;
let mut pre_seq_processor_started = false;
let mut post_seq_processor_started = false;

loop {
// Dequeue all the blocks available
Expand All @@ -173,6 +176,11 @@ pub async fn download_and_pipeline_blocks(

if !ooo_compacted_blocks.is_empty() {
if let Some(ref blocks_tx) = blocks_post_processor_pre_sequence_commands_tx {
if !pre_seq_processor_started {
pre_seq_processor_started = true;
let _ = blocks_tx.send(PostProcessorCommand::Start);
}

let _ = blocks_tx.send(PostProcessorCommand::ProcessBlocks(ooo_compacted_blocks, vec![]));
}
}
Expand All @@ -194,6 +202,10 @@ pub async fn download_and_pipeline_blocks(
}
if !blocks.is_empty() {
if let Some(ref blocks_tx) = blocks_post_processor_post_sequence_commands_tx {
if !post_seq_processor_started {
post_seq_processor_started = true;
let _ = blocks_tx.send(PostProcessorCommand::Start);
}
let _ = blocks_tx.send(PostProcessorCommand::ProcessBlocks(compacted_blocks, blocks));
}
} else {
Expand Down
Expand Up @@ -32,12 +32,17 @@ pub fn start_block_ingestion_processor(

let mut empty_cycles = 0;

if let Ok(PostProcessorCommand::Start) = commands_rx.recv() {
info!(ctx.expect_logger(), "Start block indexing runloop");
}

loop {
let (compacted_blocks, _) = match commands_rx.try_recv() {
Ok(PostProcessorCommand::ProcessBlocks(compacted_blocks, blocks)) => {
(compacted_blocks, blocks)
}
Ok(PostProcessorCommand::Terminate) => break,
Ok(PostProcessorCommand::Start) => continue,
Err(e) => match e {
TryRecvError::Empty => {
empty_cycles += 1;
Expand Down
Expand Up @@ -49,7 +49,7 @@ pub fn start_inscription_indexing_processor(

let config = config.clone();
let ctx = ctx.clone();
let handle: JoinHandle<()> = hiro_system_kit::thread_named("Batch receiver")
let handle: JoinHandle<()> = hiro_system_kit::thread_named("Inscription indexing runloop")
.spawn(move || {
let cache_l2 = Arc::new(new_traversals_lazy_cache(1024));
let garbage_collect_every_n_blocks = 100;
Expand All @@ -66,23 +66,26 @@ pub fn start_inscription_indexing_processor(
let mut inscription_height_hint = InscriptionHeigthHint::new();
let mut empty_cycles = 0;

if let Ok(PostProcessorCommand::Start) = commands_rx.recv() {
info!(ctx.expect_logger(), "Start inscription indexing runloop");
}

loop {
let (compacted_blocks, mut blocks) = match commands_rx.try_recv() {
Ok(PostProcessorCommand::ProcessBlocks(compacted_blocks, blocks)) => {
empty_cycles = 0;
(compacted_blocks, blocks)
}
Ok(PostProcessorCommand::Terminate) => break,
Ok(PostProcessorCommand::Start) => continue,
Err(e) => match e {
TryRecvError::Empty => {
empty_cycles += 1;

if empty_cycles == 30 {
if empty_cycles == 10 {
empty_cycles = 0;
let _ = events_tx.send(PostProcessorEvent::EmptyQueue);
}
sleep(Duration::from_secs(1));
if empty_cycles > 120 {
break;
}
continue;
}
_ => {
Expand Down
2 changes: 1 addition & 1 deletion components/hord-cli/src/service/mod.rs
Expand Up @@ -139,7 +139,7 @@ impl Service {
start_block,
end_block,
hord_config.first_inscription_height,
Some(&blocks_post_processor),
None,
Some(&blocks_post_processor),
speed,
&self.ctx,
Expand Down

0 comments on commit 0e3faf9

Please sign in to comment.