Skip to content

Commit

Permalink
fix: address remaining issues
Browse files Browse the repository at this point in the history
  • Loading branch information
lgalabru committed Aug 4, 2023
1 parent ddf790f commit 74b2fa9
Show file tree
Hide file tree
Showing 16 changed files with 1,206 additions and 1,267 deletions.
4 changes: 2 additions & 2 deletions components/hord-cli/Cargo.toml
Expand Up @@ -14,8 +14,8 @@ redis = "0.21.5"
serde-redis = "0.12.0"
hex = "0.4.3"
rand = "0.8.5"
chainhook-sdk = { version = "=0.8.0", default-features = false, features = ["zeromq"] }
# chainhook-sdk = { version = "=0.7.12", path = "../../../chainhook/components/chainhook-sdk", default-features = false, features = ["zeromq"] }
chainhook-sdk = { version = "=0.8.2", default-features = false, features = ["zeromq"] }
# chainhook-sdk = { version = "=0.8.2", path = "../../../chainhook/components/chainhook-sdk", default-features = false, features = ["zeromq"] }
hiro-system-kit = "0.1.0"
clap = { version = "3.2.23", features = ["derive"], optional = true }
clap_generate = { version = "3.0.3", optional = true }
Expand Down
6 changes: 3 additions & 3 deletions components/hord-cli/src/cli/mod.rs
Expand Up @@ -3,15 +3,15 @@ use crate::config::Config;
use crate::core::pipeline::download_and_pipeline_blocks;
use crate::core::pipeline::processors::block_ingestion::start_block_ingestion_processor;
use crate::core::pipeline::processors::start_inscription_indexing_processor;
use crate::core::{self};
use crate::core::protocol::inscription_parsing::parse_ordinals_and_standardize_block;
use crate::download::download_ordinals_dataset_if_required;
use crate::scan::bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate;
use crate::service::Service;

use crate::db::{
delete_data_in_hord_db, find_all_inscription_transfers, find_all_inscriptions_in_block,
find_all_transfers_in_block, find_inscription_with_id, find_last_block_inserted,
find_latest_inscription_block_height, find_lazy_block_at_block_height, initialize_hord_db,
find_latest_inscription_block_height, find_lazy_block_at_block_height,
open_readonly_hord_db_conn, open_readonly_hord_db_conn_rocks_db, open_readwrite_hord_db_conn,
open_readwrite_hord_db_conn_rocks_db,
};
Expand Down Expand Up @@ -742,7 +742,7 @@ pub async fn fetch_and_standardize_block(
download_and_parse_block_with_retry(http_client, &block_hash, &bitcoin_config, &ctx)
.await?;

core::parse_ordinals_and_standardize_block(block_breakdown, &bitcoin_config.network, &ctx)
parse_ordinals_and_standardize_block(block_breakdown, &bitcoin_config.network, &ctx)
.map_err(|(e, _)| e)
}

Expand Down
64 changes: 4 additions & 60 deletions components/hord-cli/src/core/mod.rs
@@ -1,14 +1,11 @@
pub mod pipeline;
pub mod protocol;

use chainhook_sdk::types::{
BitcoinBlockData, BitcoinNetwork, OrdinalInscriptionRevealData, OrdinalOperation,
};
use chainhook_sdk::types::{BitcoinBlockData, OrdinalOperation};
use dashmap::DashMap;
use fxhash::{FxBuildHasher, FxHasher};
use rocksdb::DB;
use rusqlite::Connection;
use std::collections::BTreeMap;
use std::hash::BuildHasherDefault;
use std::ops::Div;
use std::path::PathBuf;
Expand All @@ -20,8 +17,6 @@ use chainhook_sdk::{

use crate::config::{Config, LogConfig};

use chainhook_sdk::indexer::bitcoin::{standardize_bitcoin_block, BitcoinBlockFullBreakdown};

use crate::db::{
find_last_block_inserted, find_latest_inscription_block_height, initialize_hord_db,
open_readonly_hord_db_conn, open_readonly_hord_db_conn_rocks_db,
Expand All @@ -32,10 +27,6 @@ use crate::db::{
LazyBlockTransaction,
};

use self::protocol::inscribing::{
get_inscriptions_from_full_tx, get_inscriptions_from_standardized_tx,
};

#[derive(Clone, Debug)]
pub struct HordConfig {
pub network_thread_max: usize,
Expand All @@ -47,52 +38,6 @@ pub struct HordConfig {
pub logs: LogConfig,
}

pub fn parse_ordinals_and_standardize_block(
raw_block: BitcoinBlockFullBreakdown,
network: &BitcoinNetwork,
ctx: &Context,
) -> Result<BitcoinBlockData, (String, bool)> {
let mut ordinal_operations = BTreeMap::new();

for tx in raw_block.tx.iter() {
ordinal_operations.insert(tx.txid.to_string(), get_inscriptions_from_full_tx(&tx, ctx));
}

let mut block = standardize_bitcoin_block(raw_block, network, ctx)?;

for tx in block.transactions.iter_mut() {
if let Some(ordinal_operations) =
ordinal_operations.remove(tx.transaction_identifier.get_hash_bytes_str())
{
tx.metadata.ordinal_operations = ordinal_operations;
}
}
Ok(block)
}

pub fn parse_inscriptions_in_standardized_block(block: &mut BitcoinBlockData, ctx: &Context) {
for tx in block.transactions.iter_mut() {
tx.metadata.ordinal_operations = get_inscriptions_from_standardized_tx(tx, ctx);
}
}

pub fn get_inscriptions_revealed_in_block(
block: &BitcoinBlockData,
) -> Vec<&OrdinalInscriptionRevealData> {
let mut ops = vec![];
for tx in block.transactions.iter() {
for op in tx.metadata.ordinal_operations.iter() {
if let OrdinalOperation::InscriptionRevealed(op) = op {
ops.push(op);
}
if let OrdinalOperation::CursedInscriptionRevealed(op) = op {
ops.push(op);
}
}
}
ops
}

pub fn revert_hord_db_with_augmented_bitcoin_block(
block: &BitcoinBlockData,
blocks_db_rw: &DB,
Expand All @@ -106,8 +51,7 @@ pub fn revert_hord_db_with_augmented_bitcoin_block(
let tx = &block.transactions[block.transactions.len() - tx_index];
for ordinal_event in tx.metadata.ordinal_operations.iter() {
match ordinal_event {
OrdinalOperation::InscriptionRevealed(data)
| OrdinalOperation::CursedInscriptionRevealed(data) => {
OrdinalOperation::InscriptionRevealed(data) => {
// We remove any new inscription created
remove_entry_from_inscriptions(
&data.inscription_id,
Expand Down Expand Up @@ -248,13 +192,13 @@ pub fn should_sync_hord_db(
(end_block.min(200_000), 10_000)
} else if start_block < 550_000 {
(end_block.min(550_000), 1_000)
} else {
} else {
(end_block, 100)
};

if start_block < 767430 && end_block > 767430 {
end_block = 767430;
}
}

if start_block <= end_block {
Ok(Some((start_block, end_block, speed)))
Expand Down
2 changes: 1 addition & 1 deletion components/hord-cli/src/core/pipeline/mod.rs
Expand Up @@ -16,7 +16,7 @@ use chainhook_sdk::indexer::bitcoin::{
build_http_client, parse_downloaded_block, try_download_block_bytes_with_retry,
};

use super::parse_ordinals_and_standardize_block;
use super::protocol::inscription_parsing::parse_ordinals_and_standardize_block;

pub enum PostProcessorCommand {
Start,
Expand Down
61 changes: 22 additions & 39 deletions components/hord-cli/src/core/pipeline/processors/block_ingestion.rs
Expand Up @@ -6,11 +6,12 @@ use std::{

use chainhook_sdk::{types::BitcoinBlockData, utils::Context};
use crossbeam_channel::TryRecvError;
use rocksdb::DB;

use crate::{
config::Config,
core::pipeline::{PostProcessorCommand, PostProcessorController, PostProcessorEvent},
db::{insert_entry_in_blocks, open_readwrite_hord_db_conn_rocks_db},
db::{insert_entry_in_blocks, open_readwrite_hord_db_conn_rocks_db, LazyBlock},
};

pub fn start_block_ingestion_processor(
Expand All @@ -25,18 +26,14 @@ pub fn start_block_ingestion_processor(
let ctx = ctx.clone();
let handle: JoinHandle<()> = hiro_system_kit::thread_named("Processor Runloop")
.spawn(move || {
let mut num_writes = 0;
let blocks_db_rw =
open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx).unwrap();

let mut empty_cycles = 0;
let mut tip: u64 = 0;

if let Ok(PostProcessorCommand::Start) = commands_rx.recv() {
info!(ctx.expect_logger(), "Start block indexing runloop");
}


loop {
let (compacted_blocks, _) = match commands_rx.try_recv() {
Ok(PostProcessorCommand::ProcessBlocks(compacted_blocks, blocks)) => {
Expand All @@ -62,40 +59,7 @@ pub fn start_block_ingestion_processor(
}
},
};

let batch_size = compacted_blocks.len();
num_writes += batch_size;
for (block_height, compacted_block) in compacted_blocks.into_iter() {
tip = tip.max(block_height);
insert_entry_in_blocks(
block_height as u32,
&compacted_block,
&blocks_db_rw,
&ctx,
);
}
info!(ctx.expect_logger(), "{batch_size} blocks saved to disk (total: {tip})");

// Early return
if num_writes >= 512 {
ctx.try_log(|logger| {
info!(logger, "Flushing DB to disk ({num_writes} inserts)");
});
if let Err(e) = blocks_db_rw.flush() {
ctx.try_log(|logger| {
error!(logger, "{}", e.to_string());
});
}
num_writes = 0;
continue;
}

// Write blocks to disk, before traversals
if let Err(e) = blocks_db_rw.flush() {
ctx.try_log(|logger| {
error!(logger, "{}", e.to_string());
});
}
store_compacted_blocks(compacted_blocks, &blocks_db_rw, &ctx);
}

if let Err(e) = blocks_db_rw.flush() {
Expand All @@ -112,3 +76,22 @@ pub fn start_block_ingestion_processor(
thread_handle: handle,
}
}

pub fn store_compacted_blocks(
mut compacted_blocks: Vec<(u64, LazyBlock)>,
blocks_db_rw: &DB,
ctx: &Context,
) {
compacted_blocks.sort_by(|(a, _), (b, _)| a.cmp(b));

for (block_height, compacted_block) in compacted_blocks.into_iter() {
insert_entry_in_blocks(block_height as u32, &compacted_block, &blocks_db_rw, &ctx);
info!(ctx.expect_logger(), "Block #{block_height} saved to disk");
}

if let Err(e) = blocks_db_rw.flush() {
ctx.try_log(|logger| {
error!(logger, "{}", e.to_string());
});
}
}

0 comments on commit 74b2fa9

Please sign in to comment.