Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@

## Unreleased

- ...
### New DB table for dynamic data sources

For new subgraph deployments, dynamic data sources will be recorded under the `sgd*.data_sources$`
table, rather than `subgraphs.dynamic_ethereum_contract_data_source`. As a consequence
new deployments will not work correctly on earlier graph node versions, so
_downgrading to an earlier graph node version is not supported_.
See issue #3405 for other details.

## 0.27.0

Expand Down
6 changes: 6 additions & 0 deletions chain/arweave/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ impl blockchain::UnresolvedDataSource<Chain> for UnresolvedDataSource {
self,
resolver: &Arc<dyn LinkResolver>,
logger: &Logger,
_manifest_idx: u32,
) -> Result<DataSource, Error> {
let UnresolvedDataSource {
kind,
Expand Down Expand Up @@ -261,6 +262,7 @@ impl blockchain::UnresolvedDataSourceTemplate<Chain> for UnresolvedDataSourceTem
self,
resolver: &Arc<dyn LinkResolver>,
logger: &Logger,
_manifest_idx: u32,
) -> Result<DataSourceTemplate, Error> {
let UnresolvedDataSourceTemplate {
kind,
Expand Down Expand Up @@ -292,6 +294,10 @@ impl blockchain::DataSourceTemplate<Chain> for DataSourceTemplate {
fn runtime(&self) -> Option<Arc<Vec<u8>>> {
Some(self.mapping.runtime.cheap_clone())
}

fn manifest_idx(&self) -> u32 {
unreachable!("arweave does not support dynamic data sources")
}
}

#[derive(Clone, Debug, Default, Hash, Eq, PartialEq, Deserialize)]
Expand Down
6 changes: 6 additions & 0 deletions chain/cosmos/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ impl blockchain::UnresolvedDataSource<Chain> for UnresolvedDataSource {
self,
resolver: &Arc<dyn LinkResolver>,
logger: &Logger,
_manifest_idx: u32,
) -> Result<DataSource> {
let UnresolvedDataSource {
kind,
Expand Down Expand Up @@ -325,6 +326,7 @@ impl blockchain::UnresolvedDataSourceTemplate<Chain> for UnresolvedDataSourceTem
self,
_resolver: &Arc<dyn LinkResolver>,
_logger: &Logger,
_manifest_idx: u32,
) -> Result<DataSourceTemplate> {
Err(anyhow!(TEMPLATE_ERROR))
}
Expand All @@ -342,6 +344,10 @@ impl blockchain::DataSourceTemplate<Chain> for DataSourceTemplate {
fn runtime(&self) -> Option<Arc<Vec<u8>>> {
unimplemented!("{}", TEMPLATE_ERROR);
}

fn manifest_idx(&self) -> u32 {
unimplemented!("{}", TEMPLATE_ERROR);
}
}

#[derive(Clone, Debug, Default, Hash, Eq, PartialEq, Deserialize)]
Expand Down
35 changes: 28 additions & 7 deletions chain/ethereum/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub struct DataSource {
pub kind: String,
pub network: Option<String>,
pub name: String,
pub manifest_idx: u32,
pub address: Option<Address>,
pub start_block: BlockNumber,
pub mapping: Mapping,
Expand Down Expand Up @@ -92,6 +93,7 @@ impl blockchain::DataSource<Chain> for DataSource {
kind,
network,
name,
manifest_idx,
address,
mapping,
context,
Expand All @@ -109,6 +111,7 @@ impl blockchain::DataSource<Chain> for DataSource {
kind == &other.kind
&& network == &other.network
&& name == &other.name
&& manifest_idx == &other.manifest_idx
&& address == &other.address
&& mapping.abis == other.mapping.abis
&& mapping.event_handlers == other.mapping.event_handlers
Expand All @@ -120,7 +123,7 @@ impl blockchain::DataSource<Chain> for DataSource {
fn as_stored_dynamic_data_source(&self) -> StoredDynamicDataSource {
let param = self.address.map(|addr| addr.0.into());
StoredDynamicDataSource {
name: self.name.to_owned(),
manifest_idx: self.manifest_idx,
param,
context: self
.context
Expand All @@ -136,7 +139,7 @@ impl blockchain::DataSource<Chain> for DataSource {
stored: StoredDynamicDataSource,
) -> Result<Self, Error> {
let StoredDynamicDataSource {
name: _,
manifest_idx,
param,
context,
creation_block,
Expand All @@ -151,6 +154,7 @@ impl blockchain::DataSource<Chain> for DataSource {
kind: template.kind.to_string(),
network: template.network.as_ref().map(|s| s.to_string()),
name: template.name.clone(),
manifest_idx,
address,
start_block: 0,
mapping: template.mapping.clone(),
Expand Down Expand Up @@ -232,6 +236,7 @@ impl DataSource {
source: Source,
mapping: Mapping,
context: Option<DataSourceContext>,
manifest_idx: u32,
) -> Result<Self, Error> {
// Data sources in the manifest are created "before genesis" so they have no creation block.
let creation_block = None;
Expand All @@ -243,6 +248,7 @@ impl DataSource {
kind,
network,
name,
manifest_idx,
address: source.address,
start_block: source.start_block,
mapping,
Expand Down Expand Up @@ -722,6 +728,7 @@ impl blockchain::UnresolvedDataSource<Chain> for UnresolvedDataSource {
self,
resolver: &Arc<dyn LinkResolver>,
logger: &Logger,
manifest_idx: u32,
) -> Result<DataSource, anyhow::Error> {
let UnresolvedDataSource {
kind,
Expand All @@ -736,7 +743,7 @@ impl blockchain::UnresolvedDataSource<Chain> for UnresolvedDataSource {

let mapping = mapping.resolve(&*resolver, logger).await?;

DataSource::from_manifest(kind, network, name, source, mapping, context)
DataSource::from_manifest(kind, network, name, source, mapping, context, manifest_idx)
}
}

Expand Down Expand Up @@ -778,6 +785,7 @@ impl TryFrom<DataSourceTemplateInfo<Chain>> for DataSource {
kind: template.kind,
network: template.network,
name: template.name,
manifest_idx: template.manifest_idx,
address: Some(address),
start_block: 0,
mapping: template.mapping,
Expand All @@ -789,23 +797,31 @@ impl TryFrom<DataSourceTemplateInfo<Chain>> for DataSource {
}

#[derive(Clone, Debug, Default, Hash, Eq, PartialEq, Deserialize)]
pub struct BaseDataSourceTemplate<M> {
pub struct UnresolvedDataSourceTemplate {
pub kind: String,
pub network: Option<String>,
pub name: String,
pub source: TemplateSource,
pub mapping: M,
pub mapping: UnresolvedMapping,
}

pub type UnresolvedDataSourceTemplate = BaseDataSourceTemplate<UnresolvedMapping>;
pub type DataSourceTemplate = BaseDataSourceTemplate<Mapping>;
#[derive(Clone, Debug)]
pub struct DataSourceTemplate {
pub kind: String,
pub network: Option<String>,
pub name: String,
pub manifest_idx: u32,
pub source: TemplateSource,
pub mapping: Mapping,
}

#[async_trait]
impl blockchain::UnresolvedDataSourceTemplate<Chain> for UnresolvedDataSourceTemplate {
async fn resolve(
self,
resolver: &Arc<dyn LinkResolver>,
logger: &Logger,
manifest_idx: u32,
) -> Result<DataSourceTemplate, anyhow::Error> {
let UnresolvedDataSourceTemplate {
kind,
Expand All @@ -821,6 +837,7 @@ impl blockchain::UnresolvedDataSourceTemplate<Chain> for UnresolvedDataSourceTem
kind,
network,
name,
manifest_idx,
source,
mapping: mapping.resolve(resolver, logger).await?,
})
Expand All @@ -839,6 +856,10 @@ impl blockchain::DataSourceTemplate<Chain> for DataSourceTemplate {
fn runtime(&self) -> Option<Arc<Vec<u8>>> {
Some(self.mapping.runtime.cheap_clone())
}

fn manifest_idx(&self) -> u32 {
self.manifest_idx
}
}

#[derive(Clone, Debug, Default, Hash, Eq, PartialEq, Deserialize)]
Expand Down
6 changes: 6 additions & 0 deletions chain/near/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ impl blockchain::UnresolvedDataSource<Chain> for UnresolvedDataSource {
self,
resolver: &Arc<dyn LinkResolver>,
logger: &Logger,
_manifest_idx: u32,
) -> Result<DataSource, Error> {
let UnresolvedDataSource {
kind,
Expand Down Expand Up @@ -346,6 +347,7 @@ impl blockchain::UnresolvedDataSourceTemplate<Chain> for UnresolvedDataSourceTem
self,
resolver: &Arc<dyn LinkResolver>,
logger: &Logger,
_manifest_idx: u32,
) -> Result<DataSourceTemplate, Error> {
let UnresolvedDataSourceTemplate {
kind,
Expand Down Expand Up @@ -377,6 +379,10 @@ impl blockchain::DataSourceTemplate<Chain> for DataSourceTemplate {
fn runtime(&self) -> Option<Arc<Vec<u8>>> {
Some(self.mapping.runtime.cheap_clone())
}

fn manifest_idx(&self) -> u32 {
unreachable!("near does not support dynamic data sources")
}
}

#[derive(Clone, Debug, Default, Hash, Eq, PartialEq, Deserialize)]
Expand Down
8 changes: 7 additions & 1 deletion chain/substreams/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ impl blockchain::UnresolvedDataSource<Chain> for UnresolvedDataSource {
self,
_resolver: &Arc<dyn LinkResolver>,
_logger: &Logger,
_manifest_idx: u32,
) -> Result<DataSource, Error> {
Ok(DataSource {
kind: SUBSTREAMS_KIND.into(),
Expand Down Expand Up @@ -205,6 +206,10 @@ impl blockchain::DataSourceTemplate<Chain> for NoopDataSourceTemplate {
fn runtime(&self) -> Option<Arc<Vec<u8>>> {
unimplemented!("{}", TEMPLATE_ERROR);
}

fn manifest_idx(&self) -> u32 {
todo!()
}
}

#[async_trait]
Expand All @@ -213,6 +218,7 @@ impl blockchain::UnresolvedDataSourceTemplate<Chain> for NoopDataSourceTemplate
self,
_resolver: &Arc<dyn LinkResolver>,
_logger: &Logger,
_manifest_idx: u32,
) -> Result<NoopDataSourceTemplate, anyhow::Error> {
unimplemented!("{}", TEMPLATE_ERROR)
}
Expand Down Expand Up @@ -260,7 +266,7 @@ mod test {
let ds: UnresolvedDataSource = serde_yaml::from_str(TEMPLATE_DATA_SOURCE).unwrap();
let link_resolver: Arc<dyn LinkResolver> = Arc::new(NoopLinkResolver {});
let logger = Logger::root(Discard, o!());
let ds: DataSource = ds.resolve(&link_resolver, &logger).await.unwrap();
let ds: DataSource = ds.resolve(&link_resolver, &logger, 0).await.unwrap();
let expected = DataSource {
kind: SUBSTREAMS_KIND.into(),
network: Some("mainnet".into()),
Expand Down
3 changes: 3 additions & 0 deletions core/src/subgraph/inputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,7 @@ pub struct IndexingInputs<C: Blockchain> {
pub templates: Arc<Vec<C::DataSourceTemplate>>,
pub unified_api_version: UnifiedMappingApiVersion,
pub static_filters: bool,

// Correspondence between data source or template position in the manifest and name.
pub manifest_idx_and_name: Vec<(u32, String)>,
}
35 changes: 28 additions & 7 deletions core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ use crate::subgraph::metrics::{
use crate::subgraph::runner::SubgraphRunner;
use crate::subgraph::SubgraphInstance;
use graph::blockchain::block_stream::BlockStreamMetrics;
use graph::blockchain::Blockchain;
use graph::blockchain::DataSource;
use graph::blockchain::NodeCapabilities;
use graph::blockchain::{Blockchain, DataSourceTemplate};
use graph::blockchain::{BlockchainKind, TriggerFilter};
use graph::prelude::{SubgraphInstanceManager as SubgraphInstanceManagerTrait, *};
use graph::{blockchain::BlockchainMap, components::store::DeploymentLocator};
Expand Down Expand Up @@ -147,7 +148,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
// that is done
store.start_subgraph_deployment(&logger).await?;

let manifest: SubgraphManifest<C> = {
let (manifest, manifest_idx_and_name) = {
info!(logger, "Resolve subgraph files using IPFS");

let mut manifest = SubgraphManifest::resolve_from_raw(
Expand All @@ -161,9 +162,28 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
.await
.context("Failed to resolve subgraph from IPFS")?;

let data_sources = load_dynamic_data_sources(store.clone(), logger.clone(), &manifest)
.await
.context("Failed to load dynamic data sources")?;
let manifest_idx_and_name: Vec<(u32, String)> = manifest
.data_sources
.iter()
.map(|ds: &C::DataSource| ds.name().to_owned())
.chain(
manifest
.templates
.iter()
.map(|t: &C::DataSourceTemplate| t.name().to_owned()),
)
.enumerate()
.map(|(idx, name)| (idx as u32, name))
.collect();

let data_sources = load_dynamic_data_sources(
store.clone(),
logger.clone(),
&manifest,
manifest_idx_and_name.clone(),
)
.await
.context("Failed to load dynamic data sources")?;

info!(logger, "Successfully resolved subgraph files using IPFS");

Expand All @@ -176,7 +196,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
manifest.data_sources.len()
);

manifest
(manifest, manifest_idx_and_name)
};

let required_capabilities = C::NodeCapabilities::from_data_sources(&manifest.data_sources);
Expand Down Expand Up @@ -240,7 +260,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {

// Initialize deployment_head with current deployment head. Any sort of trouble in
// getting the deployment head ptr leads to initializing with 0
let deployment_head = store.block_ptr().await.map(|ptr| ptr.number).unwrap_or(0) as f64;
let deployment_head = store.block_ptr().map(|ptr| ptr.number).unwrap_or(0) as f64;
block_stream_metrics.deployment_head.set(deployment_head);

let host_builder = graph_runtime_wasm::RuntimeHostBuilder::new(
Expand Down Expand Up @@ -273,6 +293,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
templates,
unified_api_version,
static_filters: self.static_filters,
manifest_idx_and_name,
};

// The subgraph state tracks the state of the subgraph instance over time
Expand Down
10 changes: 7 additions & 3 deletions core/src/subgraph/loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,21 @@ pub async fn load_dynamic_data_sources<C: Blockchain>(
store: Arc<dyn WritableStore>,
logger: Logger,
manifest: &SubgraphManifest<C>,
manifest_idx_and_name: Vec<(u32, String)>,
) -> Result<Vec<C::DataSource>, Error> {
let start_time = Instant::now();

let mut data_sources: Vec<C::DataSource> = vec![];

for stored in store.load_dynamic_data_sources().await? {
for stored in store
.load_dynamic_data_sources(manifest_idx_and_name)
.await?
{
let template = manifest
.templates
.iter()
.find(|template| template.name() == stored.name.as_str())
.ok_or_else(|| anyhow!("no template named `{}` was found", stored.name))?;
.find(|template| template.manifest_idx() == stored.manifest_idx)
.ok_or_else(|| anyhow!("no template with idx `{}` was found", stored.manifest_idx))?;

let ds = C::DataSource::from_stored_dynamic_data_source(&template, stored)?;

Expand Down
Loading