Skip to content

Commit

Permalink
Remove provider checks at startup
Browse files Browse the repository at this point in the history
  • Loading branch information
mangas committed May 22, 2024
1 parent 1209443 commit b97ffec
Show file tree
Hide file tree
Showing 40 changed files with 2,375 additions and 1,424 deletions.
8 changes: 5 additions & 3 deletions chain/arweave/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use graph::blockchain::{
EmptyNodeCapabilities, NoopDecoderHook, NoopRuntimeAdapter,
};
use graph::cheap_clone::CheapClone;
use graph::components::adapter::ChainId;
use graph::components::store::DeploymentCursorTracker;
use graph::data::subgraph::UnifiedMappingApiVersion;
use graph::env::EnvVars;
Expand Down Expand Up @@ -41,7 +42,7 @@ use graph::blockchain::block_stream::{

pub struct Chain {
logger_factory: LoggerFactory,
name: String,
name: ChainId,
client: Arc<ChainClient<Self>>,
chain_store: Arc<dyn ChainStore>,
metrics_registry: Arc<MetricsRegistry>,
Expand Down Expand Up @@ -157,7 +158,8 @@ impl Blockchain for Chain {
number: BlockNumber,
) -> Result<BlockPtr, IngestorError> {
self.client
.firehose_endpoint()?
.firehose_endpoint()
.await?
.block_ptr_for_number::<codec::Block>(logger, number)
.await
.map_err(Into::into)
Expand All @@ -171,7 +173,7 @@ impl Blockchain for Chain {
self.client.clone()
}

fn block_ingestor(&self) -> anyhow::Result<Box<dyn BlockIngestor>> {
async fn block_ingestor(&self) -> anyhow::Result<Box<dyn BlockIngestor>> {
let ingestor = FirehoseBlockIngestor::<crate::Block, Self>::new(
self.chain_store.cheap_clone(),
self.chain_client(),
Expand Down
7 changes: 4 additions & 3 deletions chain/cosmos/src/chain.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use graph::blockchain::firehose_block_ingestor::FirehoseBlockIngestor;
use graph::blockchain::{BlockIngestor, NoopDecoderHook};
use graph::components::adapter::ChainId;
use graph::env::EnvVars;
use graph::prelude::MetricsRegistry;
use graph::substreams::Clock;
Expand Down Expand Up @@ -35,7 +36,7 @@ use crate::{codec, TriggerFilter};

pub struct Chain {
logger_factory: LoggerFactory,
name: String,
name: ChainId,
client: Arc<ChainClient<Self>>,
chain_store: Arc<dyn ChainStore>,
metrics_registry: Arc<MetricsRegistry>,
Expand Down Expand Up @@ -149,7 +150,7 @@ impl Blockchain for Chain {
logger: &Logger,
number: BlockNumber,
) -> Result<BlockPtr, IngestorError> {
let firehose_endpoint = self.client.firehose_endpoint()?;
let firehose_endpoint = self.client.firehose_endpoint().await?;

firehose_endpoint
.block_ptr_for_number::<codec::HeaderOnlyBlock>(logger, number)
Expand All @@ -165,7 +166,7 @@ impl Blockchain for Chain {
self.client.clone()
}

fn block_ingestor(&self) -> anyhow::Result<Box<dyn BlockIngestor>> {
async fn block_ingestor(&self) -> anyhow::Result<Box<dyn BlockIngestor>> {
let ingestor = FirehoseBlockIngestor::<crate::Block, Self>::new(
self.chain_store.cheap_clone(),
self.chain_client(),
Expand Down
36 changes: 25 additions & 11 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use graph::blockchain::firehose_block_ingestor::{FirehoseBlockIngestor, Transfor
use graph::blockchain::{
BlockIngestor, BlockTime, BlockchainKind, ChainIdentifier, TriggersAdapterSelector,
};
use graph::components::adapter::ChainId;
use graph::components::store::DeploymentCursorTracker;
use graph::data::subgraph::UnifiedMappingApiVersion;
use graph::firehose::{FirehoseEndpoint, ForkStep};
Expand Down Expand Up @@ -146,7 +147,7 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
let chain_store = chain.chain_store();
let chain_head_update_stream = chain
.chain_head_update_listener
.subscribe(chain.name.clone(), logger.clone());
.subscribe(chain.name.to_string(), logger.clone());

// Special case: Detect Celo and set the threshold to 0, so that eth_getLogs is always used.
// This is ok because Celo blocks are always final. And we _need_ to do this because
Expand All @@ -156,6 +157,7 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
ChainClient::Rpc(adapter) => {
adapter
.cheapest()
.await
.ok_or(anyhow!("unable to get eth adapter for chan_id call"))?
.chain_id()
.await?
Expand Down Expand Up @@ -199,7 +201,7 @@ impl BlockRefetcher<Chain> for EthereumBlockRefetcher {
logger: &Logger,
cursor: FirehoseCursor,
) -> Result<BlockFinality, Error> {
let endpoint = chain.chain_client().firehose_endpoint()?;
let endpoint = chain.chain_client().firehose_endpoint().await?;
let block = endpoint.get_block::<codec::Block>(cursor, logger).await?;
let ethereum_block: EthereumBlockWithCalls = (&block).try_into()?;
Ok(BlockFinality::NonFinal(ethereum_block))
Expand Down Expand Up @@ -286,7 +288,7 @@ impl RuntimeAdapterBuilder for EthereumRuntimeAdapterBuilder {

pub struct Chain {
logger_factory: LoggerFactory,
name: String,
pub name: ChainId,
node_id: NodeId,
chain_identifier: Arc<ChainIdentifier>,
registry: Arc<MetricsRegistry>,
Expand Down Expand Up @@ -314,7 +316,7 @@ impl Chain {
/// Creates a new Ethereum [`Chain`].
pub fn new(
logger_factory: LoggerFactory,
name: String,
name: ChainId,
node_id: NodeId,
registry: Arc<MetricsRegistry>,
chain_store: Arc<dyn ChainStore>,
Expand Down Expand Up @@ -360,12 +362,12 @@ impl Chain {
// TODO: This is only used to build the block stream which could prolly
// be moved to the chain itself and return a block stream future that the
// caller can spawn.
pub fn cheapest_adapter(&self) -> Arc<EthereumAdapter> {
pub async fn cheapest_adapter(&self) -> Arc<EthereumAdapter> {
let adapters = match self.client.as_ref() {
ChainClient::Firehose(_) => panic!("no adapter with firehose"),
ChainClient::Rpc(adapter) => adapter,
};
adapters.cheapest().unwrap()
adapters.cheapest().await.unwrap()
}
}

Expand Down Expand Up @@ -454,13 +456,15 @@ impl Blockchain for Chain {
) -> Result<BlockPtr, IngestorError> {
match self.client.as_ref() {
ChainClient::Firehose(endpoints) => endpoints
.endpoint()?
.endpoint()
.await?
.block_ptr_for_number::<HeaderOnlyBlock>(logger, number)
.await
.map_err(IngestorError::Unknown),
ChainClient::Rpc(adapters) => {
let adapter = adapters
.cheapest()
.await
.with_context(|| format!("no adapter for chain {}", self.name))?
.clone();

Expand Down Expand Up @@ -507,7 +511,7 @@ impl Blockchain for Chain {
self.client.clone()
}

fn block_ingestor(&self) -> anyhow::Result<Box<dyn BlockIngestor>> {
async fn block_ingestor(&self) -> anyhow::Result<Box<dyn BlockIngestor>> {
let ingestor: Box<dyn BlockIngestor> = match self.chain_client().as_ref() {
ChainClient::Firehose(_) => {
let ingestor = FirehoseBlockIngestor::<HeaderOnlyBlock, Self>::new(
Expand All @@ -524,6 +528,7 @@ impl Blockchain for Chain {
ChainClient::Rpc(rpc) => {
let eth_adapter = rpc
.cheapest()
.await
.ok_or_else(|| anyhow!("unable to get adapter for ethereum block ingestor"))?;
let logger = self
.logger_factory
Expand Down Expand Up @@ -675,7 +680,10 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
filter: &TriggerFilter,
) -> Result<Vec<BlockWithTriggers<Chain>>, Error> {
blocks_with_triggers(
self.chain_client.rpc()?.cheapest_with(&self.capabilities)?,
self.chain_client
.rpc()?
.cheapest_with(&self.capabilities)
.await?,
self.logger.clone(),
self.chain_store.clone(),
self.ethrpc_metrics.clone(),
Expand Down Expand Up @@ -705,7 +713,11 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {

match &block {
BlockFinality::Final(_) => {
let adapter = self.chain_client.rpc()?.cheapest_with(&self.capabilities)?;
let adapter = self
.chain_client
.rpc()?
.cheapest_with(&self.capabilities)
.await?;
let block_number = block.number() as BlockNumber;
let blocks = blocks_with_triggers(
adapter,
Expand Down Expand Up @@ -738,6 +750,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
self.chain_client
.rpc()?
.cheapest()
.await
.ok_or(anyhow!("unable to get adapter for is_on_main_chain"))?
.is_on_main_chain(&self.logger, ptr.clone())
.await
Expand Down Expand Up @@ -774,7 +787,8 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
}),
ChainClient::Rpc(adapters) => {
let blocks = adapters
.cheapest_with(&self.capabilities)?
.cheapest_with(&self.capabilities)
.await?
.load_blocks(
self.logger.cheap_clone(),
self.chain_store.cheap_clone(),
Expand Down
11 changes: 7 additions & 4 deletions chain/ethereum/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1041,10 +1041,13 @@ impl DecoderHook {
.map(|call| call.as_eth_call(block_ptr.clone(), self.eth_call_gas))
.unzip();

let eth_adapter = self.eth_adapters.call_or_cheapest(Some(&NodeCapabilities {
archive: true,
traces: false,
}))?;
let eth_adapter = self
.eth_adapters
.call_or_cheapest(Some(&NodeCapabilities {
archive: true,
traces: false,
}))
.await?;

let call_refs = calls.iter().collect::<Vec<_>>();
let results = eth_adapter
Expand Down
3 changes: 2 additions & 1 deletion chain/ethereum/src/ethereum_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1857,7 +1857,8 @@ pub(crate) async fn get_calls(
} else {
client
.rpc()?
.cheapest_with(capabilities)?
.cheapest_with(capabilities)
.await?
.calls_in_block(
&logger,
subgraph_metrics.clone(),
Expand Down
7 changes: 4 additions & 3 deletions chain/ethereum/src/ingestor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{chain::BlockFinality, EthereumAdapter, EthereumAdapterTrait, ENV_VARS};
use graph::components::adapter::ChainId;
use graph::futures03::compat::Future01CompatExt;
use graph::{
blockchain::{BlockHash, BlockIngestor, BlockPtr, IngestorError},
Expand All @@ -16,7 +17,7 @@ pub struct PollingBlockIngestor {
eth_adapter: Arc<EthereumAdapter>,
chain_store: Arc<dyn ChainStore>,
polling_interval: Duration,
network_name: String,
network_name: ChainId,
}

impl PollingBlockIngestor {
Expand All @@ -26,7 +27,7 @@ impl PollingBlockIngestor {
eth_adapter: Arc<EthereumAdapter>,
chain_store: Arc<dyn ChainStore>,
polling_interval: Duration,
network_name: String,
network_name: ChainId,
) -> Result<PollingBlockIngestor, Error> {
Ok(PollingBlockIngestor {
logger,
Expand Down Expand Up @@ -225,7 +226,7 @@ impl BlockIngestor for PollingBlockIngestor {
}
}

fn network_name(&self) -> String {
fn network_name(&self) -> ChainId {
self.network_name.clone()
}
}
1 change: 0 additions & 1 deletion chain/ethereum/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ pub use crate::adapter::{
ProviderEthRpcMetrics, SubgraphEthRpcMetrics, TriggerFilter,
};
pub use crate::chain::Chain;
pub use crate::network::EthereumNetworks;
pub use graph::blockchain::BlockIngestor;

#[cfg(test)]
Expand Down

0 comments on commit b97ffec

Please sign in to comment.