Skip to content

Commit

Permalink
fix: serialize handlers in one thread
Browse files Browse the repository at this point in the history
  • Loading branch information
lgalabru committed Aug 23, 2023
1 parent 3443915 commit cdfc264
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 45 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions components/ordhook-core/Cargo.toml
Expand Up @@ -12,8 +12,8 @@ redis = "0.21.5"
serde-redis = "0.12.0"
hex = "0.4.3"
rand = "0.8.5"
chainhook-sdk = { version = "=0.8.3", default-features = false, features = ["zeromq"] }
# chainhook-sdk = { version = "=0.8.3", path = "../../../chainhook/components/chainhook-sdk", default-features = false, features = ["zeromq"] }
chainhook-sdk = { version = "=0.8.4", default-features = false, features = ["zeromq"] }
# chainhook-sdk = { version = "=0.8.4", path = "../../../chainhook/components/chainhook-sdk", default-features = false, features = ["zeromq"] }
hiro-system-kit = "0.1.0"
reqwest = { version = "0.11", features = ["stream", "json"] }
tokio = { version = "=1.24", features = ["full"] }
Expand Down
64 changes: 23 additions & 41 deletions components/ordhook-core/src/service/mod.rs
Expand Up @@ -33,6 +33,7 @@ use chainhook_sdk::observer::{
};
use chainhook_sdk::types::{BitcoinBlockData, BlockIdentifier};
use chainhook_sdk::utils::Context;
use crossbeam_channel::select;
use crossbeam_channel::unbounded;
use dashmap::DashMap;
use fxhash::FxHasher;
Expand Down Expand Up @@ -132,9 +133,9 @@ impl Service {

// Sidecar channels setup
let (observer_command_tx, observer_command_rx) = channel();
let (block_mutator_in_tx, block_mutator_in_rx) = channel();
let (block_mutator_out_tx, block_mutator_out_rx) = channel();
let (chain_event_notifier_tx, chain_event_notifier_rx) = channel();
let (block_mutator_in_tx, block_mutator_in_rx) = crossbeam_channel::unbounded();
let (block_mutator_out_tx, block_mutator_out_rx) = crossbeam_channel::unbounded();
let (chain_event_notifier_tx, chain_event_notifier_rx) = crossbeam_channel::unbounded();
let observer_sidecar = ObserverSidecar {
bitcoin_blocks_mutator: Some((block_mutator_in_tx, block_mutator_out_rx)),
bitcoin_chain_event_notifier: Some(chain_event_notifier_tx),
Expand Down Expand Up @@ -198,46 +199,27 @@ impl Service {
let config = self.config.clone();
let cache_l2 = traversals_cache.clone();

let _ = hiro_system_kit::thread_named("Sidecar block mutator").spawn(move || loop {
let (mut blocks_to_mutate, blocks_ids_to_rollback) = match block_mutator_in_rx.recv() {
Ok(block) => block,
Err(e) => {
error!(
ctx.expect_logger(),
"Error: broken channel {}",
e.to_string()
);
break;
}
};
chainhook_sidecar_mutate_blocks(
&mut blocks_to_mutate,
&blocks_ids_to_rollback,
&cache_l2,
&config,
&ctx,
);
let _ = block_mutator_out_tx.send(blocks_to_mutate);
});

let ctx = self.ctx.clone();
let config = self.config.clone();
let _ =
hiro_system_kit::thread_named("Chain event notification handler").spawn(move || loop {
let command = match chain_event_notifier_rx.recv() {
Ok(cmd) => cmd,
Err(e) => {
error!(
ctx.expect_logger(),
"Error: broken channel {}",
e.to_string()
let _ = hiro_system_kit::thread_named("Observer Sidecar Runloop").spawn(move || loop {
select! {
recv(block_mutator_in_rx) -> msg => {
if let Ok((mut blocks_to_mutate, blocks_ids_to_rollback)) = msg {
chainhook_sidecar_mutate_blocks(
&mut blocks_to_mutate,
&blocks_ids_to_rollback,
&cache_l2,
&config,
&ctx,
);
break;
let _ = block_mutator_out_tx.send(blocks_to_mutate);
}
};

chainhook_sidecar_mutate_ordhook_db(command, &config, &ctx)
});
}
recv(chain_event_notifier_rx) -> msg => {
if let Ok(command) = msg {
chainhook_sidecar_mutate_ordhook_db(command, &config, &ctx)
}
}
}
});

loop {
let event = match observer_event_rx.recv() {
Expand Down

0 comments on commit cdfc264

Please sign in to comment.