Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions chain/arweave/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ impl blockchain::DataSource<Chain> for DataSource {
self.mapping.api_version.clone()
}

fn runtime(&self) -> &[u8] {
self.mapping.runtime.as_ref()
fn runtime(&self) -> Option<Arc<Vec<u8>>> {
Some(self.mapping.runtime.cheap_clone())
}
}

Expand Down Expand Up @@ -289,8 +289,8 @@ impl blockchain::DataSourceTemplate<Chain> for DataSourceTemplate {
self.mapping.api_version.clone()
}

fn runtime(&self) -> &[u8] {
self.mapping.runtime.as_ref()
fn runtime(&self) -> Option<Arc<Vec<u8>>> {
Some(self.mapping.runtime.cheap_clone())
}
}

Expand Down
6 changes: 3 additions & 3 deletions chain/cosmos/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,8 @@ impl blockchain::DataSource<Chain> for DataSource {
self.mapping.api_version.clone()
}

fn runtime(&self) -> &[u8] {
self.mapping.runtime.as_ref()
fn runtime(&self) -> Option<Arc<Vec<u8>>> {
Some(self.mapping.runtime.cheap_clone())
}
}

Expand Down Expand Up @@ -339,7 +339,7 @@ impl blockchain::DataSourceTemplate<Chain> for DataSourceTemplate {
unimplemented!("{}", TEMPLATE_ERROR);
}

fn runtime(&self) -> &[u8] {
fn runtime(&self) -> Option<Arc<Vec<u8>>> {
unimplemented!("{}", TEMPLATE_ERROR);
}
}
Expand Down
8 changes: 4 additions & 4 deletions chain/ethereum/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,8 @@ impl blockchain::DataSource<Chain> for DataSource {
self.mapping.api_version.clone()
}

fn runtime(&self) -> &[u8] {
self.mapping.runtime.as_ref()
fn runtime(&self) -> Option<Arc<Vec<u8>>> {
Some(self.mapping.runtime.cheap_clone())
}
}

Expand Down Expand Up @@ -836,8 +836,8 @@ impl blockchain::DataSourceTemplate<Chain> for DataSourceTemplate {
self.mapping.api_version.clone()
}

fn runtime(&self) -> &[u8] {
self.mapping.runtime.as_ref()
fn runtime(&self) -> Option<Arc<Vec<u8>>> {
Some(self.mapping.runtime.cheap_clone())
}
}

Expand Down
8 changes: 4 additions & 4 deletions chain/near/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,8 @@ impl blockchain::DataSource<Chain> for DataSource {
self.mapping.api_version.clone()
}

fn runtime(&self) -> &[u8] {
self.mapping.runtime.as_ref()
fn runtime(&self) -> Option<Arc<Vec<u8>>> {
Some(self.mapping.runtime.cheap_clone())
}
}

Expand Down Expand Up @@ -374,8 +374,8 @@ impl blockchain::DataSourceTemplate<Chain> for DataSourceTemplate {
self.mapping.api_version.clone()
}

fn runtime(&self) -> &[u8] {
self.mapping.runtime.as_ref()
fn runtime(&self) -> Option<Arc<Vec<u8>>> {
Some(self.mapping.runtime.cheap_clone())
}
}

Expand Down
4 changes: 2 additions & 2 deletions chain/substreams/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl blockchain::DataSource<Chain> for DataSource {
todo!()
}

fn runtime(&self) -> &[u8] {
fn runtime(&self) -> Option<Arc<Vec<u8>>> {
todo!()
}

Expand Down Expand Up @@ -145,7 +145,7 @@ impl blockchain::DataSourceTemplate<Chain> for DataSourceTemplate {
todo!()
}

fn runtime(&self) -> &[u8] {
fn runtime(&self) -> Option<Arc<Vec<u8>>> {
todo!()
}

Expand Down
28 changes: 24 additions & 4 deletions core/src/subgraph/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,16 @@ where
// we use the same order here as in the subgraph manifest to make the
// event processing behavior predictable
for ds in manifest.data_sources {
let runtime = ds.runtime();
let module_bytes = match runtime {
None => continue,
Some(ref module_bytes) => module_bytes,
};

let host = this.new_host(
logger.cheap_clone(),
ds,
module_bytes,
templates.cheap_clone(),
host_metrics.cheap_clone(),
)?;
Expand All @@ -84,21 +91,23 @@ where
Ok(this)
}

// module_bytes is the same as data_source.runtime().unwrap(), this is to ensure that this
// function is only called for data_sources for which data_source.runtime().is_some() is true.
fn new_host(
&mut self,
logger: Logger,
data_source: C::DataSource,
module_bytes: &Arc<Vec<u8>>,
templates: Arc<Vec<C::DataSourceTemplate>>,
host_metrics: Arc<HostMetrics>,
) -> Result<T::Host, Error> {
let mapping_request_sender = {
let module_bytes = data_source.runtime();
let module_hash = tiny_keccak::keccak256(module_bytes);
let module_hash = tiny_keccak::keccak256(module_bytes.as_ref());
if let Some(sender) = self.module_cache.get(&module_hash) {
sender.clone()
} else {
let sender = T::spawn_mapping(
module_bytes.to_owned(),
module_bytes.as_ref(),
logger,
self.subgraph_id.clone(),
host_metrics.clone(),
Expand Down Expand Up @@ -167,7 +176,18 @@ where
<= data_source.creation_block()
);

let host = Arc::new(self.new_host(logger.clone(), data_source, templates, metrics)?);
let module_bytes = match &data_source.runtime() {
None => return Ok(None),
Some(ref module_bytes) => module_bytes.cheap_clone(),
};

let host = Arc::new(self.new_host(
logger.clone(),
data_source,
&module_bytes,
templates,
metrics,
)?);

Ok(if self.hosts.contains(&host) {
None
Expand Down
4 changes: 2 additions & 2 deletions graph/src/blockchain/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl<C: Blockchain> DataSource<C> for MockDataSource {
todo!()
}

fn runtime(&self) -> &[u8] {
fn runtime(&self) -> Option<Arc<Vec<u8>>> {
todo!()
}

Expand Down Expand Up @@ -142,7 +142,7 @@ impl<C: Blockchain> DataSourceTemplate<C> for MockDataSourceTemplate {
todo!()
}

fn runtime(&self) -> &[u8] {
fn runtime(&self) -> Option<Arc<Vec<u8>>> {
todo!()
}

Expand Down
4 changes: 2 additions & 2 deletions graph/src/blockchain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ pub trait DataSource<C: Blockchain>:
fn context(&self) -> Arc<Option<DataSourceContext>>;
fn creation_block(&self) -> Option<BlockNumber>;
fn api_version(&self) -> semver::Version;
fn runtime(&self) -> &[u8];
fn runtime(&self) -> Option<Arc<Vec<u8>>>;

/// Checks if `trigger` matches this data source, and if so decodes it into a `MappingTrigger`.
/// A return of `Ok(None)` mean the trigger does not match.
Expand Down Expand Up @@ -245,7 +245,7 @@ pub trait UnresolvedDataSourceTemplate<C: Blockchain>:

pub trait DataSourceTemplate<C: Blockchain>: Send + Sync + Debug {
fn api_version(&self) -> semver::Version;
fn runtime(&self) -> &[u8];
fn runtime(&self) -> Option<Arc<Vec<u8>>>;
fn name(&self) -> &str;
}

Expand Down
2 changes: 1 addition & 1 deletion graph/src/components/subgraph/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ pub trait RuntimeHostBuilder<C: Blockchain>: Clone + Send + Sync + 'static {
/// Spawn a mapping and return a channel for mapping requests. The sender should be able to be
/// cached and shared among mappings that use the same wasm file.
fn spawn_mapping(
raw_module: Vec<u8>,
raw_module: &[u8],
logger: Logger,
subgraph_id: DeploymentHash,
metrics: Arc<HostMetrics>,
Expand Down
2 changes: 1 addition & 1 deletion graph/src/data/subgraph/features.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ fn detect_ipfs_on_ethereum_contracts<C: Blockchain>(
) -> Result<Option<SubgraphFeature>, InvalidMapping> {
for runtime in manifest.runtimes() {
for function_name in IPFS_ON_ETHEREUM_CONTRACTS_FUNCTION_NAMES {
if calls_host_fn(runtime, function_name).map_err(|_| InvalidMapping)? {
if calls_host_fn(&runtime, function_name).map_err(|_| InvalidMapping)? {
return Ok(Some(SubgraphFeature::IpfsOnEthereumContracts));
}
}
Expand Down
13 changes: 8 additions & 5 deletions graph/src/data/subgraph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -697,8 +697,7 @@ impl<C: Blockchain> SubgraphManifest<C> {
// Assume the manifest has been validated, ensuring network names are homogenous
self.data_sources
.iter()
.filter_map(|d| d.network().map(|n| n.to_string()))
.next()
.find_map(|d| d.network().map(|n| n.to_string()))
.expect("Validated manifest does not have a network defined on any datasource")
}

Expand All @@ -716,11 +715,15 @@ impl<C: Blockchain> SubgraphManifest<C> {
.chain(self.data_sources.iter().map(|source| source.api_version()))
}

pub fn runtimes(&self) -> impl Iterator<Item = &[u8]> + '_ {
pub fn runtimes(&self) -> impl Iterator<Item = Arc<Vec<u8>>> + '_ {
self.templates
.iter()
.map(|template| template.runtime())
.chain(self.data_sources.iter().map(|source| source.runtime()))
.filter_map(|template| template.runtime())
.chain(
self.data_sources
.iter()
.filter_map(|source| source.runtime()),
)
}

pub fn unified_mapping_api_version(
Expand Down
2 changes: 1 addition & 1 deletion runtime/wasm/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl<C: Blockchain> RuntimeHostBuilderTrait<C> for RuntimeHostBuilder<C> {
type Req = MappingRequest<C>;

fn spawn_mapping(
raw_module: Vec<u8>,
raw_module: &[u8],
logger: Logger,
subgraph_id: DeploymentHash,
metrics: Arc<HostMetrics>,
Expand Down
4 changes: 2 additions & 2 deletions runtime/wasm/src/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ use std::thread;

/// Spawn a wasm module in its own thread.
pub fn spawn_module<C: Blockchain>(
raw_module: Vec<u8>,
raw_module: &[u8],
logger: Logger,
subgraph_id: DeploymentHash,
host_metrics: Arc<HostMetrics>,
runtime: tokio::runtime::Handle,
timeout: Option<Duration>,
experimental_features: ExperimentalFeatures,
) -> Result<mpsc::Sender<MappingRequest<C>>, anyhow::Error> {
let valid_module = Arc::new(ValidModule::new(&logger, &raw_module)?);
let valid_module = Arc::new(ValidModule::new(&logger, raw_module)?);

// Create channel for event handling requests
let (mapping_request_sender, mapping_request_receiver) = mpsc::channel(100);
Expand Down