Skip to content

Commit

Permalink
fix: gap in stacks scanning
Browse files Browse the repository at this point in the history
  • Loading branch information
lgalabru committed Apr 12, 2023
1 parent 938c6df commit 8c8c5c8
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 164 deletions.
30 changes: 26 additions & 4 deletions components/chainhook-cli/src/cli/mod.rs
@@ -1,8 +1,8 @@
use crate::block::DigestingCommand;
use crate::config::generator::generate_config;
use crate::config::Config;
use crate::scan::bitcoin::scan_bitcoin_chain_with_predicate_via_http;
use crate::scan::stacks::scan_stacks_chain_with_predicate;
use crate::scan::bitcoin::scan_bitcoin_chainstate_via_http_using_predicate;
use crate::scan::stacks::scan_stacks_chainstate_via_csv_using_predicate;
use crate::service::Service;

use chainhook_event_observer::bitcoincore_rpc::{Auth, Client, RpcApi};
Expand Down Expand Up @@ -535,10 +535,32 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
}
};

scan_bitcoin_chain_with_predicate_via_http(predicate_spec, &config, &ctx).await?;
scan_bitcoin_chainstate_via_http_using_predicate(
predicate_spec,
&config,
&ctx,
)
.await?;
}
ChainhookFullSpecification::Stacks(predicate) => {
scan_stacks_chain_with_predicate(predicate, &mut config, &ctx).await?;
let predicate_spec = match predicate
.into_selected_network_specification(&config.network.stacks_network)
{
Ok(predicate) => predicate,
Err(e) => {
return Err(format!(
"Specification missing for network {:?}: {e}",
config.network.bitcoin_network
));
}
};

scan_stacks_chainstate_via_csv_using_predicate(
predicate_spec,
&mut config,
&ctx,
)
.await?;
}
}
}
Expand Down
58 changes: 32 additions & 26 deletions components/chainhook-cli/src/scan/stacks.rs
Expand Up @@ -9,33 +9,31 @@ use crate::{
config::Config,
};
use chainhook_event_observer::{
chainhooks::stacks::{
handle_stacks_hook_action, StacksChainhookOccurrence, StacksTriggerChainhook,
chainhooks::{
stacks::evaluate_stacks_chainhook_on_blocks,
},
utils::{file_append, send_request, AbstractStacksBlock},
indexer::{self, stacks::standardize_stacks_serialized_block_header, Indexer},
utils::Context,
};
use chainhook_event_observer::{
chainhooks::{
stacks::evaluate_stacks_chainhook_on_blocks, types::StacksChainhookFullSpecification,
stacks::{handle_stacks_hook_action, StacksChainhookOccurrence, StacksTriggerChainhook},
types::StacksChainhookSpecification,
},
indexer::{self, stacks::standardize_stacks_serialized_block_header, Indexer},
utils::Context,
utils::{file_append, send_request, AbstractStacksBlock},
};
use chainhook_types::BlockIdentifier;

pub async fn scan_stacks_chain_with_predicate(
predicate: StacksChainhookFullSpecification,
pub async fn scan_stacks_chainstate_via_csv_using_predicate(
predicate_spec: StacksChainhookSpecification,
config: &mut Config,
ctx: &Context,
) -> Result<(), String> {
let selected_predicate =
predicate.into_selected_network_specification(&config.network.stacks_network)?;

let start_block = match selected_predicate.start_block {
) -> Result<BlockIdentifier, String> {
let start_block = match predicate_spec.start_block {
Some(start_block) => start_block,
None => {
return Err(
"Chainhook specification must include fields 'start_block' and 'end_block' when using the scan command"
"Chainhook specification must include fields 'start_block' when using the scan command"
.into(),
);
}
Expand Down Expand Up @@ -105,7 +103,7 @@ pub async fn scan_stacks_chain_with_predicate(
continue;
}

if let Some(end_block) = selected_predicate.end_block {
if let Some(end_block) = predicate_spec.end_block {
if block_identifier.index > end_block {
break;
}
Expand Down Expand Up @@ -137,7 +135,10 @@ pub async fn scan_stacks_chain_with_predicate(
ctx.expect_logger(),
"Starting predicate evaluation on Stacks blocks"
);
for (_block_identifier, _parent_block_identifier, blob) in canonical_fork.drain(..) {
let mut last_block_scanned = BlockIdentifier::default();
let mut err_count = 0;
for (block_identifier, _parent_block_identifier, blob) in canonical_fork.drain(..) {
last_block_scanned = block_identifier;
blocks_scanned += 1;
let block_data = match indexer::stacks::standardize_stacks_serialized_block(
&indexer.config,
Expand All @@ -154,13 +155,13 @@ pub async fn scan_stacks_chain_with_predicate(

let blocks: Vec<&dyn AbstractStacksBlock> = vec![&block_data];

let hits_per_blocks = evaluate_stacks_chainhook_on_blocks(blocks, &selected_predicate, ctx);
let hits_per_blocks = evaluate_stacks_chainhook_on_blocks(blocks, &predicate_spec, ctx);
if hits_per_blocks.is_empty() {
continue;
}

let trigger = StacksTriggerChainhook {
chainhook: &selected_predicate,
chainhook: &predicate_spec,
apply: hits_per_blocks,
rollback: vec![],
};
Expand All @@ -170,24 +171,29 @@ pub async fn scan_stacks_chain_with_predicate(
}
Ok(action) => {
actions_triggered += 1;
match action {
StacksChainhookOccurrence::Http(request) => {
send_request(request, &ctx).await;
}
StacksChainhookOccurrence::File(path, bytes) => {
file_append(path, bytes, &ctx);
}
let res = match action {
StacksChainhookOccurrence::Http(request) => send_request(request, &ctx).await,
StacksChainhookOccurrence::File(path, bytes) => file_append(path, bytes, &ctx),
StacksChainhookOccurrence::Data(_payload) => unreachable!(),
};
if res.is_err() {
err_count += 1;
} else {
err_count = 0;
}
}
}
// We abort after 3 consecutive errors
if err_count >= 3 {
return Err(format!("Scan aborted (consecutive action errors >= 3)"));
}
}
info!(
ctx.expect_logger(),
"{blocks_scanned} blocks scanned, {actions_triggered} actions triggered"
);

Ok(())
Ok(last_block_scanned)
}

async fn download_dataset_if_required(config: &mut Config, ctx: &Context) -> bool {
Expand Down
160 changes: 26 additions & 134 deletions components/chainhook-cli/src/service/mod.rs
@@ -1,30 +1,21 @@
use crate::config::Config;
use crate::scan::bitcoin::scan_bitcoin_chain_with_predicate_via_http;


use chainhook_event_observer::chainhooks::types::{
ChainhookConfig, ChainhookFullSpecification,
};
use crate::scan::bitcoin::scan_bitcoin_chainstate_via_http_using_predicate;
use crate::scan::stacks::scan_stacks_chainstate_via_csv_using_predicate;

use chainhook_event_observer::chainhooks::types::{ChainhookConfig, ChainhookFullSpecification};

use chainhook_event_observer::observer::{start_event_observer, ApiKey, ObserverEvent};
use chainhook_event_observer::utils::{Context};
use chainhook_event_observer::utils::Context;
use chainhook_event_observer::{
chainhooks::stacks::{
evaluate_stacks_predicate_on_transaction, handle_stacks_hook_action,
StacksChainhookOccurrence, StacksTriggerChainhook,
},
chainhooks::types::ChainhookSpecification,
};
use chainhook_types::{
BitcoinBlockSignaling, BlockIdentifier, StacksBlockData, StacksBlockMetadata, StacksChainEvent,
StacksTransactionData,
BitcoinBlockSignaling, StacksBlockData, StacksChainEvent,
};
use redis::{Commands, Connection};

use std::collections::{HashMap};
use std::sync::mpsc::channel;

use std::sync::mpsc::channel;

pub const DEFAULT_INGESTION_PORT: u16 = 20455;
pub const DEFAULT_CONTROL_PORT: u16 = 20456;
Expand Down Expand Up @@ -191,129 +182,30 @@ impl Service {
);
}
match chainhook {
ChainhookSpecification::Stacks(stacks_hook) => {
// Retrieve highest block height stored
let tip_height: u64 = redis_con.get(&format!("stx:tip")).unwrap_or(1);

let start_block = stacks_hook.start_block.unwrap_or(1); // TODO(lgalabru): handle STX hooks and genesis block :s
let end_block = stacks_hook.end_block.unwrap_or(tip_height); // TODO(lgalabru): handle STX hooks and genesis block :s

ChainhookSpecification::Stacks(predicate_spec) => {
let end_block = match scan_stacks_chainstate_via_csv_using_predicate(
predicate_spec,
&mut self.config,
&self.ctx,
)
.await
{
Ok(end_block) => end_block,
Err(e) => {
error!(
self.ctx.expect_logger(),
"Unable to evaluate predicate on Bitcoin chainstate: {e}",
);
continue;
}
};
info!(
self.ctx.expect_logger(),
"Processing Stacks chainhook {}, will scan blocks [{}; {}]",
stacks_hook.uuid,
start_block,
end_block
"Stacks chainstate scan completed up to block: {}", end_block.index
);
let mut total_hits = 0;
for cursor in start_block..=end_block {
debug!(
self.ctx.expect_logger(),
"Evaluating predicate #{} on block #{}",
stacks_hook.uuid,
cursor
);
let (
block_identifier,
parent_block_identifier,
timestamp,
transactions,
metadata,
) = {
let payload: Vec<String> = redis_con
.hget(
&format!("stx:{}", cursor),
&[
"block_identifier",
"parent_block_identifier",
"timestamp",
"transactions",
"metadata",
],
)
.expect("unable to retrieve tip height");
if payload.len() != 5 {
warn!(self.ctx.expect_logger(), "Chain still being processed, please retry in a few minutes");
continue;
}
(
serde_json::from_str::<BlockIdentifier>(&payload[0])
.unwrap(),
serde_json::from_str::<BlockIdentifier>(&payload[1])
.unwrap(),
serde_json::from_str::<i64>(&payload[2]).unwrap(),
serde_json::from_str::<Vec<StacksTransactionData>>(
&payload[3],
)
.unwrap(),
serde_json::from_str::<StacksBlockMetadata>(&payload[4])
.unwrap(),
)
};
let mut hits = vec![];
for tx in transactions.iter() {
if evaluate_stacks_predicate_on_transaction(
&tx,
&stacks_hook,
&self.ctx,
) {
debug!(
self.ctx.expect_logger(),
"Action #{} triggered by transaction {} (block #{})",
stacks_hook.uuid,
tx.transaction_identifier.hash,
cursor
);
hits.push(tx);
total_hits += 1;
}
}

if hits.len() > 0 {
let block = StacksBlockData {
block_identifier,
parent_block_identifier,
timestamp,
transactions: vec![],
metadata,
};
let trigger = StacksTriggerChainhook {
chainhook: &stacks_hook,
apply: vec![(hits, &block)],
rollback: vec![],
};

let proofs = HashMap::new();
match handle_stacks_hook_action(trigger, &proofs, &self.ctx) {
Err(e) => {
info!(
self.ctx.expect_logger(),
"unable to handle action {}", e
);
}
Ok(StacksChainhookOccurrence::Http(request)) => {
if let Err(e) =
hiro_system_kit::nestable_block_on(request.send())
{
error!(
self.ctx.expect_logger(),
"unable to perform action {}", e
);
}
}
Ok(_) => {
error!(
self.ctx.expect_logger(),
"action not supported"
);
}
}
}
}
info!(self.ctx.expect_logger(), "Stacks chainhook {} scan completed: action triggered by {} transactions", stacks_hook.uuid, total_hits);
}
ChainhookSpecification::Bitcoin(predicate_spec) => {
match scan_bitcoin_chain_with_predicate_via_http(
match scan_bitcoin_chainstate_via_http_using_predicate(
predicate_spec,
&self.config,
&self.ctx,
Expand All @@ -322,7 +214,7 @@ impl Service {
{
Ok(_) => {}
Err(e) => {
info!(
error!(
self.ctx.expect_logger(),
"Unable to evaluate predicate on Bitcoin chainstate: {e}",
);
Expand Down

0 comments on commit 8c8c5c8

Please sign in to comment.