Skip to content

Commit

Permalink
fix: stateful observers
Browse files Browse the repository at this point in the history
# Conflicts:
#	components/ordhook-core/src/core/protocol/inscription_sequencing.rs
  • Loading branch information
lgalabru committed Dec 1, 2023
1 parent fe8715b commit fa7cc42
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 47 deletions.
2 changes: 1 addition & 1 deletion components/ordhook-core/Cargo.toml
Expand Up @@ -33,7 +33,7 @@ anyhow = { version = "1.0.56", features = ["backtrace"] }
schemars = { version = "0.8.10", git = "https://github.com/hirosystems/schemars.git", branch = "feat-chainhook-fixes" }
progressing = '3'
futures = "0.3.28"
rocksdb = { version = "0.21.0", default-features = false }
rocksdb = { version = "0.21.0", default-features = false, features = ["snappy"] }
pprof = { version = "0.13.0", features = ["flamegraph"], optional = true }

# [profile.release]
Expand Down
Expand Up @@ -23,8 +23,8 @@ use crate::{
find_blessed_inscription_with_ordinal_number,
find_latest_cursed_inscription_number_at_block_height,
find_latest_inscription_number_at_block_height, format_satpoint_to_watch,
update_inscriptions_with_block, update_sequence_metadata_with_block, LazyBlockTransaction,
TraversalResult,
update_inscriptions_with_block, update_sequence_metadata_with_block,
LazyBlockTransaction, TraversalResult,
},
ord::height::Height,
};
Expand Down
10 changes: 4 additions & 6 deletions components/ordhook-core/src/scan/bitcoin.rs
Expand Up @@ -7,7 +7,7 @@ use crate::core::protocol::inscription_sequencing::consolidate_block_with_pre_co
use crate::db::{get_any_entry_in_ordinal_activities, open_readonly_ordhook_db_conn};
use crate::download::download_ordinals_dataset_if_required;
use crate::service::observers::{
open_readwrite_observers_db_conn_or_panic, update_observer_progress, ObserverReport,
open_readwrite_observers_db_conn_or_panic, update_observer_progress,
};
use chainhook_sdk::bitcoincore_rpc::RpcApi;
use chainhook_sdk::bitcoincore_rpc::{Auth, Client};
Expand Down Expand Up @@ -77,7 +77,8 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(

info!(
ctx.expect_logger(),
"Starting predicate evaluation on Bitcoin blocks",
"Starting predicate evaluation on {} Bitcoin blocks",
block_heights_to_scan.len()
);
let mut actions_triggered = 0;
let mut err_count = 0;
Expand All @@ -88,12 +89,9 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
};
let bitcoin_config = event_observer_config.get_bitcoin_config();
let mut number_of_blocks_scanned = 0;
let mut last_block_height = 0;
let http_client = build_http_client();

while let Some(current_block_height) = block_heights_to_scan.pop_front() {
last_block_height = current_block_height;

let mut inscriptions_db_conn =
open_readonly_ordhook_db_conn(&config.expected_cache_path(), ctx)?;

Expand Down Expand Up @@ -174,7 +172,7 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
open_readwrite_observers_db_conn_or_panic(&config.expected_cache_path(), &ctx);
update_observer_progress(
&predicate_spec.uuid,
last_block_height,
current_block_height,
&observers_db_conn,
&ctx,
)
Expand Down
58 changes: 24 additions & 34 deletions components/ordhook-core/src/service/mod.rs
Expand Up @@ -119,19 +119,11 @@ impl Service {
// If HTTP Predicates API is on, we start:
// - Thread pool in charge of performing replays
// - API server
if self.config.is_http_api_enabled() {
self.start_main_runloop_with_dynamic_predicates(
&observer_command_tx,
observer_event_rx,
predicate_activity_relayer,
)?;
} else {
self.start_main_runloop(
&observer_command_tx,
observer_event_rx,
predicate_activity_relayer,
)?;
}
self.start_main_runloop_with_dynamic_predicates(
&observer_command_tx,
observer_event_rx,
predicate_activity_relayer,
)?;
Ok(())
}

Expand Down Expand Up @@ -224,10 +216,6 @@ impl Service {
crossbeam_channel::Sender<BitcoinChainhookOccurrencePayload>,
>,
) -> Result<(), String> {
let PredicatesApi::On(ref api_config) = self.config.http_api else {
return Ok(());
};

let (bitcoin_scan_op_tx, bitcoin_scan_op_rx) = crossbeam_channel::unbounded();
let ctx = self.ctx.clone();
let config = self.config.clone();
Expand All @@ -243,24 +231,26 @@ impl Service {
})
.expect("unable to spawn thread");

info!(
self.ctx.expect_logger(),
"Listening on port {} for chainhook predicate registrations", api_config.http_port
);
let ctx = self.ctx.clone();
let api_config = api_config.clone();
let moved_observer_command_tx = observer_command_tx.clone();
let db_dir_path = self.config.expected_cache_path();
// Test and initialize a database connection
let _ = hiro_system_kit::thread_named("HTTP Predicate API").spawn(move || {
let future = start_predicate_api_server(
api_config.http_port,
db_dir_path,
moved_observer_command_tx,
ctx,
if let PredicatesApi::On(ref api_config) = self.config.http_api {
info!(
self.ctx.expect_logger(),
"Listening on port {} for chainhook predicate registrations", api_config.http_port
);
let _ = hiro_system_kit::nestable_block_on(future);
});
let ctx = self.ctx.clone();
let api_config = api_config.clone();
let moved_observer_command_tx = observer_command_tx.clone();
let db_dir_path = self.config.expected_cache_path();
// Test and initialize a database connection
let _ = hiro_system_kit::thread_named("HTTP Predicate API").spawn(move || {
let future = start_predicate_api_server(
api_config.http_port,
db_dir_path,
moved_observer_command_tx,
ctx,
);
let _ = hiro_system_kit::nestable_block_on(future);
});
}

loop {
let event = match observer_event_rx.recv() {
Expand Down
11 changes: 7 additions & 4 deletions components/ordhook-core/src/service/observers.rs
Expand Up @@ -61,6 +61,7 @@ pub fn insert_entry_in_observers(
observers_db_conn: &Connection,
ctx: &Context,
) {
remove_entry_from_observers(&spec.uuid(), observers_db_conn, ctx);
while let Err(e) = observers_db_conn.execute(
"INSERT INTO observers (uuid, spec, streaming_enabled, last_block_height_update) VALUES (?1, ?2, ?3, ?4)",
rusqlite::params![&spec.uuid(), json!(spec).to_string(), report.streaming_enabled, report.last_block_height_update],
Expand Down Expand Up @@ -238,7 +239,7 @@ pub fn create_and_consolidate_chainhook_config_with_predicates(
BitcoinChainhookSpecification {
uuid: format!("ordhook-internal-trigger"),
owner_uuid: None,
name: format!("ordhook"),
name: format!("ordhook-internal-trigger"),
network: config.network.bitcoin_network.clone(),
version: 1,
blocks: None,
Expand All @@ -259,7 +260,7 @@ pub fn create_and_consolidate_chainhook_config_with_predicates(
));
}

let observers_db_conn = open_readwrite_observers_db_conn(&config.expected_cache_path(), ctx)?;
let observers_db_conn = initialize_observers_db(&config.expected_cache_path(), ctx);

let mut observers_to_catchup = vec![];
let mut observers_to_clean_up = vec![];
Expand Down Expand Up @@ -340,13 +341,15 @@ pub fn create_and_consolidate_chainhook_config_with_predicates(
action: observer.action,
},
);
full_specs.push(BitcoinChainhookFullSpecification {
let full_spec = BitcoinChainhookFullSpecification {
uuid: observer.uuid,
owner_uuid: observer.owner_uuid,
name: observer.name,
version: observer.version,
networks,
});
};
info!(ctx.expect_logger(), "Observer '{}' to be caught-up (last block sent: {}, tip: {})", full_spec.name, report.last_block_height_update, chain_tip_height);
full_specs.push(full_spec);
}

Ok((chainhook_config, full_specs))
Expand Down

0 comments on commit fa7cc42

Please sign in to comment.