diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b66eae60437..0475879f52c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -223,4 +223,4 @@ jobs: uses: actions-rs/cargo@v1 with: command: check - args: --release \ No newline at end of file + args: --release diff --git a/chain/arweave/src/chain.rs b/chain/arweave/src/chain.rs index 2a49859e48c..0a79a0279f3 100644 --- a/chain/arweave/src/chain.rs +++ b/chain/arweave/src/chain.rs @@ -192,7 +192,7 @@ impl TriggersAdapterTrait for TriggersAdapter { _from: BlockNumber, _to: BlockNumber, _filter: &TriggerFilter, - ) -> Result>, Error> { + ) -> Result<(Vec>, BlockNumber), Error> { panic!("Should never be called since not used by FirehoseBlockStream") } @@ -241,6 +241,7 @@ impl TriggersAdapterTrait for TriggersAdapter { &self, _ptr: BlockPtr, _offset: BlockNumber, + _root: Option, ) -> Result, Error> { panic!("Should never be called since FirehoseBlockStream cannot resolve it") } diff --git a/chain/cosmos/src/chain.rs b/chain/cosmos/src/chain.rs index 8b151015861..383c40d4478 100644 --- a/chain/cosmos/src/chain.rs +++ b/chain/cosmos/src/chain.rs @@ -186,6 +186,7 @@ impl TriggersAdapterTrait for TriggersAdapter { &self, _ptr: BlockPtr, _offset: BlockNumber, + _root: Option, ) -> Result, Error> { panic!("Should never be called since not used by FirehoseBlockStream") } @@ -195,7 +196,7 @@ impl TriggersAdapterTrait for TriggersAdapter { _from: BlockNumber, _to: BlockNumber, _filter: &TriggerFilter, - ) -> Result>, Error> { + ) -> Result<(Vec>, BlockNumber), Error> { panic!("Should never be called since not used by FirehoseBlockStream") } diff --git a/chain/ethereum/src/adapter.rs b/chain/ethereum/src/adapter.rs index aeaa62b6e32..f78ff1b0bec 100644 --- a/chain/ethereum/src/adapter.rs +++ b/chain/ethereum/src/adapter.rs @@ -1140,13 +1140,6 @@ pub trait EthereumAdapter: Send + Sync + 'static { Box> + Send + '_>, >; - /// Load block pointer for the specified `block number`. - fn block_pointer_from_number( - &self, - logger: &Logger, - block_number: BlockNumber, - ) -> Box + Send>; - /// Find a block by its number, according to the Ethereum node. /// /// Careful: don't use this function without considering race conditions. @@ -1162,6 +1155,17 @@ pub trait EthereumAdapter: Send + Sync + 'static { block_number: BlockNumber, ) -> Box, Error = Error> + Send>; + /// Finds the hash and number of the lowest non-null block with height greater than or equal to + /// the given number. + /// + /// Note that the same caveats on reorgs apply as for `block_hash_by_block_number`, and must + /// also be considered for the resolved block, in case it is higher than the requested number. + async fn next_existing_ptr_to_number( + &self, + logger: &Logger, + block_number: BlockNumber, + ) -> Result; + /// Call the function of a smart contract. A return of `None` indicates /// that the call reverted. The returned `CallSource` indicates where /// the result came from for accounting purposes diff --git a/chain/ethereum/src/chain.rs b/chain/ethereum/src/chain.rs index 990995894b0..be7fb2b431e 100644 --- a/chain/ethereum/src/chain.rs +++ b/chain/ethereum/src/chain.rs @@ -465,9 +465,9 @@ impl Blockchain for Chain { .clone(); adapter - .block_pointer_from_number(logger, number) - .compat() + .next_existing_ptr_to_number(logger, number) .await + .map_err(From::from) } } } @@ -673,7 +673,7 @@ impl TriggersAdapterTrait for TriggersAdapter { from: BlockNumber, to: BlockNumber, filter: &TriggerFilter, - ) -> Result>, Error> { + ) -> Result<(Vec>, BlockNumber), Error> { blocks_with_triggers( self.chain_client.rpc()?.cheapest_with(&self.capabilities)?, self.logger.clone(), @@ -707,7 +707,7 @@ impl TriggersAdapterTrait for TriggersAdapter { BlockFinality::Final(_) => { let adapter = self.chain_client.rpc()?.cheapest_with(&self.capabilities)?; let block_number = block.number() as BlockNumber; - let blocks = blocks_with_triggers( + let (blocks, _) = blocks_with_triggers( adapter, logger.clone(), self.chain_store.clone(), @@ -747,11 +747,12 @@ impl TriggersAdapterTrait for TriggersAdapter { &self, ptr: BlockPtr, offset: BlockNumber, + root: Option, ) -> Result, Error> { let block: Option = self .chain_store .cheap_clone() - .ancestor_block(ptr, offset) + .ancestor_block(ptr, offset, root) .await? .map(json::from_value) .transpose()?; diff --git a/chain/ethereum/src/ethereum_adapter.rs b/chain/ethereum/src/ethereum_adapter.rs index 6b1ce416ba2..e282a776417 100644 --- a/chain/ethereum/src/ethereum_adapter.rs +++ b/chain/ethereum/src/ethereum_adapter.rs @@ -791,6 +791,7 @@ impl EthereumAdapter { stream::iter_ok::<_, Error>(block_nums.into_iter().map(move |block_num| { let web3 = web3.clone(); retry(format!("load block ptr {}", block_num), &logger) + .when(|res| !res.is_ok() && !detect_null_block(res)) .no_limit() .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) .run(move || { @@ -810,8 +811,16 @@ impl EthereumAdapter { .boxed() .compat() .from_err() + .then(|res| { + if detect_null_block(&res) { + Ok(None) + } else { + Some(res).transpose() + } + }) })) .buffered(ENV_VARS.block_batch_size) + .filter_map(|b| b) .map(|b| b.into()) } @@ -830,13 +839,12 @@ impl EthereumAdapter { logger: &Logger, block_ptr: BlockPtr, ) -> Result { - let block_hash = self - .block_hash_by_block_number(logger, block_ptr.number) - .compat() + // TODO: This considers null blocks, but we could instead bail if we encounter one as a + // small optimization. + let canonical_block = self + .next_existing_ptr_to_number(logger, block_ptr.number) .await?; - block_hash - .ok_or_else(|| anyhow!("Ethereum node is missing block #{}", block_ptr.number)) - .map(|block_hash| block_hash == block_ptr.hash_as_h256()) + Ok(canonical_block == block_ptr) } pub(crate) fn logs_in_block_range( @@ -1079,6 +1087,16 @@ impl EthereumAdapter { } } +// Detects null blocks as can occur on Filecoin EVM chains, by checking for the FEVM-specific +// error returned when requesting such a null round. Ideally there should be a defined reponse or +// message for this case, or a check that is less dependent on the Filecoin implementation. +fn detect_null_block(res: &Result) -> bool { + match res { + Ok(_) => false, + Err(e) => e.to_string().contains("requested epoch was a null round"), + } +} + #[async_trait] impl EthereumAdapterTrait for EthereumAdapter { fn provider(&self) -> &str { @@ -1363,26 +1381,6 @@ impl EthereumAdapterTrait for EthereumAdapter { Box::pin(block_future) } - fn block_pointer_from_number( - &self, - logger: &Logger, - block_number: BlockNumber, - ) -> Box + Send> { - Box::new( - self.block_hash_by_block_number(logger, block_number) - .and_then(move |block_hash_opt| { - block_hash_opt.ok_or_else(|| { - anyhow!( - "Ethereum node could not find start block hash by block number {}", - &block_number - ) - }) - }) - .from_err() - .map(move |block_hash| BlockPtr::from((block_hash, block_number))), - ) - } - fn block_hash_by_block_number( &self, logger: &Logger, @@ -1448,6 +1446,54 @@ impl EthereumAdapterTrait for EthereumAdapter { Box::new(self.code(logger, address, block_ptr)) } + async fn next_existing_ptr_to_number( + &self, + logger: &Logger, + block_number: BlockNumber, + ) -> Result { + let mut next_number = block_number; + loop { + let retry_log_message = format!( + "eth_getBlockByNumber RPC call for block number {}", + next_number + ); + let web3 = self.web3.clone(); + let logger = logger.clone(); + let res = retry(retry_log_message, &logger) + .when(|res| !res.is_ok() && !detect_null_block(res)) + .no_limit() + .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) + .run(move || { + let web3 = web3.cheap_clone(); + async move { + web3.eth() + .block(BlockId::Number(next_number.into())) + .await + .map(|block_opt| block_opt.and_then(|block| block.hash)) + .map_err(Error::from) + } + }) + .await + .map_err(move |e| { + e.into_inner().unwrap_or_else(move || { + anyhow!( + "Ethereum node took too long to return data for block #{}", + next_number + ) + }) + }); + if detect_null_block(&res) { + next_number += 1; + continue; + } + return match res { + Ok(Some(hash)) => Ok(BlockPtr::new(hash.into(), next_number)), + Ok(None) => Err(anyhow!("Block {} does not contain hash", next_number)), + Err(e) => Err(e), + }; + } + } + async fn contract_call( &self, logger: &Logger, @@ -1652,9 +1698,10 @@ impl EthereumAdapterTrait for EthereumAdapter { } } -/// Returns blocks with triggers, corresponding to the specified range and filters. +/// Returns blocks with triggers, corresponding to the specified range and filters; and the resolved +/// `to` block, which is the nearest non-null block greater than or equal to the passed `to` block. /// If a block contains no triggers, there may be no corresponding item in the stream. -/// However the `to` block will always be present, even if triggers are empty. +/// However the (resolved) `to` block will always be present, even if triggers are empty. /// /// Careful: don't use this function without considering race conditions. /// Chain reorgs could happen at any time, and could affect the answer received. @@ -1674,7 +1721,7 @@ pub(crate) async fn blocks_with_triggers( to: BlockNumber, filter: &TriggerFilter, unified_api_version: UnifiedMappingApiVersion, -) -> Result>, Error> { +) -> Result<(Vec>, BlockNumber), Error> { // Each trigger filter needs to be queried for the same block range // and the blocks yielded need to be deduped. If any error occurs // while searching for a trigger type, the entire operation fails. @@ -1685,6 +1732,13 @@ pub(crate) async fn blocks_with_triggers( let trigger_futs: FuturesUnordered, anyhow::Error>>> = FuturesUnordered::new(); + // Resolve the nearest non-null "to" block + debug!(logger, "Finding nearest valid `to` block to {}", to); + + let to_ptr = eth.next_existing_ptr_to_number(&logger, to).await?; + let to_hash = to_ptr.hash_as_h256(); + let to = to_ptr.block_number(); + // This is for `start` triggers which can be initialization handlers which needs to be run // before all other triggers if filter.block.trigger_every_block { @@ -1753,28 +1807,11 @@ pub(crate) async fn blocks_with_triggers( trigger_futs.push(block_future) } - // Get hash for "to" block - let to_hash_fut = eth - .block_hash_by_block_number(&logger, to) - .and_then(|hash| match hash { - Some(hash) => Ok(hash), - None => { - warn!(logger, - "Ethereum endpoint is behind"; - "url" => eth.provider() - ); - bail!("Block {} not found in the chain", to) - } - }) - .compat(); - - // Join on triggers and block hash resolution - let (triggers, to_hash) = futures03::join!(trigger_futs.try_concat(), to_hash_fut); - - // Unpack and handle possible errors in the previously joined futures - let triggers = - triggers.with_context(|| format!("Failed to obtain triggers for block {}", to))?; - let to_hash = to_hash.with_context(|| format!("Failed to infer hash for block {}", to))?; + // Join on triggers, unpack and handle possible errors + let triggers = trigger_futs + .try_concat() + .await + .with_context(|| format!("Failed to obtain triggers for block {}", to))?; let mut block_hashes: HashSet = triggers.iter().map(EthereumTrigger::block_hash).collect(); @@ -1839,7 +1876,7 @@ pub(crate) async fn blocks_with_triggers( )); } - Ok(blocks) + Ok((blocks, to)) } pub(crate) async fn get_calls( diff --git a/chain/near/src/chain.rs b/chain/near/src/chain.rs index 1eba594d968..a5b98cfaf01 100644 --- a/chain/near/src/chain.rs +++ b/chain/near/src/chain.rs @@ -316,7 +316,7 @@ impl TriggersAdapterTrait for TriggersAdapter { _from: BlockNumber, _to: BlockNumber, _filter: &TriggerFilter, - ) -> Result>, Error> { + ) -> Result<(Vec>, BlockNumber), Error> { panic!("Should never be called since not used by FirehoseBlockStream") } @@ -390,6 +390,7 @@ impl TriggersAdapterTrait for TriggersAdapter { &self, _ptr: BlockPtr, _offset: BlockNumber, + _root: Option, ) -> Result, Error> { panic!("Should never be called since FirehoseBlockStream cannot resolve it") } diff --git a/chain/starknet/src/chain.rs b/chain/starknet/src/chain.rs index f28dfa94c1f..b83425218e3 100644 --- a/chain/starknet/src/chain.rs +++ b/chain/starknet/src/chain.rs @@ -358,6 +358,7 @@ impl TriggersAdapterTrait for TriggersAdapter { &self, _ptr: BlockPtr, _offset: BlockNumber, + _root: Option, ) -> Result, Error> { panic!("Should never be called since FirehoseBlockStream cannot resolve it") } @@ -373,7 +374,7 @@ impl TriggersAdapterTrait for TriggersAdapter { _from: BlockNumber, _to: BlockNumber, _filter: &crate::adapter::TriggerFilter, - ) -> Result>, Error> { + ) -> Result<(Vec>, BlockNumber), Error> { panic!("Should never be called since not used by FirehoseBlockStream") } diff --git a/chain/substreams/src/trigger.rs b/chain/substreams/src/trigger.rs index 2360e8a711f..2b47e4e57b8 100644 --- a/chain/substreams/src/trigger.rs +++ b/chain/substreams/src/trigger.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use anyhow::Error; use graph::{ blockchain::{ @@ -18,6 +16,7 @@ use graph::{ }; use graph_runtime_wasm::module::ToAscPtr; use lazy_static::__Deref; +use std::sync::Arc; use crate::{Block, Chain, NoopDataSourceTemplate, ParsedChanges}; @@ -132,6 +131,7 @@ impl blockchain::TriggersAdapter for TriggersAdapter { &self, _ptr: BlockPtr, _offset: BlockNumber, + _root: Option, ) -> Result, Error> { unimplemented!() } @@ -141,7 +141,7 @@ impl blockchain::TriggersAdapter for TriggersAdapter { _from: BlockNumber, _to: BlockNumber, _filter: &TriggerFilter, - ) -> Result>, Error> { + ) -> Result<(Vec>, BlockNumber), Error> { unimplemented!() } diff --git a/graph/src/blockchain/block_stream.rs b/graph/src/blockchain/block_stream.rs index 7b19f82b069..25a923dd502 100644 --- a/graph/src/blockchain/block_stream.rs +++ b/graph/src/blockchain/block_stream.rs @@ -260,14 +260,15 @@ impl BlockWithTriggers { #[async_trait] pub trait TriggersAdapter: Send + Sync { - // Return the block that is `offset` blocks before the block pointed to - // by `ptr` from the local cache. An offset of 0 means the block itself, - // an offset of 1 means the block's parent etc. If the block is not in - // the local cache, return `None` + // Return the block that is `offset` blocks before the block pointed to by `ptr` from the local + // cache. An offset of 0 means the block itself, an offset of 1 means the block's parent etc. If + // `root` is passed, short-circuit upon finding a child of `root`. If the block is not in the + // local cache, return `None`. async fn ancestor_block( &self, ptr: BlockPtr, offset: BlockNumber, + root: Option, ) -> Result, Error>; // Returns a sequence of blocks in increasing order of block number. @@ -281,7 +282,7 @@ pub trait TriggersAdapter: Send + Sync { from: BlockNumber, to: BlockNumber, filter: &C::TriggerFilter, - ) -> Result>, Error>; + ) -> Result<(Vec>, BlockNumber), Error>; // Used for reprocessing blocks when creating a data source. async fn triggers_in_block( diff --git a/graph/src/blockchain/mock.rs b/graph/src/blockchain/mock.rs index 6bde388b99a..87d20a236d0 100644 --- a/graph/src/blockchain/mock.rs +++ b/graph/src/blockchain/mock.rs @@ -5,7 +5,7 @@ use crate::{ subgraph::InstanceDSTemplateInfo, }, data::subgraph::UnifiedMappingApiVersion, - data_source::DataSourceTemplateInfo, + prelude::{BlockHash, DataSourceTemplateInfo}, }; use anyhow::Error; use async_trait::async_trait; @@ -218,6 +218,7 @@ impl TriggersAdapter for MockTriggersAdapter { &self, _ptr: BlockPtr, _offset: BlockNumber, + _root: Option, ) -> Result, Error> { todo!() } @@ -227,7 +228,7 @@ impl TriggersAdapter for MockTriggersAdapter { _from: crate::components::store::BlockNumber, _to: crate::components::store::BlockNumber, _filter: &C::TriggerFilter, - ) -> Result>, Error> { + ) -> Result<(Vec>, BlockNumber), Error> { todo!() } diff --git a/graph/src/blockchain/polling_block_stream.rs b/graph/src/blockchain/polling_block_stream.rs index 85ebbf0240a..ce3fdf2a4ef 100644 --- a/graph/src/blockchain/polling_block_stream.rs +++ b/graph/src/blockchain/polling_block_stream.rs @@ -363,22 +363,42 @@ where // 1000 triggers found, 2 per block, range_size = 1000 / 2 = 500 let range_size_upper_limit = max_block_range_size.min(ctx.previous_block_range_size * 10); - let range_size = if ctx.previous_triggers_per_block == 0.0 { + let target_range_size = if ctx.previous_triggers_per_block == 0.0 { range_size_upper_limit } else { (self.target_triggers_per_block_range as f64 / ctx.previous_triggers_per_block) .max(1.0) .min(range_size_upper_limit as f64) as BlockNumber }; - let to = cmp::min(from + range_size - 1, to_limit); + let to = cmp::min(from + target_range_size - 1, to_limit); info!( ctx.logger, "Scanning blocks [{}, {}]", from, to; - "range_size" => range_size + "target_range_size" => target_range_size ); - let blocks = self.adapter.scan_triggers(from, to, &self.filter).await?; + // Update with actually scanned range, to account for any skipped null blocks. + let (blocks, to) = self.adapter.scan_triggers(from, to, &self.filter).await?; + let range_size = to - from + 1; + + // If the target block (`to`) is within the reorg threshold, indicating no non-null finalized blocks are + // greater than or equal to `to`, we retry later. This deferment allows the chain head to advance, + // ensuring the target block range becomes finalized. It effectively minimizes the risk of chain reorg + // affecting the processing by waiting for a more stable set of blocks. + if to > head_ptr.number - reorg_threshold { + return Ok(ReconciliationStep::Retry); + } + + if to > head_ptr.number - reorg_threshold { + return Ok(ReconciliationStep::Retry); + } + + info!( + ctx.logger, + "Scanned blocks [{}, {}]", from, to; + "range_size" => range_size + ); Ok(ReconciliationStep::ProcessDescendantBlocks( blocks, range_size, @@ -415,7 +435,10 @@ where // In principle this block should be in the store, but we have seen this error for deep // reorgs in ropsten. - let head_ancestor_opt = self.adapter.ancestor_block(head_ptr, offset).await?; + let head_ancestor_opt = self + .adapter + .ancestor_block(head_ptr, offset, Some(subgraph_ptr.hash.clone())) + .await?; match head_ancestor_opt { None => { @@ -427,6 +450,15 @@ where Ok(ReconciliationStep::Retry) } Some(head_ancestor) => { + // Check if there was an interceding skipped (null) block. + if head_ancestor.number() != subgraph_ptr.number + 1 { + warn!( + ctx.logger, + "skipped block detected: {}", + subgraph_ptr.number + 1 + ); + } + // We stopped one block short, so we'll compare the parent hash to the // subgraph ptr. if head_ancestor.parent_hash().as_ref() == Some(&subgraph_ptr.hash) { diff --git a/graph/src/blockchain/types.rs b/graph/src/blockchain/types.rs index f369e2d9d0e..ae5505dd30b 100644 --- a/graph/src/blockchain/types.rs +++ b/graph/src/blockchain/types.rs @@ -230,6 +230,16 @@ impl From<(Vec, u64)> for BlockPtr { } } +impl From<(Vec, i64)> for BlockPtr { + fn from((bytes, number): (Vec, i64)) -> Self { + let number = i32::try_from(number).unwrap(); + BlockPtr { + hash: BlockHash::from(bytes), + number, + } + } +} + impl From<(H256, u64)> for BlockPtr { fn from((hash, number): (H256, u64)) -> BlockPtr { let number = i32::try_from(number).unwrap(); diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index b95a6e9d0ea..7c29b891fdf 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -474,14 +474,24 @@ pub trait ChainStore: Send + Sync + 'static { ) -> Result, Error>; /// Get the `offset`th ancestor of `block_hash`, where offset=0 means the block matching - /// `block_hash` and offset=1 means its parent. Returns None if unable to complete due to - /// missing blocks in the chain store. + /// `block_hash` and offset=1 means its parent. If `root` is passed, short-circuit upon finding + /// a child of `root`. Returns None if unable to complete due to missing blocks in the chain + /// store. + /// + /// The short-circuit mechanism is particularly useful in situations where blocks are skipped + /// in certain chains like Filecoin EVM. In such cases, relying solely on the numeric offset + /// might lead to inaccuracies because block numbers could be non-sequential. By allowing a + /// `root` block hash as a reference, the function can more accurately identify the desired + /// ancestor by stopping the search as soon as it discovers a block that is a direct child + /// of the `root` (i.e., when block.parent_hash equals root.hash). This approach ensures + /// the correct ancestor block is identified without solely depending on the offset. /// /// Returns an error if the offset would reach past the genesis block. async fn ancestor_block( self: Arc, block_ptr: BlockPtr, offset: BlockNumber, + root: Option, ) -> Result, Error>; /// Remove old blocks from the cache we maintain in the database and diff --git a/node/src/manager/commands/chain.rs b/node/src/manager/commands/chain.rs index 98493c29d6e..52d44f67f6b 100644 --- a/node/src/manager/commands/chain.rs +++ b/node/src/manager/commands/chain.rs @@ -109,7 +109,7 @@ pub async fn info( let ancestor = match &head_block { None => None, Some(head_block) => chain_store - .ancestor_block(head_block.clone(), offset) + .ancestor_block(head_block.clone(), offset, None) .await? .map(json::from_value::) .transpose()? diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index 66e838860a3..17aa0b311c6 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -381,10 +381,10 @@ mod data { create index blocks_number ON {nsp}.blocks using btree(number); create table {nsp}.call_cache ( - id bytea not null primary key, - return_value bytea not null, - contract_address bytea not null, - block_number int4 not null + id bytea not null primary key, + return_value bytea not null, + contract_address bytea not null, + block_number int4 not null ); create index call_cache_block_number_idx ON {nsp}.call_cache(block_number); @@ -948,78 +948,124 @@ mod data { } } + fn ancestor_block_query( + &self, + short_circuit_predicate: &str, + blocks_table_name: &str, + ) -> String { + format!( + " + with recursive ancestors(block_hash, block_offset) as ( + values ($1, 0) + union all + select b.parent_hash, a.block_offset + 1 + from ancestors a, {blocks_table_name} b + where a.block_hash = b.hash + and a.block_offset < $2 + {short_circuit_predicate} + ) + select a.block_hash as hash, b.number as number + from ancestors a + inner join {blocks_table_name} b on a.block_hash = b.hash + order by a.block_offset desc limit 1 + ", + blocks_table_name = blocks_table_name, + short_circuit_predicate = short_circuit_predicate, + ) + } + + /// Returns an ancestor of a specified block at a given offset, with an option to specify a `root` hash + /// for a targeted search. If a `root` hash is provided, the search stops at the block whose parent hash + /// matches the `root`. pub(super) fn ancestor_block( &self, conn: &mut PgConnection, block_ptr: BlockPtr, offset: BlockNumber, + root: Option, ) -> Result, Error> { - let data_and_hash = match self { + let short_circuit_predicate = match root { + Some(_) => "and b.parent_hash <> $3", + None => "", + }; + + let data_and_ptr = match self { Storage::Shared => { - const ANCESTOR_SQL: &str = " - with recursive ancestors(block_hash, block_offset) as ( - values ($1, 0) - union all - select b.parent_hash, a.block_offset+1 - from ancestors a, ethereum_blocks b - where a.block_hash = b.hash - and a.block_offset < $2 - ) - select a.block_hash as hash - from ancestors a - where a.block_offset = $2;"; + let query = + self.ancestor_block_query(short_circuit_predicate, "ethereum_blocks"); + + // type Result = (Text, i64); + #[derive(QueryableByName)] + struct BlockHashAndNumber { + #[sql_type = "Text"] + hash: String, + #[sql_type = "BigInt"] + number: i64, + } - let hash = sql_query(ANCESTOR_SQL) - .bind::(block_ptr.hash_hex()) - .bind::(offset as i64) - .get_result::(conn) - .optional()?; + let block = match root { + Some(root) => sql_query(query) + .bind::(block_ptr.hash_hex()) + .bind::(offset as i64) + .bind::(root.hash_hex()) + .get_result::(conn), + None => sql_query(query) + .bind::(block_ptr.hash_hex()) + .bind::(offset as i64) + .get_result::(conn), + } + .optional()?; use public::ethereum_blocks as b; - match hash { + match block { None => None, - Some(hash) => Some(( + Some(block) => Some(( b::table - .filter(b::hash.eq(&hash.hash)) + .filter(b::hash.eq(&block.hash)) .select(b::data) .first::(conn)?, - BlockHash::from_str(&hash.hash)?, + BlockPtr::new( + BlockHash::from_str(&block.hash)?, + i32::try_from(block.number).unwrap(), + ), )), } } Storage::Private(Schema { blocks, .. }) => { - // Same as ANCESTOR_SQL except for the table name - let query = format!( - " - with recursive ancestors(block_hash, block_offset) as ( - values ($1, 0) - union all - select b.parent_hash, a.block_offset+1 - from ancestors a, {} b - where a.block_hash = b.hash - and a.block_offset < $2 - ) - select a.block_hash as hash - from ancestors a - where a.block_offset = $2;", - blocks.qname - ); + let query = + self.ancestor_block_query(short_circuit_predicate, blocks.qname.as_str()); + + #[derive(QueryableByName)] + struct BlockHashAndNumber { + #[sql_type = "Bytea"] + hash: Vec, + #[sql_type = "BigInt"] + number: i64, + } + + let block = match root { + Some(root) => sql_query(query) + .bind::(block_ptr.hash_slice()) + .bind::(offset as i64) + .bind::(root.as_slice()) + .get_result::(conn), + None => sql_query(query) + .bind::(block_ptr.hash_slice()) + .bind::(offset as i64) + .get_result::(conn), + } + .optional()?; - let hash = sql_query(query) - .bind::(block_ptr.hash_slice()) - .bind::(offset as i64) - .get_result::(conn) - .optional()?; - match hash { + match block { None => None, - Some(hash) => Some(( + Some(block) => Some(( blocks .table() - .filter(blocks.hash().eq(&hash.hash)) + .filter(blocks.hash().eq(&block.hash)) .select(blocks.data()) .first::(conn)?, - BlockHash::from(hash.hash), + BlockPtr::from((block.hash, block.number)), )), } } @@ -1034,13 +1080,13 @@ mod data { let data_and_ptr = { use graph::prelude::serde_json::json; - data_and_hash.map(|(data, hash)| { + data_and_ptr.map(|(data, ptr)| { ( match data.get("block") { Some(_) => data, None => json!({ "block": data, "transaction_receipts": [] }), }, - BlockPtr::new(hash, block_ptr.number - offset), + ptr, ) }) }; @@ -2079,6 +2125,7 @@ impl ChainStoreTrait for ChainStore { self: Arc, block_ptr: BlockPtr, offset: BlockNumber, + root: Option, ) -> Result, Error> { ensure!( block_ptr.number >= offset, @@ -2099,7 +2146,7 @@ impl ChainStoreTrait for ChainStore { .with_conn(move |conn, _| { chain_store .storage - .ancestor_block(conn, block_ptr_clone, offset) + .ancestor_block(conn, block_ptr_clone, offset, root) .map_err(StoreError::from) .map_err(CancelableError::from) }) diff --git a/store/test-store/src/block_store.rs b/store/test-store/src/block_store.rs index b33f256c892..6f161258a0e 100644 --- a/store/test-store/src/block_store.rs +++ b/store/test-store/src/block_store.rs @@ -33,6 +33,11 @@ lazy_static! { pub static ref BLOCK_TWO: FakeBlock = BLOCK_ONE.make_child("f8ccbd3877eb98c958614f395dd351211afb9abba187bfc1fb4ac414b099c4a6", None); pub static ref BLOCK_TWO_NO_PARENT: FakeBlock = FakeBlock::make_no_parent(2, "3b652b00bff5e168b1218ff47593d516123261c4487629c4175f642ee56113fe"); + pub static ref BLOCK_THREE_SKIPPED_2: FakeBlock = BLOCK_ONE.make_skipped_child( + "d8ccbd3877eb98c958614f395dd351211afb9abba187bfc1fb4ac414b099c4a6", + None, + 1, + ); pub static ref BLOCK_THREE: FakeBlock = BLOCK_TWO.make_child("7347afe69254df06729e123610b00b8b11f15cfae3241f9366fb113aec07489c", None); pub static ref BLOCK_THREE_NO_PARENT: FakeBlock = FakeBlock::make_no_parent(3, "fa9ebe3f74de4c56908b49f5c4044e85825f7350f3fa08a19151de82a82a7313"); pub static ref BLOCK_THREE_TIMESTAMP: FakeBlock = BLOCK_TWO.make_child("6b834521bb753c132fdcf0e1034803ed9068e324112f8750ba93580b393a986b", Some(U256::from(1657712166))); @@ -41,6 +46,8 @@ lazy_static! { // what you are doing, don't use this block for other tests. pub static ref BLOCK_THREE_NO_TIMESTAMP: FakeBlock = BLOCK_TWO.make_child("6b834521bb753c132fdcf0e1034803ed9068e324112f8750ba93580b393a986b", None); pub static ref BLOCK_FOUR: FakeBlock = BLOCK_THREE.make_child("7cce080f5a49c2997a6cc65fc1cee9910fd8fc3721b7010c0b5d0873e2ac785e", None); + pub static ref BLOCK_FOUR_SKIPPED_2_AND_3: FakeBlock = BLOCK_ONE.make_skipped_child("9cce080f5a49c2997a6cc65fc1cee9910fd8fc3721b7010c0b5d0873e2ac785e", None, 2); + pub static ref BLOCK_FIVE_AFTER_SKIP: FakeBlock = BLOCK_FOUR_SKIPPED_2_AND_3.make_child("8b0ea919e258eb2b119eb32de56b85d12d50ac6a9f7c5909f843d6172c8ba196", None); pub static ref BLOCK_FIVE: FakeBlock = BLOCK_FOUR.make_child("7b0ea919e258eb2b119eb32de56b85d12d50ac6a9f7c5909f843d6172c8ba196", None); pub static ref BLOCK_SIX_NO_PARENT: FakeBlock = FakeBlock::make_no_parent(6, "6b834521bb753c132fdcf0e1034803ed9068e324112f8750ba93580b393a986b"); } @@ -67,6 +74,15 @@ impl FakeBlock { } } + pub fn make_skipped_child(&self, hash: &str, timestamp: Option, skip: i32) -> Self { + FakeBlock { + number: self.number + 1 + skip, + hash: hash.to_owned(), + parent_hash: self.hash.clone(), + timestamp, + } + } + pub fn make_no_parent(number: BlockNumber, hash: &str) -> Self { FakeBlock { number, diff --git a/store/test-store/tests/postgres/chain_head.rs b/store/test-store/tests/postgres/chain_head.rs index 89ec43c5158..9c4766302d9 100644 --- a/store/test-store/tests/postgres/chain_head.rs +++ b/store/test-store/tests/postgres/chain_head.rs @@ -20,9 +20,9 @@ use graph_store_postgres::Store as DieselStore; use graph_store_postgres::{layout_for_tests::FAKE_NETWORK_SHARED, ChainStore as DieselChainStore}; use test_store::block_store::{ - FakeBlock, FakeBlockList, BLOCK_FIVE, BLOCK_FOUR, BLOCK_ONE, BLOCK_ONE_NO_PARENT, - BLOCK_ONE_SIBLING, BLOCK_THREE, BLOCK_THREE_NO_PARENT, BLOCK_TWO, BLOCK_TWO_NO_PARENT, - GENESIS_BLOCK, NO_PARENT, + FakeBlock, FakeBlockList, BLOCK_FIVE, BLOCK_FIVE_AFTER_SKIP, BLOCK_FOUR, + BLOCK_FOUR_SKIPPED_2_AND_3, BLOCK_ONE, BLOCK_ONE_NO_PARENT, BLOCK_ONE_SIBLING, BLOCK_THREE, + BLOCK_THREE_NO_PARENT, BLOCK_TWO, BLOCK_TWO_NO_PARENT, GENESIS_BLOCK, NO_PARENT, }; use test_store::*; @@ -42,8 +42,12 @@ where let chain_store = store.block_store().chain_store(name).expect("chain store"); // Run test - test(chain_store.cheap_clone(), store.cheap_clone()) - .unwrap_or_else(|_| panic!("test finishes successfully on network {}", name)); + test(chain_store.cheap_clone(), store.cheap_clone()).unwrap_or_else(|err| { + panic!( + "test finishes successfully on network {} with error {}", + name, err + ) + }); } }); } @@ -294,12 +298,13 @@ fn check_ancestor( child: &FakeBlock, offset: BlockNumber, exp: &FakeBlock, + root: Option, ) -> Result<(), Error> { - let act = executor::block_on( - store - .cheap_clone() - .ancestor_block(child.block_ptr(), offset), - )? + let act = executor::block_on(store.cheap_clone().ancestor_block( + child.block_ptr(), + offset, + root, + ))? .map(json::from_value::) .transpose()? .ok_or_else(|| anyhow!("block {} has no ancestor at offset {}", child.hash, offset))?; @@ -329,24 +334,25 @@ fn ancestor_block_simple() { ]; run_test(chain, move |store, _| -> Result<(), Error> { - check_ancestor(&store, &BLOCK_FIVE, 1, &BLOCK_FOUR)?; - check_ancestor(&store, &BLOCK_FIVE, 2, &BLOCK_THREE)?; - check_ancestor(&store, &BLOCK_FIVE, 3, &BLOCK_TWO)?; - check_ancestor(&store, &BLOCK_FIVE, 4, &BLOCK_ONE)?; - check_ancestor(&store, &BLOCK_FIVE, 5, &GENESIS_BLOCK)?; - check_ancestor(&store, &BLOCK_THREE, 2, &BLOCK_ONE)?; + check_ancestor(&store, &BLOCK_FIVE, 1, &BLOCK_FOUR, None)?; + check_ancestor(&store, &BLOCK_FIVE, 2, &BLOCK_THREE, None)?; + check_ancestor(&store, &BLOCK_FIVE, 3, &BLOCK_TWO, None)?; + check_ancestor(&store, &BLOCK_FIVE, 4, &BLOCK_ONE, None)?; + check_ancestor(&store, &BLOCK_FIVE, 5, &GENESIS_BLOCK, None)?; + check_ancestor(&store, &BLOCK_THREE, 2, &BLOCK_ONE, None)?; for offset in [6, 7, 8, 50].iter() { let offset = *offset; - let res = executor::block_on( - store - .cheap_clone() - .ancestor_block(BLOCK_FIVE.block_ptr(), offset), - ); + let res = executor::block_on(store.cheap_clone().ancestor_block( + BLOCK_FIVE.block_ptr(), + offset, + None, + )); assert!(res.is_err()); } - let block = executor::block_on(store.ancestor_block(BLOCK_TWO_NO_PARENT.block_ptr(), 1))?; + let block = + executor::block_on(store.ancestor_block(BLOCK_TWO_NO_PARENT.block_ptr(), 1, None))?; assert!(block.is_none()); Ok(()) }); @@ -362,10 +368,44 @@ fn ancestor_block_ommers() { ]; run_test(chain, move |store, _| -> Result<(), Error> { - check_ancestor(&store, &BLOCK_ONE, 1, &GENESIS_BLOCK)?; - check_ancestor(&store, &BLOCK_ONE_SIBLING, 1, &GENESIS_BLOCK)?; - check_ancestor(&store, &BLOCK_TWO, 1, &BLOCK_ONE)?; - check_ancestor(&store, &BLOCK_TWO, 2, &GENESIS_BLOCK)?; + check_ancestor(&store, &BLOCK_ONE, 1, &GENESIS_BLOCK, None)?; + check_ancestor(&store, &BLOCK_ONE_SIBLING, 1, &GENESIS_BLOCK, None)?; + check_ancestor(&store, &BLOCK_TWO, 1, &BLOCK_ONE, None)?; + check_ancestor(&store, &BLOCK_TWO, 2, &GENESIS_BLOCK, None)?; + Ok(()) + }); +} + +#[test] +fn ancestor_block_skipped() { + let chain = vec![ + &*GENESIS_BLOCK, + &*BLOCK_ONE, + &*BLOCK_FOUR_SKIPPED_2_AND_3, + &BLOCK_FIVE_AFTER_SKIP, + ]; + + run_test(chain, move |store, _| -> Result<(), Error> { + check_ancestor(&store, &BLOCK_FIVE_AFTER_SKIP, 2, &BLOCK_ONE, None)?; + + check_ancestor( + &store, + &BLOCK_FIVE_AFTER_SKIP, + 2, + &BLOCK_FOUR_SKIPPED_2_AND_3, + Some(BLOCK_ONE.block_hash()), + )?; + + check_ancestor(&store, &BLOCK_FIVE_AFTER_SKIP, 5, &GENESIS_BLOCK, None)?; + + check_ancestor( + &store, + &BLOCK_FIVE_AFTER_SKIP, + 5, + &BLOCK_ONE, + Some(GENESIS_BLOCK.block_hash()), + )?; + Ok(()) }); } diff --git a/tests/src/config.rs b/tests/src/config.rs index d1940e8aa18..6762e542168 100644 --- a/tests/src/config.rs +++ b/tests/src/config.rs @@ -261,7 +261,7 @@ impl Default for Config { graph_node: GraphNodeConfig::default(), graph_cli, num_parallel_tests, - timeout: Duration::from_secs(120), + timeout: Duration::from_secs(600), } } } diff --git a/tests/src/fixture/mod.rs b/tests/src/fixture/mod.rs index c24f688f0f7..537efa46fac 100644 --- a/tests/src/fixture/mod.rs +++ b/tests/src/fixture/mod.rs @@ -941,6 +941,7 @@ impl TriggersAdapter for MockTriggersAdapter { &self, _ptr: BlockPtr, _offset: BlockNumber, + _root: Option, ) -> Result::Block>, Error> { todo!() } @@ -950,7 +951,7 @@ impl TriggersAdapter for MockTriggersAdapter { _from: BlockNumber, _to: BlockNumber, _filter: &::TriggerFilter, - ) -> Result>, Error> { + ) -> Result<(Vec>, BlockNumber), Error> { todo!() }