Skip to content

Commit

Permalink
feat: add shared cache
Browse files Browse the repository at this point in the history
  • Loading branch information
lgalabru committed May 8, 2023
1 parent ce439ae commit 07523ae
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 90 deletions.
49 changes: 25 additions & 24 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 6 additions & 13 deletions components/chainhook-cli/src/service/mod.rs
Expand Up @@ -17,7 +17,7 @@ use std::sync::mpsc::channel;

pub const DEFAULT_INGESTION_PORT: u16 = 20455;
pub const DEFAULT_CONTROL_PORT: u16 = 20456;
pub const STACKS_SCAN_THREAD_POOL_SIZE: usize = 12;
pub const STACKS_SCAN_THREAD_POOL_SIZE: usize = 1;
pub const BITCOIN_SCAN_THREAD_POOL_SIZE: usize = 12;

pub struct Service {
Expand Down Expand Up @@ -147,8 +147,8 @@ impl Service {
&mut moved_config,
&moved_ctx,
);
let end_block = match hiro_system_kit::nestable_block_on(op) {
Ok(end_block) => end_block,
let last_block_in_csv = match hiro_system_kit::nestable_block_on(op) {
Ok(last_block_in_csv) => last_block_in_csv,
Err(e) => {
error!(
moved_ctx.expect_logger(),
Expand All @@ -159,7 +159,8 @@ impl Service {
};
info!(
moved_ctx.expect_logger(),
"Stacks chainstate scan completed up to block: {}", end_block.index
"Stacks chainstate scan completed up to block: {}",
last_block_in_csv.index
);
let _ = observer_command_tx.send(ObserverCommand::EnablePredicate(
ChainhookSpecification::Stacks(predicate_spec),
Expand Down Expand Up @@ -259,15 +260,7 @@ impl Service {
}
match chainhook {
ChainhookSpecification::Stacks(predicate_spec) => {
// let _ = stacks_scan_op_tx.send((predicate_spec, api_key));
info!(
self.ctx.expect_logger(),
"Enabling stacks predicate {}", predicate_spec.uuid
);
let _ = observer_command_tx.send(ObserverCommand::EnablePredicate(
ChainhookSpecification::Stacks(predicate_spec),
api_key,
));
let _ = stacks_scan_op_tx.send((predicate_spec, api_key));
}
ChainhookSpecification::Bitcoin(predicate_spec) => {
let _ = bitcoin_scan_op_tx.send((predicate_spec, api_key));
Expand Down
4 changes: 2 additions & 2 deletions components/chainhook-event-observer/Cargo.toml
Expand Up @@ -50,8 +50,8 @@ rand = "0.8.5"
hex-simd = "0.8.0"
serde_cbor = "0.11.2"
zeromq = { version = "*", default-features = false, features = ["tokio-runtime", "tcp-transport"] }
zerocopy = "0.6.1"
zerocopy-derive = "0.3.2"
dashmap = "5.4.0"
fxhash = "0.2.1"

[dependencies.rocksdb]
version = "0.20.1"
Expand Down
115 changes: 101 additions & 14 deletions components/chainhook-event-observer/src/hord/db/mod.rs
@@ -1,11 +1,15 @@
use std::{
collections::{BTreeMap, HashMap},
hash::BuildHasherDefault,
path::PathBuf,
sync::Arc,
};

use chainhook_types::{
BitcoinBlockData, BlockIdentifier, OrdinalInscriptionRevealData, TransactionIdentifier,
};
use dashmap::DashMap;
use fxhash::FxHasher;
use hiro_system_kit::slog;

use rocksdb::DB;
Expand Down Expand Up @@ -418,6 +422,7 @@ pub fn open_readonly_hord_db_conn_rocks_db(
let mut opts = rocksdb::Options::default();
opts.create_if_missing(true);
opts.set_max_open_files(5000);
opts.set_disable_auto_compactions(true);
let db = DB::open_for_read_only(&opts, path, false)
.map_err(|e| format!("unable to open blocks_db: {}", e.to_string()))?;
Ok(db)
Expand All @@ -431,11 +436,22 @@ pub fn open_readwrite_hord_db_conn_rocks_db(
let mut opts = rocksdb::Options::default();
opts.create_if_missing(true);
opts.set_max_open_files(5000);
opts.set_disable_auto_compactions(true);
let db = DB::open(&opts, path)
.map_err(|e| format!("unable to open blocks_db: {}", e.to_string()))?;
Ok(db)
}

pub fn archive_hord_db_conn_rocks_db(base_dir: &PathBuf, _ctx: &Context) {
let from = get_default_hord_db_file_path_rocks_db(&base_dir);
let to = {
let mut destination_path = base_dir.clone();
destination_path.push("hord.rocksdb_archive");
destination_path
};
let _ = std::fs::rename(from, to);
}

// Legacy - to remove after migrations
pub fn find_block_at_block_height_sqlite(
block_height: u32,
Expand Down Expand Up @@ -1058,11 +1074,24 @@ impl TraversalResult {
}
}

// May 05 21:48:55.191 INFO Computing ordinal number for Satoshi point 0x5489d47538302148cd524f0ab1cc13223f3dc089cd5267d4cd45ccf1d532b743:0:0 (block #788201)
// May 05 21:50:36.807 INFO Satoshi #147405521136231 was minted in block #29481 at offset 521136231 and was transferred 2858 times (progress: 2379/2379).

// May 05 22:04:23.767 INFO Computing ordinal number for Satoshi point 0x57baf2e41fe5ffc70fc63129a1208c77606dd94d9ec05097a47ee557d8653c74:0:0 (block #788201)
// May 05 22:04:56.009 INFO Satoshi #1122896574049767 was minted in block #239158 at offset 1574049767 and was transferred 10634 times (progress: 2379/2379).

pub fn retrieve_satoshi_point_using_local_storage(
blocks_db: &DB,
block_identifier: &BlockIdentifier,
transaction_identifier: &TransactionIdentifier,
inscription_number: u64,
cache: Arc<
DashMap<
(u32, [u8; 8]),
(Vec<([u8; 8], u32, u16, u64)>, Vec<u64>),
BuildHasherDefault<FxHasher>,
>,
>,
ctx: &Context,
) -> Result<TraversalResult, String> {
ctx.try_log(|logger| {
Expand All @@ -1084,10 +1113,7 @@ pub fn retrieve_satoshi_point_using_local_storage(
};
let mut tx_cursor = (txid, 0);
let mut hops: u32 = 0;
let mut local_block_cache = HashMap::new();
loop {
local_block_cache.clear();

hops += 1;
if hops as u64 > block_identifier.index {
return Err(format!(
Expand All @@ -1096,17 +1122,77 @@ pub fn retrieve_satoshi_point_using_local_storage(
));
}

let block = match local_block_cache.get(&ordinal_block_number) {
Some(block) => block,
None => match find_block_at_block_height(ordinal_block_number, 3, &blocks_db) {
Some(block) => {
local_block_cache.insert(ordinal_block_number, block);
local_block_cache.get(&ordinal_block_number).unwrap()
if let Some(cached_tx) = cache.get(&(ordinal_block_number, tx_cursor.0)) {
let (inputs, outputs) = cached_tx.value();
let mut next_found_in_cache = false;

let mut sats_out = 0;
for (index, output_value) in outputs.iter().enumerate() {
if index == tx_cursor.1 {
break;
}
None => {
return Err(format!("block #{ordinal_block_number} not in database"));
// ctx.try_log(|logger| {
// slog::info!(logger, "Adding {} from output #{}", output_value, index)
// });
sats_out += output_value;
}
sats_out += ordinal_offset;
// ctx.try_log(|logger| {
// slog::info!(
// logger,
// "Adding offset {ordinal_offset} to sats_out {sats_out}"
// )
// });

let mut sats_in = 0;
for (txin, block_height, vout, txin_value) in inputs.into_iter() {
sats_in += txin_value;
// ctx.try_log(|logger| {
// slog::info!(
// logger,
// "Adding txin_value {txin_value} to sats_in {sats_in} (txin: {})",
// hex::encode(&txin)
// )
// });

if sats_out < sats_in {
ordinal_offset = sats_out - (sats_in - txin_value);
ordinal_block_number = *block_height;

// ctx.try_log(|logger| slog::info!(logger, "Block {ordinal_block_number} / Tx {} / [in:{sats_in}, out:{sats_out}]: {block_height} -> {ordinal_block_number}:{ordinal_offset} -> {}:{vout}",
// hex::encode(&txid_n),
// hex::encode(&txin)));
tx_cursor = (txin.clone(), *vout as usize);
next_found_in_cache = true;
break;
}
},
}

if next_found_in_cache {
continue;
}

if sats_in == 0 {
ctx.try_log(|logger| {
slog::error!(
logger,
"Transaction {} is originating from a non spending transaction",
transaction_identifier.hash
)
});
return Ok(TraversalResult {
inscription_number: 0,
ordinal_number: 0,
transfers: 0,
});
}
}

let block = match find_block_at_block_height(ordinal_block_number, 3, &blocks_db) {
Some(block) => block,
None => {
return Err(format!("block #{ordinal_block_number} not in database"));
}
};

let coinbase_txid = &block.0 .0 .0;
Expand Down Expand Up @@ -1162,7 +1248,7 @@ pub fn retrieve_satoshi_point_using_local_storage(
}
} else {
// isolate the target transaction
for (txid_n, inputs, outputs) in block.0 .1.iter() {
for (txid_n, inputs, outputs) in block.0 .1.into_iter() {
// we iterate over the transactions, looking for the transaction target
if !txid_n.eq(&txid) {
continue;
Expand Down Expand Up @@ -1191,7 +1277,7 @@ pub fn retrieve_satoshi_point_using_local_storage(
// });

let mut sats_in = 0;
for (txin, block_height, vout, txin_value) in inputs.into_iter() {
for (txin, block_height, vout, txin_value) in inputs.iter() {
sats_in += txin_value;
// ctx.try_log(|logger| {
// slog::info!(
Expand All @@ -1202,6 +1288,7 @@ pub fn retrieve_satoshi_point_using_local_storage(
// });

if sats_out < sats_in {
cache.insert((ordinal_block_number, txid_n), (inputs.clone(), outputs));
ordinal_offset = sats_out - (sats_in - txin_value);
ordinal_block_number = *block_height;

Expand Down

0 comments on commit 07523ae

Please sign in to comment.