Skip to content

Commit

Permalink
feat: share traversals_cache over 10 blocks spans
Browse files Browse the repository at this point in the history
  • Loading branch information
lgalabru committed May 8, 2023
1 parent 569d22f commit b0378c3
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 38 deletions.
12 changes: 7 additions & 5 deletions components/chainhook-cli/src/cli/mod.rs
Expand Up @@ -23,7 +23,7 @@ use chainhook_event_observer::hord::db::{
open_readwrite_hord_db_conn_rocks_db, retrieve_satoshi_point_using_local_storage,
};
use chainhook_event_observer::hord::{
retrieve_inscribed_satoshi_points_from_block,
new_traversals_cache, retrieve_inscribed_satoshi_points_from_block,
update_storage_and_augment_bitcoin_block_with_inscription_transfer_data, Storage,
};
use chainhook_event_observer::indexer;
Expand Down Expand Up @@ -633,14 +633,13 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
};

let transaction_identifier = TransactionIdentifier { hash: txid.clone() };
let hasher = FxBuildHasher::default();
let cache = Arc::new(DashMap::with_hasher(hasher));
let traversals_cache = new_traversals_cache();
let traversal = retrieve_satoshi_point_using_local_storage(
&hord_db_conn,
&block_identifier,
&transaction_identifier,
0,
cache,
Arc::new(traversals_cache),
&ctx,
)?;
info!(
Expand All @@ -655,10 +654,13 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
let block =
fetch_and_standardize_block(cmd.block_height, &bitcoin_config, &ctx)
.await?;
let traversals = retrieve_inscribed_satoshi_points_from_block(
let traversals_cache = Arc::new(new_traversals_cache());

let _traversals = retrieve_inscribed_satoshi_points_from_block(
&block,
None,
&config.expected_cache_path(),
&traversals_cache,
&ctx,
);
// info!(
Expand Down
14 changes: 11 additions & 3 deletions components/chainhook-event-observer/src/hord/db/mod.rs
Expand Up @@ -27,6 +27,7 @@ use crate::{
};

use super::{
new_traversals_cache,
ord::{height::Height, sat::Sat},
update_hord_db_and_augment_bitcoin_block,
};
Expand Down Expand Up @@ -955,6 +956,7 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
let mut cursor = start_block as usize;
let mut inbox = HashMap::new();
let mut num_writes = 0;
let traversals_cache = Arc::new(new_traversals_cache());

while let Ok(Some((block_height, compacted_block, raw_block))) = block_compressed_rx.recv() {
insert_entry_in_blocks(block_height, &compacted_block, &blocks_db_rw, &ctx);
Expand Down Expand Up @@ -1002,6 +1004,7 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
&inscriptions_db_conn_rw,
false,
&hord_db_path,
&traversals_cache,
&ctx,
) {
ctx.try_log(|logger| {
Expand Down Expand Up @@ -1031,6 +1034,10 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
return Ok(());
}

if num_writes % 10 == 0 {
traversals_cache.clear();
}

if num_writes % 5000 == 0 {
ctx.try_log(|logger| {
slog::info!(logger, "Flushing DB to disk ({num_writes} inserts)");
Expand Down Expand Up @@ -1079,7 +1086,7 @@ pub fn retrieve_satoshi_point_using_local_storage(
block_identifier: &BlockIdentifier,
transaction_identifier: &TransactionIdentifier,
inscription_number: u64,
cache: Arc<
traversals_cache: Arc<
DashMap<
(u32, [u8; 8]),
(Vec<([u8; 8], u32, u16, u64)>, Vec<u64>),
Expand Down Expand Up @@ -1116,7 +1123,7 @@ pub fn retrieve_satoshi_point_using_local_storage(
));
}

if let Some(cached_tx) = cache.get(&(ordinal_block_number, tx_cursor.0)) {
if let Some(cached_tx) = traversals_cache.get(&(ordinal_block_number, tx_cursor.0)) {
let (inputs, outputs) = cached_tx.value();
let mut next_found_in_cache = false;

Expand Down Expand Up @@ -1282,7 +1289,8 @@ pub fn retrieve_satoshi_point_using_local_storage(
// });

if sats_out < sats_in {
cache.insert((ordinal_block_number, txid_n), (inputs.clone(), outputs));
traversals_cache
.insert((ordinal_block_number, txid_n), (inputs.clone(), outputs));
ordinal_offset = sats_out - (sats_in - txin_value);
ordinal_block_number = *block_height;

Expand Down
31 changes: 26 additions & 5 deletions components/chainhook-event-observer/src/hord/mod.rs
Expand Up @@ -9,12 +9,14 @@ use chainhook_types::{
OrdinalOperation, TransactionIdentifier,
};
use dashmap::DashMap;
use fxhash::{FxBuildHasher, FxHasher};
use hiro_system_kit::slog;
use rand::seq::SliceRandom;
use rand::thread_rng;
use rocksdb::DB;
use rusqlite::Connection;
use std::collections::{BTreeMap, HashMap, VecDeque};
use std::hash::BuildHasherDefault;
use std::path::PathBuf;
use std::sync::mpsc::channel;
use std::sync::Arc;
Expand Down Expand Up @@ -162,10 +164,24 @@ pub fn revert_hord_db_with_augmented_bitcoin_block(
Ok(())
}

pub fn new_traversals_cache(
) -> DashMap<(u32, [u8; 8]), (Vec<([u8; 8], u32, u16, u64)>, Vec<u64>), BuildHasherDefault<FxHasher>>
{
let hasher = FxBuildHasher::default();
DashMap::with_hasher(hasher)
}

pub fn retrieve_inscribed_satoshi_points_from_block(
block: &BitcoinBlockData,
inscriptions_db_conn: Option<&Connection>,
hord_db_path: &PathBuf,
traversals_cache: &Arc<
DashMap<
(u32, [u8; 8]),
(Vec<([u8; 8], u32, u16, u64)>, Vec<u64>),
BuildHasherDefault<FxHasher>,
>,
>,
ctx: &Context,
) -> HashMap<TransactionIdentifier, TraversalResult> {
let mut transactions_ids = vec![];
Expand Down Expand Up @@ -202,14 +218,12 @@ pub fn retrieve_inscribed_satoshi_points_from_block(

let mut rng = thread_rng();
transactions_ids.shuffle(&mut rng);
let hasher = fxhash::FxBuildHasher::default();
let shared_cache = Arc::new(DashMap::with_hasher(hasher));
for transaction_id in transactions_ids.into_iter() {
let moved_traversal_tx = traversal_tx.clone();
let moved_ctx = ctx.clone();
let block_identifier = block.block_identifier.clone();
let moved_hord_db_path = hord_db_path.clone();
let cache = shared_cache.clone();
let local_cache = traversals_cache.clone();
traversal_data_pool.execute(move || loop {
match open_readonly_hord_db_conn_rocks_db(&moved_hord_db_path, &moved_ctx) {
Ok(blocks_db) => {
Expand All @@ -218,7 +232,7 @@ pub fn retrieve_inscribed_satoshi_points_from_block(
&block_identifier,
&transaction_id,
0,
cache,
local_cache,
&moved_ctx,
);
let _ = moved_traversal_tx.send((transaction_id, traversal));
Expand Down Expand Up @@ -262,7 +276,6 @@ pub fn retrieve_inscribed_satoshi_points_from_block(
}
}
let _ = traversal_data_pool.join();
std::thread::spawn(move || drop(shared_cache));
}

traversals
Expand All @@ -274,6 +287,13 @@ pub fn update_hord_db_and_augment_bitcoin_block(
inscriptions_db_conn_rw: &Connection,
write_block: bool,
hord_db_path: &PathBuf,
traversals_cache: &Arc<
DashMap<
(u32, [u8; 8]),
(Vec<([u8; 8], u32, u16, u64)>, Vec<u64>),
BuildHasherDefault<FxHasher>,
>,
>,
ctx: &Context,
) -> Result<(), String> {
if write_block {
Expand All @@ -299,6 +319,7 @@ pub fn update_hord_db_and_augment_bitcoin_block(
&new_block,
Some(inscriptions_db_conn_rw),
hord_db_path,
traversals_cache,
ctx,
);

Expand Down
59 changes: 34 additions & 25 deletions components/chainhook-event-observer/src/observer/mod.rs
Expand Up @@ -10,6 +10,7 @@ use crate::chainhooks::types::{
ChainhookConfig, ChainhookFullSpecification, ChainhookSpecification,
};

use crate::hord::new_traversals_cache;
#[cfg(feature = "ordinals")]
use crate::hord::{
db::{open_readwrite_hord_db_conn, open_readwrite_hord_db_conn_rocks_db},
Expand Down Expand Up @@ -695,21 +696,25 @@ pub async fn start_observer_commands_handler(
match bitcoin_block_store.get_mut(&header.block_identifier) {
Some(block) => {
#[cfg(feature = "ordinals")]
if let Err(e) = update_hord_db_and_augment_bitcoin_block(
block,
&blocks_db,
&inscriptions_db_conn_rw,
true,
&config.get_cache_path_buf(),
&ctx,
) {
ctx.try_log(|logger| {
slog::error!(
{
let traversals_cache = Arc::new(new_traversals_cache());
if let Err(e) = update_hord_db_and_augment_bitcoin_block(
block,
&blocks_db,
&inscriptions_db_conn_rw,
true,
&config.get_cache_path_buf(),
&traversals_cache,
&ctx,
) {
ctx.try_log(|logger| {
slog::error!(
logger,
"Unable to insert bitcoin block {} in hord_db: {e}",
block.block_identifier.index
)
});
});
}
}
new_blocks.push(block.clone());
}
Expand Down Expand Up @@ -833,20 +838,24 @@ pub async fn start_observer_commands_handler(
match bitcoin_block_store.get_mut(&header.block_identifier) {
Some(block) => {
#[cfg(feature = "ordinals")]
if let Err(e) = update_hord_db_and_augment_bitcoin_block(
block,
&blocks_db,
&inscriptions_db_conn_rw,
true,
&config.get_cache_path_buf(),
&ctx,
) {
ctx.try_log(|logger| {
slog::error!(
logger,
"Unable to apply bitcoin block {} with hord_db: {e}", block.block_identifier.index
)
});
{
let traversals_cache = Arc::new(new_traversals_cache());
if let Err(e) = update_hord_db_and_augment_bitcoin_block(
block,
&blocks_db,
&inscriptions_db_conn_rw,
true,
&config.get_cache_path_buf(),
&traversals_cache,
&ctx,
) {
ctx.try_log(|logger| {
slog::error!(
logger,
"Unable to apply bitcoin block {} with hord_db: {e}", block.block_identifier.index
)
});
}
}
blocks_to_apply.push(block.clone());
}
Expand Down

0 comments on commit b0378c3

Please sign in to comment.