Skip to content

Commit

Permalink
fix: better handling of database locks (#200)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ludo Galabru authored Nov 23, 2023
1 parent 977a30e commit f820169
Show file tree
Hide file tree
Showing 17 changed files with 726 additions and 990 deletions.
518 changes: 0 additions & 518 deletions .github/workflows/ordhook-sdk-js.yml

This file was deleted.

250 changes: 177 additions & 73 deletions components/ordhook-cli/src/cli/mod.rs

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions components/ordhook-cli/src/config/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl ConfigFile {
let config_file: ConfigFile = match toml::from_slice(&file_buffer) {
Ok(s) => s,
Err(e) => {
return Err(format!("Config file malformatted {}", e.to_string()));
return Err(format!("Config file malformatted {}", e));
}
};
ConfigFile::from_config_file(config_file)
Expand Down Expand Up @@ -153,7 +153,7 @@ impl ConfigFile {
(true, false, false, _) => Config::devnet_default(),
(false, true, false, _) => Config::testnet_default(),
(false, false, true, _) => Config::mainnet_default(),
(false, false, false, Some(config_path)) => ConfigFile::from_file_path(&config_path)?,
(false, false, false, Some(config_path)) => ConfigFile::from_file_path(config_path)?,
_ => Err("Invalid combination of arguments".to_string())?,
};
Ok(config)
Expand Down
2 changes: 1 addition & 1 deletion components/ordhook-cli/src/config/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,5 @@ chainhook_internals = true
"#,
network = network.to_lowercase(),
);
return conf;
conf
}
6 changes: 3 additions & 3 deletions components/ordhook-core/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use chainhook_sdk::{

use crate::{
config::{Config, LogConfig},
db::{find_lazy_block_at_block_height, open_readwrite_ordhook_db_conn_rocks_db},
db::{find_lazy_block_at_block_height, open_ordhook_db_conn_rocks_db_loop},
};

use crate::db::{
Expand Down Expand Up @@ -95,7 +95,7 @@ pub fn compute_next_satpoint_data(
}

pub fn should_sync_rocks_db(config: &Config, ctx: &Context) -> Result<Option<(u64, u64)>, String> {
let blocks_db = open_readwrite_ordhook_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
let blocks_db = open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), &ctx);
let inscriptions_db_conn = open_readonly_ordhook_db_conn(&config.expected_cache_path(), &ctx)?;
let last_compressed_block = find_last_block_inserted(&blocks_db) as u64;
let last_indexed_block = match find_latest_inscription_block_height(&inscriptions_db_conn, ctx)?
Expand Down Expand Up @@ -128,7 +128,7 @@ pub fn should_sync_ordhook_db(
}
};

let blocks_db = open_readwrite_ordhook_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
let blocks_db = open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), &ctx);
let mut start_block = find_last_block_inserted(&blocks_db) as u64;

if start_block == 0 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
use crate::{
config::Config,
core::pipeline::{PostProcessorCommand, PostProcessorController, PostProcessorEvent},
db::{insert_entry_in_blocks, open_readwrite_ordhook_db_conn_rocks_db, LazyBlock},
db::{insert_entry_in_blocks, LazyBlock, open_ordhook_db_conn_rocks_db_loop},
};

pub fn start_block_archiving_processor(
Expand All @@ -26,18 +26,15 @@ pub fn start_block_archiving_processor(
let handle: JoinHandle<()> = hiro_system_kit::thread_named("Processor Runloop")
.spawn(move || {
let blocks_db_rw =
open_readwrite_ordhook_db_conn_rocks_db(&config.expected_cache_path(), &ctx)
.unwrap();
open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), &ctx);
let mut processed_blocks = 0;

loop {
debug!(ctx.expect_logger(), "Tick");
let (compacted_blocks, _) = match commands_rx.try_recv() {
Ok(PostProcessorCommand::ProcessBlocks(compacted_blocks, blocks)) => {
(compacted_blocks, blocks)
}
Ok(PostProcessorCommand::Terminate) => {
debug!(ctx.expect_logger(), "Terminating block processor");
let _ = events_tx.send(PostProcessorEvent::Terminated);
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,16 @@ pub fn start_inscription_indexing_processor(
(compacted_blocks, blocks)
}
Ok(PostProcessorCommand::Terminate) => {
debug!(ctx.expect_logger(), "Terminating block processor");
let _ = events_tx.send(PostProcessorEvent::Terminated);
break;
}
Err(e) => match e {
TryRecvError::Empty => {
empty_cycles += 1;
if empty_cycles == 180 {
warn!(ctx.expect_logger(), "Block processor reached expiration");
ctx.try_log(|logger| {
info!(logger, "Block processor reached expiration")
});
let _ = events_tx.send(PostProcessorEvent::Expired);
break;
}
Expand All @@ -117,7 +118,7 @@ pub fn start_inscription_indexing_processor(
);
}

info!(ctx.expect_logger(), "Processing {} blocks", blocks.len());
ctx.try_log(|logger| info!(logger, "Processing {} blocks", blocks.len()));

blocks = process_blocks(
&mut blocks,
Expand All @@ -132,19 +133,19 @@ pub fn start_inscription_indexing_processor(
garbage_collect_nth_block += blocks.len();

if garbage_collect_nth_block > garbage_collect_every_n_blocks {
ctx.try_log(|logger| info!(logger, "Performing garbage collecting"));

// Clear L2 cache on a regular basis
info!(
ctx.expect_logger(),
"Clearing cache L2 ({} entries)",
cache_l2.len()
);
ctx.try_log(|logger| {
info!(logger, "Clearing cache L2 ({} entries)", cache_l2.len())
});
cache_l2.clear();

// Recreate sqlite db connection on a regular basis
inscriptions_db_conn_rw =
open_readwrite_ordhook_db_conn(&config.expected_cache_path(), &ctx)
.unwrap();

inscriptions_db_conn_rw.flush_prepared_statement_cache();
garbage_collect_nth_block = 0;
}
}
Expand Down Expand Up @@ -215,7 +216,7 @@ pub fn process_blocks(

if any_existing_activity {
ctx.try_log(|logger| {
warn!(
error!(
logger,
"Dropping updates for block #{}, activities present in database",
block.block_identifier.index,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,16 @@ pub fn start_transfers_recomputing_processor(
blocks
}
Ok(PostProcessorCommand::Terminate) => {
debug!(ctx.expect_logger(), "Terminating block processor");
let _ = events_tx.send(PostProcessorEvent::Terminated);
break;
}
Err(e) => match e {
TryRecvError::Empty => {
empty_cycles += 1;
if empty_cycles == 10 {
warn!(ctx.expect_logger(), "Block processor reached expiration");
ctx.try_log(|logger| {
warn!(logger, "Block processor reached expiration")
});
let _ = events_tx.send(PostProcessorEvent::Expired);
break;
}
Expand All @@ -65,7 +66,7 @@ pub fn start_transfers_recomputing_processor(
},
};

info!(ctx.expect_logger(), "Processing {} blocks", blocks.len());
ctx.try_log(|logger| info!(logger, "Processing {} blocks", blocks.len()));
let inscriptions_db_tx = inscriptions_db_conn_rw.transaction().unwrap();

for block in blocks.iter_mut() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ use std::{
collections::{BTreeMap, HashMap, VecDeque},
hash::BuildHasherDefault,
sync::Arc,
thread::sleep,
time::Duration,
};

use chainhook_sdk::{
Expand Down Expand Up @@ -784,8 +782,6 @@ pub fn consolidate_block_with_pre_computed_ordinals_data(
if results.len() == expected_inscriptions_count {
break results;
}
// Handle race conditions: if the db is being updated, the number of expected entries could be un-met.
sleep(Duration::from_secs(3));
ctx.try_log(|logger| {
warn!(
logger,
Expand Down
31 changes: 17 additions & 14 deletions components/ordhook-core/src/core/protocol/inscription_tracking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use chainhook_sdk::{
bitcoincore_rpc_json::bitcoin::{hashes::hex::FromHex, Address, Network, Script},
types::{
BitcoinBlockData, BitcoinNetwork, BitcoinTransactionData, BlockIdentifier,
OrdinalInscriptionTransferData, OrdinalOperation, TransactionIdentifier, OrdinalInscriptionTransferDestination,
OrdinalInscriptionTransferData, OrdinalInscriptionTransferDestination, OrdinalOperation,
TransactionIdentifier,
},
utils::Context,
};
Expand Down Expand Up @@ -84,14 +85,7 @@ pub fn augment_transaction_with_ordinals_transfers_data(
);

let entries =
match find_inscriptions_at_wached_outpoint(&outpoint_pre_transfer, &inscriptions_db_tx)
{
Ok(entries) => entries,
Err(e) => {
ctx.try_log(|logger| warn!(logger, "unable query inscriptions: {e}"));
continue;
}
};
find_inscriptions_at_wached_outpoint(&outpoint_pre_transfer, &inscriptions_db_tx, ctx);
// For each satpoint inscribed retrieved, we need to compute the next
// outpoint to watch
for watched_satpoint in entries.into_iter() {
Expand Down Expand Up @@ -124,10 +118,12 @@ pub fn augment_transaction_with_ordinals_transfers_data(
tx.metadata.outputs[output_index].get_script_pubkey_hex();
let updated_address = match Script::from_hex(&script_pub_key_hex) {
Ok(script) => match Address::from_script(&script, network.clone()) {
Ok(address) => OrdinalInscriptionTransferDestination::Transferred(address.to_string()),
Ok(address) => OrdinalInscriptionTransferDestination::Transferred(
address.to_string(),
),
Err(e) => {
ctx.try_log(|logger| {
warn!(
info!(
logger,
"unable to retrieve address from {script_pub_key_hex}: {}",
e.to_string()
Expand All @@ -138,13 +134,15 @@ pub fn augment_transaction_with_ordinals_transfers_data(
},
Err(e) => {
ctx.try_log(|logger| {
warn!(
info!(
logger,
"unable to retrieve address from {script_pub_key_hex}: {}",
e.to_string()
)
});
OrdinalInscriptionTransferDestination::Burnt(script_pub_key_hex.to_string())
OrdinalInscriptionTransferDestination::Burnt(
script_pub_key_hex.to_string(),
)
}
};

Expand Down Expand Up @@ -181,7 +179,12 @@ pub fn augment_transaction_with_ordinals_transfers_data(
offset
)
});
(outpoint, total_offset, OrdinalInscriptionTransferDestination::SpentInFees, None)
(
outpoint,
total_offset,
OrdinalInscriptionTransferDestination::SpentInFees,
None,
)
}
};

Expand Down
Loading

0 comments on commit f820169

Please sign in to comment.