Skip to content

Commit

Permalink
Handle null blocks from Filecoin EVM (#5294)
Browse files Browse the repository at this point in the history
Accommodate Filecoin EVM null blocks

---------

Co-authored-by: David Boreham <david@bozemanpass.com>
Co-authored-by: Roy Crihfield <roy@manteia.ltd>
  • Loading branch information
3 people committed Jun 6, 2024
1 parent 1c5ded5 commit 897cfa8
Show file tree
Hide file tree
Showing 20 changed files with 373 additions and 169 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -223,4 +223,4 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: check
args: --release
args: --release
3 changes: 2 additions & 1 deletion chain/arweave/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
_from: BlockNumber,
_to: BlockNumber,
_filter: &TriggerFilter,
) -> Result<Vec<BlockWithTriggers<Chain>>, Error> {
) -> Result<(Vec<BlockWithTriggers<Chain>>, BlockNumber), Error> {
panic!("Should never be called since not used by FirehoseBlockStream")
}

Expand Down Expand Up @@ -241,6 +241,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
&self,
_ptr: BlockPtr,
_offset: BlockNumber,
_root: Option<BlockHash>,
) -> Result<Option<codec::Block>, Error> {
panic!("Should never be called since FirehoseBlockStream cannot resolve it")
}
Expand Down
3 changes: 2 additions & 1 deletion chain/cosmos/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
&self,
_ptr: BlockPtr,
_offset: BlockNumber,
_root: Option<BlockHash>,
) -> Result<Option<codec::Block>, Error> {
panic!("Should never be called since not used by FirehoseBlockStream")
}
Expand All @@ -195,7 +196,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
_from: BlockNumber,
_to: BlockNumber,
_filter: &TriggerFilter,
) -> Result<Vec<BlockWithTriggers<Chain>>, Error> {
) -> Result<(Vec<BlockWithTriggers<Chain>>, BlockNumber), Error> {
panic!("Should never be called since not used by FirehoseBlockStream")
}

Expand Down
18 changes: 11 additions & 7 deletions chain/ethereum/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1140,13 +1140,6 @@ pub trait EthereumAdapter: Send + Sync + 'static {
Box<dyn std::future::Future<Output = Result<EthereumBlock, bc::IngestorError>> + Send + '_>,
>;

/// Load block pointer for the specified `block number`.
fn block_pointer_from_number(
&self,
logger: &Logger,
block_number: BlockNumber,
) -> Box<dyn Future<Item = BlockPtr, Error = bc::IngestorError> + Send>;

/// Find a block by its number, according to the Ethereum node.
///
/// Careful: don't use this function without considering race conditions.
Expand All @@ -1162,6 +1155,17 @@ pub trait EthereumAdapter: Send + Sync + 'static {
block_number: BlockNumber,
) -> Box<dyn Future<Item = Option<H256>, Error = Error> + Send>;

/// Finds the hash and number of the lowest non-null block with height greater than or equal to
/// the given number.
///
/// Note that the same caveats on reorgs apply as for `block_hash_by_block_number`, and must
/// also be considered for the resolved block, in case it is higher than the requested number.
async fn next_existing_ptr_to_number(
&self,
logger: &Logger,
block_number: BlockNumber,
) -> Result<BlockPtr, Error>;

/// Call the function of a smart contract. A return of `None` indicates
/// that the call reverted. The returned `CallSource` indicates where
/// the result came from for accounting purposes
Expand Down
11 changes: 6 additions & 5 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,9 +465,9 @@ impl Blockchain for Chain {
.clone();

adapter
.block_pointer_from_number(logger, number)
.compat()
.next_existing_ptr_to_number(logger, number)
.await
.map_err(From::from)
}
}
}
Expand Down Expand Up @@ -673,7 +673,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
from: BlockNumber,
to: BlockNumber,
filter: &TriggerFilter,
) -> Result<Vec<BlockWithTriggers<Chain>>, Error> {
) -> Result<(Vec<BlockWithTriggers<Chain>>, BlockNumber), Error> {
blocks_with_triggers(
self.chain_client.rpc()?.cheapest_with(&self.capabilities)?,
self.logger.clone(),
Expand Down Expand Up @@ -707,7 +707,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
BlockFinality::Final(_) => {
let adapter = self.chain_client.rpc()?.cheapest_with(&self.capabilities)?;
let block_number = block.number() as BlockNumber;
let blocks = blocks_with_triggers(
let (blocks, _) = blocks_with_triggers(
adapter,
logger.clone(),
self.chain_store.clone(),
Expand Down Expand Up @@ -747,11 +747,12 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
&self,
ptr: BlockPtr,
offset: BlockNumber,
root: Option<BlockHash>,
) -> Result<Option<BlockFinality>, Error> {
let block: Option<EthereumBlock> = self
.chain_store
.cheap_clone()
.ancestor_block(ptr, offset)
.ancestor_block(ptr, offset, root)
.await?
.map(json::from_value)
.transpose()?;
Expand Down
141 changes: 89 additions & 52 deletions chain/ethereum/src/ethereum_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,7 @@ impl EthereumAdapter {
stream::iter_ok::<_, Error>(block_nums.into_iter().map(move |block_num| {
let web3 = web3.clone();
retry(format!("load block ptr {}", block_num), &logger)
.when(|res| !res.is_ok() && !detect_null_block(res))
.no_limit()
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
.run(move || {
Expand All @@ -810,8 +811,16 @@ impl EthereumAdapter {
.boxed()
.compat()
.from_err()
.then(|res| {
if detect_null_block(&res) {
Ok(None)
} else {
Some(res).transpose()
}
})
}))
.buffered(ENV_VARS.block_batch_size)
.filter_map(|b| b)
.map(|b| b.into())
}

Expand All @@ -830,13 +839,12 @@ impl EthereumAdapter {
logger: &Logger,
block_ptr: BlockPtr,
) -> Result<bool, Error> {
let block_hash = self
.block_hash_by_block_number(logger, block_ptr.number)
.compat()
// TODO: This considers null blocks, but we could instead bail if we encounter one as a
// small optimization.
let canonical_block = self
.next_existing_ptr_to_number(logger, block_ptr.number)
.await?;
block_hash
.ok_or_else(|| anyhow!("Ethereum node is missing block #{}", block_ptr.number))
.map(|block_hash| block_hash == block_ptr.hash_as_h256())
Ok(canonical_block == block_ptr)
}

pub(crate) fn logs_in_block_range(
Expand Down Expand Up @@ -1079,6 +1087,16 @@ impl EthereumAdapter {
}
}

// Detects null blocks as can occur on Filecoin EVM chains, by checking for the FEVM-specific
// error returned when requesting such a null round. Ideally there should be a defined reponse or
// message for this case, or a check that is less dependent on the Filecoin implementation.
fn detect_null_block<T>(res: &Result<T, Error>) -> bool {
match res {
Ok(_) => false,
Err(e) => e.to_string().contains("requested epoch was a null round"),
}
}

#[async_trait]
impl EthereumAdapterTrait for EthereumAdapter {
fn provider(&self) -> &str {
Expand Down Expand Up @@ -1363,26 +1381,6 @@ impl EthereumAdapterTrait for EthereumAdapter {
Box::pin(block_future)
}

fn block_pointer_from_number(
&self,
logger: &Logger,
block_number: BlockNumber,
) -> Box<dyn Future<Item = BlockPtr, Error = IngestorError> + Send> {
Box::new(
self.block_hash_by_block_number(logger, block_number)
.and_then(move |block_hash_opt| {
block_hash_opt.ok_or_else(|| {
anyhow!(
"Ethereum node could not find start block hash by block number {}",
&block_number
)
})
})
.from_err()
.map(move |block_hash| BlockPtr::from((block_hash, block_number))),
)
}

fn block_hash_by_block_number(
&self,
logger: &Logger,
Expand Down Expand Up @@ -1448,6 +1446,54 @@ impl EthereumAdapterTrait for EthereumAdapter {
Box::new(self.code(logger, address, block_ptr))
}

async fn next_existing_ptr_to_number(
&self,
logger: &Logger,
block_number: BlockNumber,
) -> Result<BlockPtr, Error> {
let mut next_number = block_number;
loop {
let retry_log_message = format!(
"eth_getBlockByNumber RPC call for block number {}",
next_number
);
let web3 = self.web3.clone();
let logger = logger.clone();
let res = retry(retry_log_message, &logger)
.when(|res| !res.is_ok() && !detect_null_block(res))
.no_limit()
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
.run(move || {
let web3 = web3.cheap_clone();
async move {
web3.eth()
.block(BlockId::Number(next_number.into()))
.await
.map(|block_opt| block_opt.and_then(|block| block.hash))
.map_err(Error::from)
}
})
.await
.map_err(move |e| {
e.into_inner().unwrap_or_else(move || {
anyhow!(
"Ethereum node took too long to return data for block #{}",
next_number
)
})
});
if detect_null_block(&res) {
next_number += 1;
continue;
}
return match res {
Ok(Some(hash)) => Ok(BlockPtr::new(hash.into(), next_number)),
Ok(None) => Err(anyhow!("Block {} does not contain hash", next_number)),
Err(e) => Err(e),
};
}
}

async fn contract_call(
&self,
logger: &Logger,
Expand Down Expand Up @@ -1652,9 +1698,10 @@ impl EthereumAdapterTrait for EthereumAdapter {
}
}

/// Returns blocks with triggers, corresponding to the specified range and filters.
/// Returns blocks with triggers, corresponding to the specified range and filters; and the resolved
/// `to` block, which is the nearest non-null block greater than or equal to the passed `to` block.
/// If a block contains no triggers, there may be no corresponding item in the stream.
/// However the `to` block will always be present, even if triggers are empty.
/// However the (resolved) `to` block will always be present, even if triggers are empty.
///
/// Careful: don't use this function without considering race conditions.
/// Chain reorgs could happen at any time, and could affect the answer received.
Expand All @@ -1674,7 +1721,7 @@ pub(crate) async fn blocks_with_triggers(
to: BlockNumber,
filter: &TriggerFilter,
unified_api_version: UnifiedMappingApiVersion,
) -> Result<Vec<BlockWithTriggers<crate::Chain>>, Error> {
) -> Result<(Vec<BlockWithTriggers<crate::Chain>>, BlockNumber), Error> {
// Each trigger filter needs to be queried for the same block range
// and the blocks yielded need to be deduped. If any error occurs
// while searching for a trigger type, the entire operation fails.
Expand All @@ -1685,6 +1732,13 @@ pub(crate) async fn blocks_with_triggers(
let trigger_futs: FuturesUnordered<BoxFuture<Result<Vec<EthereumTrigger>, anyhow::Error>>> =
FuturesUnordered::new();

// Resolve the nearest non-null "to" block
debug!(logger, "Finding nearest valid `to` block to {}", to);

let to_ptr = eth.next_existing_ptr_to_number(&logger, to).await?;
let to_hash = to_ptr.hash_as_h256();
let to = to_ptr.block_number();

// This is for `start` triggers which can be initialization handlers which needs to be run
// before all other triggers
if filter.block.trigger_every_block {
Expand Down Expand Up @@ -1753,28 +1807,11 @@ pub(crate) async fn blocks_with_triggers(
trigger_futs.push(block_future)
}

// Get hash for "to" block
let to_hash_fut = eth
.block_hash_by_block_number(&logger, to)
.and_then(|hash| match hash {
Some(hash) => Ok(hash),
None => {
warn!(logger,
"Ethereum endpoint is behind";
"url" => eth.provider()
);
bail!("Block {} not found in the chain", to)
}
})
.compat();

// Join on triggers and block hash resolution
let (triggers, to_hash) = futures03::join!(trigger_futs.try_concat(), to_hash_fut);

// Unpack and handle possible errors in the previously joined futures
let triggers =
triggers.with_context(|| format!("Failed to obtain triggers for block {}", to))?;
let to_hash = to_hash.with_context(|| format!("Failed to infer hash for block {}", to))?;
// Join on triggers, unpack and handle possible errors
let triggers = trigger_futs
.try_concat()
.await
.with_context(|| format!("Failed to obtain triggers for block {}", to))?;

let mut block_hashes: HashSet<H256> =
triggers.iter().map(EthereumTrigger::block_hash).collect();
Expand Down Expand Up @@ -1839,7 +1876,7 @@ pub(crate) async fn blocks_with_triggers(
));
}

Ok(blocks)
Ok((blocks, to))
}

pub(crate) async fn get_calls(
Expand Down
3 changes: 2 additions & 1 deletion chain/near/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
_from: BlockNumber,
_to: BlockNumber,
_filter: &TriggerFilter,
) -> Result<Vec<BlockWithTriggers<Chain>>, Error> {
) -> Result<(Vec<BlockWithTriggers<Chain>>, BlockNumber), Error> {
panic!("Should never be called since not used by FirehoseBlockStream")
}

Expand Down Expand Up @@ -390,6 +390,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
&self,
_ptr: BlockPtr,
_offset: BlockNumber,
_root: Option<BlockHash>,
) -> Result<Option<codec::Block>, Error> {
panic!("Should never be called since FirehoseBlockStream cannot resolve it")
}
Expand Down
3 changes: 2 additions & 1 deletion chain/starknet/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
&self,
_ptr: BlockPtr,
_offset: BlockNumber,
_root: Option<BlockHash>,
) -> Result<Option<codec::Block>, Error> {
panic!("Should never be called since FirehoseBlockStream cannot resolve it")
}
Expand All @@ -373,7 +374,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
_from: BlockNumber,
_to: BlockNumber,
_filter: &crate::adapter::TriggerFilter,
) -> Result<Vec<BlockWithTriggers<Chain>>, Error> {
) -> Result<(Vec<BlockWithTriggers<Chain>>, BlockNumber), Error> {
panic!("Should never be called since not used by FirehoseBlockStream")
}

Expand Down
Loading

0 comments on commit 897cfa8

Please sign in to comment.