Skip to content
This repository has been archived by the owner on Mar 23, 2021. It is now read-only.

Commit

Permalink
Re-work Ethereum matching transaction logic
Browse files Browse the repository at this point in the history
Currently the logic for matching transactions is super generic.  We
would like an API that accepts only what is needed to match a
transaction and returns what we need from the matched transaction.  From
the COMIT protocol level i.e., rfc003 this is a 'per action' API i.e.,
funded/deployed/redeem/refunded.

The API is now

    matching_create_contract(...) -> Result<Transaction, Address>
    matching_events(...) -> Result<Transaction, Log>

Re-write ethereum integration tests. Use
`matching_transaction_and_receipt` to test the new matching logic.
  • Loading branch information
Tobin C. Harding committed Feb 24, 2020
1 parent 6699180 commit e35a6a9
Show file tree
Hide file tree
Showing 11 changed files with 518 additions and 1,422 deletions.
263 changes: 126 additions & 137 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 0 additions & 10 deletions cnd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,21 +59,11 @@ uuid = { version = "0.8", features = ["serde", "v4"] }
void = "1.0.2"
warp = { version = "0.2", default-features = false }

# These versions need to be "in sync".
# web3 0.8 gives us primitive-types 0.3.0
# primitive-types 0.3.0 with the "rlp" feature gives us "rlp" version 0.4.2
[dependencies.web3]
default-features = false
features = ["http"]
version = "0.8"

[dependencies.primitive-types]
features = ["rlp"]
version = "0.3.0"

[dependencies.rlp]
version = "0.4.2"

[dev-dependencies]
base64 = "0.11"
bitcoincore-rpc = "0.9.0"
Expand Down
318 changes: 244 additions & 74 deletions cnd/src/btsieve/ethereum/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
mod cache;
mod transaction_pattern;
mod web3_connector;

pub use self::{
cache::Cache,
transaction_pattern::{Event, Topic, TransactionPattern, TRANSACTION_STATUS_OK},
web3_connector::Web3Connector,
};
pub use self::{cache::Cache, web3_connector::Web3Connector};
use crate::{
btsieve::{BlockByHash, LatestBlock, Predates, ReceiptByHash},
ethereum::{Transaction, TransactionAndReceipt, TransactionReceipt, H256, U256},
ethereum::{Address, Bytes, Log, Transaction, TransactionReceipt, H256, U256},
Never,
};
use anyhow;
use chrono::NaiveDateTime;
use ethbloom::Input;
use futures_core::compat::Future01CompatExt;
use genawaiter::{
sync::{Co, Gen},
Expand All @@ -24,16 +20,119 @@ use std::collections::HashSet;
type Hash = H256;
type Block = crate::ethereum::Block<Transaction>;

pub async fn matching_transaction<C>(
pub const TRANSACTION_STATUS_OK: u32 = 1;

pub async fn matching_create_contract<C>(
blockchain_connector: C,
start_of_swap: NaiveDateTime,
bytecode: Bytes,
) -> anyhow::Result<(Transaction, Address)>
where
C: LatestBlock<Block = Option<Block>>
+ BlockByHash<Block = Option<Block>, BlockHash = Hash>
+ ReceiptByHash<Receipt = Option<TransactionReceipt>, TransactionHash = Hash>
+ Clone,
{
let (transaction, receipt) =
matching_transaction_and_receipt(blockchain_connector, start_of_swap, |transaction| {
// transaction.to address is None if, and only if, the transaction
// creates a contract.
transaction.to.is_none() && transaction.input == bytecode
})
.await?;

match receipt.contract_address {
Some(location) => Ok((transaction, location)),
None => Err(anyhow::anyhow!("contract address missing from receipt")),
}
}

pub async fn matching_event<C>(
blockchain_connector: C,
start_of_swap: NaiveDateTime,
event: Event,
action: &'static str,
) -> anyhow::Result<(Transaction, Log)>
where
C: LatestBlock<Block = Option<Block>>
+ BlockByHash<Block = Option<Block>, BlockHash = Hash>
+ ReceiptByHash<Receipt = Option<TransactionReceipt>, TransactionHash = Hash>
+ Clone,
{
matching_transaction_and_log(
blockchain_connector.clone(),
start_of_swap,
event.topics.clone(),
action,
|receipt| {
if event_exists_in_receipt(&event, &receipt) {
let log_msg = &event.topics[0].unwrap().0;
let log = receipt
.logs
.into_iter()
.find(|log| log.topics.contains(log_msg))
.ok_or_else(|| {
anyhow::anyhow!("Fund transaction receipt must contain transfer event")
})?;

return Ok(Some(log));
}

Ok(None)
},
)
.await
}

/// Fetch receipt from connector using transaction hash.
async fn fetch_receipt<C>(blockchain_connector: C, hash: Hash) -> anyhow::Result<TransactionReceipt>
where
C: ReceiptByHash<Receipt = Option<TransactionReceipt>, TransactionHash = Hash>,
{
let receipt = blockchain_connector
.receipt_by_hash(hash)
.compat()
.await?
.ok_or_else(|| {
anyhow::anyhow!(
"Could not get transaction receipt for transaction {:x}",
hash
)
})?;
Ok(receipt)
}

fn event_exists_in_receipt(event: &Event, receipt: &TransactionReceipt) -> bool {
match event {
Event { topics, .. } if topics.is_empty() => false,
Event { address, topics } => receipt.logs.iter().any(|tx_log| {
if address != &tx_log.address {
return false;
}

if tx_log.topics.len() == topics.len() {
tx_log.topics.iter().enumerate().all(|(index, tx_topic)| {
let topic = &topics[index];
topic.as_ref().map_or(true, |topic| tx_topic == &topic.0)
})
} else {
false
}
}),
}
}

pub async fn matching_transaction_and_receipt<C, F>(
connector: C,
pattern: TransactionPattern,
start_of_swap: NaiveDateTime,
) -> anyhow::Result<TransactionAndReceipt>
matcher: F,
) -> anyhow::Result<(Transaction, TransactionReceipt)>
where
C: LatestBlock<Block = Option<Block>>
+ BlockByHash<Block = Option<Block>, BlockHash = Hash>
+ ReceiptByHash<Receipt = Option<TransactionReceipt>, TransactionHash = Hash>
+ Clone,
F: Fn(Transaction) -> bool,
{
let mut block_generator = Gen::new({
let connector = connector.clone();
Expand All @@ -43,13 +142,98 @@ where
loop {
match block_generator.async_resume().await {
GeneratorState::Yielded(block) => {
if let Some(transaction_and_receipt) =
check_block_against_pattern(connector.clone(), block, pattern.clone()).await?
{
return Ok(transaction_and_receipt);
} else {
for transaction in block.transactions.into_iter() {
if matcher(transaction.clone()) {
let receipt = fetch_receipt(connector.clone(), transaction.hash).await?;
if !receipt.transaction_status_ok() {
// This can be caused by a failed attempt to complete an action,
// for example, sending a transaction with low gas.
tracing::warn!(
"transaction matched {:x} but status was NOT OK",
transaction.hash,
);
continue;
}
tracing::trace!("transaction matched {:x}", transaction.hash,);
return Ok((transaction, receipt));
}
}
}
GeneratorState::Complete(Err(e)) => return Err(e),
// By matching against the never type explicitly, we assert that the `Ok` value of the
// result is actually the never type and has not been changed since this
// line was written. The never type can never be constructed, so we cannot
// reach this line never anyway.
GeneratorState::Complete(Ok(never)) => match never {},
}
}
}

async fn matching_transaction_and_log<C, F>(
connector: C,
start_of_swap: NaiveDateTime,
topics: Vec<Option<Topic>>,
action: &str,
matcher: F,
) -> anyhow::Result<(Transaction, Log)>
where
C: LatestBlock<Block = Option<Block>>
+ BlockByHash<Block = Option<Block>, BlockHash = Hash>
+ ReceiptByHash<Receipt = Option<TransactionReceipt>, TransactionHash = Hash>
+ Clone,
F: Fn(TransactionReceipt) -> anyhow::Result<Option<Log>>,
{
let mut block_generator = Gen::new({
let connector = connector.clone();
|co| async move { find_relevant_blocks(connector, &co, start_of_swap).await }
});

loop {
match block_generator.async_resume().await {
GeneratorState::Yielded(block) => {
let block_hash = block
.hash
.ok_or_else(|| anyhow::anyhow!("block without hash"))?;

let maybe_contains_transaction = topics.iter().all(|topic| {
topic.as_ref().map_or(true, |topic| {
block
.logs_bloom
.contains_input(Input::Raw(topic.0.as_ref()))
})
});
if !maybe_contains_transaction {
tracing::trace!(
"bloom filter indicates that block does not contain {} transaction:
{:x}",
action,
block_hash,
);
continue;
}

tracing::trace!(
"bloom filter indicates that we should check the block for {} transactions: {:x}",
action,
block_hash,
);
for transaction in block.transactions.into_iter() {
let receipt = fetch_receipt(connector.clone(), transaction.hash).await?;
if let Some(log) = matcher(receipt.clone())? {
if !receipt.transaction_status_ok() {
// This can be caused by a failed attempt to complete an action,
// for example, sending a transaction with low gas.
tracing::warn!(
"{} transaction matched {:x} but status was NOT OK",
action,
transaction.hash,
);
continue;
}
tracing::trace!("{} transaction matched {:x}", action, transaction.hash,);
return Ok((transaction.clone(), log));
}
}
}
GeneratorState::Complete(Err(e)) => return Err(e),
// By matching against the never type explicitly, we assert that the `Ok` value of the
Expand Down Expand Up @@ -192,71 +376,57 @@ fn seen_block_or_predates_start_of_swap(
}
}

async fn check_block_against_pattern<C>(
connector: C,
block: Block,
pattern: TransactionPattern,
) -> anyhow::Result<Option<TransactionAndReceipt>>
where
C: ReceiptByHash<Receipt = Option<TransactionReceipt>, TransactionHash = Hash>,
{
let needs_receipt = pattern.needs_receipts(&block);
let block_hash = block
.hash
.ok_or_else(|| anyhow::anyhow!("block without hash"))?;
impl Predates for Block {
fn predates(&self, timestamp: NaiveDateTime) -> bool {
let unix_timestamp = timestamp.timestamp();

if needs_receipt {
tracing::debug!(
"bloom-filter of block {:x} suggests to fetch receipts for {:?}",
block_hash,
pattern
);
} else {
tracing::debug!(
"bloom-filter of block {:x} suggests to not fetch receipts for {:?}",
block_hash,
pattern
);
self.timestamp < U256::from(unix_timestamp)
}
}

for transaction in block.transactions.into_iter() {
let tx_hash = transaction.hash;

let receipt = connector
.receipt_by_hash(tx_hash)
.compat()
.await?
.ok_or_else(|| {
anyhow::anyhow!(
"Could not get transaction receipt for transaction {:x}",
tx_hash
)
})?;

let result = pattern.matches(&transaction, &receipt);

tracing::debug!(
"matching {:?} against transaction {:x} yielded {}",
pattern,
tx_hash,
result
);

if result {
return Ok(Some(TransactionAndReceipt {
transaction,
receipt,
}));
}
}
#[derive(Clone, Copy, Default, Eq, PartialEq, serde::Serialize, serdebug::SerDebug)]
#[serde(transparent)]
pub struct Topic(pub H256);

Ok(None)
/// Event works similar to web3 filters:
/// https://web3js.readthedocs.io/en/1.0/web3-eth-subscribe.html?highlight=filter#subscribe-logs
/// E.g. this `Event` would match this `Log`:
/// ```rust, ignore
/// Event {
/// address: "0xe46FB33e4DB653De84cB0E0E8b810A6c4cD39d59",
/// topics: [
/// None,
/// 0x000000000000000000000000e46fb33e4db653de84cb0e0e8b810a6c4cd39d59,
/// None,
/// ],
/// ```
/// ```rust, ignore
/// Log:
/// [ { address: "0xe46FB33e4DB653De84cB0E0E8b810A6c4cD39d59",
/// data: "0x123",
/// ..
/// topics:
/// [ "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
/// "0x000000000000000000000000e46fb33e4db653de84cb0e0e8b810a6c4cd39d59",
/// "0x000000000000000000000000d51ecee7414c4445534f74208538683702cbb3e4" ],
/// },
/// .. ] //Other data omitted
/// }
/// ```
#[derive(Clone, Default, Eq, PartialEq, serde::Serialize, serdebug::SerDebug)]
pub struct Event {
pub address: Address,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub topics: Vec<Option<Topic>>,
}

impl Predates for Block {
fn predates(&self, timestamp: NaiveDateTime) -> bool {
let unix_timestamp = timestamp.timestamp();
trait TransactionStatusOk {
fn transaction_status_ok(&self) -> bool;
}

self.timestamp < U256::from(unix_timestamp)
impl TransactionStatusOk for TransactionReceipt {
fn transaction_status_ok(&self) -> bool {
const TRANSACTION_STATUS_OK: u32 = 1;
self.status == Some(TRANSACTION_STATUS_OK.into())
}
}
Loading

0 comments on commit e35a6a9

Please sign in to comment.