Skip to content

Commit

Permalink
feat: implement zmq runloop
Browse files Browse the repository at this point in the history
  • Loading branch information
lgalabru committed Apr 4, 2023
1 parent d2e328a commit c6c1c0e
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 34 deletions.
2 changes: 1 addition & 1 deletion components/chainhook-event-observer/src/hord/db/mod.rs
Expand Up @@ -702,7 +702,7 @@ pub fn retrieve_satoshi_point_using_local_storage(
let res = match find_compacted_block_at_block_height(ordinal_block_number, &hord_db_conn) {
Some(res) => res,
None => {
return Err(format!("unable to retrieve block #{ordinal_block_number}"));
return Err(format!("block #{ordinal_block_number} not in database"));
}
};

Expand Down
8 changes: 4 additions & 4 deletions components/chainhook-event-observer/src/hord/mod.rs
Expand Up @@ -107,7 +107,7 @@ pub fn update_hord_db_and_augment_bitcoin_block(
let mut traversals = HashMap::new();
if !transactions_ids.is_empty() {
let expected_traversals = transactions_ids.len();
let (traversal_tx, traversal_rx) = channel::<(TransactionIdentifier, TraversalResult)>();
let (traversal_tx, traversal_rx) = channel::<(TransactionIdentifier, _)>();
let traversal_data_pool = ThreadPool::new(10);

for transaction_id in transactions_ids.into_iter() {
Expand All @@ -123,16 +123,16 @@ pub fn update_hord_db_and_augment_bitcoin_block(
&transaction_id,
0,
&moved_ctx,
)
.unwrap();
);
let _ = moved_traversal_tx.send((transaction_id, traversal));
});
}

let mut traversals_received = 0;
while let Ok((transaction_identifier, traversal_result)) = traversal_rx.recv() {
traversals_received += 1;
traversals.insert(transaction_identifier, traversal_result);
let traversal = traversal_result?;
traversals.insert(transaction_identifier, traversal);
if traversals_received == expected_traversals {
break;
}
Expand Down
100 changes: 71 additions & 29 deletions components/chainhook-event-observer/src/observer/mod.rs
Expand Up @@ -18,6 +18,7 @@ use crate::indexer::bitcoin::{
retrieve_full_block_breakdown_with_retry, standardize_bitcoin_block, BitcoinBlockFullBreakdown,
NewBitcoinBlock,
};
use crate::indexer::fork_scratch_pad::ForkScratchPad;
use crate::indexer::{self, Indexer, IndexerConfig};
use crate::utils::{send_request, Context};

Expand Down Expand Up @@ -408,6 +409,7 @@ pub async fn start_event_observer(
if let BitcoinBlockSignaling::ZeroMQ(ref bitcoind_zmq_url) = config.bitcoin_block_signaling {
let bitcoind_zmq_url = bitcoind_zmq_url.clone();
let ctx_moved = ctx.clone();
let bitcoin_config = config.get_bitcoin_config();
hiro_system_kit::thread_named("Bitcoind zmq listener")
.spawn(move || {
ctx_moved.try_log(|logger| {
Expand All @@ -417,38 +419,78 @@ pub async fn start_event_observer(
)
});

let _ = hiro_system_kit::nestable_block_on(async move {
let mut socket = zeromq::SubSocket::new();
socket
.connect(&bitcoind_zmq_url)
.await
.expect("Failed to connect");
let _: Result<(), Box<dyn Error>> =
hiro_system_kit::nestable_block_on(async move {
let mut socket = zeromq::SubSocket::new();

socket.subscribe("hashblock").await?;
ctx_moved.try_log(|logger| {
slog::info!(logger, "Waiting for ZMQ messages from bitcoind")
});
socket
.connect(&bitcoind_zmq_url)
.await
.expect("Failed to connect");

loop {
let message = match socket.recv().await {
Ok(message) => message,
Err(e) => {
ctx_moved.try_log(|logger| {
slog::info!(
logger,
"Unable to receive ZMQ message: {}",
e.to_string()
)
});
continue;
socket.subscribe("").await?;
ctx_moved.try_log(|logger| {
slog::info!(logger, "Waiting for ZMQ messages from bitcoind")
});

let mut bitcoin_blocks_pool = ForkScratchPad::new();

loop {
let message = match socket.recv().await {
Ok(message) => message,
Err(e) => {
ctx_moved.try_log(|logger| {
slog::error!(
logger,
"Unable to receive ZMQ message: {}",
e.to_string()
)
});
continue;
}
};
let block_hash = hex::encode(message.get(1).unwrap().to_vec());

let block = match retrieve_full_block_breakdown_with_retry(
&block_hash,
&bitcoin_config,
&ctx_moved,
)
.await
{
Ok(block) => block,
Err(e) => {
ctx_moved.try_log(|logger| {
slog::warn!(
logger,
"unable to retrieve_full_block_breakdown: {}",
e.to_string()
)
});
continue;
}
};

ctx_moved.try_log(|logger| {
slog::info!(
logger,
"Bitcoin block #{} dispatched for processing",
block.height
)
});

let header = block.get_block_header();
let _ = observer_commands_tx
.send(ObserverCommand::ProcessBitcoinBlock(block));

if let Ok(Some(event)) =
bitcoin_blocks_pool.process_header(header, &ctx_moved)
{
let _ = observer_commands_tx
.send(ObserverCommand::PropagateBitcoinChainEvent(event));
}
};
let block_hash: String =
String::from_utf8(message.get(0).unwrap().to_vec())?;
println!("Received {}", block_hash);
}
Ok::<(), Box<dyn Error>>(())
});
}
});
})
.expect("unable to spawn thread");
}
Expand Down

0 comments on commit c6c1c0e

Please sign in to comment.