Skip to content

Commit

Permalink
fix: infinite loop; the program now poll efficiently
Browse files Browse the repository at this point in the history
tawan
  • Loading branch information
catflyflyfly committed May 27, 2022
1 parent 36e64ed commit d314132
Showing 1 changed file with 73 additions and 29 deletions.
102 changes: 73 additions & 29 deletions src/event_watcher_http.rs
@@ -1,11 +1,8 @@
use std::str::FromStr;
use std::time;
use web3::{
api::Eth,
confirm,
futures::{self, StreamExt},
types::{BlockNumber, Filter, FilterBuilder, Log, H160, H256, U64},
Transport,
};

#[tokio::main]
Expand All @@ -18,7 +15,7 @@ async fn main() -> web3::contract::Result<()> {
//
let start_contract_block = BlockNumber::Earliest;
let confirmations = 15;
let poll_interval = time::Duration::from_millis(1000);
let poll_interval = time::Duration::from_millis(100);
//
//

Expand All @@ -30,6 +27,7 @@ async fn main() -> web3::contract::Result<()> {
futures::pin_mut!(logs_stream);

while let Some(result_log) = logs_stream.next().await {
println!("log received.");
let log: Log = match result_log {
Ok(log) => log,
Err(e) => {
Expand All @@ -38,31 +36,59 @@ async fn main() -> web3::contract::Result<()> {
}
};

let log_hash = match log.block_hash {
Some(log_hash) => log_hash,
let log_block_number = match log.block_number {
Some(log_block_number) => log_block_number,
None => {
println!("how could a log have no hash ???");
println!("log has no block number");
continue;
}
};

let eth = web3.eth();
confirm::wait_for_confirmations(
web3.eth(),
web3.eth_filter(),
poll_interval,
confirmations,
|| transaction_receipt_block_number_check(&eth, log_hash),
)
.await?;

println!("----------------------------------------");
println!("Block Number: {}", log.block_number.unwrap());
println!("Event Address: {:?}", log.topics[0]);
println!("From Address: {}", log.topics[1]);
println!("To Address: {}", log.topics[2]);
println!("Data: {}", format!("0x{}", hex::encode(log.data.0)));
println!("event block number: {}", log.block_number.unwrap());
println!("current block number: {}", web3.eth().block_number().await?);

let mut current_block_number = web3.eth().block_number().await?;

if !is_confirmed(log_block_number, current_block_number, confirmations) {
println!(
"confirmation needed: {}",
log_confirmation_needed(log_block_number, current_block_number, confirmations)
as usize
);
println!("fetching latest block number...");
let block_stream = web3
.eth_filter()
.create_blocks_filter()
.await?
.stream(poll_interval)
.skip(
log_confirmation_needed(log_block_number, current_block_number, confirmations)
as usize,
);
futures::pin_mut!(block_stream);

while !is_confirmed(log_block_number, current_block_number, confirmations) {
println!(
"log block number {} vs current block number {}; not confirmed",
log.block_number.unwrap().low_u64(),
web3.eth().block_number().await?
);
let _ = block_stream.next().await;
current_block_number = web3.eth().block_number().await?;
}
}

println!(
"log block number {} vs current block number {}; confirmed !",
log.block_number.unwrap().low_u64(),
web3.eth().block_number().await?
);
println!("Log Confirmed.");
println!("Current Block Number: {}", web3.eth().block_number().await?);

process_log(log).await;
}

Ok(())
}

Expand Down Expand Up @@ -91,10 +117,28 @@ fn filter(start_at: BlockNumber) -> Filter {
.build()
}

async fn transaction_receipt_block_number_check<T: Transport>(
eth: &Eth<T>,
hash: H256,
) -> web3::error::Result<Option<U64>> {
let receipt = eth.transaction_receipt(hash).await?;
Ok(receipt.and_then(|receipt| receipt.block_number))
fn is_confirmed(log_block_number: U64, current_block_number: U64, confirmations: u64) -> bool {
log_confirmation_needed(log_block_number, current_block_number, confirmations) == 0
}

fn log_confirmation_needed(
log_block_number: U64,
current_block_number: U64,
confirmations: u64,
) -> u64 {
(log_block_number.low_u64() + confirmations)
.checked_sub(current_block_number.low_u64())
.unwrap_or(0)
}

async fn process_log(log: Log) {
println!("----------------------------------------");
println!("Block Number: {}", log.block_number.unwrap());
println!("Event Address: {:?}", log.topics[0]);
println!("From Address: {}", log.topics[1]);
println!("To Address: {}", log.topics[2]);
println!("Data: {}", format!("0x{}", hex::encode(log.data.0)));
println!("----------------------------------------");
println!("*****");
println!("*****");
}

0 comments on commit d314132

Please sign in to comment.