Skip to content

Commit

Permalink
feat: handle stacks unconfirmed state scans
Browse files Browse the repository at this point in the history
  • Loading branch information
lgalabru committed Jun 6, 2023
1 parent 158633c commit f6d050f
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 95 deletions.
8 changes: 6 additions & 2 deletions components/chainhook-cli/src/scan/stacks.rs
Expand Up @@ -147,9 +147,13 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate(
let mut last_block_scanned = BlockIdentifier::default();
let mut err_count = 0;
for cursor in start_block..=end_block {
let block_data = match get_stacks_block_at_block_height(cursor, 3, stacks_db_conn) {
let block_data = match get_stacks_block_at_block_height(cursor, true, 3, stacks_db_conn) {
Ok(Some(block)) => block,
Ok(None) => unimplemented!(),
Ok(None) => match get_stacks_block_at_block_height(cursor, false, 3, stacks_db_conn) {
Ok(Some(block)) => block,
Ok(None) => unimplemented!(),
Err(_) => unimplemented!(),
},
Err(_) => unimplemented!(),
};
last_block_scanned = block_data.block_identifier.clone();
Expand Down
117 changes: 31 additions & 86 deletions components/chainhook-cli/src/service/mod.rs
@@ -1,14 +1,18 @@
mod http_api;
mod runloops;

use crate::config::{Config, PredicatesApi};
use crate::scan::bitcoin::scan_bitcoin_chainstate_via_http_using_predicate;
use crate::scan::bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate;
use crate::scan::stacks::{
consolidate_local_stacks_chainstate_using_csv,
scan_stacks_chainstate_via_rocksdb_using_predicate,
};
use crate::service::http_api::{load_predicates_from_redis, start_predicate_api_server};
use crate::service::runloops::{start_bitcoin_scan_runloop, start_stacks_scan_runloop};
use crate::storage::{
insert_entries_in_stacks_blocks, open_readonly_stacks_db_conn, open_readwrite_stacks_db_conn,
confirm_entries_in_stacks_blocks, draft_entries_in_stacks_blocks,
insert_unconfirmed_entry_in_stacks_blocks, open_readonly_stacks_db_conn,
open_readwrite_stacks_db_conn,
};

use chainhook_event_observer::chainhooks::types::{ChainhookConfig, ChainhookFullSpecification};
Expand Down Expand Up @@ -135,102 +139,33 @@ impl Service {

// Stacks scan operation threadpool
let (stacks_scan_op_tx, stacks_scan_op_rx) = crossbeam_channel::unbounded();
let stacks_scan_pool =
ThreadPool::new(self.config.limits.max_number_of_concurrent_stacks_scans);
let ctx = self.ctx.clone();
let config = self.config.clone();
let observer_command_tx_moved = observer_command_tx.clone();
let _ = hiro_system_kit::thread_named("Stacks scan runloop")
.spawn(move || {
while let Ok(mut predicate_spec) = stacks_scan_op_rx.recv() {
let moved_ctx = ctx.clone();
let moved_config = config.clone();
let observer_command_tx = observer_command_tx_moved.clone();
stacks_scan_pool.execute(move || {
let stacks_db_conn = match open_readonly_stacks_db_conn(
&moved_config.expected_cache_path(),
&moved_ctx,
) {
Ok(db_conn) => db_conn,
Err(e) => {
error!(
moved_ctx.expect_logger(),
"unable to store stacks block: {}",
e.to_string()
);
unimplemented!()
}
};

let op = scan_stacks_chainstate_via_rocksdb_using_predicate(
&predicate_spec,
&stacks_db_conn,
&moved_config,
&moved_ctx,
);
let last_block_scanned = match hiro_system_kit::nestable_block_on(op) {
Ok(last_block_scanned) => last_block_scanned,
Err(e) => {
error!(
moved_ctx.expect_logger(),
"Unable to evaluate predicate on Stacks chainstate: {e}",
);
return;
}
};
info!(
moved_ctx.expect_logger(),
"Stacks chainstate scan completed up to block: {}",
last_block_scanned.index
);
predicate_spec.end_block = Some(last_block_scanned.index);
let _ = observer_command_tx.send(ObserverCommand::EnablePredicate(
ChainhookSpecification::Stacks(predicate_spec),
));
});
}
let res = stacks_scan_pool.join();
res
start_stacks_scan_runloop(
&config,
stacks_scan_op_rx,
observer_command_tx_moved,
&ctx,
);
})
.expect("unable to spawn thread");

// Bitcoin scan operation threadpool
let (bitcoin_scan_op_tx, bitcoin_scan_op_rx) = crossbeam_channel::unbounded();
let bitcoin_scan_pool =
ThreadPool::new(self.config.limits.max_number_of_concurrent_bitcoin_scans);
let ctx = self.ctx.clone();
let config = self.config.clone();
let moved_observer_command_tx = observer_command_tx.clone();
let observer_command_tx_moved = observer_command_tx.clone();
let _ = hiro_system_kit::thread_named("Bitcoin scan runloop")
.spawn(move || {
while let Ok(predicate_spec) = bitcoin_scan_op_rx.recv() {
let moved_ctx = ctx.clone();
let moved_config = config.clone();
let observer_command_tx = moved_observer_command_tx.clone();
bitcoin_scan_pool.execute(move || {
let op = scan_bitcoin_chainstate_via_http_using_predicate(
&predicate_spec,
&moved_config,
&moved_ctx,
);

match hiro_system_kit::nestable_block_on(op) {
Ok(_) => {}
Err(e) => {
error!(
moved_ctx.expect_logger(),
"Unable to evaluate predicate on Bitcoin chainstate: {e}",
);
return;
}
};
let _ = observer_command_tx.send(ObserverCommand::EnablePredicate(
ChainhookSpecification::Bitcoin(predicate_spec),
));
});
}
let res = bitcoin_scan_pool.join();
res
start_bitcoin_scan_runloop(
&config,
bitcoin_scan_op_rx,
observer_command_tx_moved,
&ctx,
);
})
.expect("unable to spawn thread");

Expand Down Expand Up @@ -358,18 +293,28 @@ impl Service {
match &chain_event {
StacksChainEvent::ChainUpdatedWithBlocks(data) => {
stacks_event += 1;
insert_entries_in_stacks_blocks(
confirm_entries_in_stacks_blocks(
&data.confirmed_blocks,
&stacks_db_conn_rw,
&self.ctx,
);
draft_entries_in_stacks_blocks(
&data.new_blocks,
&stacks_db_conn_rw,
&self.ctx,
)
}
StacksChainEvent::ChainUpdatedWithReorg(data) => {
insert_entries_in_stacks_blocks(
confirm_entries_in_stacks_blocks(
&data.confirmed_blocks,
&stacks_db_conn_rw,
&self.ctx,
);
draft_entries_in_stacks_blocks(
&data.blocks_to_apply,
&stacks_db_conn_rw,
&self.ctx,
)
}
StacksChainEvent::ChainUpdatedWithMicroblocks(_)
| StacksChainEvent::ChainUpdatedWithMicroblocksReorg(_) => {}
Expand Down
112 changes: 112 additions & 0 deletions components/chainhook-cli/src/service/runloops.rs
@@ -0,0 +1,112 @@
use std::sync::mpsc::Sender;

use chainhook_event_observer::{
chainhooks::types::{
BitcoinChainhookSpecification, ChainhookSpecification, StacksChainhookSpecification,
},
observer::ObserverCommand,
utils::Context,
};
use threadpool::ThreadPool;

use crate::{
config::{Config, PredicatesApiConfig},
scan::{
bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate,
stacks::scan_stacks_chainstate_via_rocksdb_using_predicate,
},
storage::open_readonly_stacks_db_conn,
};

pub fn start_stacks_scan_runloop(
config: &Config,
stacks_scan_op_rx: crossbeam_channel::Receiver<StacksChainhookSpecification>,
observer_command_tx: Sender<ObserverCommand>,
ctx: &Context,
) {
let stacks_scan_pool = ThreadPool::new(config.limits.max_number_of_concurrent_stacks_scans);
while let Ok(mut predicate_spec) = stacks_scan_op_rx.recv() {
let moved_ctx = ctx.clone();
let moved_config = config.clone();
let observer_command_tx = observer_command_tx.clone();
stacks_scan_pool.execute(move || {
let stacks_db_conn =
match open_readonly_stacks_db_conn(&moved_config.expected_cache_path(), &moved_ctx)
{
Ok(db_conn) => db_conn,
Err(e) => {
error!(
moved_ctx.expect_logger(),
"unable to store stacks block: {}",
e.to_string()
);
unimplemented!()
}
};

let op = scan_stacks_chainstate_via_rocksdb_using_predicate(
&predicate_spec,
&stacks_db_conn,
&moved_config,
&moved_ctx,
);
let last_block_scanned = match hiro_system_kit::nestable_block_on(op) {
Ok(last_block_scanned) => last_block_scanned,
Err(e) => {
error!(
moved_ctx.expect_logger(),
"Unable to evaluate predicate on Stacks chainstate: {e}",
);
return;
}
};
info!(
moved_ctx.expect_logger(),
"Stacks chainstate scan completed up to block: {}", last_block_scanned.index
);
predicate_spec.end_block = Some(last_block_scanned.index);
let _ = observer_command_tx.send(ObserverCommand::EnablePredicate(
ChainhookSpecification::Stacks(predicate_spec),
));
});
}
let res = stacks_scan_pool.join();
res
}

pub fn start_bitcoin_scan_runloop(
config: &Config,
bitcoin_scan_op_rx: crossbeam_channel::Receiver<BitcoinChainhookSpecification>,
observer_command_tx: Sender<ObserverCommand>,
ctx: &Context,
) {
let bitcoin_scan_pool = ThreadPool::new(config.limits.max_number_of_concurrent_bitcoin_scans);

while let Ok(predicate_spec) = bitcoin_scan_op_rx.recv() {
let moved_ctx = ctx.clone();
let moved_config = config.clone();
let observer_command_tx = observer_command_tx.clone();
bitcoin_scan_pool.execute(move || {
let op = scan_bitcoin_chainstate_via_rpc_using_predicate(
&predicate_spec,
&moved_config,
&moved_ctx,
);

match hiro_system_kit::nestable_block_on(op) {
Ok(_) => {}
Err(e) => {
error!(
moved_ctx.expect_logger(),
"Unable to evaluate predicate on Bitcoin chainstate: {e}",
);
return;
}
};
let _ = observer_command_tx.send(ObserverCommand::EnablePredicate(
ChainhookSpecification::Bitcoin(predicate_spec),
));
});
}
let res = bitcoin_scan_pool.join();
}

0 comments on commit f6d050f

Please sign in to comment.