From 70224c6010a47bde1d603e593588e1c5c0d4972e Mon Sep 17 00:00:00 2001 From: Leonardo Yvens Date: Fri, 3 Jun 2022 15:21:13 -0700 Subject: [PATCH 01/11] dynds: Take just block number in `copy` --- store/postgres/src/deployment_store.rs | 2 +- store/postgres/src/dynds/mod.rs | 2 +- store/postgres/src/dynds/shared.rs | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index 1fb86666afa..a4cf8c87033 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -1250,7 +1250,7 @@ impl DeploymentStore { conn.transaction(|| -> Result<(), StoreError> { // Copy dynamic data sources and adjust their ID let start = Instant::now(); - let count = dynds::copy(&conn, &src.site, &dst.site, &block)?; + let count = dynds::copy(&conn, &src.site, &dst.site, block.number)?; info!(logger, "Copied {} dynamic data sources", count; "time_ms" => start.elapsed().as_millis()); diff --git a/store/postgres/src/dynds/mod.rs b/store/postgres/src/dynds/mod.rs index f241624b7bc..b37fbf04107 100644 --- a/store/postgres/src/dynds/mod.rs +++ b/store/postgres/src/dynds/mod.rs @@ -42,7 +42,7 @@ pub(crate) fn copy( conn: &PgConnection, src: &Site, dst: &Site, - target_block: &BlockPtr, + target_block: BlockNumber, ) -> Result { if src.schema_version != dst.schema_version { return Err(StoreError::ConstraintViolation(format!( diff --git a/store/postgres/src/dynds/shared.rs b/store/postgres/src/dynds/shared.rs index 9303953bb30..e2adba70a3f 100644 --- a/store/postgres/src/dynds/shared.rs +++ b/store/postgres/src/dynds/shared.rs @@ -146,7 +146,7 @@ pub(super) fn copy( conn: &PgConnection, src: &Site, dst: &Site, - target_block: &BlockPtr, + target_block: BlockNumber, ) -> Result { use dynamic_ethereum_contract_data_source as decds; @@ -183,7 +183,7 @@ pub(super) fn copy( Ok(sql_query(&query) .bind::(src.deployment.as_str()) .bind::(dst.deployment.as_str()) - .bind::(target_block.number) + .bind::(target_block) .execute(conn)?) } From f887de66da43ae881f630d591a61bd8159ae6fe9 Mon Sep 17 00:00:00 2001 From: Leonardo Yvens Date: Sat, 11 Jun 2022 18:20:29 -0700 Subject: [PATCH 02/11] *: Implement private data sources This required plumbing through a map between manifest index and template name for backwards compatibility with the shared data sources table. --- chain/arweave/src/data_source.rs | 6 ++ chain/cosmos/src/data_source.rs | 6 ++ chain/ethereum/src/data_source.rs | 35 ++++-- chain/near/src/data_source.rs | 6 ++ core/src/subgraph/inputs.rs | 3 + core/src/subgraph/instance_manager.rs | 33 ++++-- core/src/subgraph/loader.rs | 10 +- core/src/subgraph/runner.rs | 1 + graph/src/blockchain/mock.rs | 6 ++ graph/src/blockchain/mod.rs | 3 + graph/src/components/store/cache.rs | 3 +- graph/src/components/store/mod.rs | 5 +- graph/src/components/store/traits.rs | 6 +- graph/src/data/subgraph/mod.rs | 15 ++- graph/tests/entity_cache.rs | 6 +- runtime/test/src/common.rs | 2 + store/postgres/src/deployment_store.rs | 12 ++- store/postgres/src/dynds/mod.rs | 18 +++- store/postgres/src/dynds/private.rs | 141 ++++++++++++++++++++++--- store/postgres/src/dynds/shared.rs | 20 +++- store/postgres/src/writable.rs | 46 ++++++-- store/postgres/tests/store.rs | 18 +++- store/test-store/src/store.rs | 15 ++- 23 files changed, 355 insertions(+), 61 deletions(-) diff --git a/chain/arweave/src/data_source.rs b/chain/arweave/src/data_source.rs index 11d7f43a8af..56be161858d 100644 --- a/chain/arweave/src/data_source.rs +++ b/chain/arweave/src/data_source.rs @@ -215,6 +215,7 @@ impl blockchain::UnresolvedDataSource for UnresolvedDataSource { self, resolver: &Arc, logger: &Logger, + _manifest_idx: u32, ) -> Result { let UnresolvedDataSource { kind, @@ -261,6 +262,7 @@ impl blockchain::UnresolvedDataSourceTemplate for UnresolvedDataSourceTem self, resolver: &Arc, logger: &Logger, + _manifest_idx: u32, ) -> Result { let UnresolvedDataSourceTemplate { kind, @@ -292,6 +294,10 @@ impl blockchain::DataSourceTemplate for DataSourceTemplate { fn runtime(&self) -> Option>> { Some(self.mapping.runtime.cheap_clone()) } + + fn manifest_idx(&self) -> u32 { + unreachable!("arweave does not support dynamic data sources") + } } #[derive(Clone, Debug, Default, Hash, Eq, PartialEq, Deserialize)] diff --git a/chain/cosmos/src/data_source.rs b/chain/cosmos/src/data_source.rs index c3806b01d58..768ba272391 100644 --- a/chain/cosmos/src/data_source.rs +++ b/chain/cosmos/src/data_source.rs @@ -282,6 +282,7 @@ impl blockchain::UnresolvedDataSource for UnresolvedDataSource { self, resolver: &Arc, logger: &Logger, + _manifest_idx: u32, ) -> Result { let UnresolvedDataSource { kind, @@ -325,6 +326,7 @@ impl blockchain::UnresolvedDataSourceTemplate for UnresolvedDataSourceTem self, _resolver: &Arc, _logger: &Logger, + _manifest_idx: u32, ) -> Result { Err(anyhow!(TEMPLATE_ERROR)) } @@ -342,6 +344,10 @@ impl blockchain::DataSourceTemplate for DataSourceTemplate { fn runtime(&self) -> Option>> { unimplemented!("{}", TEMPLATE_ERROR); } + + fn manifest_idx(&self) -> u32 { + unimplemented!("{}", TEMPLATE_ERROR); + } } #[derive(Clone, Debug, Default, Hash, Eq, PartialEq, Deserialize)] diff --git a/chain/ethereum/src/data_source.rs b/chain/ethereum/src/data_source.rs index 099b864f9a6..246fd468096 100644 --- a/chain/ethereum/src/data_source.rs +++ b/chain/ethereum/src/data_source.rs @@ -40,6 +40,7 @@ pub struct DataSource { pub kind: String, pub network: Option, pub name: String, + pub manifest_idx: u32, pub address: Option
, pub start_block: BlockNumber, pub mapping: Mapping, @@ -92,6 +93,7 @@ impl blockchain::DataSource for DataSource { kind, network, name, + manifest_idx, address, mapping, context, @@ -109,6 +111,7 @@ impl blockchain::DataSource for DataSource { kind == &other.kind && network == &other.network && name == &other.name + && manifest_idx == &other.manifest_idx && address == &other.address && mapping.abis == other.mapping.abis && mapping.event_handlers == other.mapping.event_handlers @@ -120,7 +123,7 @@ impl blockchain::DataSource for DataSource { fn as_stored_dynamic_data_source(&self) -> StoredDynamicDataSource { let param = self.address.map(|addr| addr.0.into()); StoredDynamicDataSource { - name: self.name.to_owned(), + manifest_idx: self.manifest_idx, param, context: self .context @@ -136,7 +139,7 @@ impl blockchain::DataSource for DataSource { stored: StoredDynamicDataSource, ) -> Result { let StoredDynamicDataSource { - name: _, + manifest_idx, param, context, creation_block, @@ -151,6 +154,7 @@ impl blockchain::DataSource for DataSource { kind: template.kind.to_string(), network: template.network.as_ref().map(|s| s.to_string()), name: template.name.clone(), + manifest_idx, address, start_block: 0, mapping: template.mapping.clone(), @@ -232,6 +236,7 @@ impl DataSource { source: Source, mapping: Mapping, context: Option, + manifest_idx: u32, ) -> Result { // Data sources in the manifest are created "before genesis" so they have no creation block. let creation_block = None; @@ -243,6 +248,7 @@ impl DataSource { kind, network, name, + manifest_idx, address: source.address, start_block: source.start_block, mapping, @@ -722,6 +728,7 @@ impl blockchain::UnresolvedDataSource for UnresolvedDataSource { self, resolver: &Arc, logger: &Logger, + manifest_idx: u32, ) -> Result { let UnresolvedDataSource { kind, @@ -736,7 +743,7 @@ impl blockchain::UnresolvedDataSource for UnresolvedDataSource { let mapping = mapping.resolve(&*resolver, logger).await?; - DataSource::from_manifest(kind, network, name, source, mapping, context) + DataSource::from_manifest(kind, network, name, source, mapping, context, manifest_idx) } } @@ -778,6 +785,7 @@ impl TryFrom> for DataSource { kind: template.kind, network: template.network, name: template.name, + manifest_idx: template.manifest_idx, address: Some(address), start_block: 0, mapping: template.mapping, @@ -789,16 +797,23 @@ impl TryFrom> for DataSource { } #[derive(Clone, Debug, Default, Hash, Eq, PartialEq, Deserialize)] -pub struct BaseDataSourceTemplate { +pub struct UnresolvedDataSourceTemplate { pub kind: String, pub network: Option, pub name: String, pub source: TemplateSource, - pub mapping: M, + pub mapping: UnresolvedMapping, } -pub type UnresolvedDataSourceTemplate = BaseDataSourceTemplate; -pub type DataSourceTemplate = BaseDataSourceTemplate; +#[derive(Clone, Debug)] +pub struct DataSourceTemplate { + pub kind: String, + pub network: Option, + pub name: String, + pub manifest_idx: u32, + pub source: TemplateSource, + pub mapping: Mapping, +} #[async_trait] impl blockchain::UnresolvedDataSourceTemplate for UnresolvedDataSourceTemplate { @@ -806,6 +821,7 @@ impl blockchain::UnresolvedDataSourceTemplate for UnresolvedDataSourceTem self, resolver: &Arc, logger: &Logger, + manifest_idx: u32, ) -> Result { let UnresolvedDataSourceTemplate { kind, @@ -821,6 +837,7 @@ impl blockchain::UnresolvedDataSourceTemplate for UnresolvedDataSourceTem kind, network, name, + manifest_idx, source, mapping: mapping.resolve(resolver, logger).await?, }) @@ -839,6 +856,10 @@ impl blockchain::DataSourceTemplate for DataSourceTemplate { fn runtime(&self) -> Option>> { Some(self.mapping.runtime.cheap_clone()) } + + fn manifest_idx(&self) -> u32 { + self.manifest_idx + } } #[derive(Clone, Debug, Default, Hash, Eq, PartialEq, Deserialize)] diff --git a/chain/near/src/data_source.rs b/chain/near/src/data_source.rs index bafc36d46ef..15d9ca84eb4 100644 --- a/chain/near/src/data_source.rs +++ b/chain/near/src/data_source.rs @@ -272,6 +272,7 @@ impl blockchain::UnresolvedDataSource for UnresolvedDataSource { self, resolver: &Arc, logger: &Logger, + _manifest_idx: u32, ) -> Result { let UnresolvedDataSource { kind, @@ -346,6 +347,7 @@ impl blockchain::UnresolvedDataSourceTemplate for UnresolvedDataSourceTem self, resolver: &Arc, logger: &Logger, + _manifest_idx: u32, ) -> Result { let UnresolvedDataSourceTemplate { kind, @@ -377,6 +379,10 @@ impl blockchain::DataSourceTemplate for DataSourceTemplate { fn runtime(&self) -> Option>> { Some(self.mapping.runtime.cheap_clone()) } + + fn manifest_idx(&self) -> u32 { + unreachable!("near does not support dynamic data sources") + } } #[derive(Clone, Debug, Default, Hash, Eq, PartialEq, Deserialize)] diff --git a/core/src/subgraph/inputs.rs b/core/src/subgraph/inputs.rs index f9e70b1e047..b507e2bf6ad 100644 --- a/core/src/subgraph/inputs.rs +++ b/core/src/subgraph/inputs.rs @@ -21,4 +21,7 @@ pub struct IndexingInputs { pub templates: Arc>, pub unified_api_version: UnifiedMappingApiVersion, pub static_filters: bool, + + // Correspondence between data source or template position in the manifest and name. + pub manifest_idx_and_name: Vec<(u32, String)>, } diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index 76d1421de24..aa28ba19da6 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -7,8 +7,9 @@ use crate::subgraph::metrics::{ use crate::subgraph::runner::SubgraphRunner; use crate::subgraph::SubgraphInstance; use graph::blockchain::block_stream::BlockStreamMetrics; -use graph::blockchain::Blockchain; +use graph::blockchain::DataSource; use graph::blockchain::NodeCapabilities; +use graph::blockchain::{Blockchain, DataSourceTemplate}; use graph::blockchain::{BlockchainKind, TriggerFilter}; use graph::prelude::{SubgraphInstanceManager as SubgraphInstanceManagerTrait, *}; use graph::{blockchain::BlockchainMap, components::store::DeploymentLocator}; @@ -147,7 +148,7 @@ impl SubgraphInstanceManager { // that is done store.start_subgraph_deployment(&logger).await?; - let manifest: SubgraphManifest = { + let (manifest, manifest_idx_and_name) = { info!(logger, "Resolve subgraph files using IPFS"); let mut manifest = SubgraphManifest::resolve_from_raw( @@ -161,9 +162,28 @@ impl SubgraphInstanceManager { .await .context("Failed to resolve subgraph from IPFS")?; - let data_sources = load_dynamic_data_sources(store.clone(), logger.clone(), &manifest) - .await - .context("Failed to load dynamic data sources")?; + let manifest_idx_and_name: Vec<(u32, String)> = manifest + .data_sources + .iter() + .map(|ds: &C::DataSource| ds.name().to_owned()) + .chain( + manifest + .templates + .iter() + .map(|t: &C::DataSourceTemplate| t.name().to_owned()), + ) + .enumerate() + .map(|(idx, name)| (idx as u32, name)) + .collect(); + + let data_sources = load_dynamic_data_sources( + store.clone(), + logger.clone(), + &manifest, + manifest_idx_and_name.clone(), + ) + .await + .context("Failed to load dynamic data sources")?; info!(logger, "Successfully resolved subgraph files using IPFS"); @@ -176,7 +196,7 @@ impl SubgraphInstanceManager { manifest.data_sources.len() ); - manifest + (manifest, manifest_idx_and_name) }; let required_capabilities = C::NodeCapabilities::from_data_sources(&manifest.data_sources); @@ -273,6 +293,7 @@ impl SubgraphInstanceManager { templates, unified_api_version, static_filters: self.static_filters, + manifest_idx_and_name, }; // The subgraph state tracks the state of the subgraph instance over time diff --git a/core/src/subgraph/loader.rs b/core/src/subgraph/loader.rs index be61306764c..442cc4d15f2 100644 --- a/core/src/subgraph/loader.rs +++ b/core/src/subgraph/loader.rs @@ -8,17 +8,21 @@ pub async fn load_dynamic_data_sources( store: Arc, logger: Logger, manifest: &SubgraphManifest, + manifest_idx_and_name: Vec<(u32, String)>, ) -> Result, Error> { let start_time = Instant::now(); let mut data_sources: Vec = vec![]; - for stored in store.load_dynamic_data_sources().await? { + for stored in store + .load_dynamic_data_sources(manifest_idx_and_name) + .await? + { let template = manifest .templates .iter() - .find(|template| template.name() == stored.name.as_str()) - .ok_or_else(|| anyhow!("no template named `{}` was found", stored.name))?; + .find(|template| template.manifest_idx() == stored.manifest_idx) + .ok_or_else(|| anyhow!("no template with idx `{}` was found", stored.manifest_idx))?; let ds = C::DataSource::from_stored_dynamic_data_source(&template, stored)?; diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner.rs index 18ba7b3448d..68bd619dc03 100644 --- a/core/src/subgraph/runner.rs +++ b/core/src/subgraph/runner.rs @@ -375,6 +375,7 @@ where &self.metrics.host.stopwatch, data_sources, deterministic_errors, + self.inputs.manifest_idx_and_name.clone(), ) .await .context("Failed to transact block operations")?; diff --git a/graph/src/blockchain/mock.rs b/graph/src/blockchain/mock.rs index fd48f59fc25..d519a29078d 100644 --- a/graph/src/blockchain/mock.rs +++ b/graph/src/blockchain/mock.rs @@ -129,6 +129,7 @@ impl UnresolvedDataSource for MockUnresolvedDataSource { self, _resolver: &Arc, _logger: &slog::Logger, + _manifest_idx: u32, ) -> Result { todo!() } @@ -149,6 +150,10 @@ impl DataSourceTemplate for MockDataSourceTemplate { fn name(&self) -> &str { todo!() } + + fn manifest_idx(&self) -> u32 { + todo!() + } } #[derive(Clone, Default, Deserialize)] @@ -160,6 +165,7 @@ impl UnresolvedDataSourceTemplate for MockUnresolvedDataSource self, _resolver: &Arc, _logger: &slog::Logger, + _manifest_idx: u32, ) -> Result { todo!() } diff --git a/graph/src/blockchain/mod.rs b/graph/src/blockchain/mod.rs index 6225a8dd244..71c44270ab9 100644 --- a/graph/src/blockchain/mod.rs +++ b/graph/src/blockchain/mod.rs @@ -241,6 +241,7 @@ pub trait UnresolvedDataSourceTemplate: self, resolver: &Arc, logger: &Logger, + manifest_idx: u32, ) -> Result; } @@ -248,6 +249,7 @@ pub trait DataSourceTemplate: Send + Sync + Debug { fn api_version(&self) -> semver::Version; fn runtime(&self) -> Option>>; fn name(&self) -> &str; + fn manifest_idx(&self) -> u32; } #[async_trait] @@ -258,6 +260,7 @@ pub trait UnresolvedDataSource: self, resolver: &Arc, logger: &Logger, + manifest_idx: u32, ) -> Result; } diff --git a/graph/src/components/store/cache.rs b/graph/src/components/store/cache.rs index 547d699b332..8207c23c5fc 100644 --- a/graph/src/components/store/cache.rs +++ b/graph/src/components/store/cache.rs @@ -4,6 +4,7 @@ use std::fmt::{self, Debug}; use std::sync::Arc; use crate::blockchain::BlockPtr; +use crate::blockchain::DataSource; use crate::components::store::{ self as s, Entity, EntityKey, EntityOp, EntityOperation, EntityType, }; @@ -188,7 +189,7 @@ impl EntityCache { } /// Add a dynamic data source - pub fn add_data_source(&mut self, data_source: &impl s::DataSource) { + pub fn add_data_source(&mut self, data_source: &impl DataSource) { self.data_sources .push(data_source.as_stored_dynamic_data_source()); } diff --git a/graph/src/components/store/mod.rs b/graph/src/components/store/mod.rs index 285dde2590c..bc6384e5677 100644 --- a/graph/src/components/store/mod.rs +++ b/graph/src/components/store/mod.rs @@ -19,7 +19,6 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; use std::time::Duration; -use crate::blockchain::DataSource; use crate::blockchain::{Block, Blockchain}; use crate::data::store::scalar::Bytes; use crate::data::store::*; @@ -822,9 +821,9 @@ pub enum UnfailOutcome { Unfailed, } -#[derive(Clone)] +#[derive(Clone, PartialEq, Eq, Debug)] pub struct StoredDynamicDataSource { - pub name: String, + pub manifest_idx: u32, pub param: Option, pub context: Option, pub creation_block: Option, diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 223bfe55b77..a17a0e4a07d 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -204,6 +204,7 @@ pub trait WritableStore: Send + Sync + 'static { stopwatch: &StopwatchMetrics, data_sources: Vec, deterministic_errors: Vec, + manifest_idx_and_name: Vec<(u32, String)>, ) -> Result<(), StoreError>; /// Look up multiple entities as of the latest block. Returns a map of @@ -225,7 +226,10 @@ pub trait WritableStore: Send + Sync + 'static { fn unassign_subgraph(&self) -> Result<(), StoreError>; /// Load the dynamic data sources for the given deployment - async fn load_dynamic_data_sources(&self) -> Result, StoreError>; + async fn load_dynamic_data_sources( + &self, + manifest_idx_and_name: Vec<(u32, String)>, + ) -> Result, StoreError>; /// Report the name of the shard in which the subgraph is stored. This /// should only be used for reporting and monitoring diff --git a/graph/src/data/subgraph/mod.rs b/graph/src/data/subgraph/mod.rs index ef7b202638e..ae1d634e053 100644 --- a/graph/src/data/subgraph/mod.rs +++ b/graph/src/data/subgraph/mod.rs @@ -764,16 +764,27 @@ impl UnresolvedSubgraphManifest { )); } + let ds_count = data_sources.len(); + if ds_count as u64 + templates.len() as u64 > u32::MAX as u64 { + return Err(anyhow!( + "Subgraph has too many declared data sources and templates", + )); + } + let (schema, data_sources, templates, offchain_data_sources) = try_join4( schema.resolve(id.clone(), &resolver, logger), data_sources .into_iter() - .map(|ds| ds.resolve(&resolver, logger)) + .enumerate() + .map(|(idx, ds)| ds.resolve(&resolver, logger, idx as u32)) .collect::>() .try_collect::>(), templates .into_iter() - .map(|template| template.resolve(&resolver, logger)) + .enumerate() + .map(|(idx, template)| { + template.resolve(&resolver, logger, ds_count as u32 + idx as u32) + }) .collect::>() .try_collect::>(), offchain_data_sources diff --git a/graph/tests/entity_cache.rs b/graph/tests/entity_cache.rs index 598eea0339a..70e715c66a8 100644 --- a/graph/tests/entity_cache.rs +++ b/graph/tests/entity_cache.rs @@ -107,6 +107,7 @@ impl WritableStore for MockStore { _: &StopwatchMetrics, _: Vec, _: Vec, + _: Vec<(u32, String)>, ) -> Result<(), StoreError> { unimplemented!() } @@ -126,7 +127,10 @@ impl WritableStore for MockStore { unimplemented!() } - async fn load_dynamic_data_sources(&self) -> Result, StoreError> { + async fn load_dynamic_data_sources( + &self, + _manifest_idx_and_name: Vec<(u32, String)>, + ) -> Result, StoreError> { unimplemented!() } diff --git a/runtime/test/src/common.rs b/runtime/test/src/common.rs index acd51fe2f04..e548a41ebc6 100644 --- a/runtime/test/src/common.rs +++ b/runtime/test/src/common.rs @@ -30,6 +30,7 @@ fn mock_host_exports( let templates = vec![DataSourceTemplate { kind: String::from("ethereum/contract"), name: String::from("example template"), + manifest_idx: 0, network: Some(String::from("mainnet")), source: TemplateSource { abi: String::from("foo"), @@ -120,6 +121,7 @@ pub fn mock_data_source(path: &str, api_version: Version) -> DataSource { DataSource { kind: String::from("ethereum/contract"), name: String::from("example data source"), + manifest_idx: 0, network: Some(String::from("mainnet")), address: Some(Address::from_str("0123123123012312312301231231230123123123").unwrap()), start_block: 0, diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index a4cf8c87033..fdc04af7574 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -973,6 +973,7 @@ impl DeploymentStore { stopwatch: &StopwatchMetrics, data_sources: &[StoredDynamicDataSource], deterministic_errors: &[SubgraphError], + manifest_idx_and_name: &[(u32, String)], ) -> Result { // All operations should apply only to data or metadata for this subgraph if mods @@ -1010,7 +1011,13 @@ impl DeploymentStore { )?; section.end(); - dynds::insert(&conn, &site, data_sources, block_ptr_to)?; + dynds::insert( + &conn, + &site, + data_sources, + block_ptr_to, + manifest_idx_and_name, + )?; if !deterministic_errors.is_empty() { deployment::insert_subgraph_errors( @@ -1178,9 +1185,10 @@ impl DeploymentStore { &self, site: Arc, block: BlockNumber, + manifest_idx_and_name: Vec<(u32, String)>, ) -> Result, StoreError> { self.with_conn(move |conn, _| { - conn.transaction(|| crate::dynds::load(&conn, &site, block)) + conn.transaction(|| crate::dynds::load(&conn, &site, block, manifest_idx_and_name)) .map_err(Into::into) }) .await diff --git a/store/postgres/src/dynds/mod.rs b/store/postgres/src/dynds/mod.rs index b37fbf04107..c34055bb167 100644 --- a/store/postgres/src/dynds/mod.rs +++ b/store/postgres/src/dynds/mod.rs @@ -15,10 +15,11 @@ pub fn load( conn: &PgConnection, site: &Site, block: BlockNumber, + manifest_idx_and_name: Vec<(u32, String)>, ) -> Result, StoreError> { match site.schema_version.private_data_sources() { true => DataSourcesTable::new(site.namespace.clone()).load(conn, block), - false => shared::load(conn, site.deployment.as_str(), block), + false => shared::load(conn, site.deployment.as_str(), block, manifest_idx_and_name), } } @@ -27,6 +28,7 @@ pub(crate) fn insert( site: &Site, data_sources: &[StoredDynamicDataSource], block_ptr: &BlockPtr, + manifest_idx_and_name: &[(u32, String)], ) -> Result { match site.schema_version.private_data_sources() { true => DataSourcesTable::new(site.namespace.clone()).insert( @@ -34,7 +36,13 @@ pub(crate) fn insert( data_sources, block_ptr.number, ), - false => shared::insert(conn, &site.deployment, data_sources, block_ptr), + false => shared::insert( + conn, + &site.deployment, + data_sources, + block_ptr, + manifest_idx_and_name, + ), } } @@ -52,7 +60,11 @@ pub(crate) fn copy( ))); } match src.schema_version.private_data_sources() { - true => todo!(), + true => DataSourcesTable::new(src.namespace.clone()).copy_to( + conn, + &DataSourcesTable::new(dst.namespace.clone()), + target_block, + ), false => shared::copy(conn, src, dst, target_block), } } diff --git a/store/postgres/src/dynds/private.rs b/store/postgres/src/dynds/private.rs index a253b2d5a5f..560af9e1bd8 100644 --- a/store/postgres/src/dynds/private.rs +++ b/store/postgres/src/dynds/private.rs @@ -2,18 +2,22 @@ #![allow(unused_variables)] #![allow(unused_imports)] +use std::ops::Bound; + use diesel::{ pg::types::sql_types, + sql_query, sql_types::{Binary, Integer, Jsonb, Nullable}, - types::Bytea, - PgConnection, QueryDsl, RunQueryDsl, + ExpressionMethods, PgConnection, QueryDsl, RunQueryDsl, }; + use graph::{ components::store::StoredDynamicDataSource, + constraint_violation, prelude::{serde_json, BlockNumber, StoreError}, }; -use crate::primary::Namespace; +use crate::{layout_for_tests::BlockRange, primary::Namespace}; type DynTable = diesel_dynamic_schema::Table; type DynColumn = diesel_dynamic_schema::Column; @@ -78,16 +82,40 @@ impl DataSourcesTable { conn: &PgConnection, block: BlockNumber, ) -> Result, StoreError> { - // self.table - // .select(( - // self.block_range, - // self.manifest_idx, - // self.param, - // self.context, - // )) - // .load(conn)?; - - todo!() + type Tuple = ( + (Bound, Bound), + i32, + Option>, + Option, + ); + let tuples = self + .table + .clone() + .select(( + &self.block_range, + &self.manifest_idx, + &self.param, + &self.context, + )) + .load::(conn)?; + + Ok(tuples + .into_iter() + .map(|(block_range, manifest_idx, param, context)| { + let creation_block = match block_range.0 { + Bound::Included(block) => Some(block), + + // Should never happen. + Bound::Excluded(_) | Bound::Unbounded => None, + }; + StoredDynamicDataSource { + manifest_idx: manifest_idx as u32, + param: param.map(|p| p.into()), + context, + creation_block, + } + }) + .collect()) } pub(crate) fn insert( @@ -96,10 +124,93 @@ impl DataSourcesTable { data_sources: &[StoredDynamicDataSource], block: BlockNumber, ) -> Result { - todo!() + // Currently all data sources share the same causality region. + let causality_region = 0; + + let mut inserted_total = 0; + + for ds in data_sources { + let StoredDynamicDataSource { + manifest_idx, + param, + context, + creation_block, + } = ds; + + if creation_block != &Some(block) { + return Err(constraint_violation!( + "mismatching creation blocks `{:?}` and `{}`", + creation_block, + block + )); + } + + let query = format!( + "insert into {}(block_range, manifest_idx, causality_region, param, context) \ + values (int4range($1, null), $2, $3, $4, $5)", + self.qname + ); + + inserted_total += sql_query(query) + .bind::, _>(creation_block) + .bind::(*manifest_idx as i32) + .bind::(causality_region) + .bind::, _>(param.as_ref().map(|p| &**p)) + .bind::, _>(context) + .execute(conn)?; + } + + Ok(inserted_total) } pub(crate) fn revert(&self, conn: &PgConnection, block: BlockNumber) -> Result<(), StoreError> { - todo!() + // Use `@>` to leverage the gist index. + // This assumes all ranges are of the form [x, +inf). + let query = format!( + "delete from {} where block_range @> $1 and lower(block_range) = $1", + self.qname + ); + sql_query(query).bind::(block).execute(conn)?; + Ok(()) + } + + /// Copy the dynamic data sources from `self` to `dst`. All data sources that + /// were created up to and including `target_block` will be copied. + pub(super) fn copy_to( + &self, + conn: &PgConnection, + dst: &DataSourcesTable, + target_block: BlockNumber, + ) -> Result { + // Check if there are any data sources for dst which indicates we already copied + let count = dst.table.clone().count().get_result::(conn)?; + if count > 0 { + return Ok(count as usize); + } + + // Assumes all ranges are of the form `[n, +inf)`. + let query = format!( + "\ + insert into {dst}(block_range, causality_region, manifest_idx, parent, id, param, context) + select int4range(lower(e.block_range), null), e.causality_region, e.manifest_idx, + e.parent, e.id, e.param, e.context + from {src} e + where lower(e.block_range) <= $1 + ", + src = self.qname, + dst = dst.qname + ); + + let count = sql_query(&query) + .bind::(target_block) + .execute(conn)?; + + // Test that both tables have the same contents. + debug_assert!( + self.load(conn, target_block).map_err(|e| e.to_string()) + == dst.load(conn, target_block).map_err(|e| e.to_string()) + ); + + Ok(count) } } diff --git a/store/postgres/src/dynds/shared.rs b/store/postgres/src/dynds/shared.rs index e2adba70a3f..961a28161fb 100644 --- a/store/postgres/src/dynds/shared.rs +++ b/store/postgres/src/dynds/shared.rs @@ -40,6 +40,7 @@ pub(super) fn load( conn: &PgConnection, id: &str, block: BlockNumber, + manifest_idx_and_name: Vec<(u32, String)>, ) -> Result, StoreError> { use dynamic_ethereum_contract_data_source as decds; @@ -69,9 +70,14 @@ pub(super) fn load( )); } + let manifest_idx = manifest_idx_and_name + .iter() + .find(|(_, manifest_name)| manifest_name == &name) + .ok_or_else(|| constraint_violation!("data source name {} not found", name))? + .0; let creation_block = creation_block.to_i32(); let data_source = StoredDynamicDataSource { - name, + manifest_idx, param: Some(address.into()), context: context.map(|ctx| serde_json::from_str(&ctx)).transpose()?, creation_block, @@ -93,6 +99,7 @@ pub(super) fn insert( deployment: &DeploymentHash, data_sources: &[StoredDynamicDataSource], block_ptr: &BlockPtr, + manifest_idx_and_name: &[(u32, String)], ) -> Result { use dynamic_ethereum_contract_data_source as decds; @@ -105,7 +112,7 @@ pub(super) fn insert( .into_iter() .map(|ds| { let StoredDynamicDataSource { - name, + manifest_idx: _, param, context, creation_block: _, @@ -114,11 +121,16 @@ pub(super) fn insert( Some(param) => param, None => { return Err(constraint_violation!( - "dynamic data sources must have an address, but `{}` has none", - name + "dynamic data sources must have an addres", )); } }; + let name = manifest_idx_and_name + .iter() + .find(|(idx, _)| *idx == ds.manifest_idx) + .ok_or_else(|| constraint_violation!("manifest idx {} not found", ds.manifest_idx))? + .1 + .clone(); Ok(( decds::deployment.eq(deployment.as_str()), decds::name.eq(name), diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index d0e29693541..392649fda4a 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -252,6 +252,7 @@ impl SyncStore { stopwatch: &StopwatchMetrics, data_sources: &[StoredDynamicDataSource], deterministic_errors: &[SubgraphError], + manifest_idx_and_name: &[(u32, String)], ) -> Result<(), StoreError> { fn same_subgraph(mods: &[EntityModification], id: &DeploymentHash) -> bool { mods.iter().all(|md| &md.entity_key().subgraph_id == id) @@ -270,6 +271,7 @@ impl SyncStore { stopwatch, data_sources, deterministic_errors, + manifest_idx_and_name, )?; let _section = stopwatch.start_section("send_store_event"); @@ -311,10 +313,15 @@ impl SyncStore { async fn load_dynamic_data_sources( &self, block: BlockNumber, + manifest_idx_and_name: Vec<(u32, String)>, ) -> Result, StoreError> { self.retry_async("load_dynamic_data_sources", || async { self.writable - .load_dynamic_data_sources(self.site.cheap_clone(), block) + .load_dynamic_data_sources( + self.site.cheap_clone(), + block, + manifest_idx_and_name.clone(), + ) .await }) .await @@ -420,6 +427,7 @@ enum Request { mods: Vec, data_sources: Vec, deterministic_errors: Vec, + manifest_idx_and_name: Vec<(u32, String)>, }, RevertTo { store: Arc, @@ -440,6 +448,7 @@ impl Request { mods, data_sources, deterministic_errors, + manifest_idx_and_name, } => store.transact_block_operations( block_ptr_to, firehose_cursor, @@ -447,6 +456,7 @@ impl Request { stopwatch, data_sources, deterministic_errors, + manifest_idx_and_name, ), Request::RevertTo { store, @@ -734,7 +744,10 @@ impl Queue { } /// Load dynamic data sources by looking at both the queue and the store - async fn load_dynamic_data_sources(&self) -> Result, StoreError> { + async fn load_dynamic_data_sources( + &self, + manifest_idx_and_name: Vec<(u32, String)>, + ) -> Result, StoreError> { // See the implementation of `get` for how we handle reverts let mut tracker = BlockTracker::new(); @@ -768,7 +781,7 @@ impl Queue { let mut dds = self .store - .load_dynamic_data_sources(tracker.query_block()) + .load_dynamic_data_sources(tracker.query_block(), manifest_idx_and_name) .await?; dds.append(&mut queue_dds); @@ -805,6 +818,7 @@ impl Writer { stopwatch: &StopwatchMetrics, data_sources: Vec, deterministic_errors: Vec, + manifest_idx_and_name: Vec<(u32, String)>, ) -> Result<(), StoreError> { match self { Writer::Sync(store) => store.transact_block_operations( @@ -814,6 +828,7 @@ impl Writer { &stopwatch, &data_sources, &deterministic_errors, + &manifest_idx_and_name, ), Writer::Async(queue) => { let req = Request::Write { @@ -824,6 +839,7 @@ impl Writer { mods, data_sources, deterministic_errors, + manifest_idx_and_name, }; queue.push(req).await } @@ -872,10 +888,17 @@ impl Writer { } } - async fn load_dynamic_data_sources(&self) -> Result, StoreError> { + async fn load_dynamic_data_sources( + &self, + manifest_idx_and_name: Vec<(u32, String)>, + ) -> Result, StoreError> { match self { - Writer::Sync(store) => store.load_dynamic_data_sources(BLOCK_NUMBER_MAX).await, - Writer::Async(queue) => queue.load_dynamic_data_sources().await, + Writer::Sync(store) => { + store + .load_dynamic_data_sources(BLOCK_NUMBER_MAX, manifest_idx_and_name) + .await + } + Writer::Async(queue) => queue.load_dynamic_data_sources(manifest_idx_and_name).await, } } } @@ -995,6 +1018,7 @@ impl WritableStoreTrait for WritableStore { stopwatch: &StopwatchMetrics, data_sources: Vec, deterministic_errors: Vec, + manifest_idx_and_name: Vec<(u32, String)>, ) -> Result<(), StoreError> { self.writer .write( @@ -1004,6 +1028,7 @@ impl WritableStoreTrait for WritableStore { stopwatch, data_sources, deterministic_errors, + manifest_idx_and_name, ) .await?; @@ -1032,8 +1057,13 @@ impl WritableStoreTrait for WritableStore { self.store.unassign_subgraph() } - async fn load_dynamic_data_sources(&self) -> Result, StoreError> { - self.writer.load_dynamic_data_sources().await + async fn load_dynamic_data_sources( + &self, + manifest_idx_and_name: Vec<(u32, String)>, + ) -> Result, StoreError> { + self.writer + .load_dynamic_data_sources(manifest_idx_and_name) + .await } fn shard(&self) -> &str { diff --git a/store/postgres/tests/store.rs b/store/postgres/tests/store.rs index 5f9cda3f210..f74f9878299 100644 --- a/store/postgres/tests/store.rs +++ b/store/postgres/tests/store.rs @@ -1148,6 +1148,7 @@ fn mock_data_source() -> graph_chain_ethereum::DataSource { graph_chain_ethereum::DataSource { kind: String::from("ethereum/contract"), name: String::from("example data source"), + manifest_idx: 0, network: Some(String::from("mainnet")), address: Some(Address::from_str("0123123123012312312301231231230123123123").unwrap()), start_block: 0, @@ -1209,7 +1210,8 @@ fn revert_block_with_dynamic_data_source_operations() { let original_user = writable.get(&user_key).unwrap().expect("missing entity"); // Create operations to add a dynamic data source - let data_source = mock_data_source(); + let mut data_source = mock_data_source(); + let manifest_idx_and_name = vec![(0, "example data source".to_string())]; let ops = vec![EntityOperation::Set { key: user_key.clone(), @@ -1217,12 +1219,14 @@ fn revert_block_with_dynamic_data_source_operations() { }]; // Add user and dynamic data source to the store + data_source.creation_block = Some(TEST_BLOCK_3_PTR.number); transact_entities_and_dynamic_data_sources( &subgraph_store, deployment.clone(), TEST_BLOCK_3_PTR.clone(), vec![data_source.as_stored_dynamic_data_source()], ops, + manifest_idx_and_name.clone(), ) .await .unwrap(); @@ -1234,7 +1238,10 @@ fn revert_block_with_dynamic_data_source_operations() { ); // Verify that the dynamic data source exists afterwards - let loaded_dds = writable.load_dynamic_data_sources().await.unwrap(); + let loaded_dds = writable + .load_dynamic_data_sources(manifest_idx_and_name.clone()) + .await + .unwrap(); assert_eq!(1, loaded_dds.len()); assert_eq!( data_source.address.unwrap().0, @@ -1253,7 +1260,10 @@ fn revert_block_with_dynamic_data_source_operations() { ); // Verify that the dynamic data source is gone after the reversion - let loaded_dds = writable.load_dynamic_data_sources().await.unwrap(); + let loaded_dds = writable + .load_dynamic_data_sources(manifest_idx_and_name) + .await + .unwrap(); assert_eq!(0, loaded_dds.len()); // Verify that the right change events were emitted for the reversion @@ -1567,6 +1577,7 @@ fn handle_large_string_with_index() { &stopwatch_metrics, Vec::new(), Vec::new(), + Vec::new(), ) .await .expect("Failed to insert large text"); @@ -1664,6 +1675,7 @@ fn handle_large_bytea_with_index() { &stopwatch_metrics, Vec::new(), Vec::new(), + Vec::new(), ) .await .expect("Failed to insert large text"); diff --git a/store/test-store/src/store.rs b/store/test-store/src/store.rs index 350e6f90326..b69be23b240 100644 --- a/store/test-store/src/store.rs +++ b/store/test-store/src/store.rs @@ -233,6 +233,7 @@ pub async fn transact_errors( &stopwatch_metrics, Vec::new(), errs, + Vec::new(), ) .await?; flush(deployment).await @@ -245,8 +246,15 @@ pub async fn transact_entity_operations( block_ptr_to: BlockPtr, ops: Vec, ) -> Result<(), StoreError> { - transact_entities_and_dynamic_data_sources(store, deployment.clone(), block_ptr_to, vec![], ops) - .await + transact_entities_and_dynamic_data_sources( + store, + deployment.clone(), + block_ptr_to, + vec![], + ops, + vec![], + ) + .await } /// Convenience to transact EntityOperation instead of EntityModification and wait for the store to process the operations @@ -262,6 +270,7 @@ pub async fn transact_and_wait( block_ptr_to, vec![], ops, + vec![], ) .await?; flush(deployment).await @@ -273,6 +282,7 @@ pub async fn transact_entities_and_dynamic_data_sources( block_ptr_to: BlockPtr, data_sources: Vec, ops: Vec, + manifest_idx_and_name: Vec<(u32, String)>, ) -> Result<(), StoreError> { let store = futures03::executor::block_on(store.cheap_clone().writable(LOGGER.clone(), deployment.id))?; @@ -297,6 +307,7 @@ pub async fn transact_entities_and_dynamic_data_sources( &stopwatch_metrics, data_sources, Vec::new(), + manifest_idx_and_name, ) .await } From 3440eb7673ff01932b91ce3a94dca251d821eeae Mon Sep 17 00:00:00 2001 From: Leonardo Yvens Date: Wed, 13 Jul 2022 13:00:12 -0300 Subject: [PATCH 03/11] store: Bump DeploymentSchemaVersion::Latest, allow copies for all versions --- graph/src/components/store/mod.rs | 2 +- store/postgres/src/primary.rs | 8 -------- 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/graph/src/components/store/mod.rs b/graph/src/components/store/mod.rs index bc6384e5677..5c342e4eb2f 100644 --- a/graph/src/components/store/mod.rs +++ b/graph/src/components/store/mod.rs @@ -1047,7 +1047,7 @@ pub enum DeploymentSchemaVersion { impl DeploymentSchemaVersion { // Latest schema version supported by this version of graph node. - pub const LATEST: Self = Self::V0; + pub const LATEST: Self = Self::V1; pub fn private_data_sources(self) -> bool { use DeploymentSchemaVersion::*; diff --git a/store/postgres/src/primary.rs b/store/postgres/src/primary.rs index c54d75323b9..3da80ed6f2d 100644 --- a/store/postgres/src/primary.rs +++ b/store/postgres/src/primary.rs @@ -1110,14 +1110,6 @@ impl<'a> Connection<'a> { return Ok(site); } - if src.schema_version != DeploymentSchemaVersion::LATEST { - return Err(StoreError::Unknown(anyhow!( - "Attempted to copy from deployment {} which is on an old schema version. - This means a schema migration is ongoing, please try again later.", - src.id - ))); - } - self.create_site( shard, src.deployment.clone(), From 0440366d9a4bc65404dc909e772fd2c795a8e8ba Mon Sep 17 00:00:00 2001 From: Leonardo Yvens Date: Fri, 15 Jul 2022 15:07:35 -0300 Subject: [PATCH 04/11] tests: Test loading data sources from DB --- core/src/subgraph/runner.rs | 14 ++--- tests/src/fixture.rs | 32 ++++++++++- tests/tests/runner.rs | 104 +++++++++++------------------------- 3 files changed, 70 insertions(+), 80 deletions(-) diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner.rs index 68bd619dc03..a0d2cea4010 100644 --- a/core/src/subgraph/runner.rs +++ b/core/src/subgraph/runner.rs @@ -678,6 +678,13 @@ where } } + if let Some(stop_block) = &self.inputs.stop_block { + if block_ptr.number >= *stop_block { + info!(self.logger, "stop block reached for subgraph"); + return Ok(Action::Stop); + } + } + if matches!(action, Action::Restart) { // Cancel the stream for real self.ctx @@ -690,13 +697,6 @@ where return Ok(Action::Restart); } - if let Some(stop_block) = &self.inputs.stop_block { - if block_ptr.number >= *stop_block { - info!(self.logger, "stop block reached for subgraph"); - return Ok(Action::Stop); - } - } - return Ok(Action::Continue); } Err(BlockProcessingError::Canceled) => { diff --git a/tests/src/fixture.rs b/tests/src/fixture.rs index b8be34364ca..4b3bdb46663 100644 --- a/tests/src/fixture.rs +++ b/tests/src/fixture.rs @@ -3,6 +3,7 @@ pub mod ethereum; use std::marker::PhantomData; use std::process::Command; use std::sync::Mutex; +use std::time::Duration; use crate::helpers::run_cmd; use anyhow::Error; @@ -32,7 +33,7 @@ use graph_mock::MockMetricsRegistry; use graph_node::manager::PanicSubscriptionManager; use graph_node::{config::Config, store_builder::StoreBuilder}; use graph_store_postgres::{ChainHeadUpdateListener, ChainStore, Store, SubgraphStore}; -use slog::Logger; +use slog::{info, Logger}; use std::env::VarError; use std::pin::Pin; use std::sync::Arc; @@ -242,6 +243,35 @@ pub fn cleanup(subgraph_store: &SubgraphStore, name: &SubgraphName, hash: &Deplo } } +pub async fn wait_for_sync( + logger: &Logger, + store: &SubgraphStore, + hash: &DeploymentHash, + stop_block: BlockPtr, +) -> Result<(), Error> { + loop { + tokio::time::sleep(Duration::from_millis(1000)).await; + + let block_ptr = match store.least_block_ptr(&hash).await { + Ok(Some(ptr)) => ptr, + res => { + info!(&logger, "{:?}", res); + continue; + } + }; + + if block_ptr == stop_block { + break; + } + + if !store.is_healthy(&hash).await.unwrap() { + return Err(anyhow::anyhow!("subgraph failed unexpectedly")); + } + } + + Ok(()) +} + /// `chain` is the sequence of chain heads to be processed. If the next block to be processed in the /// chain is not a descendant of the previous one, reorgs will be emitted until it is. /// See also: static-stream-builder diff --git a/tests/tests/runner.rs b/tests/tests/runner.rs index f927326810f..b1c5c863be7 100644 --- a/tests/tests/runner.rs +++ b/tests/tests/runner.rs @@ -1,12 +1,8 @@ -use graph_tests::fixture::ethereum::{chain, empty_block, genesis}; -use graph_tests::fixture::{self, stores, test_ptr}; -use std::time::Duration; - -use anyhow::anyhow; use graph::blockchain::{Block, BlockPtr}; use graph::prelude::ethabi::ethereum_types::H256; -use graph::prelude::{SubgraphAssignmentProvider, SubgraphName, SubgraphStore as _}; -use slog::{debug, info}; +use graph::prelude::{SubgraphAssignmentProvider, SubgraphName}; +use graph_tests::fixture::ethereum::{chain, empty_block, genesis}; +use graph_tests::fixture::{self, stores, test_ptr, wait_for_sync}; #[tokio::test] async fn data_source_revert() -> anyhow::Result<()> { @@ -19,59 +15,48 @@ async fn data_source_revert() -> anyhow::Result<()> { }; let blocks = { - let block_0 = genesis(); - let block_1 = empty_block(block_0.ptr(), test_ptr(1)); - let block_1_reorged_ptr = BlockPtr { + let block0 = genesis(); + let block1 = empty_block(block0.ptr(), test_ptr(1)); + let block1_reorged_ptr = BlockPtr { number: 1, hash: H256::from_low_u64_be(12).into(), }; - let block_1_reorged = empty_block(block_0.ptr(), block_1_reorged_ptr); - vec![block_0, block_1, block_1_reorged] + let block1_reorged = empty_block(block0.ptr(), block1_reorged_ptr.clone()); + let block2 = empty_block(block1_reorged_ptr, test_ptr(2)); + let block3 = empty_block(block2.ptr(), test_ptr(3)); + vec![block0, block1, block1_reorged, block2, block3] }; - let stop_block = blocks.last().unwrap().block.ptr(); - let stores = stores("./integration-tests/config.simple.toml").await; let chain = chain(blocks, &stores).await; let ctx = fixture::setup(subgraph_name.clone(), &hash, &stores, chain).await; let provider = ctx.provider.clone(); let store = ctx.store.clone(); + let locator = &ctx.deployment_locator; + let logger = ctx.logger_factory.subgraph_logger(locator); - let logger = ctx.logger_factory.subgraph_logger(&ctx.deployment_locator); - - SubgraphAssignmentProvider::start(provider.as_ref(), ctx.deployment_locator.clone(), None) + let stop_block = test_ptr(2); + provider + .start(locator.clone(), Some(stop_block.number)) .await .expect("unable to start subgraph"); - loop { - tokio::time::sleep(Duration::from_millis(1000)).await; - - let block_ptr = match store.least_block_ptr(&hash).await { - Ok(Some(ptr)) => ptr, - res => { - info!(&logger, "{:?}", res); - continue; - } - }; - - debug!(&logger, "subgraph block: {:?}", block_ptr); - - if block_ptr == stop_block { - info!( - &logger, - "subgraph now at block {}, reached stop block {}", block_ptr.number, stop_block - ); - break; - } - - if !store.is_healthy(&hash).await.unwrap() { - return Err(anyhow!("subgraph failed unexpectedly")); - } - } + wait_for_sync(&logger, &store, &hash, stop_block.clone()) + .await + .unwrap(); + provider.stop(locator.clone()).await.unwrap(); - assert!(store.is_healthy(&hash).await.unwrap()); + // Test loading data sources from DB. + let stop_block = test_ptr(3); + provider + .start(locator.clone(), Some(stop_block.number)) + .await + .expect("unabel to start subgraph"); + wait_for_sync(&logger, &store, &hash, stop_block.clone()) + .await + .unwrap(); fixture::cleanup(&ctx.store, &subgraph_name, &hash); Ok(()) @@ -113,36 +98,11 @@ async fn typename() -> anyhow::Result<()> { SubgraphAssignmentProvider::start(provider.as_ref(), ctx.deployment_locator.clone(), None) .await - .expect("unable to start subgraph"); - - loop { - tokio::time::sleep(Duration::from_millis(1000)).await; - - let block_ptr = match store.least_block_ptr(&hash).await { - Ok(Some(ptr)) => ptr, - res => { - info!(&logger, "{:?}", res); - continue; - } - }; - - debug!(&logger, "subgraph block: {:?}", block_ptr); - - if block_ptr == stop_block { - info!( - &logger, - "subgraph now at block {}, reached stop block {}", block_ptr.number, stop_block - ); - break; - } - - if !store.is_healthy(&hash).await.unwrap() { - return Err(anyhow!("subgraph failed unexpectedly")); - } - } - - assert!(store.is_healthy(&hash).await.unwrap()); + .expect("unabel to start subgraph"); + wait_for_sync(&logger, &store, &hash, stop_block.clone()) + .await + .unwrap(); fixture::cleanup(&ctx.store, &subgraph_name, &hash); Ok(()) From 271ef0ffaee6c94b6142a4d65f9a4126d117f0be Mon Sep 17 00:00:00 2001 From: Leonardo Yvens Date: Fri, 15 Jul 2022 16:57:22 -0300 Subject: [PATCH 05/11] store: Clean dead code --- store/postgres/src/dynds/private.rs | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/store/postgres/src/dynds/private.rs b/store/postgres/src/dynds/private.rs index 560af9e1bd8..69b7d780c06 100644 --- a/store/postgres/src/dynds/private.rs +++ b/store/postgres/src/dynds/private.rs @@ -1,14 +1,10 @@ -#![allow(dead_code)] -#![allow(unused_variables)] -#![allow(unused_imports)] - use std::ops::Bound; use diesel::{ pg::types::sql_types, sql_query, sql_types::{Binary, Integer, Jsonb, Nullable}, - ExpressionMethods, PgConnection, QueryDsl, RunQueryDsl, + PgConnection, QueryDsl, RunQueryDsl, }; use graph::{ @@ -17,7 +13,7 @@ use graph::{ prelude::{serde_json, BlockNumber, StoreError}, }; -use crate::{layout_for_tests::BlockRange, primary::Namespace}; +use crate::primary::Namespace; type DynTable = diesel_dynamic_schema::Table; type DynColumn = diesel_dynamic_schema::Column; @@ -28,7 +24,7 @@ pub(crate) struct DataSourcesTable { qname: String, table: DynTable, block_range: DynColumn>, - causality_region: DynColumn, + _causality_region: DynColumn, manifest_idx: DynColumn, param: DynColumn>, context: DynColumn>, @@ -45,7 +41,7 @@ impl DataSourcesTable { qname: format!("{}.{}", namespace, Self::TABLE_NAME), namespace, block_range: table.column("block_range"), - causality_region: table.column("causality_region"), + _causality_region: table.column("causality_region"), manifest_idx: table.column("manifest_idx"), param: table.column("param"), context: table.column("context"), @@ -91,6 +87,7 @@ impl DataSourcesTable { let tuples = self .table .clone() + .filter(diesel::dsl::sql("block_range @> ").bind::(block)) .select(( &self.block_range, &self.manifest_idx, From 2205ff8254c8804bafd264fa9925575aaeda854c Mon Sep 17 00:00:00 2001 From: Leonardo Yvens Date: Mon, 18 Jul 2022 18:38:52 -0300 Subject: [PATCH 06/11] store: Allow graft from legacy schema versions --- store/postgres/src/copy.rs | 9 +++++++++ store/postgres/src/dynds/mod.rs | 7 ------- store/postgres/src/primary.rs | 2 +- store/postgres/src/subgraph_store.rs | 23 +++++++++++++++-------- 4 files changed, 25 insertions(+), 16 deletions(-) diff --git a/store/postgres/src/copy.rs b/store/postgres/src/copy.rs index 66ef865b5ba..dfa7f17ebdd 100644 --- a/store/postgres/src/copy.rs +++ b/store/postgres/src/copy.rs @@ -599,6 +599,15 @@ impl Connection { target_block: BlockPtr, ) -> Result { let logger = logger.new(o!("dst" => dst.site.namespace.to_string())); + + if src.site.schema_version != dst.site.schema_version { + return Err(StoreError::ConstraintViolation(format!( + "attempted to copy between different schema versions, \ + source version is {} but destination version is {}", + src.site.schema_version, dst.site.schema_version + ))); + } + let mut last_log = Instant::now(); let conn = pool.get_fdw(&logger, || { if last_log.elapsed() > LOG_INTERVAL { diff --git a/store/postgres/src/dynds/mod.rs b/store/postgres/src/dynds/mod.rs index c34055bb167..2f4ebcea766 100644 --- a/store/postgres/src/dynds/mod.rs +++ b/store/postgres/src/dynds/mod.rs @@ -52,13 +52,6 @@ pub(crate) fn copy( dst: &Site, target_block: BlockNumber, ) -> Result { - if src.schema_version != dst.schema_version { - return Err(StoreError::ConstraintViolation(format!( - "attempted to copy between different schema versions, \ - source version is {} but destination version is {}", - src.schema_version, dst.schema_version - ))); - } match src.schema_version.private_data_sources() { true => DataSourcesTable::new(src.namespace.clone()).copy_to( conn, diff --git a/store/postgres/src/primary.rs b/store/postgres/src/primary.rs index 3da80ed6f2d..90717784a2e 100644 --- a/store/postgres/src/primary.rs +++ b/store/postgres/src/primary.rs @@ -1087,12 +1087,12 @@ impl<'a> Connection<'a> { shard: Shard, subgraph: &DeploymentHash, network: String, + schema_version: DeploymentSchemaVersion, ) -> Result { if let Some(site) = queries::find_active_site(self.conn.as_ref(), subgraph)? { return Ok(site); } - let schema_version = DeploymentSchemaVersion::LATEST; self.create_site(shard, subgraph.clone(), network, schema_version, true) } diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index 3457ff500c2..a47723b8310 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -15,7 +15,10 @@ use graph::{ cheap_clone::CheapClone, components::{ server::index_node::VersionInfo, - store::{self, BlockStore, DeploymentLocator, EnsLookup as EnsLookupTrait, SubgraphFork}, + store::{ + self, BlockStore, DeploymentLocator, DeploymentSchemaVersion, + EnsLookup as EnsLookupTrait, SubgraphFork, + }, }, constraint_violation, data::query::QueryTarget, @@ -495,6 +498,12 @@ impl SubgraphStoreInner { #[cfg(not(debug_assertions))] assert!(!replace); + let graft_base = deployment + .graft_base + .as_ref() + .map(|base| self.layout(base)) + .transpose()?; + let (site, node_id) = { // We need to deal with two situations: // (1) We are really creating a new subgraph; it therefore needs @@ -507,19 +516,17 @@ impl SubgraphStoreInner { // assignment that we used last time to avoid creating // the same deployment in another shard let (shard, node_id) = self.place(&name, &network_name, node_id)?; + let schema_version = match &graft_base { + None => DeploymentSchemaVersion::LATEST, + Some(src_layout) => src_layout.site.schema_version, + }; let conn = self.primary_conn()?; - let site = conn.allocate_site(shard, &schema.id, network_name)?; + let site = conn.allocate_site(shard, &schema.id, network_name, schema_version)?; let node_id = conn.assigned_node(&site)?.unwrap_or(node_id); (site, node_id) }; let site = Arc::new(site); - let graft_base = deployment - .graft_base - .as_ref() - .map(|base| self.layout(base)) - .transpose()?; - if let Some(graft_base) = &graft_base { self.primary_conn()? .record_active_copy(graft_base.site.as_ref(), site.as_ref())?; From bf93e4e25da55926f8015128ce6d5a24e3cef405 Mon Sep 17 00:00:00 2001 From: Leonardo Yvens Date: Thu, 21 Jul 2022 10:50:08 -0300 Subject: [PATCH 07/11] Update NEWS.md --- NEWS.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/NEWS.md b/NEWS.md index 71318656522..dcd569eddee 100644 --- a/NEWS.md +++ b/NEWS.md @@ -2,7 +2,13 @@ ## Unreleased -- ... +### New DB table for dynamic data sources + +For new subgraph deployments, dynamic data sources will be recorded under the `sgd*.data_sources$` +table, rather than `subgraphs.ethereum_contract_data_source`. As a consequence +new deployments will not work correctly on earlier graph node versions, so +_downgrading to an earlier graph node version is not supported_. +See issue #3405 for other details. ## 0.27.0 From 188295dcd77bb2776dc32d0f37469219f58656ba Mon Sep 17 00:00:00 2001 From: Leonardo Yvens Date: Tue, 26 Jul 2022 16:20:27 -0300 Subject: [PATCH 08/11] dynds: Rebase and review --- NEWS.md | 2 +- chain/substreams/src/data_source.rs | 8 +++++++- store/postgres/src/dynds/private.rs | 2 +- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/NEWS.md b/NEWS.md index dcd569eddee..497c56de70c 100644 --- a/NEWS.md +++ b/NEWS.md @@ -5,7 +5,7 @@ ### New DB table for dynamic data sources For new subgraph deployments, dynamic data sources will be recorded under the `sgd*.data_sources$` -table, rather than `subgraphs.ethereum_contract_data_source`. As a consequence +table, rather than `subgraphs.dynamic_ethereum_contract_data_source`. As a consequence new deployments will not work correctly on earlier graph node versions, so _downgrading to an earlier graph node version is not supported_. See issue #3405 for other details. diff --git a/chain/substreams/src/data_source.rs b/chain/substreams/src/data_source.rs index 86a29390291..1d3c13c8fa9 100644 --- a/chain/substreams/src/data_source.rs +++ b/chain/substreams/src/data_source.rs @@ -162,6 +162,7 @@ impl blockchain::UnresolvedDataSource for UnresolvedDataSource { self, _resolver: &Arc, _logger: &Logger, + _manifest_idx: u32, ) -> Result { Ok(DataSource { kind: SUBSTREAMS_KIND.into(), @@ -205,6 +206,10 @@ impl blockchain::DataSourceTemplate for NoopDataSourceTemplate { fn runtime(&self) -> Option>> { unimplemented!("{}", TEMPLATE_ERROR); } + + fn manifest_idx(&self) -> u32 { + todo!() + } } #[async_trait] @@ -213,6 +218,7 @@ impl blockchain::UnresolvedDataSourceTemplate for NoopDataSourceTemplate self, _resolver: &Arc, _logger: &Logger, + _manifest_idx: u32, ) -> Result { unimplemented!("{}", TEMPLATE_ERROR) } @@ -260,7 +266,7 @@ mod test { let ds: UnresolvedDataSource = serde_yaml::from_str(TEMPLATE_DATA_SOURCE).unwrap(); let link_resolver: Arc = Arc::new(NoopLinkResolver {}); let logger = Logger::root(Discard, o!()); - let ds: DataSource = ds.resolve(&link_resolver, &logger).await.unwrap(); + let ds: DataSource = ds.resolve(&link_resolver, &logger, 0).await.unwrap(); let expected = DataSource { kind: SUBSTREAMS_KIND.into(), network: Some("mainnet".into()), diff --git a/store/postgres/src/dynds/private.rs b/store/postgres/src/dynds/private.rs index 69b7d780c06..b5ac6367f19 100644 --- a/store/postgres/src/dynds/private.rs +++ b/store/postgres/src/dynds/private.rs @@ -103,7 +103,7 @@ impl DataSourcesTable { Bound::Included(block) => Some(block), // Should never happen. - Bound::Excluded(_) | Bound::Unbounded => None, + Bound::Excluded(_) | Bound::Unbounded => unreachable!("dds with open creation"), }; StoredDynamicDataSource { manifest_idx: manifest_idx as u32, From db357c7efb3e39b6da88c8cccac3da1e12ed1d89 Mon Sep 17 00:00:00 2001 From: Leonardo Yvens Date: Tue, 2 Aug 2022 22:56:47 +0100 Subject: [PATCH 09/11] refactor(store): Remove uncessary async --- core/src/subgraph/instance_manager.rs | 2 +- core/src/subgraph/runner.rs | 4 ++-- core/src/subgraph/stream.rs | 4 ++-- graph/src/components/store/traits.rs | 4 ++-- graph/tests/entity_cache.rs | 4 ++-- store/postgres/src/writable.rs | 4 ++-- store/postgres/tests/subgraph.rs | 1 - 7 files changed, 11 insertions(+), 12 deletions(-) diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index aa28ba19da6..6aa09e67e94 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -260,7 +260,7 @@ impl SubgraphInstanceManager { // Initialize deployment_head with current deployment head. Any sort of trouble in // getting the deployment head ptr leads to initializing with 0 - let deployment_head = store.block_ptr().await.map(|ptr| ptr.number).unwrap_or(0) as f64; + let deployment_head = store.block_ptr().map(|ptr| ptr.number).unwrap_or(0) as f64; block_stream_metrics.deployment_head.set(deployment_head); let host_builder = graph_runtime_wasm::RuntimeHostBuilder::new( diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner.rs index a0d2cea4010..dc1c9351745 100644 --- a/core/src/subgraph/runner.rs +++ b/core/src/subgraph/runner.rs @@ -72,7 +72,7 @@ where // If a subgraph failed for deterministic reasons, before start indexing, we first // revert the deployment head. It should lead to the same result since the error was // deterministic. - if let Some(current_ptr) = self.inputs.store.block_ptr().await { + if let Some(current_ptr) = self.inputs.store.block_ptr() { if let Some(parent_ptr) = self .inputs .triggers_adapter @@ -798,7 +798,7 @@ where // // Safe unwrap because in a Revert event we're sure the subgraph has // advanced at least once. - let subgraph_ptr = self.inputs.store.block_ptr().await.unwrap(); + let subgraph_ptr = self.inputs.store.block_ptr().unwrap(); if revert_to_ptr.number >= subgraph_ptr.number { info!(&self.logger, "Block to revert is higher than subgraph pointer, nothing to do"; "subgraph_ptr" => &subgraph_ptr, "revert_to_ptr" => &revert_to_ptr); return Ok(Action::Continue); diff --git a/core/src/subgraph/stream.rs b/core/src/subgraph/stream.rs index e97e8e01e58..9733f0d206d 100644 --- a/core/src/subgraph/stream.rs +++ b/core/src/subgraph/stream.rs @@ -18,12 +18,12 @@ pub async fn new_block_stream( false => BUFFERED_BLOCK_STREAM_SIZE, }; - let current_ptr = inputs.store.block_ptr().await; + let current_ptr = inputs.store.block_ptr(); let block_stream = match is_firehose { true => inputs.chain.new_firehose_block_stream( inputs.deployment.clone(), - inputs.store.block_cursor().await, + inputs.store.block_cursor(), inputs.start_blocks.clone(), current_ptr, Arc::new(filter.clone()), diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index a17a0e4a07d..a7122b1afc8 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -150,11 +150,11 @@ pub trait SubgraphStore: Send + Sync + 'static { #[async_trait] pub trait WritableStore: Send + Sync + 'static { /// Get a pointer to the most recently processed block in the subgraph. - async fn block_ptr(&self) -> Option; + fn block_ptr(&self) -> Option; /// Returns the Firehose `cursor` this deployment is currently at in the block stream of events. This /// is used when re-connecting a Firehose stream to start back exactly where we left off. - async fn block_cursor(&self) -> FirehoseCursor; + fn block_cursor(&self) -> FirehoseCursor; /// Start an existing subgraph deployment. async fn start_subgraph_deployment(&self, logger: &Logger) -> Result<(), StoreError>; diff --git a/graph/tests/entity_cache.rs b/graph/tests/entity_cache.rs index 70e715c66a8..d3f3ec72d83 100644 --- a/graph/tests/entity_cache.rs +++ b/graph/tests/entity_cache.rs @@ -46,11 +46,11 @@ impl MockStore { #[async_trait] impl WritableStore for MockStore { - async fn block_ptr(&self) -> Option { + fn block_ptr(&self) -> Option { unimplemented!() } - async fn block_cursor(&self) -> FirehoseCursor { + fn block_cursor(&self) -> FirehoseCursor { unimplemented!() } diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index 392649fda4a..947e8869db5 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -938,11 +938,11 @@ impl WritableStore { #[async_trait::async_trait] impl WritableStoreTrait for WritableStore { - async fn block_ptr(&self) -> Option { + fn block_ptr(&self) -> Option { self.block_ptr.lock().unwrap().clone() } - async fn block_cursor(&self) -> FirehoseCursor { + fn block_cursor(&self) -> FirehoseCursor { self.block_cursor.lock().unwrap().clone() } diff --git a/store/postgres/tests/subgraph.rs b/store/postgres/tests/subgraph.rs index 68b0bf4e03e..c1f4c0f564f 100644 --- a/store/postgres/tests/subgraph.rs +++ b/store/postgres/tests/subgraph.rs @@ -59,7 +59,6 @@ async fn latest_block(store: &Store, deployment_id: DeploymentId) -> BlockPtr { .await .expect("can get writable") .block_ptr() - .await .unwrap() } From 52c3d1093a35f7f33b309d3a0c916501cf68bc26 Mon Sep 17 00:00:00 2001 From: Leonardo Yvens Date: Thu, 4 Aug 2022 15:33:04 +0100 Subject: [PATCH 10/11] tests: Test dynamic data sources graft --- core/src/subgraph/registrar.rs | 96 ++++++++++--------- graph/src/components/subgraph/registrar.rs | 3 +- node/src/main.rs | 1 + node/src/manager/commands/run.rs | 1 + server/json-rpc/src/lib.rs | 1 + .../data-source-revert/grafted.yaml | 46 +++++++++ .../data-source-revert/package.json | 6 +- .../data-source-revert/subgraph.yaml | 3 - tests/src/fixture.rs | 44 ++++++--- tests/tests/runner.rs | 70 ++++++-------- 10 files changed, 167 insertions(+), 104 deletions(-) create mode 100644 tests/integration-tests/data-source-revert/grafted.yaml diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index 7fe46acbe34..f4f74246130 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -7,6 +7,7 @@ use graph::blockchain::BlockchainKind; use graph::blockchain::BlockchainMap; use graph::components::store::{DeploymentId, DeploymentLocator, SubscriptionManager}; use graph::data::subgraph::schema::DeploymentCreate; +use graph::data::subgraph::Graft; use graph::prelude::{ CreateSubgraphResult, SubgraphAssignmentProvider as SubgraphAssignmentProviderTrait, SubgraphRegistrar as SubgraphRegistrarTrait, *, @@ -267,7 +268,8 @@ where hash: DeploymentHash, node_id: NodeId, debug_fork: Option, - start_block: Option, + start_block_override: Option, + graft_block_override: Option, ) -> Result { // We don't have a location for the subgraph yet; that will be // assigned when we deploy for real. For logging purposes, make up a @@ -303,7 +305,8 @@ where self.chains.cheap_clone(), name.clone(), hash.cheap_clone(), - start_block, + start_block_override, + graft_block_override, raw, node_id, debug_fork, @@ -319,7 +322,8 @@ where self.chains.cheap_clone(), name.clone(), hash.cheap_clone(), - start_block, + start_block_override, + graft_block_override, raw, node_id, debug_fork, @@ -335,7 +339,8 @@ where self.chains.cheap_clone(), name.clone(), hash.cheap_clone(), - start_block, + start_block_override, + graft_block_override, raw, node_id, debug_fork, @@ -351,7 +356,8 @@ where self.chains.cheap_clone(), name.clone(), hash.cheap_clone(), - start_block, + start_block_override, + graft_block_override, raw, node_id, debug_fork, @@ -469,25 +475,22 @@ async fn start_subgraph( } } -/// Resolves the subgraph's earliest block and the manifest's graft base block -async fn resolve_subgraph_chain_blocks( +/// Resolves the subgraph's earliest block +async fn resolve_start_block( manifest: &SubgraphManifest, - chain: Arc, + chain: &impl Blockchain, logger: &Logger, -) -> Result<(Option, Option<(DeploymentHash, BlockPtr)>), SubgraphRegistrarError> { - let logger1 = logger.clone(); - let graft = manifest.graft.clone(); - +) -> Result, SubgraphRegistrarError> { // If the minimum start block is 0 (i.e. the genesis block), // return `None` to start indexing from the genesis block. Otherwise // return a block pointer for the block with number `min_start_block - 1`. - let start_block_ptr = match manifest + match manifest .start_blocks() .into_iter() .min() .expect("cannot identify minimum start block because there are no data sources") { - 0 => None, + 0 => Ok(None), min_start_block => chain .block_pointer_from_number(logger, min_start_block - 1) .await @@ -496,31 +499,27 @@ async fn resolve_subgraph_chain_blocks( SubgraphRegistrarError::ManifestValidationError(vec![ SubgraphManifestValidationError::BlockNotFound(min_start_block.to_string()), ]) - })?, - }; + }), + } +} - let base_ptr = { - match graft { - None => None, - Some(base) => { - let base_block = base.block; - - chain - .block_pointer_from_number(&logger1, base.block) - .await - .map(|ptr| Some((base.base, ptr))) - .map_err(move |_| { - SubgraphRegistrarError::ManifestValidationError(vec![ - SubgraphManifestValidationError::BlockNotFound(format!( - "graft base block {} not found", - base_block - )), - ]) - })? - } - } - }; - Ok((start_block_ptr, base_ptr)) +/// Resolves the manifest's graft base block +async fn resolve_graft_block( + base: &Graft, + chain: &impl Blockchain, + logger: &Logger, +) -> Result { + chain + .block_pointer_from_number(logger, base.block) + .await + .map_err(|_| { + SubgraphRegistrarError::ManifestValidationError(vec![ + SubgraphManifestValidationError::BlockNotFound(format!( + "graft base block {} not found", + base.block + )), + ]) + }) } async fn create_subgraph_version( @@ -529,7 +528,8 @@ async fn create_subgraph_version( chains: Arc, name: SubgraphName, deployment: DeploymentHash, - start_block_ptr: Option, + start_block_override: Option, + graft_block_override: Option, raw: serde_yaml::Mapping, node_id: NodeId, debug_fork: Option, @@ -571,12 +571,20 @@ async fn create_subgraph_version( return Err(SubgraphRegistrarError::NameNotFound(name.to_string())); } - let (manifest_start_block, base_block) = - resolve_subgraph_chain_blocks(&manifest, chain, &logger.clone()).await?; - - let start_block = match start_block_ptr { + let start_block = match start_block_override { Some(block) => Some(block), - None => manifest_start_block, + None => resolve_start_block(&manifest, &*chain, &logger).await?, + }; + + let base_block = match &manifest.graft { + None => None, + Some(graft) => Some(( + graft.base.clone(), + match graft_block_override { + Some(block) => block, + None => resolve_graft_block(&graft, &*chain, &logger).await?, + }, + )), }; info!( diff --git a/graph/src/components/subgraph/registrar.rs b/graph/src/components/subgraph/registrar.rs index c96b9a22b40..cfb2c2ffa2c 100644 --- a/graph/src/components/subgraph/registrar.rs +++ b/graph/src/components/subgraph/registrar.rs @@ -42,7 +42,8 @@ pub trait SubgraphRegistrar: Send + Sync + 'static { hash: DeploymentHash, assignment_node_id: NodeId, debug_fork: Option, - start_block: Option, + start_block_block: Option, + graft_block_override: Option, ) -> Result; async fn remove_subgraph(&self, name: SubgraphName) -> Result<(), SubgraphRegistrarError>; diff --git a/node/src/main.rs b/node/src/main.rs index 4ff21436fae..16f43abddaa 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -490,6 +490,7 @@ async fn main() { node_id, debug_fork, start_block, + None, ) .await } diff --git a/node/src/manager/commands/run.rs b/node/src/manager/commands/run.rs index dd6e9672ded..809bed6979c 100644 --- a/node/src/manager/commands/run.rs +++ b/node/src/manager/commands/run.rs @@ -205,6 +205,7 @@ pub async fn run( node_id.clone(), None, None, + None, ) .await?; diff --git a/server/json-rpc/src/lib.rs b/server/json-rpc/src/lib.rs index b58bdbe2f1f..d076a8d1c16 100644 --- a/server/json-rpc/src/lib.rs +++ b/server/json-rpc/src/lib.rs @@ -92,6 +92,7 @@ impl JsonRpcServer { // Here it doesn't make sense to receive another // startBlock, we'll use the one from the manifest. None, + None, ) .await { diff --git a/tests/integration-tests/data-source-revert/grafted.yaml b/tests/integration-tests/data-source-revert/grafted.yaml new file mode 100644 index 00000000000..3cb184beeec --- /dev/null +++ b/tests/integration-tests/data-source-revert/grafted.yaml @@ -0,0 +1,46 @@ +specVersion: 0.0.4 +features: + - grafting +schema: + file: ./schema.graphql +graft: + # Must match the id from building `subgraph.yaml` + base: QmRhW72iAE6AEY6fiL9nPt5ZVffzbq9XswKDbH9LC3JPUh + block: 3 +dataSources: + - kind: ethereum/contract + name: Contract + network: test + source: + address: "0xCfEB869F69431e42cdB54A4F4f105C19C080A601" + abi: Contract + mapping: + kind: ethereum/events + apiVersion: 0.0.5 + language: wasm/assemblyscript + entities: + - Gravatar + abis: + - name: Contract + file: ./abis/Contract.abi + blockHandlers: + - handler: handleBlock + file: ./src/mapping.ts +templates: + - kind: ethereum/contract + name: Template + network: test + source: + abi: Contract + mapping: + kind: ethereum/events + apiVersion: 0.0.5 + language: wasm/assemblyscript + entities: + - Gravatar + abis: + - name: Contract + file: ./abis/Contract.abi + blockHandlers: + - handler: handleBlockTemplate + file: ./src/mapping.ts diff --git a/tests/integration-tests/data-source-revert/package.json b/tests/integration-tests/data-source-revert/package.json index 2969afb3e33..73208954aa4 100644 --- a/tests/integration-tests/data-source-revert/package.json +++ b/tests/integration-tests/data-source-revert/package.json @@ -3,11 +3,11 @@ "version": "0.1.0", "scripts": { "codegen": "graph codegen", - "create:test": "graph create test/data-source-revert --node $GRAPH_NODE_ADMIN_URI", - "deploy:test": "graph deploy test/data-source-revert --version-label v0.0.1 --ipfs $IPFS_URI --node $GRAPH_NODE_ADMIN_URI" + "deploy:test": "graph deploy test/data-source-revert --version-label v0.0.1 --ipfs $IPFS_URI --node $GRAPH_NODE_ADMIN_URI", + "deploy:test-grafted": "graph deploy test/data-source-revert-grafted grafted.yaml --version-label v0.0.1 --ipfs $IPFS_URI --node $GRAPH_NODE_ADMIN_URI" }, "devDependencies": { "@graphprotocol/graph-cli": "https://github.com/graphprotocol/graph-cli#main", "@graphprotocol/graph-ts": "https://github.com/graphprotocol/graph-ts#main" } -} +} \ No newline at end of file diff --git a/tests/integration-tests/data-source-revert/subgraph.yaml b/tests/integration-tests/data-source-revert/subgraph.yaml index 9cacaac58bb..87f85563093 100644 --- a/tests/integration-tests/data-source-revert/subgraph.yaml +++ b/tests/integration-tests/data-source-revert/subgraph.yaml @@ -1,9 +1,6 @@ specVersion: 0.0.4 -repository: https://github.com/graphprotocol/example-subgraph schema: file: ./schema.graphql -features: - - nonFatalErrors dataSources: - kind: ethereum/contract name: Contract diff --git a/tests/src/fixture.rs b/tests/src/fixture.rs index 4b3bdb46663..6489cb70eed 100644 --- a/tests/src/fixture.rs +++ b/tests/src/fixture.rs @@ -22,8 +22,9 @@ use graph::env::ENV_VARS; use graph::ipfs_client::IpfsClient; use graph::prelude::ethabi::ethereum_types::H256; use graph::prelude::{ - async_trait, BlockNumber, DeploymentHash, LoggerFactory, MetricsRegistry, NodeId, SubgraphName, - SubgraphRegistrar, SubgraphStore as _, SubgraphVersionSwitchingMode, + async_trait, BlockNumber, DeploymentHash, LoggerFactory, MetricsRegistry, NodeId, + SubgraphAssignmentProvider, SubgraphName, SubgraphRegistrar, SubgraphStore as _, + SubgraphVersionSwitchingMode, }; use graph_core::{ LinkResolver, SubgraphAssignmentProvider as IpfsSubgraphAssignmentProvider, @@ -43,6 +44,10 @@ use tokio::fs::read_to_string; const NODE_ID: &str = "default"; pub async fn build_subgraph(dir: &str) -> DeploymentHash { + build_subgraph_with_yarn_cmd(dir, "deploy:test").await +} + +pub async fn build_subgraph_with_yarn_cmd(dir: &str, yarn_cmd: &str) -> DeploymentHash { // Test that IPFS is up. IpfsClient::localhost() .test() @@ -65,7 +70,7 @@ pub async fn build_subgraph(dir: &str) -> DeploymentHash { // is fake and the actual deploy call is meant to fail. let deploy_output = run_cmd( Command::new("yarn") - .arg("deploy:test") + .arg(yarn_cmd) .env("IPFS_URI", "http://127.0.0.1:5001") .env("GRAPH_NODE_ADMIN_URI", "http://localhost:0") .current_dir(dir), @@ -90,14 +95,27 @@ pub fn test_ptr(n: BlockNumber) -> BlockPtr { } } pub struct TestContext { - pub logger_factory: LoggerFactory, + pub logger: Logger, pub provider: Arc< IpfsSubgraphAssignmentProvider< SubgraphInstanceManager, >, >, pub store: Arc, - pub deployment_locator: DeploymentLocator, + pub deployment: DeploymentLocator, +} + +impl TestContext { + pub async fn start_and_sync_to(&self, stop_block: BlockPtr) { + self.provider + .start(self.deployment.clone(), Some(stop_block.number)) + .await + .expect("unable to start subgraph"); + + wait_for_sync(&self.logger, &self.store, &self.deployment.hash, stop_block) + .await + .unwrap(); + } } pub struct Stores { @@ -163,7 +181,8 @@ pub async fn setup( subgraph_name: SubgraphName, hash: &DeploymentHash, stores: &Stores, - chain: C, + chain: Arc, + graft_block: Option, ) -> TestContext { let logger = graph::log::logger(true); let logger_factory = LoggerFactory::new(logger.clone(), None); @@ -175,7 +194,7 @@ pub async fn setup( cleanup(&subgraph_store, &subgraph_name, hash); let mut blockchain_map = BlockchainMap::new(); - blockchain_map.insert(stores.network_name.clone(), Arc::new(chain)); + blockchain_map.insert(stores.network_name.clone(), chain); let static_filters = ENV_VARS.experimental_static_filters; @@ -216,22 +235,23 @@ pub async fn setup( .await .expect("unable to create subgraph"); - let deployment_locator = SubgraphRegistrar::create_subgraph_version( + let deployment = SubgraphRegistrar::create_subgraph_version( subgraph_registrar.as_ref(), subgraph_name.clone(), hash.clone(), node_id.clone(), None, None, + graft_block, ) .await .expect("failed to create subgraph version"); TestContext { - logger_factory, + logger: logger_factory.subgraph_logger(&deployment), provider: subgraph_provider, store: subgraph_store, - deployment_locator, + deployment, } } @@ -249,13 +269,15 @@ pub async fn wait_for_sync( hash: &DeploymentHash, stop_block: BlockPtr, ) -> Result<(), Error> { - loop { + let mut err_count = 0; + while err_count < 10 { tokio::time::sleep(Duration::from_millis(1000)).await; let block_ptr = match store.least_block_ptr(&hash).await { Ok(Some(ptr)) => ptr, res => { info!(&logger, "{:?}", res); + err_count += 1; continue; } }; diff --git a/tests/tests/runner.rs b/tests/tests/runner.rs index b1c5c863be7..421ecd85302 100644 --- a/tests/tests/runner.rs +++ b/tests/tests/runner.rs @@ -1,14 +1,16 @@ +use std::sync::Arc; + use graph::blockchain::{Block, BlockPtr}; use graph::prelude::ethabi::ethereum_types::H256; use graph::prelude::{SubgraphAssignmentProvider, SubgraphName}; use graph_tests::fixture::ethereum::{chain, empty_block, genesis}; -use graph_tests::fixture::{self, stores, test_ptr, wait_for_sync}; +use graph_tests::fixture::{self, stores, test_ptr}; #[tokio::test] async fn data_source_revert() -> anyhow::Result<()> { - let subgraph_name = SubgraphName::new("data-source-revert") - .expect("Subgraph name must contain only a-z, A-Z, 0-9, '-' and '_'"); + let stores = stores("./integration-tests/config.simple.toml").await; + let subgraph_name = SubgraphName::new("data-source-revert").unwrap(); let hash = { let test_dir = format!("./integration-tests/{}", subgraph_name); fixture::build_subgraph(&test_dir).await @@ -24,39 +26,33 @@ async fn data_source_revert() -> anyhow::Result<()> { let block1_reorged = empty_block(block0.ptr(), block1_reorged_ptr.clone()); let block2 = empty_block(block1_reorged_ptr, test_ptr(2)); let block3 = empty_block(block2.ptr(), test_ptr(3)); - vec![block0, block1, block1_reorged, block2, block3] + let block4 = empty_block(block3.ptr(), test_ptr(4)); + vec![block0, block1, block1_reorged, block2, block3, block4] }; - let stores = stores("./integration-tests/config.simple.toml").await; - let chain = chain(blocks, &stores).await; - let ctx = fixture::setup(subgraph_name.clone(), &hash, &stores, chain).await; - - let provider = ctx.provider.clone(); - let store = ctx.store.clone(); - let locator = &ctx.deployment_locator; - let logger = ctx.logger_factory.subgraph_logger(locator); + let chain = Arc::new(chain(blocks.clone(), &stores).await); + let ctx = fixture::setup(subgraph_name.clone(), &hash, &stores, chain.clone(), None).await; let stop_block = test_ptr(2); - provider - .start(locator.clone(), Some(stop_block.number)) - .await - .expect("unable to start subgraph"); - - wait_for_sync(&logger, &store, &hash, stop_block.clone()) - .await - .unwrap(); - provider.stop(locator.clone()).await.unwrap(); + ctx.start_and_sync_to(stop_block).await; + ctx.provider.stop(ctx.deployment.clone()).await.unwrap(); // Test loading data sources from DB. let stop_block = test_ptr(3); - provider - .start(locator.clone(), Some(stop_block.number)) - .await - .expect("unabel to start subgraph"); - - wait_for_sync(&logger, &store, &hash, stop_block.clone()) - .await - .unwrap(); + ctx.start_and_sync_to(stop_block).await; + + // Test grafted version + let subgraph_name = SubgraphName::new("data-source-revert-grafted").unwrap(); + let hash = fixture::build_subgraph_with_yarn_cmd( + "./integration-tests/data-source-revert", + "deploy:test-grafted", + ) + .await; + let graft_block = Some(test_ptr(3)); + let ctx = fixture::setup(subgraph_name.clone(), &hash, &stores, chain, graft_block).await; + let stop_block = test_ptr(4); + ctx.start_and_sync_to(stop_block).await; + fixture::cleanup(&ctx.store, &subgraph_name, &hash); Ok(()) @@ -88,21 +84,11 @@ async fn typename() -> anyhow::Result<()> { let stop_block = blocks.last().unwrap().block.ptr(); let stores = stores("./integration-tests/config.simple.toml").await; - let chain = chain(blocks, &stores).await; - let ctx = fixture::setup(subgraph_name.clone(), &hash, &stores, chain).await; - - let provider = ctx.provider.clone(); - let store = ctx.store.clone(); - - let logger = ctx.logger_factory.subgraph_logger(&ctx.deployment_locator); + let chain = Arc::new(chain(blocks, &stores).await); + let ctx = fixture::setup(subgraph_name.clone(), &hash, &stores, chain, None).await; - SubgraphAssignmentProvider::start(provider.as_ref(), ctx.deployment_locator.clone(), None) - .await - .expect("unabel to start subgraph"); + ctx.start_and_sync_to(stop_block).await; - wait_for_sync(&logger, &store, &hash, stop_block.clone()) - .await - .unwrap(); fixture::cleanup(&ctx.store, &subgraph_name, &hash); Ok(()) From be98a6dbacb406f3d5090361dce10726ffa3aeca Mon Sep 17 00:00:00 2001 From: Leonardo Yvens Date: Thu, 4 Aug 2022 15:36:07 +0100 Subject: [PATCH 11/11] dynds: Future proof `copy_to` for closed ranges --- store/postgres/src/dynds/private.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/store/postgres/src/dynds/private.rs b/store/postgres/src/dynds/private.rs index b5ac6367f19..75246c2f6ee 100644 --- a/store/postgres/src/dynds/private.rs +++ b/store/postgres/src/dynds/private.rs @@ -185,12 +185,14 @@ impl DataSourcesTable { return Ok(count as usize); } - // Assumes all ranges are of the form `[n, +inf)`. let query = format!( "\ insert into {dst}(block_range, causality_region, manifest_idx, parent, id, param, context) - select int4range(lower(e.block_range), null), e.causality_region, e.manifest_idx, - e.parent, e.id, e.param, e.context + select case + when upper(e.block_range) <= $1 then e.block_range + else int4range(lower(e.block_range), null) + end, + e.causality_region, e.manifest_idx, e.parent, e.id, e.param, e.context from {src} e where lower(e.block_range) <= $1 ",