Skip to content

Commit

Permalink
feat: support for latest archives, add logs
Browse files Browse the repository at this point in the history
  • Loading branch information
lgalabru committed Mar 23, 2023
1 parent 9fda9d0 commit 494cf3c
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 51 deletions.
14 changes: 0 additions & 14 deletions components/chainhook-cli/Chainhook.toml

This file was deleted.

31 changes: 13 additions & 18 deletions components/chainhook-cli/src/archive/mod.rs
Expand Up @@ -3,39 +3,40 @@ use chainhook_types::StacksNetwork;
use flate2::read::GzDecoder;
use futures_util::StreamExt;
use std::fs;
use std::io::Read;
use std::io::{self, Cursor};
use tar::Archive;
use std::io::{Read, Write};

pub fn default_tsv_file_path(network: &StacksNetwork) -> String {
format!("stacks-node-events-{:?}.tsv", network).to_lowercase()
format!("{:?}-stacks-events.tsv", network).to_lowercase()
}

pub async fn download_tsv_file(config: &Config) -> Result<(), String> {
let destination_path = config.expected_cache_path();
let mut destination_path = config.expected_cache_path();
let url = config.expected_remote_tsv_url();
let res = reqwest::get(url)
.await
.or(Err(format!("Failed to GET from '{}'", &url)))?;

// Download chunks
let (tx, rx) = flume::bounded(0);
destination_path.push(default_tsv_file_path(&config.network.stacks_network));

let mut from = destination_path.clone();
let decoder_thread = std::thread::spawn(move || {
let input = ChannelRead::new(rx);
let gz = GzDecoder::new(input);
let mut archive = Archive::new(gz);
archive.unpack(&destination_path).unwrap();
let mut decoder = GzDecoder::new(input);
let mut content = Vec::new();
let _ = decoder.read_to_end(&mut content);
let mut file = fs::File::create(&destination_path).unwrap();
file.write_all(&content[..]).unwrap();
});

if res.status() == reqwest::StatusCode::OK {
let mut stream = res.bytes_stream();
while let Some(item) = stream.next().await {
let chunk = item
.or(Err(format!("Error while downloading file")))
.unwrap();
tx.send_async(chunk.to_vec()).await.unwrap();
let chunk = item.or(Err(format!("Error while downloading file")))?;
tx.send_async(chunk.to_vec())
.await
.map_err(|e| format!("unable to download stacks event: {}", e.to_string()))?;
}
drop(tx);
}
Expand All @@ -45,12 +46,6 @@ pub async fn download_tsv_file(config: &Config) -> Result<(), String> {
.unwrap()
.unwrap();

from.push("stacks-node-events.tsv");
let mut to = from.clone();
to.pop();
to.push(default_tsv_file_path(&config.network.stacks_network));
let _ = fs::rename(from, to);

Ok(())
}

Expand Down
9 changes: 4 additions & 5 deletions components/chainhook-cli/src/cli/mod.rs
@@ -1,15 +1,14 @@
use crate::block::DigestingCommand;
use crate::config::Config;
use crate::config::generator::generate_config;
use crate::config::Config;
use crate::node::Node;
use crate::scan::bitcoin::scan_bitcoin_chain_with_predicate;
use crate::scan::stacks::scan_stacks_chain_with_predicate;

use chainhook_event_observer::chainhooks::types::ChainhookFullSpecification;
use chainhook_event_observer::indexer::ordinals::db::{
build_bitcoin_traversal_local_storage,
find_inscriptions_at_wached_outpoint, initialize_ordinal_state_storage,
open_readonly_ordinals_db_conn,
build_bitcoin_traversal_local_storage, find_inscriptions_at_wached_outpoint,
initialize_ordinal_state_storage, open_readonly_ordinals_db_conn,
retrieve_satoshi_point_using_local_storage,
};
use chainhook_event_observer::observer::BitcoinConfig;
Expand Down Expand Up @@ -318,7 +317,7 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
.map_err(|e| format!("unable to write file {}\n{}", file_path.display(), e))?;
println!("Created file Chainhook.toml");
}
}
},
Command::Predicates(subcmd) => match subcmd {
PredicatesCommand::New(_cmd) => {
// let manifest = clarinet_files::get_manifest_location(None);
Expand Down
5 changes: 4 additions & 1 deletion components/chainhook-cli/src/config/generator.rs
Expand Up @@ -3,7 +3,7 @@ pub fn generate_config() -> String {
r#"[storage]
driver = "redis"
redis_uri = "redis://localhost:6379/"
cache_path = "chainhook_cache"
cache_path = "cache"
[chainhooks]
max_stacks_registrations = 500
Expand All @@ -15,6 +15,9 @@ bitcoin_node_rpc_url = "http://localhost:8332"
bitcoin_node_rpc_username = "devnet"
bitcoin_node_rpc_password = "devnet"
stacks_node_rpc_url = "http://localhost:20443"
[[event_source]]
tsv_file_url = "https://archive.hiro.so/mainnet/stacks-blockchain-api/mainnet-stacks-blockchain-api-latest.gz"
"#
);
return conf;
Expand Down
26 changes: 19 additions & 7 deletions components/chainhook-cli/src/config/mod.rs
Expand Up @@ -8,10 +8,10 @@ use std::fs::File;
use std::io::{BufReader, Read};
use std::path::PathBuf;

const DEFAULT_MAINNET_TSV_ARCHIVE: &str = "https://storage.googleapis.com/hirosystems-archive/mainnet/api/mainnet-blockchain-api-latest.tar.gz";
const DEFAULT_TESTNET_TSV_ARCHIVE: &str = "https://storage.googleapis.com/hirosystems-archive/testnet/api/testnet-blockchain-api-latest.tar.gz";
// const DEFAULT_MAINNET_TSV_ARCHIVE: &str = "https://archive.hiro.so/mainnet/stacks-blockchain-api/mainnet-stacks-blockchain-api-latest.gz";
// const DEFAULT_TESTNET_TSV_ARCHIVE: &str = "https://archive.hiro.so/testnet/stacks-blockchain-api/testnet-stacks-blockchain-api-latest.gz";
const DEFAULT_MAINNET_TSV_ARCHIVE: &str =
"https://archive.hiro.so/mainnet/stacks-blockchain-api/mainnet-stacks-blockchain-api-latest.gz";
const DEFAULT_TESTNET_TSV_ARCHIVE: &str =
"https://archive.hiro.so/testnet/stacks-blockchain-api/testnet-stacks-blockchain-api-latest.gz";

#[derive(Clone, Debug)]
pub struct Config {
Expand Down Expand Up @@ -99,16 +99,28 @@ impl Config {
_ => return Err("network.mode not supported".to_string()),
};

let mut event_sources = vec![];
for source in config_file.event_source.unwrap_or(vec![]).iter_mut() {
if let Some(dst) = source.tsv_file_path.take() {
let mut file_path = PathBuf::new();
file_path.push(dst);
event_sources.push(EventSourceConfig::TsvPath(TsvPathConfig { file_path }));
continue;
}
if let Some(file_url) = source.tsv_file_url.take() {
event_sources.push(EventSourceConfig::TsvUrl(TsvUrlConfig { file_url }));
continue;
}
}

let config = Config {
storage: StorageConfig {
driver: StorageDriver::Redis(RedisConfig {
uri: config_file.storage.redis_uri.to_string(),
}),
cache_path: config_file.storage.cache_path.unwrap_or("cache".into()),
},
event_sources: vec![EventSourceConfig::StacksNode(StacksNodeConfig {
host: config_file.network.stacks_node_rpc_url.to_string(),
})],
event_sources,
chainhooks: ChainhooksConfig {
max_stacks_registrations: config_file
.chainhooks
Expand Down
34 changes: 28 additions & 6 deletions components/chainhook-cli/src/scan/stacks.rs
Expand Up @@ -153,7 +153,14 @@ pub async fn scan_stacks_chain_with_predicate(
};
let proofs = HashMap::new();

let mut actions_triggered = 0;
let mut blocks_scanned = 0;
info!(
ctx.expect_logger(),
"Starting predicate evaluation on blocks"
);
for (_block_identifier, _parent_block_identifier, blob) in canonical_fork.drain(..) {
blocks_scanned += 1;
let block_data = match indexer::stacks::standardize_stacks_serialized_block(
&indexer.config,
&blob,
Expand Down Expand Up @@ -183,15 +190,24 @@ pub async fn scan_stacks_chain_with_predicate(
Err(e) => {
error!(ctx.expect_logger(), "unable to handle action {}", e);
}
Ok(StacksChainhookOccurrence::Http(request)) => {
send_request(request, &ctx).await;
}
Ok(StacksChainhookOccurrence::File(path, bytes)) => {
file_append(path, bytes, &ctx);
Ok(action) => {
actions_triggered += 1;
match action {
StacksChainhookOccurrence::Http(request) => {
send_request(request, &ctx).await;
}
StacksChainhookOccurrence::File(path, bytes) => {
file_append(path, bytes, &ctx);
}
StacksChainhookOccurrence::Data(_payload) => unreachable!(),
}
}
Ok(StacksChainhookOccurrence::Data(_payload)) => unreachable!(),
}
}
info!(
ctx.expect_logger(),
"{blocks_scanned} blocks scanned, {actions_triggered} actions triggered"
);

Ok(())
}
Expand All @@ -215,6 +231,12 @@ async fn download_dataset_if_required(config: &mut Config, ctx: &Context) -> boo
process::exit(1);
}
}
} else {
info!(
ctx.expect_logger(),
"Building in-memory chainstate from file {}",
destination_path.display()
);
}
config.add_local_tsv_source(&destination_path);
}
Expand Down

0 comments on commit 494cf3c

Please sign in to comment.