Skip to content

Commit

Permalink
fix: tweak sqlite connections (#217)
Browse files Browse the repository at this point in the history
* fix: attempt to optimize sqlite connections

* chore: add logs

* feat: introduce sequence metadata table
  • Loading branch information
lgalabru committed Nov 27, 2023
1 parent a84c951 commit 334565c
Show file tree
Hide file tree
Showing 6 changed files with 206 additions and 100 deletions.
36 changes: 16 additions & 20 deletions components/ordhook-cli/src/cli/mod.rs
Expand Up @@ -26,9 +26,9 @@ use ordhook::db::{
delete_data_in_ordhook_db, find_all_inscription_transfers, find_all_inscriptions_in_block,
find_all_transfers_in_block, find_inscription_with_id, find_last_block_inserted,
find_latest_inscription_block_height, find_lazy_block_at_block_height,
get_default_ordhook_db_file_path, initialize_ordhook_db, open_readonly_ordhook_db_conn,
open_readonly_ordhook_db_conn_rocks_db, open_readwrite_ordhook_db_conn, open_ordhook_db_conn_rocks_db_loop,

get_default_ordhook_db_file_path, initialize_ordhook_db, open_ordhook_db_conn_rocks_db_loop,
open_readonly_ordhook_db_conn, open_readonly_ordhook_db_conn_rocks_db,
open_readwrite_ordhook_db_conn,
};
use ordhook::download::download_ordinals_dataset_if_required;
use ordhook::scan::bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate;
Expand Down Expand Up @@ -790,25 +790,21 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
)
.await?;
if let Some(true) = cmd.debug {
let blocks_db = open_ordhook_db_conn_rocks_db_loop(false, &config.get_ordhook_config().db_path, ctx);
let blocks_db = open_ordhook_db_conn_rocks_db_loop(
false,
&config.get_ordhook_config().db_path,
ctx,
);
for i in cmd.get_blocks().into_iter() {
let block = find_lazy_block_at_block_height(i as u32, 10, false, &blocks_db, ctx).expect("unable to retrieve block {i}");
info!(
ctx.expect_logger(),
"--------------------"
);
info!(
ctx.expect_logger(),
"Block: {i}"
);
let block =
find_lazy_block_at_block_height(i as u32, 10, false, &blocks_db, ctx)
.expect("unable to retrieve block {i}");
info!(ctx.expect_logger(), "--------------------");
info!(ctx.expect_logger(), "Block: {i}");
for tx in block.iter_tx() {
info!(
ctx.expect_logger(),
"Tx: {}",
ordhook::hex::encode(tx.txid)
);
info!(ctx.expect_logger(), "Tx: {}", ordhook::hex::encode(tx.txid));
}
}
}
}
}
RepairCommand::Inscriptions(cmd) => {
Expand Down Expand Up @@ -886,7 +882,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
Command::Db(OrdhookDbCommand::Drop(cmd)) => {
let config = ConfigFile::default(false, false, false, &cmd.config_path)?;
let blocks_db =
open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), ctx);
open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), ctx);
let inscriptions_db_conn_rw =
open_readwrite_ordhook_db_conn(&config.expected_cache_path(), ctx)?;

Expand Down
Expand Up @@ -9,7 +9,7 @@ use std::{
use crate::{
config::Config,
core::pipeline::{PostProcessorCommand, PostProcessorController, PostProcessorEvent},
db::{insert_entry_in_blocks, LazyBlock, open_ordhook_db_conn_rocks_db_loop},
db::{insert_entry_in_blocks, open_ordhook_db_conn_rocks_db_loop, LazyBlock},
};

pub fn start_block_archiving_processor(
Expand Down
73 changes: 50 additions & 23 deletions components/ordhook-core/src/core/protocol/inscription_sequencing.rs
Expand Up @@ -12,6 +12,7 @@ use chainhook_sdk::{
},
utils::Context,
};
use crossbeam_channel::bounded;
use dashmap::DashMap;
use fxhash::FxHasher;
use rusqlite::{Connection, Transaction};
Expand Down Expand Up @@ -71,6 +72,13 @@ pub fn parallelize_inscription_data_computations(
ordhook_config: &OrdhookConfig,
ctx: &Context,
) -> Result<bool, String> {
ctx.try_log(|logger| {
info!(
logger,
"Inscriptions data computation for block #{} started", block.block_identifier.index
)
});

let (mut transactions_ids, l1_cache_hits) =
get_transactions_to_process(block, cache_l1, inscriptions_db_tx, ctx);

Expand All @@ -90,7 +98,7 @@ pub fn parallelize_inscription_data_computations(
}

let expected_traversals = transactions_ids.len() + l1_cache_hits.len();
let (traversal_tx, traversal_rx) = channel();
let (traversal_tx, traversal_rx) = bounded(64);

let mut tx_thread_pool = vec![];
let mut thread_pool_handles = vec![];
Expand Down Expand Up @@ -207,6 +215,7 @@ pub fn parallelize_inscription_data_computations(
});
}
}

if traversals_received == expected_traversals {
break;
}
Expand Down Expand Up @@ -244,6 +253,12 @@ pub fn parallelize_inscription_data_computations(
}
}
}
ctx.try_log(|logger| {
info!(
logger,
"Inscriptions data computation for block #{} collected", block.block_identifier.index
)
});

// Collect eventual results for incoming blocks
for tx in tx_thread_pool.iter() {
Expand Down Expand Up @@ -273,10 +288,21 @@ pub fn parallelize_inscription_data_computations(
let _ = tx.send(None);
}

let ctx_moved = ctx.clone();
let _ = hiro_system_kit::thread_named("Garbage collection").spawn(move || {
ctx_moved.try_log(|logger| info!(logger, "Cleanup: threadpool deallocation started",));

for handle in thread_pool_handles.into_iter() {
let _ = handle.join();
}
ctx_moved.try_log(|logger| info!(logger, "Cleanup: threadpool deallocation ended",));
});

ctx.try_log(|logger| {
info!(
logger,
"Inscriptions data computation for block #{} ended", block.block_identifier.index
)
});

Ok(has_transactions_to_process)
Expand Down Expand Up @@ -373,28 +399,27 @@ impl<'a> SequenceCursor<'a> {
self.current_block_height = 0;
}

pub fn pick_next(&mut self, cursed: bool, block_height: u64) -> i64 {
pub fn pick_next(&mut self, cursed: bool, block_height: u64, ctx: &Context) -> i64 {
if block_height < self.current_block_height {
self.reset();
}
self.current_block_height = block_height;

match cursed {
true => self.pick_next_cursed(),
false => self.pick_next_blessed(),
true => self.pick_next_cursed(ctx),
false => self.pick_next_blessed(ctx),
}
}

fn pick_next_blessed(&mut self) -> i64 {
fn pick_next_blessed(&mut self, ctx: &Context) -> i64 {
match self.blessed {
None => {
match find_latest_inscription_number_at_block_height(
&self.current_block_height,
&None,
&self.inscriptions_db_conn,
&Context::empty(),
&ctx,
) {
Ok(Some(inscription_number)) => {
Some(inscription_number) => {
self.blessed = Some(inscription_number);
inscription_number + 1
}
Expand All @@ -405,16 +430,15 @@ impl<'a> SequenceCursor<'a> {
}
}

fn pick_next_cursed(&mut self) -> i64 {
fn pick_next_cursed(&mut self, ctx: &Context) -> i64 {
match self.cursed {
None => {
match find_latest_cursed_inscription_number_at_block_height(
&self.current_block_height,
&None,
&self.inscriptions_db_conn,
&Context::empty(),
&ctx,
) {
Ok(Some(inscription_number)) => {
Some(inscription_number) => {
self.cursed = Some(inscription_number);
inscription_number - 1
}
Expand All @@ -425,12 +449,12 @@ impl<'a> SequenceCursor<'a> {
}
}

pub fn increment_cursed(&mut self) {
self.cursed = Some(self.pick_next_cursed());
pub fn increment_cursed(&mut self, ctx: &Context) {
self.cursed = Some(self.pick_next_cursed(ctx));
}

pub fn increment_blessed(&mut self) {
self.blessed = Some(self.pick_next_blessed())
pub fn increment_blessed(&mut self, ctx: &Context) {
self.blessed = Some(self.pick_next_blessed(ctx))
}
}

Expand Down Expand Up @@ -523,13 +547,14 @@ pub fn augment_block_with_ordinals_inscriptions_data(
continue;
};
let is_curse = inscription_data.curse_type.is_some();
let inscription_number = sequence_cursor.pick_next(is_curse, block.block_identifier.index);
let inscription_number =
sequence_cursor.pick_next(is_curse, block.block_identifier.index, &ctx);
inscription_data.inscription_number = inscription_number;

if is_curse {
sequence_cursor.increment_cursed();
sequence_cursor.increment_cursed(ctx);
} else {
sequence_cursor.increment_blessed();
sequence_cursor.increment_blessed(ctx);
};

ctx.try_log(|logger| {
Expand Down Expand Up @@ -594,7 +619,8 @@ fn augment_transaction_with_ordinals_inscriptions_data(
};

// Do we need to curse the inscription?
let mut inscription_number = sequence_cursor.pick_next(is_cursed, block_identifier.index);
let mut inscription_number =
sequence_cursor.pick_next(is_cursed, block_identifier.index, ctx);
let mut curse_type_override = None;
if !is_cursed {
// Is this inscription re-inscribing an existing blessed inscription?
Expand All @@ -612,7 +638,8 @@ fn augment_transaction_with_ordinals_inscriptions_data(
});

is_cursed = true;
inscription_number = sequence_cursor.pick_next(is_cursed, block_identifier.index);
inscription_number =
sequence_cursor.pick_next(is_cursed, block_identifier.index, ctx);
curse_type_override = Some(OrdinalInscriptionCurseType::Reinscription)
}
};
Expand Down Expand Up @@ -683,9 +710,9 @@ fn augment_transaction_with_ordinals_inscriptions_data(
});

if is_cursed {
sequence_cursor.increment_cursed();
sequence_cursor.increment_cursed(ctx);
} else {
sequence_cursor.increment_blessed();
sequence_cursor.increment_blessed(ctx);
}
}
any_event
Expand Down
Expand Up @@ -244,7 +244,7 @@ pub fn compute_satoshi_number(
)
});
std::process::exit(1);
},
}
};

let mut sats_out = 0;
Expand Down

0 comments on commit 334565c

Please sign in to comment.