diff --git a/NEWS.md b/NEWS.md index 71318656522..497c56de70c 100644 --- a/NEWS.md +++ b/NEWS.md @@ -2,7 +2,13 @@ ## Unreleased -- ... +### New DB table for dynamic data sources + +For new subgraph deployments, dynamic data sources will be recorded under the `sgd*.data_sources$` +table, rather than `subgraphs.dynamic_ethereum_contract_data_source`. As a consequence +new deployments will not work correctly on earlier graph node versions, so +_downgrading to an earlier graph node version is not supported_. +See issue #3405 for other details. ## 0.27.0 diff --git a/chain/arweave/src/data_source.rs b/chain/arweave/src/data_source.rs index 11d7f43a8af..56be161858d 100644 --- a/chain/arweave/src/data_source.rs +++ b/chain/arweave/src/data_source.rs @@ -215,6 +215,7 @@ impl blockchain::UnresolvedDataSource for UnresolvedDataSource { self, resolver: &Arc, logger: &Logger, + _manifest_idx: u32, ) -> Result { let UnresolvedDataSource { kind, @@ -261,6 +262,7 @@ impl blockchain::UnresolvedDataSourceTemplate for UnresolvedDataSourceTem self, resolver: &Arc, logger: &Logger, + _manifest_idx: u32, ) -> Result { let UnresolvedDataSourceTemplate { kind, @@ -292,6 +294,10 @@ impl blockchain::DataSourceTemplate for DataSourceTemplate { fn runtime(&self) -> Option>> { Some(self.mapping.runtime.cheap_clone()) } + + fn manifest_idx(&self) -> u32 { + unreachable!("arweave does not support dynamic data sources") + } } #[derive(Clone, Debug, Default, Hash, Eq, PartialEq, Deserialize)] diff --git a/chain/cosmos/src/data_source.rs b/chain/cosmos/src/data_source.rs index c3806b01d58..768ba272391 100644 --- a/chain/cosmos/src/data_source.rs +++ b/chain/cosmos/src/data_source.rs @@ -282,6 +282,7 @@ impl blockchain::UnresolvedDataSource for UnresolvedDataSource { self, resolver: &Arc, logger: &Logger, + _manifest_idx: u32, ) -> Result { let UnresolvedDataSource { kind, @@ -325,6 +326,7 @@ impl blockchain::UnresolvedDataSourceTemplate for UnresolvedDataSourceTem self, _resolver: &Arc, _logger: &Logger, + _manifest_idx: u32, ) -> Result { Err(anyhow!(TEMPLATE_ERROR)) } @@ -342,6 +344,10 @@ impl blockchain::DataSourceTemplate for DataSourceTemplate { fn runtime(&self) -> Option>> { unimplemented!("{}", TEMPLATE_ERROR); } + + fn manifest_idx(&self) -> u32 { + unimplemented!("{}", TEMPLATE_ERROR); + } } #[derive(Clone, Debug, Default, Hash, Eq, PartialEq, Deserialize)] diff --git a/chain/ethereum/src/data_source.rs b/chain/ethereum/src/data_source.rs index 099b864f9a6..246fd468096 100644 --- a/chain/ethereum/src/data_source.rs +++ b/chain/ethereum/src/data_source.rs @@ -40,6 +40,7 @@ pub struct DataSource { pub kind: String, pub network: Option, pub name: String, + pub manifest_idx: u32, pub address: Option
, pub start_block: BlockNumber, pub mapping: Mapping, @@ -92,6 +93,7 @@ impl blockchain::DataSource for DataSource { kind, network, name, + manifest_idx, address, mapping, context, @@ -109,6 +111,7 @@ impl blockchain::DataSource for DataSource { kind == &other.kind && network == &other.network && name == &other.name + && manifest_idx == &other.manifest_idx && address == &other.address && mapping.abis == other.mapping.abis && mapping.event_handlers == other.mapping.event_handlers @@ -120,7 +123,7 @@ impl blockchain::DataSource for DataSource { fn as_stored_dynamic_data_source(&self) -> StoredDynamicDataSource { let param = self.address.map(|addr| addr.0.into()); StoredDynamicDataSource { - name: self.name.to_owned(), + manifest_idx: self.manifest_idx, param, context: self .context @@ -136,7 +139,7 @@ impl blockchain::DataSource for DataSource { stored: StoredDynamicDataSource, ) -> Result { let StoredDynamicDataSource { - name: _, + manifest_idx, param, context, creation_block, @@ -151,6 +154,7 @@ impl blockchain::DataSource for DataSource { kind: template.kind.to_string(), network: template.network.as_ref().map(|s| s.to_string()), name: template.name.clone(), + manifest_idx, address, start_block: 0, mapping: template.mapping.clone(), @@ -232,6 +236,7 @@ impl DataSource { source: Source, mapping: Mapping, context: Option, + manifest_idx: u32, ) -> Result { // Data sources in the manifest are created "before genesis" so they have no creation block. let creation_block = None; @@ -243,6 +248,7 @@ impl DataSource { kind, network, name, + manifest_idx, address: source.address, start_block: source.start_block, mapping, @@ -722,6 +728,7 @@ impl blockchain::UnresolvedDataSource for UnresolvedDataSource { self, resolver: &Arc, logger: &Logger, + manifest_idx: u32, ) -> Result { let UnresolvedDataSource { kind, @@ -736,7 +743,7 @@ impl blockchain::UnresolvedDataSource for UnresolvedDataSource { let mapping = mapping.resolve(&*resolver, logger).await?; - DataSource::from_manifest(kind, network, name, source, mapping, context) + DataSource::from_manifest(kind, network, name, source, mapping, context, manifest_idx) } } @@ -778,6 +785,7 @@ impl TryFrom> for DataSource { kind: template.kind, network: template.network, name: template.name, + manifest_idx: template.manifest_idx, address: Some(address), start_block: 0, mapping: template.mapping, @@ -789,16 +797,23 @@ impl TryFrom> for DataSource { } #[derive(Clone, Debug, Default, Hash, Eq, PartialEq, Deserialize)] -pub struct BaseDataSourceTemplate { +pub struct UnresolvedDataSourceTemplate { pub kind: String, pub network: Option, pub name: String, pub source: TemplateSource, - pub mapping: M, + pub mapping: UnresolvedMapping, } -pub type UnresolvedDataSourceTemplate = BaseDataSourceTemplate; -pub type DataSourceTemplate = BaseDataSourceTemplate; +#[derive(Clone, Debug)] +pub struct DataSourceTemplate { + pub kind: String, + pub network: Option, + pub name: String, + pub manifest_idx: u32, + pub source: TemplateSource, + pub mapping: Mapping, +} #[async_trait] impl blockchain::UnresolvedDataSourceTemplate for UnresolvedDataSourceTemplate { @@ -806,6 +821,7 @@ impl blockchain::UnresolvedDataSourceTemplate for UnresolvedDataSourceTem self, resolver: &Arc, logger: &Logger, + manifest_idx: u32, ) -> Result { let UnresolvedDataSourceTemplate { kind, @@ -821,6 +837,7 @@ impl blockchain::UnresolvedDataSourceTemplate for UnresolvedDataSourceTem kind, network, name, + manifest_idx, source, mapping: mapping.resolve(resolver, logger).await?, }) @@ -839,6 +856,10 @@ impl blockchain::DataSourceTemplate for DataSourceTemplate { fn runtime(&self) -> Option>> { Some(self.mapping.runtime.cheap_clone()) } + + fn manifest_idx(&self) -> u32 { + self.manifest_idx + } } #[derive(Clone, Debug, Default, Hash, Eq, PartialEq, Deserialize)] diff --git a/chain/near/src/data_source.rs b/chain/near/src/data_source.rs index bafc36d46ef..15d9ca84eb4 100644 --- a/chain/near/src/data_source.rs +++ b/chain/near/src/data_source.rs @@ -272,6 +272,7 @@ impl blockchain::UnresolvedDataSource for UnresolvedDataSource { self, resolver: &Arc, logger: &Logger, + _manifest_idx: u32, ) -> Result { let UnresolvedDataSource { kind, @@ -346,6 +347,7 @@ impl blockchain::UnresolvedDataSourceTemplate for UnresolvedDataSourceTem self, resolver: &Arc, logger: &Logger, + _manifest_idx: u32, ) -> Result { let UnresolvedDataSourceTemplate { kind, @@ -377,6 +379,10 @@ impl blockchain::DataSourceTemplate for DataSourceTemplate { fn runtime(&self) -> Option>> { Some(self.mapping.runtime.cheap_clone()) } + + fn manifest_idx(&self) -> u32 { + unreachable!("near does not support dynamic data sources") + } } #[derive(Clone, Debug, Default, Hash, Eq, PartialEq, Deserialize)] diff --git a/chain/substreams/src/data_source.rs b/chain/substreams/src/data_source.rs index 86a29390291..1d3c13c8fa9 100644 --- a/chain/substreams/src/data_source.rs +++ b/chain/substreams/src/data_source.rs @@ -162,6 +162,7 @@ impl blockchain::UnresolvedDataSource for UnresolvedDataSource { self, _resolver: &Arc, _logger: &Logger, + _manifest_idx: u32, ) -> Result { Ok(DataSource { kind: SUBSTREAMS_KIND.into(), @@ -205,6 +206,10 @@ impl blockchain::DataSourceTemplate for NoopDataSourceTemplate { fn runtime(&self) -> Option>> { unimplemented!("{}", TEMPLATE_ERROR); } + + fn manifest_idx(&self) -> u32 { + todo!() + } } #[async_trait] @@ -213,6 +218,7 @@ impl blockchain::UnresolvedDataSourceTemplate for NoopDataSourceTemplate self, _resolver: &Arc, _logger: &Logger, + _manifest_idx: u32, ) -> Result { unimplemented!("{}", TEMPLATE_ERROR) } @@ -260,7 +266,7 @@ mod test { let ds: UnresolvedDataSource = serde_yaml::from_str(TEMPLATE_DATA_SOURCE).unwrap(); let link_resolver: Arc = Arc::new(NoopLinkResolver {}); let logger = Logger::root(Discard, o!()); - let ds: DataSource = ds.resolve(&link_resolver, &logger).await.unwrap(); + let ds: DataSource = ds.resolve(&link_resolver, &logger, 0).await.unwrap(); let expected = DataSource { kind: SUBSTREAMS_KIND.into(), network: Some("mainnet".into()), diff --git a/core/src/subgraph/inputs.rs b/core/src/subgraph/inputs.rs index f9e70b1e047..b507e2bf6ad 100644 --- a/core/src/subgraph/inputs.rs +++ b/core/src/subgraph/inputs.rs @@ -21,4 +21,7 @@ pub struct IndexingInputs { pub templates: Arc>, pub unified_api_version: UnifiedMappingApiVersion, pub static_filters: bool, + + // Correspondence between data source or template position in the manifest and name. + pub manifest_idx_and_name: Vec<(u32, String)>, } diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index 76d1421de24..6aa09e67e94 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -7,8 +7,9 @@ use crate::subgraph::metrics::{ use crate::subgraph::runner::SubgraphRunner; use crate::subgraph::SubgraphInstance; use graph::blockchain::block_stream::BlockStreamMetrics; -use graph::blockchain::Blockchain; +use graph::blockchain::DataSource; use graph::blockchain::NodeCapabilities; +use graph::blockchain::{Blockchain, DataSourceTemplate}; use graph::blockchain::{BlockchainKind, TriggerFilter}; use graph::prelude::{SubgraphInstanceManager as SubgraphInstanceManagerTrait, *}; use graph::{blockchain::BlockchainMap, components::store::DeploymentLocator}; @@ -147,7 +148,7 @@ impl SubgraphInstanceManager { // that is done store.start_subgraph_deployment(&logger).await?; - let manifest: SubgraphManifest = { + let (manifest, manifest_idx_and_name) = { info!(logger, "Resolve subgraph files using IPFS"); let mut manifest = SubgraphManifest::resolve_from_raw( @@ -161,9 +162,28 @@ impl SubgraphInstanceManager { .await .context("Failed to resolve subgraph from IPFS")?; - let data_sources = load_dynamic_data_sources(store.clone(), logger.clone(), &manifest) - .await - .context("Failed to load dynamic data sources")?; + let manifest_idx_and_name: Vec<(u32, String)> = manifest + .data_sources + .iter() + .map(|ds: &C::DataSource| ds.name().to_owned()) + .chain( + manifest + .templates + .iter() + .map(|t: &C::DataSourceTemplate| t.name().to_owned()), + ) + .enumerate() + .map(|(idx, name)| (idx as u32, name)) + .collect(); + + let data_sources = load_dynamic_data_sources( + store.clone(), + logger.clone(), + &manifest, + manifest_idx_and_name.clone(), + ) + .await + .context("Failed to load dynamic data sources")?; info!(logger, "Successfully resolved subgraph files using IPFS"); @@ -176,7 +196,7 @@ impl SubgraphInstanceManager { manifest.data_sources.len() ); - manifest + (manifest, manifest_idx_and_name) }; let required_capabilities = C::NodeCapabilities::from_data_sources(&manifest.data_sources); @@ -240,7 +260,7 @@ impl SubgraphInstanceManager { // Initialize deployment_head with current deployment head. Any sort of trouble in // getting the deployment head ptr leads to initializing with 0 - let deployment_head = store.block_ptr().await.map(|ptr| ptr.number).unwrap_or(0) as f64; + let deployment_head = store.block_ptr().map(|ptr| ptr.number).unwrap_or(0) as f64; block_stream_metrics.deployment_head.set(deployment_head); let host_builder = graph_runtime_wasm::RuntimeHostBuilder::new( @@ -273,6 +293,7 @@ impl SubgraphInstanceManager { templates, unified_api_version, static_filters: self.static_filters, + manifest_idx_and_name, }; // The subgraph state tracks the state of the subgraph instance over time diff --git a/core/src/subgraph/loader.rs b/core/src/subgraph/loader.rs index be61306764c..442cc4d15f2 100644 --- a/core/src/subgraph/loader.rs +++ b/core/src/subgraph/loader.rs @@ -8,17 +8,21 @@ pub async fn load_dynamic_data_sources( store: Arc, logger: Logger, manifest: &SubgraphManifest, + manifest_idx_and_name: Vec<(u32, String)>, ) -> Result, Error> { let start_time = Instant::now(); let mut data_sources: Vec = vec![]; - for stored in store.load_dynamic_data_sources().await? { + for stored in store + .load_dynamic_data_sources(manifest_idx_and_name) + .await? + { let template = manifest .templates .iter() - .find(|template| template.name() == stored.name.as_str()) - .ok_or_else(|| anyhow!("no template named `{}` was found", stored.name))?; + .find(|template| template.manifest_idx() == stored.manifest_idx) + .ok_or_else(|| anyhow!("no template with idx `{}` was found", stored.manifest_idx))?; let ds = C::DataSource::from_stored_dynamic_data_source(&template, stored)?; diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index 7fe46acbe34..f4f74246130 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -7,6 +7,7 @@ use graph::blockchain::BlockchainKind; use graph::blockchain::BlockchainMap; use graph::components::store::{DeploymentId, DeploymentLocator, SubscriptionManager}; use graph::data::subgraph::schema::DeploymentCreate; +use graph::data::subgraph::Graft; use graph::prelude::{ CreateSubgraphResult, SubgraphAssignmentProvider as SubgraphAssignmentProviderTrait, SubgraphRegistrar as SubgraphRegistrarTrait, *, @@ -267,7 +268,8 @@ where hash: DeploymentHash, node_id: NodeId, debug_fork: Option, - start_block: Option, + start_block_override: Option, + graft_block_override: Option, ) -> Result { // We don't have a location for the subgraph yet; that will be // assigned when we deploy for real. For logging purposes, make up a @@ -303,7 +305,8 @@ where self.chains.cheap_clone(), name.clone(), hash.cheap_clone(), - start_block, + start_block_override, + graft_block_override, raw, node_id, debug_fork, @@ -319,7 +322,8 @@ where self.chains.cheap_clone(), name.clone(), hash.cheap_clone(), - start_block, + start_block_override, + graft_block_override, raw, node_id, debug_fork, @@ -335,7 +339,8 @@ where self.chains.cheap_clone(), name.clone(), hash.cheap_clone(), - start_block, + start_block_override, + graft_block_override, raw, node_id, debug_fork, @@ -351,7 +356,8 @@ where self.chains.cheap_clone(), name.clone(), hash.cheap_clone(), - start_block, + start_block_override, + graft_block_override, raw, node_id, debug_fork, @@ -469,25 +475,22 @@ async fn start_subgraph( } } -/// Resolves the subgraph's earliest block and the manifest's graft base block -async fn resolve_subgraph_chain_blocks( +/// Resolves the subgraph's earliest block +async fn resolve_start_block( manifest: &SubgraphManifest, - chain: Arc, + chain: &impl Blockchain, logger: &Logger, -) -> Result<(Option, Option<(DeploymentHash, BlockPtr)>), SubgraphRegistrarError> { - let logger1 = logger.clone(); - let graft = manifest.graft.clone(); - +) -> Result, SubgraphRegistrarError> { // If the minimum start block is 0 (i.e. the genesis block), // return `None` to start indexing from the genesis block. Otherwise // return a block pointer for the block with number `min_start_block - 1`. - let start_block_ptr = match manifest + match manifest .start_blocks() .into_iter() .min() .expect("cannot identify minimum start block because there are no data sources") { - 0 => None, + 0 => Ok(None), min_start_block => chain .block_pointer_from_number(logger, min_start_block - 1) .await @@ -496,31 +499,27 @@ async fn resolve_subgraph_chain_blocks( SubgraphRegistrarError::ManifestValidationError(vec![ SubgraphManifestValidationError::BlockNotFound(min_start_block.to_string()), ]) - })?, - }; + }), + } +} - let base_ptr = { - match graft { - None => None, - Some(base) => { - let base_block = base.block; - - chain - .block_pointer_from_number(&logger1, base.block) - .await - .map(|ptr| Some((base.base, ptr))) - .map_err(move |_| { - SubgraphRegistrarError::ManifestValidationError(vec![ - SubgraphManifestValidationError::BlockNotFound(format!( - "graft base block {} not found", - base_block - )), - ]) - })? - } - } - }; - Ok((start_block_ptr, base_ptr)) +/// Resolves the manifest's graft base block +async fn resolve_graft_block( + base: &Graft, + chain: &impl Blockchain, + logger: &Logger, +) -> Result { + chain + .block_pointer_from_number(logger, base.block) + .await + .map_err(|_| { + SubgraphRegistrarError::ManifestValidationError(vec![ + SubgraphManifestValidationError::BlockNotFound(format!( + "graft base block {} not found", + base.block + )), + ]) + }) } async fn create_subgraph_version( @@ -529,7 +528,8 @@ async fn create_subgraph_version( chains: Arc, name: SubgraphName, deployment: DeploymentHash, - start_block_ptr: Option, + start_block_override: Option, + graft_block_override: Option, raw: serde_yaml::Mapping, node_id: NodeId, debug_fork: Option, @@ -571,12 +571,20 @@ async fn create_subgraph_version( return Err(SubgraphRegistrarError::NameNotFound(name.to_string())); } - let (manifest_start_block, base_block) = - resolve_subgraph_chain_blocks(&manifest, chain, &logger.clone()).await?; - - let start_block = match start_block_ptr { + let start_block = match start_block_override { Some(block) => Some(block), - None => manifest_start_block, + None => resolve_start_block(&manifest, &*chain, &logger).await?, + }; + + let base_block = match &manifest.graft { + None => None, + Some(graft) => Some(( + graft.base.clone(), + match graft_block_override { + Some(block) => block, + None => resolve_graft_block(&graft, &*chain, &logger).await?, + }, + )), }; info!( diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner.rs index 18ba7b3448d..dc1c9351745 100644 --- a/core/src/subgraph/runner.rs +++ b/core/src/subgraph/runner.rs @@ -72,7 +72,7 @@ where // If a subgraph failed for deterministic reasons, before start indexing, we first // revert the deployment head. It should lead to the same result since the error was // deterministic. - if let Some(current_ptr) = self.inputs.store.block_ptr().await { + if let Some(current_ptr) = self.inputs.store.block_ptr() { if let Some(parent_ptr) = self .inputs .triggers_adapter @@ -375,6 +375,7 @@ where &self.metrics.host.stopwatch, data_sources, deterministic_errors, + self.inputs.manifest_idx_and_name.clone(), ) .await .context("Failed to transact block operations")?; @@ -677,6 +678,13 @@ where } } + if let Some(stop_block) = &self.inputs.stop_block { + if block_ptr.number >= *stop_block { + info!(self.logger, "stop block reached for subgraph"); + return Ok(Action::Stop); + } + } + if matches!(action, Action::Restart) { // Cancel the stream for real self.ctx @@ -689,13 +697,6 @@ where return Ok(Action::Restart); } - if let Some(stop_block) = &self.inputs.stop_block { - if block_ptr.number >= *stop_block { - info!(self.logger, "stop block reached for subgraph"); - return Ok(Action::Stop); - } - } - return Ok(Action::Continue); } Err(BlockProcessingError::Canceled) => { @@ -797,7 +798,7 @@ where // // Safe unwrap because in a Revert event we're sure the subgraph has // advanced at least once. - let subgraph_ptr = self.inputs.store.block_ptr().await.unwrap(); + let subgraph_ptr = self.inputs.store.block_ptr().unwrap(); if revert_to_ptr.number >= subgraph_ptr.number { info!(&self.logger, "Block to revert is higher than subgraph pointer, nothing to do"; "subgraph_ptr" => &subgraph_ptr, "revert_to_ptr" => &revert_to_ptr); return Ok(Action::Continue); diff --git a/core/src/subgraph/stream.rs b/core/src/subgraph/stream.rs index e97e8e01e58..9733f0d206d 100644 --- a/core/src/subgraph/stream.rs +++ b/core/src/subgraph/stream.rs @@ -18,12 +18,12 @@ pub async fn new_block_stream( false => BUFFERED_BLOCK_STREAM_SIZE, }; - let current_ptr = inputs.store.block_ptr().await; + let current_ptr = inputs.store.block_ptr(); let block_stream = match is_firehose { true => inputs.chain.new_firehose_block_stream( inputs.deployment.clone(), - inputs.store.block_cursor().await, + inputs.store.block_cursor(), inputs.start_blocks.clone(), current_ptr, Arc::new(filter.clone()), diff --git a/graph/src/blockchain/mock.rs b/graph/src/blockchain/mock.rs index fd48f59fc25..d519a29078d 100644 --- a/graph/src/blockchain/mock.rs +++ b/graph/src/blockchain/mock.rs @@ -129,6 +129,7 @@ impl UnresolvedDataSource for MockUnresolvedDataSource { self, _resolver: &Arc, _logger: &slog::Logger, + _manifest_idx: u32, ) -> Result { todo!() } @@ -149,6 +150,10 @@ impl DataSourceTemplate for MockDataSourceTemplate { fn name(&self) -> &str { todo!() } + + fn manifest_idx(&self) -> u32 { + todo!() + } } #[derive(Clone, Default, Deserialize)] @@ -160,6 +165,7 @@ impl UnresolvedDataSourceTemplate for MockUnresolvedDataSource self, _resolver: &Arc, _logger: &slog::Logger, + _manifest_idx: u32, ) -> Result { todo!() } diff --git a/graph/src/blockchain/mod.rs b/graph/src/blockchain/mod.rs index 6225a8dd244..71c44270ab9 100644 --- a/graph/src/blockchain/mod.rs +++ b/graph/src/blockchain/mod.rs @@ -241,6 +241,7 @@ pub trait UnresolvedDataSourceTemplate: self, resolver: &Arc, logger: &Logger, + manifest_idx: u32, ) -> Result; } @@ -248,6 +249,7 @@ pub trait DataSourceTemplate: Send + Sync + Debug { fn api_version(&self) -> semver::Version; fn runtime(&self) -> Option>>; fn name(&self) -> &str; + fn manifest_idx(&self) -> u32; } #[async_trait] @@ -258,6 +260,7 @@ pub trait UnresolvedDataSource: self, resolver: &Arc, logger: &Logger, + manifest_idx: u32, ) -> Result; } diff --git a/graph/src/components/store/cache.rs b/graph/src/components/store/cache.rs index 547d699b332..8207c23c5fc 100644 --- a/graph/src/components/store/cache.rs +++ b/graph/src/components/store/cache.rs @@ -4,6 +4,7 @@ use std::fmt::{self, Debug}; use std::sync::Arc; use crate::blockchain::BlockPtr; +use crate::blockchain::DataSource; use crate::components::store::{ self as s, Entity, EntityKey, EntityOp, EntityOperation, EntityType, }; @@ -188,7 +189,7 @@ impl EntityCache { } /// Add a dynamic data source - pub fn add_data_source(&mut self, data_source: &impl s::DataSource) { + pub fn add_data_source(&mut self, data_source: &impl DataSource) { self.data_sources .push(data_source.as_stored_dynamic_data_source()); } diff --git a/graph/src/components/store/mod.rs b/graph/src/components/store/mod.rs index 285dde2590c..5c342e4eb2f 100644 --- a/graph/src/components/store/mod.rs +++ b/graph/src/components/store/mod.rs @@ -19,7 +19,6 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; use std::time::Duration; -use crate::blockchain::DataSource; use crate::blockchain::{Block, Blockchain}; use crate::data::store::scalar::Bytes; use crate::data::store::*; @@ -822,9 +821,9 @@ pub enum UnfailOutcome { Unfailed, } -#[derive(Clone)] +#[derive(Clone, PartialEq, Eq, Debug)] pub struct StoredDynamicDataSource { - pub name: String, + pub manifest_idx: u32, pub param: Option, pub context: Option, pub creation_block: Option, @@ -1048,7 +1047,7 @@ pub enum DeploymentSchemaVersion { impl DeploymentSchemaVersion { // Latest schema version supported by this version of graph node. - pub const LATEST: Self = Self::V0; + pub const LATEST: Self = Self::V1; pub fn private_data_sources(self) -> bool { use DeploymentSchemaVersion::*; diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 223bfe55b77..a7122b1afc8 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -150,11 +150,11 @@ pub trait SubgraphStore: Send + Sync + 'static { #[async_trait] pub trait WritableStore: Send + Sync + 'static { /// Get a pointer to the most recently processed block in the subgraph. - async fn block_ptr(&self) -> Option; + fn block_ptr(&self) -> Option; /// Returns the Firehose `cursor` this deployment is currently at in the block stream of events. This /// is used when re-connecting a Firehose stream to start back exactly where we left off. - async fn block_cursor(&self) -> FirehoseCursor; + fn block_cursor(&self) -> FirehoseCursor; /// Start an existing subgraph deployment. async fn start_subgraph_deployment(&self, logger: &Logger) -> Result<(), StoreError>; @@ -204,6 +204,7 @@ pub trait WritableStore: Send + Sync + 'static { stopwatch: &StopwatchMetrics, data_sources: Vec, deterministic_errors: Vec, + manifest_idx_and_name: Vec<(u32, String)>, ) -> Result<(), StoreError>; /// Look up multiple entities as of the latest block. Returns a map of @@ -225,7 +226,10 @@ pub trait WritableStore: Send + Sync + 'static { fn unassign_subgraph(&self) -> Result<(), StoreError>; /// Load the dynamic data sources for the given deployment - async fn load_dynamic_data_sources(&self) -> Result, StoreError>; + async fn load_dynamic_data_sources( + &self, + manifest_idx_and_name: Vec<(u32, String)>, + ) -> Result, StoreError>; /// Report the name of the shard in which the subgraph is stored. This /// should only be used for reporting and monitoring diff --git a/graph/src/components/subgraph/registrar.rs b/graph/src/components/subgraph/registrar.rs index c96b9a22b40..cfb2c2ffa2c 100644 --- a/graph/src/components/subgraph/registrar.rs +++ b/graph/src/components/subgraph/registrar.rs @@ -42,7 +42,8 @@ pub trait SubgraphRegistrar: Send + Sync + 'static { hash: DeploymentHash, assignment_node_id: NodeId, debug_fork: Option, - start_block: Option, + start_block_block: Option, + graft_block_override: Option, ) -> Result; async fn remove_subgraph(&self, name: SubgraphName) -> Result<(), SubgraphRegistrarError>; diff --git a/graph/src/data/subgraph/mod.rs b/graph/src/data/subgraph/mod.rs index ef7b202638e..ae1d634e053 100644 --- a/graph/src/data/subgraph/mod.rs +++ b/graph/src/data/subgraph/mod.rs @@ -764,16 +764,27 @@ impl UnresolvedSubgraphManifest { )); } + let ds_count = data_sources.len(); + if ds_count as u64 + templates.len() as u64 > u32::MAX as u64 { + return Err(anyhow!( + "Subgraph has too many declared data sources and templates", + )); + } + let (schema, data_sources, templates, offchain_data_sources) = try_join4( schema.resolve(id.clone(), &resolver, logger), data_sources .into_iter() - .map(|ds| ds.resolve(&resolver, logger)) + .enumerate() + .map(|(idx, ds)| ds.resolve(&resolver, logger, idx as u32)) .collect::>() .try_collect::>(), templates .into_iter() - .map(|template| template.resolve(&resolver, logger)) + .enumerate() + .map(|(idx, template)| { + template.resolve(&resolver, logger, ds_count as u32 + idx as u32) + }) .collect::>() .try_collect::>(), offchain_data_sources diff --git a/graph/tests/entity_cache.rs b/graph/tests/entity_cache.rs index 598eea0339a..d3f3ec72d83 100644 --- a/graph/tests/entity_cache.rs +++ b/graph/tests/entity_cache.rs @@ -46,11 +46,11 @@ impl MockStore { #[async_trait] impl WritableStore for MockStore { - async fn block_ptr(&self) -> Option { + fn block_ptr(&self) -> Option { unimplemented!() } - async fn block_cursor(&self) -> FirehoseCursor { + fn block_cursor(&self) -> FirehoseCursor { unimplemented!() } @@ -107,6 +107,7 @@ impl WritableStore for MockStore { _: &StopwatchMetrics, _: Vec, _: Vec, + _: Vec<(u32, String)>, ) -> Result<(), StoreError> { unimplemented!() } @@ -126,7 +127,10 @@ impl WritableStore for MockStore { unimplemented!() } - async fn load_dynamic_data_sources(&self) -> Result, StoreError> { + async fn load_dynamic_data_sources( + &self, + _manifest_idx_and_name: Vec<(u32, String)>, + ) -> Result, StoreError> { unimplemented!() } diff --git a/node/src/main.rs b/node/src/main.rs index 4ff21436fae..16f43abddaa 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -490,6 +490,7 @@ async fn main() { node_id, debug_fork, start_block, + None, ) .await } diff --git a/node/src/manager/commands/run.rs b/node/src/manager/commands/run.rs index dd6e9672ded..809bed6979c 100644 --- a/node/src/manager/commands/run.rs +++ b/node/src/manager/commands/run.rs @@ -205,6 +205,7 @@ pub async fn run( node_id.clone(), None, None, + None, ) .await?; diff --git a/runtime/test/src/common.rs b/runtime/test/src/common.rs index acd51fe2f04..e548a41ebc6 100644 --- a/runtime/test/src/common.rs +++ b/runtime/test/src/common.rs @@ -30,6 +30,7 @@ fn mock_host_exports( let templates = vec![DataSourceTemplate { kind: String::from("ethereum/contract"), name: String::from("example template"), + manifest_idx: 0, network: Some(String::from("mainnet")), source: TemplateSource { abi: String::from("foo"), @@ -120,6 +121,7 @@ pub fn mock_data_source(path: &str, api_version: Version) -> DataSource { DataSource { kind: String::from("ethereum/contract"), name: String::from("example data source"), + manifest_idx: 0, network: Some(String::from("mainnet")), address: Some(Address::from_str("0123123123012312312301231231230123123123").unwrap()), start_block: 0, diff --git a/server/json-rpc/src/lib.rs b/server/json-rpc/src/lib.rs index b58bdbe2f1f..d076a8d1c16 100644 --- a/server/json-rpc/src/lib.rs +++ b/server/json-rpc/src/lib.rs @@ -92,6 +92,7 @@ impl JsonRpcServer { // Here it doesn't make sense to receive another // startBlock, we'll use the one from the manifest. None, + None, ) .await { diff --git a/store/postgres/src/copy.rs b/store/postgres/src/copy.rs index 66ef865b5ba..dfa7f17ebdd 100644 --- a/store/postgres/src/copy.rs +++ b/store/postgres/src/copy.rs @@ -599,6 +599,15 @@ impl Connection { target_block: BlockPtr, ) -> Result { let logger = logger.new(o!("dst" => dst.site.namespace.to_string())); + + if src.site.schema_version != dst.site.schema_version { + return Err(StoreError::ConstraintViolation(format!( + "attempted to copy between different schema versions, \ + source version is {} but destination version is {}", + src.site.schema_version, dst.site.schema_version + ))); + } + let mut last_log = Instant::now(); let conn = pool.get_fdw(&logger, || { if last_log.elapsed() > LOG_INTERVAL { diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index 1fb86666afa..fdc04af7574 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -973,6 +973,7 @@ impl DeploymentStore { stopwatch: &StopwatchMetrics, data_sources: &[StoredDynamicDataSource], deterministic_errors: &[SubgraphError], + manifest_idx_and_name: &[(u32, String)], ) -> Result { // All operations should apply only to data or metadata for this subgraph if mods @@ -1010,7 +1011,13 @@ impl DeploymentStore { )?; section.end(); - dynds::insert(&conn, &site, data_sources, block_ptr_to)?; + dynds::insert( + &conn, + &site, + data_sources, + block_ptr_to, + manifest_idx_and_name, + )?; if !deterministic_errors.is_empty() { deployment::insert_subgraph_errors( @@ -1178,9 +1185,10 @@ impl DeploymentStore { &self, site: Arc, block: BlockNumber, + manifest_idx_and_name: Vec<(u32, String)>, ) -> Result, StoreError> { self.with_conn(move |conn, _| { - conn.transaction(|| crate::dynds::load(&conn, &site, block)) + conn.transaction(|| crate::dynds::load(&conn, &site, block, manifest_idx_and_name)) .map_err(Into::into) }) .await @@ -1250,7 +1258,7 @@ impl DeploymentStore { conn.transaction(|| -> Result<(), StoreError> { // Copy dynamic data sources and adjust their ID let start = Instant::now(); - let count = dynds::copy(&conn, &src.site, &dst.site, &block)?; + let count = dynds::copy(&conn, &src.site, &dst.site, block.number)?; info!(logger, "Copied {} dynamic data sources", count; "time_ms" => start.elapsed().as_millis()); diff --git a/store/postgres/src/dynds/mod.rs b/store/postgres/src/dynds/mod.rs index f241624b7bc..2f4ebcea766 100644 --- a/store/postgres/src/dynds/mod.rs +++ b/store/postgres/src/dynds/mod.rs @@ -15,10 +15,11 @@ pub fn load( conn: &PgConnection, site: &Site, block: BlockNumber, + manifest_idx_and_name: Vec<(u32, String)>, ) -> Result, StoreError> { match site.schema_version.private_data_sources() { true => DataSourcesTable::new(site.namespace.clone()).load(conn, block), - false => shared::load(conn, site.deployment.as_str(), block), + false => shared::load(conn, site.deployment.as_str(), block, manifest_idx_and_name), } } @@ -27,6 +28,7 @@ pub(crate) fn insert( site: &Site, data_sources: &[StoredDynamicDataSource], block_ptr: &BlockPtr, + manifest_idx_and_name: &[(u32, String)], ) -> Result { match site.schema_version.private_data_sources() { true => DataSourcesTable::new(site.namespace.clone()).insert( @@ -34,7 +36,13 @@ pub(crate) fn insert( data_sources, block_ptr.number, ), - false => shared::insert(conn, &site.deployment, data_sources, block_ptr), + false => shared::insert( + conn, + &site.deployment, + data_sources, + block_ptr, + manifest_idx_and_name, + ), } } @@ -42,17 +50,14 @@ pub(crate) fn copy( conn: &PgConnection, src: &Site, dst: &Site, - target_block: &BlockPtr, + target_block: BlockNumber, ) -> Result { - if src.schema_version != dst.schema_version { - return Err(StoreError::ConstraintViolation(format!( - "attempted to copy between different schema versions, \ - source version is {} but destination version is {}", - src.schema_version, dst.schema_version - ))); - } match src.schema_version.private_data_sources() { - true => todo!(), + true => DataSourcesTable::new(src.namespace.clone()).copy_to( + conn, + &DataSourcesTable::new(dst.namespace.clone()), + target_block, + ), false => shared::copy(conn, src, dst, target_block), } } diff --git a/store/postgres/src/dynds/private.rs b/store/postgres/src/dynds/private.rs index a253b2d5a5f..75246c2f6ee 100644 --- a/store/postgres/src/dynds/private.rs +++ b/store/postgres/src/dynds/private.rs @@ -1,15 +1,15 @@ -#![allow(dead_code)] -#![allow(unused_variables)] -#![allow(unused_imports)] +use std::ops::Bound; use diesel::{ pg::types::sql_types, + sql_query, sql_types::{Binary, Integer, Jsonb, Nullable}, - types::Bytea, PgConnection, QueryDsl, RunQueryDsl, }; + use graph::{ components::store::StoredDynamicDataSource, + constraint_violation, prelude::{serde_json, BlockNumber, StoreError}, }; @@ -24,7 +24,7 @@ pub(crate) struct DataSourcesTable { qname: String, table: DynTable, block_range: DynColumn>, - causality_region: DynColumn, + _causality_region: DynColumn, manifest_idx: DynColumn, param: DynColumn>, context: DynColumn>, @@ -41,7 +41,7 @@ impl DataSourcesTable { qname: format!("{}.{}", namespace, Self::TABLE_NAME), namespace, block_range: table.column("block_range"), - causality_region: table.column("causality_region"), + _causality_region: table.column("causality_region"), manifest_idx: table.column("manifest_idx"), param: table.column("param"), context: table.column("context"), @@ -78,16 +78,41 @@ impl DataSourcesTable { conn: &PgConnection, block: BlockNumber, ) -> Result, StoreError> { - // self.table - // .select(( - // self.block_range, - // self.manifest_idx, - // self.param, - // self.context, - // )) - // .load(conn)?; - - todo!() + type Tuple = ( + (Bound, Bound), + i32, + Option>, + Option, + ); + let tuples = self + .table + .clone() + .filter(diesel::dsl::sql("block_range @> ").bind::(block)) + .select(( + &self.block_range, + &self.manifest_idx, + &self.param, + &self.context, + )) + .load::(conn)?; + + Ok(tuples + .into_iter() + .map(|(block_range, manifest_idx, param, context)| { + let creation_block = match block_range.0 { + Bound::Included(block) => Some(block), + + // Should never happen. + Bound::Excluded(_) | Bound::Unbounded => unreachable!("dds with open creation"), + }; + StoredDynamicDataSource { + manifest_idx: manifest_idx as u32, + param: param.map(|p| p.into()), + context, + creation_block, + } + }) + .collect()) } pub(crate) fn insert( @@ -96,10 +121,95 @@ impl DataSourcesTable { data_sources: &[StoredDynamicDataSource], block: BlockNumber, ) -> Result { - todo!() + // Currently all data sources share the same causality region. + let causality_region = 0; + + let mut inserted_total = 0; + + for ds in data_sources { + let StoredDynamicDataSource { + manifest_idx, + param, + context, + creation_block, + } = ds; + + if creation_block != &Some(block) { + return Err(constraint_violation!( + "mismatching creation blocks `{:?}` and `{}`", + creation_block, + block + )); + } + + let query = format!( + "insert into {}(block_range, manifest_idx, causality_region, param, context) \ + values (int4range($1, null), $2, $3, $4, $5)", + self.qname + ); + + inserted_total += sql_query(query) + .bind::, _>(creation_block) + .bind::(*manifest_idx as i32) + .bind::(causality_region) + .bind::, _>(param.as_ref().map(|p| &**p)) + .bind::, _>(context) + .execute(conn)?; + } + + Ok(inserted_total) } pub(crate) fn revert(&self, conn: &PgConnection, block: BlockNumber) -> Result<(), StoreError> { - todo!() + // Use `@>` to leverage the gist index. + // This assumes all ranges are of the form [x, +inf). + let query = format!( + "delete from {} where block_range @> $1 and lower(block_range) = $1", + self.qname + ); + sql_query(query).bind::(block).execute(conn)?; + Ok(()) + } + + /// Copy the dynamic data sources from `self` to `dst`. All data sources that + /// were created up to and including `target_block` will be copied. + pub(super) fn copy_to( + &self, + conn: &PgConnection, + dst: &DataSourcesTable, + target_block: BlockNumber, + ) -> Result { + // Check if there are any data sources for dst which indicates we already copied + let count = dst.table.clone().count().get_result::(conn)?; + if count > 0 { + return Ok(count as usize); + } + + let query = format!( + "\ + insert into {dst}(block_range, causality_region, manifest_idx, parent, id, param, context) + select case + when upper(e.block_range) <= $1 then e.block_range + else int4range(lower(e.block_range), null) + end, + e.causality_region, e.manifest_idx, e.parent, e.id, e.param, e.context + from {src} e + where lower(e.block_range) <= $1 + ", + src = self.qname, + dst = dst.qname + ); + + let count = sql_query(&query) + .bind::(target_block) + .execute(conn)?; + + // Test that both tables have the same contents. + debug_assert!( + self.load(conn, target_block).map_err(|e| e.to_string()) + == dst.load(conn, target_block).map_err(|e| e.to_string()) + ); + + Ok(count) } } diff --git a/store/postgres/src/dynds/shared.rs b/store/postgres/src/dynds/shared.rs index 9303953bb30..961a28161fb 100644 --- a/store/postgres/src/dynds/shared.rs +++ b/store/postgres/src/dynds/shared.rs @@ -40,6 +40,7 @@ pub(super) fn load( conn: &PgConnection, id: &str, block: BlockNumber, + manifest_idx_and_name: Vec<(u32, String)>, ) -> Result, StoreError> { use dynamic_ethereum_contract_data_source as decds; @@ -69,9 +70,14 @@ pub(super) fn load( )); } + let manifest_idx = manifest_idx_and_name + .iter() + .find(|(_, manifest_name)| manifest_name == &name) + .ok_or_else(|| constraint_violation!("data source name {} not found", name))? + .0; let creation_block = creation_block.to_i32(); let data_source = StoredDynamicDataSource { - name, + manifest_idx, param: Some(address.into()), context: context.map(|ctx| serde_json::from_str(&ctx)).transpose()?, creation_block, @@ -93,6 +99,7 @@ pub(super) fn insert( deployment: &DeploymentHash, data_sources: &[StoredDynamicDataSource], block_ptr: &BlockPtr, + manifest_idx_and_name: &[(u32, String)], ) -> Result { use dynamic_ethereum_contract_data_source as decds; @@ -105,7 +112,7 @@ pub(super) fn insert( .into_iter() .map(|ds| { let StoredDynamicDataSource { - name, + manifest_idx: _, param, context, creation_block: _, @@ -114,11 +121,16 @@ pub(super) fn insert( Some(param) => param, None => { return Err(constraint_violation!( - "dynamic data sources must have an address, but `{}` has none", - name + "dynamic data sources must have an addres", )); } }; + let name = manifest_idx_and_name + .iter() + .find(|(idx, _)| *idx == ds.manifest_idx) + .ok_or_else(|| constraint_violation!("manifest idx {} not found", ds.manifest_idx))? + .1 + .clone(); Ok(( decds::deployment.eq(deployment.as_str()), decds::name.eq(name), @@ -146,7 +158,7 @@ pub(super) fn copy( conn: &PgConnection, src: &Site, dst: &Site, - target_block: &BlockPtr, + target_block: BlockNumber, ) -> Result { use dynamic_ethereum_contract_data_source as decds; @@ -183,7 +195,7 @@ pub(super) fn copy( Ok(sql_query(&query) .bind::(src.deployment.as_str()) .bind::(dst.deployment.as_str()) - .bind::(target_block.number) + .bind::(target_block) .execute(conn)?) } diff --git a/store/postgres/src/primary.rs b/store/postgres/src/primary.rs index c54d75323b9..90717784a2e 100644 --- a/store/postgres/src/primary.rs +++ b/store/postgres/src/primary.rs @@ -1087,12 +1087,12 @@ impl<'a> Connection<'a> { shard: Shard, subgraph: &DeploymentHash, network: String, + schema_version: DeploymentSchemaVersion, ) -> Result { if let Some(site) = queries::find_active_site(self.conn.as_ref(), subgraph)? { return Ok(site); } - let schema_version = DeploymentSchemaVersion::LATEST; self.create_site(shard, subgraph.clone(), network, schema_version, true) } @@ -1110,14 +1110,6 @@ impl<'a> Connection<'a> { return Ok(site); } - if src.schema_version != DeploymentSchemaVersion::LATEST { - return Err(StoreError::Unknown(anyhow!( - "Attempted to copy from deployment {} which is on an old schema version. - This means a schema migration is ongoing, please try again later.", - src.id - ))); - } - self.create_site( shard, src.deployment.clone(), diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index 3457ff500c2..a47723b8310 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -15,7 +15,10 @@ use graph::{ cheap_clone::CheapClone, components::{ server::index_node::VersionInfo, - store::{self, BlockStore, DeploymentLocator, EnsLookup as EnsLookupTrait, SubgraphFork}, + store::{ + self, BlockStore, DeploymentLocator, DeploymentSchemaVersion, + EnsLookup as EnsLookupTrait, SubgraphFork, + }, }, constraint_violation, data::query::QueryTarget, @@ -495,6 +498,12 @@ impl SubgraphStoreInner { #[cfg(not(debug_assertions))] assert!(!replace); + let graft_base = deployment + .graft_base + .as_ref() + .map(|base| self.layout(base)) + .transpose()?; + let (site, node_id) = { // We need to deal with two situations: // (1) We are really creating a new subgraph; it therefore needs @@ -507,19 +516,17 @@ impl SubgraphStoreInner { // assignment that we used last time to avoid creating // the same deployment in another shard let (shard, node_id) = self.place(&name, &network_name, node_id)?; + let schema_version = match &graft_base { + None => DeploymentSchemaVersion::LATEST, + Some(src_layout) => src_layout.site.schema_version, + }; let conn = self.primary_conn()?; - let site = conn.allocate_site(shard, &schema.id, network_name)?; + let site = conn.allocate_site(shard, &schema.id, network_name, schema_version)?; let node_id = conn.assigned_node(&site)?.unwrap_or(node_id); (site, node_id) }; let site = Arc::new(site); - let graft_base = deployment - .graft_base - .as_ref() - .map(|base| self.layout(base)) - .transpose()?; - if let Some(graft_base) = &graft_base { self.primary_conn()? .record_active_copy(graft_base.site.as_ref(), site.as_ref())?; diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index d0e29693541..947e8869db5 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -252,6 +252,7 @@ impl SyncStore { stopwatch: &StopwatchMetrics, data_sources: &[StoredDynamicDataSource], deterministic_errors: &[SubgraphError], + manifest_idx_and_name: &[(u32, String)], ) -> Result<(), StoreError> { fn same_subgraph(mods: &[EntityModification], id: &DeploymentHash) -> bool { mods.iter().all(|md| &md.entity_key().subgraph_id == id) @@ -270,6 +271,7 @@ impl SyncStore { stopwatch, data_sources, deterministic_errors, + manifest_idx_and_name, )?; let _section = stopwatch.start_section("send_store_event"); @@ -311,10 +313,15 @@ impl SyncStore { async fn load_dynamic_data_sources( &self, block: BlockNumber, + manifest_idx_and_name: Vec<(u32, String)>, ) -> Result, StoreError> { self.retry_async("load_dynamic_data_sources", || async { self.writable - .load_dynamic_data_sources(self.site.cheap_clone(), block) + .load_dynamic_data_sources( + self.site.cheap_clone(), + block, + manifest_idx_and_name.clone(), + ) .await }) .await @@ -420,6 +427,7 @@ enum Request { mods: Vec, data_sources: Vec, deterministic_errors: Vec, + manifest_idx_and_name: Vec<(u32, String)>, }, RevertTo { store: Arc, @@ -440,6 +448,7 @@ impl Request { mods, data_sources, deterministic_errors, + manifest_idx_and_name, } => store.transact_block_operations( block_ptr_to, firehose_cursor, @@ -447,6 +456,7 @@ impl Request { stopwatch, data_sources, deterministic_errors, + manifest_idx_and_name, ), Request::RevertTo { store, @@ -734,7 +744,10 @@ impl Queue { } /// Load dynamic data sources by looking at both the queue and the store - async fn load_dynamic_data_sources(&self) -> Result, StoreError> { + async fn load_dynamic_data_sources( + &self, + manifest_idx_and_name: Vec<(u32, String)>, + ) -> Result, StoreError> { // See the implementation of `get` for how we handle reverts let mut tracker = BlockTracker::new(); @@ -768,7 +781,7 @@ impl Queue { let mut dds = self .store - .load_dynamic_data_sources(tracker.query_block()) + .load_dynamic_data_sources(tracker.query_block(), manifest_idx_and_name) .await?; dds.append(&mut queue_dds); @@ -805,6 +818,7 @@ impl Writer { stopwatch: &StopwatchMetrics, data_sources: Vec, deterministic_errors: Vec, + manifest_idx_and_name: Vec<(u32, String)>, ) -> Result<(), StoreError> { match self { Writer::Sync(store) => store.transact_block_operations( @@ -814,6 +828,7 @@ impl Writer { &stopwatch, &data_sources, &deterministic_errors, + &manifest_idx_and_name, ), Writer::Async(queue) => { let req = Request::Write { @@ -824,6 +839,7 @@ impl Writer { mods, data_sources, deterministic_errors, + manifest_idx_and_name, }; queue.push(req).await } @@ -872,10 +888,17 @@ impl Writer { } } - async fn load_dynamic_data_sources(&self) -> Result, StoreError> { + async fn load_dynamic_data_sources( + &self, + manifest_idx_and_name: Vec<(u32, String)>, + ) -> Result, StoreError> { match self { - Writer::Sync(store) => store.load_dynamic_data_sources(BLOCK_NUMBER_MAX).await, - Writer::Async(queue) => queue.load_dynamic_data_sources().await, + Writer::Sync(store) => { + store + .load_dynamic_data_sources(BLOCK_NUMBER_MAX, manifest_idx_and_name) + .await + } + Writer::Async(queue) => queue.load_dynamic_data_sources(manifest_idx_and_name).await, } } } @@ -915,11 +938,11 @@ impl WritableStore { #[async_trait::async_trait] impl WritableStoreTrait for WritableStore { - async fn block_ptr(&self) -> Option { + fn block_ptr(&self) -> Option { self.block_ptr.lock().unwrap().clone() } - async fn block_cursor(&self) -> FirehoseCursor { + fn block_cursor(&self) -> FirehoseCursor { self.block_cursor.lock().unwrap().clone() } @@ -995,6 +1018,7 @@ impl WritableStoreTrait for WritableStore { stopwatch: &StopwatchMetrics, data_sources: Vec, deterministic_errors: Vec, + manifest_idx_and_name: Vec<(u32, String)>, ) -> Result<(), StoreError> { self.writer .write( @@ -1004,6 +1028,7 @@ impl WritableStoreTrait for WritableStore { stopwatch, data_sources, deterministic_errors, + manifest_idx_and_name, ) .await?; @@ -1032,8 +1057,13 @@ impl WritableStoreTrait for WritableStore { self.store.unassign_subgraph() } - async fn load_dynamic_data_sources(&self) -> Result, StoreError> { - self.writer.load_dynamic_data_sources().await + async fn load_dynamic_data_sources( + &self, + manifest_idx_and_name: Vec<(u32, String)>, + ) -> Result, StoreError> { + self.writer + .load_dynamic_data_sources(manifest_idx_and_name) + .await } fn shard(&self) -> &str { diff --git a/store/postgres/tests/store.rs b/store/postgres/tests/store.rs index 5f9cda3f210..f74f9878299 100644 --- a/store/postgres/tests/store.rs +++ b/store/postgres/tests/store.rs @@ -1148,6 +1148,7 @@ fn mock_data_source() -> graph_chain_ethereum::DataSource { graph_chain_ethereum::DataSource { kind: String::from("ethereum/contract"), name: String::from("example data source"), + manifest_idx: 0, network: Some(String::from("mainnet")), address: Some(Address::from_str("0123123123012312312301231231230123123123").unwrap()), start_block: 0, @@ -1209,7 +1210,8 @@ fn revert_block_with_dynamic_data_source_operations() { let original_user = writable.get(&user_key).unwrap().expect("missing entity"); // Create operations to add a dynamic data source - let data_source = mock_data_source(); + let mut data_source = mock_data_source(); + let manifest_idx_and_name = vec![(0, "example data source".to_string())]; let ops = vec![EntityOperation::Set { key: user_key.clone(), @@ -1217,12 +1219,14 @@ fn revert_block_with_dynamic_data_source_operations() { }]; // Add user and dynamic data source to the store + data_source.creation_block = Some(TEST_BLOCK_3_PTR.number); transact_entities_and_dynamic_data_sources( &subgraph_store, deployment.clone(), TEST_BLOCK_3_PTR.clone(), vec![data_source.as_stored_dynamic_data_source()], ops, + manifest_idx_and_name.clone(), ) .await .unwrap(); @@ -1234,7 +1238,10 @@ fn revert_block_with_dynamic_data_source_operations() { ); // Verify that the dynamic data source exists afterwards - let loaded_dds = writable.load_dynamic_data_sources().await.unwrap(); + let loaded_dds = writable + .load_dynamic_data_sources(manifest_idx_and_name.clone()) + .await + .unwrap(); assert_eq!(1, loaded_dds.len()); assert_eq!( data_source.address.unwrap().0, @@ -1253,7 +1260,10 @@ fn revert_block_with_dynamic_data_source_operations() { ); // Verify that the dynamic data source is gone after the reversion - let loaded_dds = writable.load_dynamic_data_sources().await.unwrap(); + let loaded_dds = writable + .load_dynamic_data_sources(manifest_idx_and_name) + .await + .unwrap(); assert_eq!(0, loaded_dds.len()); // Verify that the right change events were emitted for the reversion @@ -1567,6 +1577,7 @@ fn handle_large_string_with_index() { &stopwatch_metrics, Vec::new(), Vec::new(), + Vec::new(), ) .await .expect("Failed to insert large text"); @@ -1664,6 +1675,7 @@ fn handle_large_bytea_with_index() { &stopwatch_metrics, Vec::new(), Vec::new(), + Vec::new(), ) .await .expect("Failed to insert large text"); diff --git a/store/postgres/tests/subgraph.rs b/store/postgres/tests/subgraph.rs index 68b0bf4e03e..c1f4c0f564f 100644 --- a/store/postgres/tests/subgraph.rs +++ b/store/postgres/tests/subgraph.rs @@ -59,7 +59,6 @@ async fn latest_block(store: &Store, deployment_id: DeploymentId) -> BlockPtr { .await .expect("can get writable") .block_ptr() - .await .unwrap() } diff --git a/store/test-store/src/store.rs b/store/test-store/src/store.rs index 350e6f90326..b69be23b240 100644 --- a/store/test-store/src/store.rs +++ b/store/test-store/src/store.rs @@ -233,6 +233,7 @@ pub async fn transact_errors( &stopwatch_metrics, Vec::new(), errs, + Vec::new(), ) .await?; flush(deployment).await @@ -245,8 +246,15 @@ pub async fn transact_entity_operations( block_ptr_to: BlockPtr, ops: Vec, ) -> Result<(), StoreError> { - transact_entities_and_dynamic_data_sources(store, deployment.clone(), block_ptr_to, vec![], ops) - .await + transact_entities_and_dynamic_data_sources( + store, + deployment.clone(), + block_ptr_to, + vec![], + ops, + vec![], + ) + .await } /// Convenience to transact EntityOperation instead of EntityModification and wait for the store to process the operations @@ -262,6 +270,7 @@ pub async fn transact_and_wait( block_ptr_to, vec![], ops, + vec![], ) .await?; flush(deployment).await @@ -273,6 +282,7 @@ pub async fn transact_entities_and_dynamic_data_sources( block_ptr_to: BlockPtr, data_sources: Vec, ops: Vec, + manifest_idx_and_name: Vec<(u32, String)>, ) -> Result<(), StoreError> { let store = futures03::executor::block_on(store.cheap_clone().writable(LOGGER.clone(), deployment.id))?; @@ -297,6 +307,7 @@ pub async fn transact_entities_and_dynamic_data_sources( &stopwatch_metrics, data_sources, Vec::new(), + manifest_idx_and_name, ) .await } diff --git a/tests/integration-tests/data-source-revert/grafted.yaml b/tests/integration-tests/data-source-revert/grafted.yaml new file mode 100644 index 00000000000..3cb184beeec --- /dev/null +++ b/tests/integration-tests/data-source-revert/grafted.yaml @@ -0,0 +1,46 @@ +specVersion: 0.0.4 +features: + - grafting +schema: + file: ./schema.graphql +graft: + # Must match the id from building `subgraph.yaml` + base: QmRhW72iAE6AEY6fiL9nPt5ZVffzbq9XswKDbH9LC3JPUh + block: 3 +dataSources: + - kind: ethereum/contract + name: Contract + network: test + source: + address: "0xCfEB869F69431e42cdB54A4F4f105C19C080A601" + abi: Contract + mapping: + kind: ethereum/events + apiVersion: 0.0.5 + language: wasm/assemblyscript + entities: + - Gravatar + abis: + - name: Contract + file: ./abis/Contract.abi + blockHandlers: + - handler: handleBlock + file: ./src/mapping.ts +templates: + - kind: ethereum/contract + name: Template + network: test + source: + abi: Contract + mapping: + kind: ethereum/events + apiVersion: 0.0.5 + language: wasm/assemblyscript + entities: + - Gravatar + abis: + - name: Contract + file: ./abis/Contract.abi + blockHandlers: + - handler: handleBlockTemplate + file: ./src/mapping.ts diff --git a/tests/integration-tests/data-source-revert/package.json b/tests/integration-tests/data-source-revert/package.json index 2969afb3e33..73208954aa4 100644 --- a/tests/integration-tests/data-source-revert/package.json +++ b/tests/integration-tests/data-source-revert/package.json @@ -3,11 +3,11 @@ "version": "0.1.0", "scripts": { "codegen": "graph codegen", - "create:test": "graph create test/data-source-revert --node $GRAPH_NODE_ADMIN_URI", - "deploy:test": "graph deploy test/data-source-revert --version-label v0.0.1 --ipfs $IPFS_URI --node $GRAPH_NODE_ADMIN_URI" + "deploy:test": "graph deploy test/data-source-revert --version-label v0.0.1 --ipfs $IPFS_URI --node $GRAPH_NODE_ADMIN_URI", + "deploy:test-grafted": "graph deploy test/data-source-revert-grafted grafted.yaml --version-label v0.0.1 --ipfs $IPFS_URI --node $GRAPH_NODE_ADMIN_URI" }, "devDependencies": { "@graphprotocol/graph-cli": "https://github.com/graphprotocol/graph-cli#main", "@graphprotocol/graph-ts": "https://github.com/graphprotocol/graph-ts#main" } -} +} \ No newline at end of file diff --git a/tests/integration-tests/data-source-revert/subgraph.yaml b/tests/integration-tests/data-source-revert/subgraph.yaml index 9cacaac58bb..87f85563093 100644 --- a/tests/integration-tests/data-source-revert/subgraph.yaml +++ b/tests/integration-tests/data-source-revert/subgraph.yaml @@ -1,9 +1,6 @@ specVersion: 0.0.4 -repository: https://github.com/graphprotocol/example-subgraph schema: file: ./schema.graphql -features: - - nonFatalErrors dataSources: - kind: ethereum/contract name: Contract diff --git a/tests/src/fixture.rs b/tests/src/fixture.rs index b8be34364ca..6489cb70eed 100644 --- a/tests/src/fixture.rs +++ b/tests/src/fixture.rs @@ -3,6 +3,7 @@ pub mod ethereum; use std::marker::PhantomData; use std::process::Command; use std::sync::Mutex; +use std::time::Duration; use crate::helpers::run_cmd; use anyhow::Error; @@ -21,8 +22,9 @@ use graph::env::ENV_VARS; use graph::ipfs_client::IpfsClient; use graph::prelude::ethabi::ethereum_types::H256; use graph::prelude::{ - async_trait, BlockNumber, DeploymentHash, LoggerFactory, MetricsRegistry, NodeId, SubgraphName, - SubgraphRegistrar, SubgraphStore as _, SubgraphVersionSwitchingMode, + async_trait, BlockNumber, DeploymentHash, LoggerFactory, MetricsRegistry, NodeId, + SubgraphAssignmentProvider, SubgraphName, SubgraphRegistrar, SubgraphStore as _, + SubgraphVersionSwitchingMode, }; use graph_core::{ LinkResolver, SubgraphAssignmentProvider as IpfsSubgraphAssignmentProvider, @@ -32,7 +34,7 @@ use graph_mock::MockMetricsRegistry; use graph_node::manager::PanicSubscriptionManager; use graph_node::{config::Config, store_builder::StoreBuilder}; use graph_store_postgres::{ChainHeadUpdateListener, ChainStore, Store, SubgraphStore}; -use slog::Logger; +use slog::{info, Logger}; use std::env::VarError; use std::pin::Pin; use std::sync::Arc; @@ -42,6 +44,10 @@ use tokio::fs::read_to_string; const NODE_ID: &str = "default"; pub async fn build_subgraph(dir: &str) -> DeploymentHash { + build_subgraph_with_yarn_cmd(dir, "deploy:test").await +} + +pub async fn build_subgraph_with_yarn_cmd(dir: &str, yarn_cmd: &str) -> DeploymentHash { // Test that IPFS is up. IpfsClient::localhost() .test() @@ -64,7 +70,7 @@ pub async fn build_subgraph(dir: &str) -> DeploymentHash { // is fake and the actual deploy call is meant to fail. let deploy_output = run_cmd( Command::new("yarn") - .arg("deploy:test") + .arg(yarn_cmd) .env("IPFS_URI", "http://127.0.0.1:5001") .env("GRAPH_NODE_ADMIN_URI", "http://localhost:0") .current_dir(dir), @@ -89,14 +95,27 @@ pub fn test_ptr(n: BlockNumber) -> BlockPtr { } } pub struct TestContext { - pub logger_factory: LoggerFactory, + pub logger: Logger, pub provider: Arc< IpfsSubgraphAssignmentProvider< SubgraphInstanceManager, >, >, pub store: Arc, - pub deployment_locator: DeploymentLocator, + pub deployment: DeploymentLocator, +} + +impl TestContext { + pub async fn start_and_sync_to(&self, stop_block: BlockPtr) { + self.provider + .start(self.deployment.clone(), Some(stop_block.number)) + .await + .expect("unable to start subgraph"); + + wait_for_sync(&self.logger, &self.store, &self.deployment.hash, stop_block) + .await + .unwrap(); + } } pub struct Stores { @@ -162,7 +181,8 @@ pub async fn setup( subgraph_name: SubgraphName, hash: &DeploymentHash, stores: &Stores, - chain: C, + chain: Arc, + graft_block: Option, ) -> TestContext { let logger = graph::log::logger(true); let logger_factory = LoggerFactory::new(logger.clone(), None); @@ -174,7 +194,7 @@ pub async fn setup( cleanup(&subgraph_store, &subgraph_name, hash); let mut blockchain_map = BlockchainMap::new(); - blockchain_map.insert(stores.network_name.clone(), Arc::new(chain)); + blockchain_map.insert(stores.network_name.clone(), chain); let static_filters = ENV_VARS.experimental_static_filters; @@ -215,22 +235,23 @@ pub async fn setup( .await .expect("unable to create subgraph"); - let deployment_locator = SubgraphRegistrar::create_subgraph_version( + let deployment = SubgraphRegistrar::create_subgraph_version( subgraph_registrar.as_ref(), subgraph_name.clone(), hash.clone(), node_id.clone(), None, None, + graft_block, ) .await .expect("failed to create subgraph version"); TestContext { - logger_factory, + logger: logger_factory.subgraph_logger(&deployment), provider: subgraph_provider, store: subgraph_store, - deployment_locator, + deployment, } } @@ -242,6 +263,37 @@ pub fn cleanup(subgraph_store: &SubgraphStore, name: &SubgraphName, hash: &Deplo } } +pub async fn wait_for_sync( + logger: &Logger, + store: &SubgraphStore, + hash: &DeploymentHash, + stop_block: BlockPtr, +) -> Result<(), Error> { + let mut err_count = 0; + while err_count < 10 { + tokio::time::sleep(Duration::from_millis(1000)).await; + + let block_ptr = match store.least_block_ptr(&hash).await { + Ok(Some(ptr)) => ptr, + res => { + info!(&logger, "{:?}", res); + err_count += 1; + continue; + } + }; + + if block_ptr == stop_block { + break; + } + + if !store.is_healthy(&hash).await.unwrap() { + return Err(anyhow::anyhow!("subgraph failed unexpectedly")); + } + } + + Ok(()) +} + /// `chain` is the sequence of chain heads to be processed. If the next block to be processed in the /// chain is not a descendant of the previous one, reorgs will be emitted until it is. /// See also: static-stream-builder diff --git a/tests/tests/runner.rs b/tests/tests/runner.rs index f927326810f..421ecd85302 100644 --- a/tests/tests/runner.rs +++ b/tests/tests/runner.rs @@ -1,76 +1,57 @@ -use graph_tests::fixture::ethereum::{chain, empty_block, genesis}; -use graph_tests::fixture::{self, stores, test_ptr}; -use std::time::Duration; +use std::sync::Arc; -use anyhow::anyhow; use graph::blockchain::{Block, BlockPtr}; use graph::prelude::ethabi::ethereum_types::H256; -use graph::prelude::{SubgraphAssignmentProvider, SubgraphName, SubgraphStore as _}; -use slog::{debug, info}; +use graph::prelude::{SubgraphAssignmentProvider, SubgraphName}; +use graph_tests::fixture::ethereum::{chain, empty_block, genesis}; +use graph_tests::fixture::{self, stores, test_ptr}; #[tokio::test] async fn data_source_revert() -> anyhow::Result<()> { - let subgraph_name = SubgraphName::new("data-source-revert") - .expect("Subgraph name must contain only a-z, A-Z, 0-9, '-' and '_'"); + let stores = stores("./integration-tests/config.simple.toml").await; + let subgraph_name = SubgraphName::new("data-source-revert").unwrap(); let hash = { let test_dir = format!("./integration-tests/{}", subgraph_name); fixture::build_subgraph(&test_dir).await }; let blocks = { - let block_0 = genesis(); - let block_1 = empty_block(block_0.ptr(), test_ptr(1)); - let block_1_reorged_ptr = BlockPtr { + let block0 = genesis(); + let block1 = empty_block(block0.ptr(), test_ptr(1)); + let block1_reorged_ptr = BlockPtr { number: 1, hash: H256::from_low_u64_be(12).into(), }; - let block_1_reorged = empty_block(block_0.ptr(), block_1_reorged_ptr); - vec![block_0, block_1, block_1_reorged] + let block1_reorged = empty_block(block0.ptr(), block1_reorged_ptr.clone()); + let block2 = empty_block(block1_reorged_ptr, test_ptr(2)); + let block3 = empty_block(block2.ptr(), test_ptr(3)); + let block4 = empty_block(block3.ptr(), test_ptr(4)); + vec![block0, block1, block1_reorged, block2, block3, block4] }; - let stop_block = blocks.last().unwrap().block.ptr(); - - let stores = stores("./integration-tests/config.simple.toml").await; - let chain = chain(blocks, &stores).await; - let ctx = fixture::setup(subgraph_name.clone(), &hash, &stores, chain).await; - - let provider = ctx.provider.clone(); - let store = ctx.store.clone(); - - let logger = ctx.logger_factory.subgraph_logger(&ctx.deployment_locator); - - SubgraphAssignmentProvider::start(provider.as_ref(), ctx.deployment_locator.clone(), None) - .await - .expect("unable to start subgraph"); - - loop { - tokio::time::sleep(Duration::from_millis(1000)).await; - - let block_ptr = match store.least_block_ptr(&hash).await { - Ok(Some(ptr)) => ptr, - res => { - info!(&logger, "{:?}", res); - continue; - } - }; - - debug!(&logger, "subgraph block: {:?}", block_ptr); - - if block_ptr == stop_block { - info!( - &logger, - "subgraph now at block {}, reached stop block {}", block_ptr.number, stop_block - ); - break; - } - - if !store.is_healthy(&hash).await.unwrap() { - return Err(anyhow!("subgraph failed unexpectedly")); - } - } - - assert!(store.is_healthy(&hash).await.unwrap()); + let chain = Arc::new(chain(blocks.clone(), &stores).await); + let ctx = fixture::setup(subgraph_name.clone(), &hash, &stores, chain.clone(), None).await; + + let stop_block = test_ptr(2); + ctx.start_and_sync_to(stop_block).await; + ctx.provider.stop(ctx.deployment.clone()).await.unwrap(); + + // Test loading data sources from DB. + let stop_block = test_ptr(3); + ctx.start_and_sync_to(stop_block).await; + + // Test grafted version + let subgraph_name = SubgraphName::new("data-source-revert-grafted").unwrap(); + let hash = fixture::build_subgraph_with_yarn_cmd( + "./integration-tests/data-source-revert", + "deploy:test-grafted", + ) + .await; + let graft_block = Some(test_ptr(3)); + let ctx = fixture::setup(subgraph_name.clone(), &hash, &stores, chain, graft_block).await; + let stop_block = test_ptr(4); + ctx.start_and_sync_to(stop_block).await; fixture::cleanup(&ctx.store, &subgraph_name, &hash); @@ -103,45 +84,10 @@ async fn typename() -> anyhow::Result<()> { let stop_block = blocks.last().unwrap().block.ptr(); let stores = stores("./integration-tests/config.simple.toml").await; - let chain = chain(blocks, &stores).await; - let ctx = fixture::setup(subgraph_name.clone(), &hash, &stores, chain).await; - - let provider = ctx.provider.clone(); - let store = ctx.store.clone(); - - let logger = ctx.logger_factory.subgraph_logger(&ctx.deployment_locator); - - SubgraphAssignmentProvider::start(provider.as_ref(), ctx.deployment_locator.clone(), None) - .await - .expect("unable to start subgraph"); - - loop { - tokio::time::sleep(Duration::from_millis(1000)).await; - - let block_ptr = match store.least_block_ptr(&hash).await { - Ok(Some(ptr)) => ptr, - res => { - info!(&logger, "{:?}", res); - continue; - } - }; - - debug!(&logger, "subgraph block: {:?}", block_ptr); - - if block_ptr == stop_block { - info!( - &logger, - "subgraph now at block {}, reached stop block {}", block_ptr.number, stop_block - ); - break; - } - - if !store.is_healthy(&hash).await.unwrap() { - return Err(anyhow!("subgraph failed unexpectedly")); - } - } + let chain = Arc::new(chain(blocks, &stores).await); + let ctx = fixture::setup(subgraph_name.clone(), &hash, &stores, chain, None).await; - assert!(store.is_healthy(&hash).await.unwrap()); + ctx.start_and_sync_to(stop_block).await; fixture::cleanup(&ctx.store, &subgraph_name, &hash);