From 8ac707c9e133f67c4347c3455c2e0579f0763561 Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Sat, 10 May 2025 18:07:31 +0530 Subject: [PATCH 1/8] core: Make SubgraphRegistrar.provider public --- core/src/subgraph/registrar.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index 6f7ae17425f..d9a4c8410a8 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -32,7 +32,7 @@ pub struct SubgraphRegistrar { logger: Logger, logger_factory: LoggerFactory, resolver: Arc, - provider: Arc

, + pub provider: Arc

, store: Arc, subscription_manager: Arc, chains: Arc, From 3d9b7b3a7bbf662bf9afc63b8a89ecb987182177 Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Sat, 10 May 2025 18:13:24 +0530 Subject: [PATCH 2/8] graph: Make SubgraphAssigmentProvider.start take a link resolver override --- core/src/subgraph/provider.rs | 9 +++++++-- core/src/subgraph/registrar.rs | 2 +- graph/src/components/subgraph/provider.rs | 1 + node/src/manager/commands/run.rs | 2 +- tests/src/fixture/mod.rs | 4 ++-- 5 files changed, 12 insertions(+), 6 deletions(-) diff --git a/core/src/subgraph/provider.rs b/core/src/subgraph/provider.rs index 00d379db01f..67c1f216610 100644 --- a/core/src/subgraph/provider.rs +++ b/core/src/subgraph/provider.rs @@ -74,6 +74,7 @@ impl SubgraphAssignmentProviderTrait for SubgraphAss &self, loc: DeploymentLocator, stop_block: Option, + link_resolver_override: Option>, ) -> Result<(), SubgraphAssignmentProviderError> { let logger = self.logger_factory.subgraph_logger(&loc); @@ -86,8 +87,12 @@ impl SubgraphAssignmentProviderTrait for SubgraphAss )); } - let file_bytes = self - .link_resolver + let link_resolver = match link_resolver_override { + Some(link_resolver) => link_resolver, + None => self.link_resolver.clone(), + }; + + let file_bytes = link_resolver .cat(&logger, &loc.hash.to_ipfs_link()) .await .map_err(SubgraphAssignmentProviderError::ResolveError)?; diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index d9a4c8410a8..b03fa58bf95 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -479,7 +479,7 @@ async fn start_subgraph( trace!(logger, "Start subgraph"); let start_time = Instant::now(); - let result = provider.start(deployment.clone(), None).await; + let result = provider.start(deployment.clone(), None, None).await; debug!( logger, diff --git a/graph/src/components/subgraph/provider.rs b/graph/src/components/subgraph/provider.rs index 5edc22391c8..cb2ccac8b34 100644 --- a/graph/src/components/subgraph/provider.rs +++ b/graph/src/components/subgraph/provider.rs @@ -9,6 +9,7 @@ pub trait SubgraphAssignmentProvider: Send + Sync + 'static { &self, deployment: DeploymentLocator, stop_block: Option, + link_resolver_override: Option>, ) -> Result<(), SubgraphAssignmentProviderError>; async fn stop( &self, diff --git a/node/src/manager/commands/run.rs b/node/src/manager/commands/run.rs index 2c6bfdcb148..c3186bcc5bc 100644 --- a/node/src/manager/commands/run.rs +++ b/node/src/manager/commands/run.rs @@ -218,7 +218,7 @@ pub async fn run( let locator = locate(subgraph_store.as_ref(), &hash)?; - SubgraphAssignmentProvider::start(subgraph_provider.as_ref(), locator, Some(stop_block)) + SubgraphAssignmentProvider::start(subgraph_provider.as_ref(), locator, Some(stop_block), None) .await?; loop { diff --git a/tests/src/fixture/mod.rs b/tests/src/fixture/mod.rs index b8151857db3..8f875d2ae8d 100644 --- a/tests/src/fixture/mod.rs +++ b/tests/src/fixture/mod.rs @@ -285,7 +285,7 @@ impl TestContext { self.provider.stop(self.deployment.clone()).await.unwrap(); self.provider - .start(self.deployment.clone(), Some(stop_block.number)) + .start(self.deployment.clone(), Some(stop_block.number), None) .await .expect("unable to start subgraph"); @@ -306,7 +306,7 @@ impl TestContext { self.provider.stop(self.deployment.clone()).await.unwrap(); self.provider - .start(self.deployment.clone(), None) + .start(self.deployment.clone(), None, None) .await .expect("unable to start subgraph"); From eb6c048af2c13b9734fc340dd064918d0ea8b0ec Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Sat, 10 May 2025 18:20:33 +0530 Subject: [PATCH 3/8] graph: Make SubgraphInstanceManager methods take a link resolver override --- core/src/subgraph/instance_manager.rs | 18 ++++++++++++++---- core/src/subgraph/provider.rs | 2 +- .../components/subgraph/instance_manager.rs | 3 ++- tests/src/fixture/mod.rs | 2 ++ 4 files changed, 19 insertions(+), 6 deletions(-) diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index 8c2b76e5b6c..632b7294df5 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -62,6 +62,7 @@ impl SubgraphInstanceManagerTrait for SubgraphInstanceManager< loc: DeploymentLocator, manifest: serde_yaml::Mapping, stop_block: Option, + link_resolver_override: Option>, ) { let runner_index = self.subgraph_start_counter.fetch_add(1, Ordering::SeqCst); @@ -89,6 +90,7 @@ impl SubgraphInstanceManagerTrait for SubgraphInstanceManager< stop_block, Box::new(SubgraphTriggerProcessor {}), deployment_status_metric, + link_resolver_override, ) .await?; @@ -104,6 +106,7 @@ impl SubgraphInstanceManagerTrait for SubgraphInstanceManager< stop_block, Box::new(SubgraphTriggerProcessor {}), deployment_status_metric, + link_resolver_override, ) .await?; @@ -119,6 +122,7 @@ impl SubgraphInstanceManagerTrait for SubgraphInstanceManager< stop_block, Box::new(SubgraphTriggerProcessor {}), deployment_status_metric, + link_resolver_override, ) .await?; @@ -136,6 +140,7 @@ impl SubgraphInstanceManagerTrait for SubgraphInstanceManager< loc.clone(), )), deployment_status_metric, + link_resolver_override, ) .await?; @@ -247,6 +252,7 @@ impl SubgraphInstanceManager { stop_block: Option, tp: Box>>, deployment_status_metric: DeploymentStatusMetric, + link_resolver_override: Option>, ) -> anyhow::Result>> where C: Blockchain, @@ -261,6 +267,7 @@ impl SubgraphInstanceManager { tp, deployment_status_metric, false, + link_resolver_override, ) .await } @@ -275,6 +282,7 @@ impl SubgraphInstanceManager { tp: Box>>, deployment_status_metric: DeploymentStatusMetric, is_runner_test: bool, + link_resolver_override: Option>, ) -> anyhow::Result>> where C: Blockchain, @@ -287,7 +295,10 @@ impl SubgraphInstanceManager { let manifest = UnresolvedSubgraphManifest::parse(deployment.hash.cheap_clone(), manifest)?; // Allow for infinite retries for subgraph definition files. - let link_resolver = Arc::from(self.link_resolver.with_retries()); + let link_resolver: Arc = match link_resolver_override { + Some(link_resolver) => Arc::from(link_resolver.with_retries()), + None => Arc::from(self.link_resolver.with_retries()), + }; // Make sure the `raw_yaml` is present on both this subgraph and the graft base. self.subgraph_store @@ -295,8 +306,7 @@ impl SubgraphInstanceManager { .await?; if let Some(graft) = &manifest.graft { if self.subgraph_store.is_deployed(&graft.base)? { - let file_bytes = self - .link_resolver + let file_bytes = link_resolver .cat(&logger, &graft.base.to_ipfs_link()) .await?; let yaml = String::from_utf8(file_bytes)?; @@ -482,7 +492,7 @@ impl SubgraphInstanceManager { let (runtime_adapter, decoder_hook) = chain.runtime()?; let host_builder = graph_runtime_wasm::RuntimeHostBuilder::new( runtime_adapter, - self.link_resolver.cheap_clone(), + link_resolver.cheap_clone(), subgraph_store.ens_lookup(), ); diff --git a/core/src/subgraph/provider.rs b/core/src/subgraph/provider.rs index 67c1f216610..c538304ce05 100644 --- a/core/src/subgraph/provider.rs +++ b/core/src/subgraph/provider.rs @@ -102,7 +102,7 @@ impl SubgraphAssignmentProviderTrait for SubgraphAss self.instance_manager .cheap_clone() - .start_subgraph(loc, raw, stop_block) + .start_subgraph(loc, raw, stop_block, Some(link_resolver)) .await; Ok(()) diff --git a/graph/src/components/subgraph/instance_manager.rs b/graph/src/components/subgraph/instance_manager.rs index c04fd5237b4..4bafdd3ad3e 100644 --- a/graph/src/components/subgraph/instance_manager.rs +++ b/graph/src/components/subgraph/instance_manager.rs @@ -1,4 +1,4 @@ -use crate::prelude::BlockNumber; +use crate::prelude::{BlockNumber, LinkResolver}; use std::sync::Arc; use crate::components::store::DeploymentLocator; @@ -15,6 +15,7 @@ pub trait SubgraphInstanceManager: Send + Sync + 'static { deployment: DeploymentLocator, manifest: serde_yaml::Mapping, stop_block: Option, + link_resolver_override: Option>, ); async fn stop_subgraph(&self, deployment: DeploymentLocator); } diff --git a/tests/src/fixture/mod.rs b/tests/src/fixture/mod.rs index 8f875d2ae8d..3cef91da24d 100644 --- a/tests/src/fixture/mod.rs +++ b/tests/src/fixture/mod.rs @@ -228,6 +228,7 @@ impl TestContext { tp, deployment_status_metric, true, + None, ) .await .unwrap() @@ -259,6 +260,7 @@ impl TestContext { tp, deployment_status_metric, true, + None, ) .await .unwrap() From 7a51c934f7989beadeafce5d297c5eb0abd158c9 Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Sat, 10 May 2025 20:32:50 +0530 Subject: [PATCH 4/8] core: Make create_subgraph_version take a link resolver override --- core/src/subgraph/registrar.rs | 14 ++++++++------ graph/src/components/subgraph/registrar.rs | 1 + node/src/dev/helpers.rs | 1 + node/src/launcher.rs | 1 + node/src/manager/commands/run.rs | 1 + server/json-rpc/src/lib.rs | 1 + tests/src/fixture/mod.rs | 1 + 7 files changed, 14 insertions(+), 6 deletions(-) diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index b03fa58bf95..11e317a4cd7 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -278,6 +278,7 @@ where start_block_override: Option, graft_block_override: Option, history_blocks: Option, + link_resolver_override: Option>, ) -> Result { // We don't have a location for the subgraph yet; that will be // assigned when we deploy for real. For logging purposes, make up a @@ -286,9 +287,10 @@ where .logger_factory .subgraph_logger(&DeploymentLocator::new(DeploymentId(0), hash.clone())); + let link_resolver = link_resolver_override.unwrap_or_else(|| self.resolver.clone()); + let raw: serde_yaml::Mapping = { - let file_bytes = self - .resolver + let file_bytes = link_resolver .cat(&logger, &hash.to_ipfs_link()) .await .map_err(|e| { @@ -323,7 +325,7 @@ where node_id, debug_fork, self.version_switching_mode, - &self.resolver, + &link_resolver, history_blocks, ) .await? @@ -341,7 +343,7 @@ where node_id, debug_fork, self.version_switching_mode, - &self.resolver, + &link_resolver, history_blocks, ) .await? @@ -359,7 +361,7 @@ where node_id, debug_fork, self.version_switching_mode, - &self.resolver, + &link_resolver, history_blocks, ) .await? @@ -377,7 +379,7 @@ where node_id, debug_fork, self.version_switching_mode, - &self.resolver, + &link_resolver, history_blocks, ) .await? diff --git a/graph/src/components/subgraph/registrar.rs b/graph/src/components/subgraph/registrar.rs index 691c341e38b..9022b62ed69 100644 --- a/graph/src/components/subgraph/registrar.rs +++ b/graph/src/components/subgraph/registrar.rs @@ -45,6 +45,7 @@ pub trait SubgraphRegistrar: Send + Sync + 'static { start_block_block: Option, graft_block_override: Option, history_blocks: Option, + link_resolver_override: Option>, ) -> Result; async fn remove_subgraph(&self, name: SubgraphName) -> Result<(), SubgraphRegistrarError>; diff --git a/node/src/dev/helpers.rs b/node/src/dev/helpers.rs index 45f7af9b75e..ec7b05e7025 100644 --- a/node/src/dev/helpers.rs +++ b/node/src/dev/helpers.rs @@ -55,6 +55,7 @@ async fn deploy_subgraph( start_block, None, None, + None, ) .await .and_then(|locator| { diff --git a/node/src/launcher.rs b/node/src/launcher.rs index d82a5d0fcbf..232ecf8ccb1 100644 --- a/node/src/launcher.rs +++ b/node/src/launcher.rs @@ -253,6 +253,7 @@ fn deploy_subgraph_from_flag( start_block, None, None, + None, ) .await } diff --git a/node/src/manager/commands/run.rs b/node/src/manager/commands/run.rs index c3186bcc5bc..21043692622 100644 --- a/node/src/manager/commands/run.rs +++ b/node/src/manager/commands/run.rs @@ -213,6 +213,7 @@ pub async fn run( None, None, None, + None, ) .await?; diff --git a/server/json-rpc/src/lib.rs b/server/json-rpc/src/lib.rs index 103d36f806c..f8b960bad79 100644 --- a/server/json-rpc/src/lib.rs +++ b/server/json-rpc/src/lib.rs @@ -133,6 +133,7 @@ impl ServerState { None, None, params.history_blocks, + None, ) .await { diff --git a/tests/src/fixture/mod.rs b/tests/src/fixture/mod.rs index 3cef91da24d..8aad928979a 100644 --- a/tests/src/fixture/mod.rs +++ b/tests/src/fixture/mod.rs @@ -614,6 +614,7 @@ pub async fn setup_inner( None, graft_block, None, + None, ) .await .expect("failed to create subgraph version"); From f4dfd7112f3037ac519e95c888c7f2a9e1795c1a Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Sat, 10 May 2025 18:36:55 +0530 Subject: [PATCH 5/8] node: Do not start subgraphs normally when in dev mode --- node/src/launcher.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/node/src/launcher.rs b/node/src/launcher.rs index 232ecf8ccb1..d8b9b05eb2e 100644 --- a/node/src/launcher.rs +++ b/node/src/launcher.rs @@ -352,6 +352,7 @@ fn build_graphql_server( pub async fn run(opt: Opt, env_vars: Arc, dev_ctx: Option) { // Set up logger let logger = logger(opt.debug); + let is_dev_mode = dev_ctx.is_some(); // Log version information info!( @@ -536,12 +537,14 @@ pub async fn run(opt: Opt, env_vars: Arc, dev_ctx: Option Date: Sat, 10 May 2025 19:15:23 +0530 Subject: [PATCH 6/8] node: Make gnd work with multiple subgraphs --- node/src/bin/dev.rs | 71 +++++++----------- node/src/dev/watcher.rs | 156 ++++++++++++++++++++++++++++++++++------ 2 files changed, 160 insertions(+), 67 deletions(-) diff --git a/node/src/bin/dev.rs b/node/src/bin/dev.rs index 1545c4bad4c..e0d68f2d63c 100644 --- a/node/src/bin/dev.rs +++ b/node/src/bin/dev.rs @@ -7,10 +7,11 @@ use graph::{ components::link_resolver::FileLinkResolver, env::EnvVars, log::logger, + slog::info, tokio::{self, sync::mpsc}, }; use graph_node::{ - dev::{helpers::DevModeContext, watcher::watch_subgraph_dir}, + dev::{helpers::DevModeContext, watcher::watch_subgraphs}, launcher, opt::Opt, }; @@ -39,9 +40,17 @@ pub struct DevOpt { #[clap( long, help = "The location of the subgraph manifest file.", - default_value = "./build/subgraph.yaml" + default_value = "./build/subgraph.yaml", + value_delimiter = ',' )] - pub manifest: String, + pub manifests: Vec, + + #[clap( + long, + help = "The location of the database directory.", + default_value = "./build" + )] + pub database_dir: String, #[clap( long, @@ -63,7 +72,7 @@ pub struct DevOpt { } /// Builds the Graph Node options from DevOpt -fn build_args(dev_opt: &DevOpt, db_url: &str, manifest_path: &str) -> Result { +fn build_args(dev_opt: &DevOpt, db_url: &str) -> Result { let mut args = vec!["gnd".to_string()]; if !dev_opt.ipfs.is_empty() { @@ -76,16 +85,6 @@ fn build_args(dev_opt: &DevOpt, db_url: &str, manifest_path: &str) -> Result Result Result { - let manifest_path = Path::new(manifest_path_str); - - if !manifest_path.exists() { - anyhow::bail!("Subgraph manifest file not found at {}", manifest_path_str); - } - - let dir = manifest_path - .parent() - .context("Failed to get parent directory of manifest")?; - - dir.canonicalize() - .context("Failed to canonicalize build directory path") -} - async fn run_graph_node(opt: Opt, ctx: Option) -> Result<()> { let env_vars = Arc::new(EnvVars::from_env().context("Failed to load environment variables")?); @@ -122,18 +105,23 @@ async fn main() -> Result<()> { env_logger::init(); let dev_opt = DevOpt::parse(); - let build_dir = get_build_dir(&dev_opt.manifest)?; + let database_dir = Path::new(&dev_opt.database_dir); + + let logger = logger(true); + + info!(logger, "Starting Graph Node Dev"); + info!(logger, "Database directory: {}", database_dir.display()); let db = PgTempDBBuilder::new() - .with_data_dir_prefix(build_dir.clone()) + .with_data_dir_prefix(database_dir) .with_initdb_param("-E", "UTF8") .with_initdb_param("--locale", "C") .start_async() .await; let (tx, rx) = mpsc::channel(1); - let opt = build_args(&dev_opt, &db.connection_uri(), &dev_opt.manifest)?; - let file_link_resolver = Arc::new(FileLinkResolver::with_base_dir(&build_dir)); + let opt = build_args(&dev_opt, &db.connection_uri())?; + let file_link_resolver = Arc::new(FileLinkResolver::with_base_dir(database_dir)); let ctx = DevModeContext { watch: dev_opt.watch, @@ -141,11 +129,6 @@ async fn main() -> Result<()> { updates_rx: rx, }; - let subgraph = opt.subgraph.clone().unwrap(); - - // Set up logger - let logger = logger(opt.debug); - // Run graph node graph::spawn(async move { let _ = run_graph_node(opt, Some(ctx)).await; @@ -153,14 +136,8 @@ async fn main() -> Result<()> { if dev_opt.watch { graph::spawn_blocking(async move { - watch_subgraph_dir( - &logger, - build_dir, - subgraph, - vec!["pgtemp-*".to_string()], - tx, - ) - .await; + let _ = + watch_subgraphs(&logger, dev_opt.manifests, vec!["pgtemp-*".to_string()], tx).await; }); } diff --git a/node/src/dev/watcher.rs b/node/src/dev/watcher.rs index 53fbd729bcd..df0b70d6bf0 100644 --- a/node/src/dev/watcher.rs +++ b/node/src/dev/watcher.rs @@ -1,28 +1,102 @@ use globset::{Glob, GlobSet, GlobSetBuilder}; use graph::prelude::{DeploymentHash, SubgraphName}; -use graph::slog::{error, info, Logger}; +use graph::slog::{self, error, info, Logger}; use graph::tokio::sync::mpsc::Sender; use notify::{recommended_watcher, Event, RecursiveMode, Watcher}; +use std::collections::BTreeMap; use std::path::{Path, PathBuf}; use std::sync::mpsc; use std::time::Duration; const WATCH_DELAY: Duration = Duration::from_secs(5); +/// Maps manifest files to their parent directories and returns a mapping of deployment hashes to directories +fn deployment_hash_to_dir( + manifests: Vec, + logger: &Logger, +) -> BTreeMap { + let mut hash_to_dir = BTreeMap::new(); + + for manifest in manifests { + info!(logger, "Validating manifest: {}", manifest); + let manifest_path = Path::new(&manifest); + let manifest_path = match manifest_path.canonicalize() { + Ok(canon_path) => canon_path, + Err(e) => { + error!( + logger, + "Failed to canonicalize path for manifest {}: {}", manifest, e + ); + continue; + } + }; + + let dir = match manifest_path.parent() { + Some(parent) => match parent.canonicalize() { + Ok(canon_path) => canon_path, + Err(e) => { + error!( + logger, + "Failed to canonicalize path for manifest {}: {}", manifest, e + ); + continue; + } + }, + None => { + error!( + logger, + "Failed to get parent directory for manifest: {}", manifest + ); + continue; + } + }; + + info!(logger, "Watching manifest: {}", manifest_path.display()); + + hash_to_dir.insert( + DeploymentHash::new(manifest_path.display().to_string()) + .expect("Failed to create deployment hash"), + dir.to_path_buf(), + ); + } + + hash_to_dir +} + /// Sets up a watcher for the given directory with optional exclusions. /// Exclusions can include glob patterns like "pgtemp-*". -pub async fn watch_subgraph_dir( +pub async fn watch_subgraphs( + logger: &Logger, + manifests: Vec, + exclusions: Vec, + sender: Sender<(DeploymentHash, SubgraphName)>, +) { + let logger = logger.new(slog::o!("component" => ">>>>> Watcher")); + info!(logger, "Watching subgraphs: {}", manifests.join(", ")); + let hash_to_dir = deployment_hash_to_dir(manifests, &logger); + + watch_subgraph_dirs(&logger, hash_to_dir, exclusions, sender).await; +} + +/// Sets up a watcher for the given directories with optional exclusions. +/// Exclusions can include glob patterns like "pgtemp-*". +pub async fn watch_subgraph_dirs( logger: &Logger, - dir: PathBuf, - id: String, + hash_to_dir: BTreeMap, exclusions: Vec, sender: Sender<(DeploymentHash, SubgraphName)>, ) { + if hash_to_dir.is_empty() { + info!(logger, "No directories to watch"); + return; + } + info!( logger, - "Watching for changes in directory: {}", - dir.display() + "Watching for changes in {} directories", + hash_to_dir.len() ); + if !exclusions.is_empty() { info!(logger, "Excluding patterns: {}", exclusions.join(", ")); } @@ -33,7 +107,6 @@ pub async fn watch_subgraph_dir( // Create a channel to receive the events let (tx, rx) = mpsc::channel(); - // Create a watcher object let mut watcher = match recommended_watcher(tx) { Ok(w) => w, Err(e) => { @@ -42,28 +115,39 @@ pub async fn watch_subgraph_dir( } }; - if let Err(e) = watcher.watch(&dir, RecursiveMode::Recursive) { - error!(logger, "Error watching directory {}: {}", dir.display(), e); - return; + for (_, dir) in hash_to_dir.iter() { + if let Err(e) = watcher.watch(dir, RecursiveMode::Recursive) { + error!(logger, "Error watching directory {}: {}", dir.display(), e); + std::process::exit(1); + } + info!(logger, "Watching directory: {}", dir.display()); } - let watch_dir = dir.clone(); - let watch_exclusion_set = exclusion_set.clone(); + // Process file change events + process_file_events(logger, rx, &exclusion_set, &hash_to_dir, sender).await; +} +/// Processes file change events and triggers redeployments +async fn process_file_events( + logger: &Logger, + rx: mpsc::Receiver>, + exclusion_set: &GlobSet, + hash_to_dir: &BTreeMap, + sender: Sender<(DeploymentHash, SubgraphName)>, +) { loop { - let first_event = match rx.recv() { - Ok(Ok(e)) if should_process_event(&e, &watch_dir, &watch_exclusion_set) => Some(e), + // Wait for an event + let event = match rx.recv() { + Ok(Ok(e)) => e, Ok(_) => continue, Err(_) => break, }; - if first_event.is_none() { + if !is_relevant_event(&event, hash_to_dir, exclusion_set) { continue; } - // Once we receive an event, wait for a short period of time to allow for multiple events to be received - // This is because running graph build writes multiple files at once - // Which triggers multiple events, we only need to react to it once + // Once we receive an event, wait for a short period to batch multiple related events let start = std::time::Instant::now(); while start.elapsed() < WATCH_DELAY { match rx.try_recv() { @@ -73,12 +157,44 @@ pub async fn watch_subgraph_dir( } } + // Redeploy all subgraphs + redeploy_all_subgraphs(logger, hash_to_dir, &sender).await; + } +} + +/// Checks if an event is relevant for any of the watched directories +fn is_relevant_event( + event: &Event, + watched_dirs: &BTreeMap, + exclusion_set: &GlobSet, +) -> bool { + for path in event.paths.iter() { + for dir in watched_dirs.values() { + if path.starts_with(dir) && should_process_event(event, dir, exclusion_set) { + return true; + } + } + } + false +} + +/// Redeploys all subgraphs in the order defined by the BTreeMap +async fn redeploy_all_subgraphs( + logger: &Logger, + hash_to_dir: &BTreeMap, + sender: &Sender<(DeploymentHash, SubgraphName)>, +) { + info!(logger, "File change detected, redeploying all subgraphs"); + let mut count = 0; + for id in hash_to_dir.keys() { let _ = sender .send(( - DeploymentHash::new(id.clone()).unwrap(), - SubgraphName::new("test").unwrap(), + id.clone(), + SubgraphName::new(format!("subgraph-{}", count)) + .expect("Failed to create subgraph name"), )) .await; + count += 1; } } From 9854a7ba40ffeeaa99f851a4b0e9dbb8713b33c7 Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Sat, 10 May 2025 20:48:51 +0530 Subject: [PATCH 7/8] node: Use subgraph registrar to directly start and stop subgraphs in dev mode --- node/src/dev/helpers.rs | 102 +++++++++++++++++++++++++++------------- node/src/dev/watcher.rs | 21 +++++---- 2 files changed, 82 insertions(+), 41 deletions(-) diff --git a/node/src/dev/helpers.rs b/node/src/dev/helpers.rs index ec7b05e7025..9b6d0c4c91a 100644 --- a/node/src/dev/helpers.rs +++ b/node/src/dev/helpers.rs @@ -1,43 +1,67 @@ +use std::path::PathBuf; use std::sync::Arc; use anyhow::Result; use graph::components::link_resolver::FileLinkResolver; use graph::prelude::{ - BlockPtr, DeploymentHash, NodeId, SubgraphRegistrarError, SubgraphStore as SubgraphStoreTrait, + BlockPtr, DeploymentHash, NodeId, SubgraphRegistrar as SubgraphRegistrarTrait, + SubgraphRegistrarError, SubgraphStore as SubgraphStoreTrait, }; use graph::slog::{error, info, Logger}; use graph::tokio::sync::mpsc::Receiver; use graph::{ components::store::DeploymentLocator, - prelude::{SubgraphName, SubgraphRegistrar}, + prelude::{SubgraphAssignmentProvider as SubgraphAssignmentProviderTrait, SubgraphName}, }; -use graph_store_postgres::SubgraphStore; +use graph_core::{SubgraphAssignmentProvider, SubgraphInstanceManager, SubgraphRegistrar}; +use graph_store_postgres::{SubgraphStore, SubscriptionManager}; + +type SubgraphRegistrarType = SubgraphRegistrar< + SubgraphAssignmentProvider>, + SubgraphStore, + SubscriptionManager, +>; + +pub struct DevSubgraph { + pub hash: DeploymentHash, + pub name: SubgraphName, + pub build_dir: PathBuf, +} pub struct DevModeContext { pub watch: bool, pub file_link_resolver: Arc, - pub updates_rx: Receiver<(DeploymentHash, SubgraphName)>, + pub updates_rx: Receiver, } /// Cleanup a subgraph /// This is used to remove a subgraph before redeploying it when using the watch flag -fn cleanup_dev_subgraph( +async fn cleanup_dev_subgraph( logger: &Logger, subgraph_store: &SubgraphStore, - name: &SubgraphName, - locator: &DeploymentLocator, + subgraph_registrar: &Arc, + subgraph: &DevSubgraph, ) -> Result<()> { - info!(logger, "Removing subgraph"; "name" => name.to_string(), "id" => locator.id.to_string(), "hash" => locator.hash.to_string()); - subgraph_store.remove_subgraph(name.clone())?; - subgraph_store.unassign_subgraph(locator)?; - subgraph_store.remove_deployment(locator.id.into())?; - info!(logger, "Subgraph removed"; "name" => name.to_string(), "id" => locator.id.to_string(), "hash" => locator.hash.to_string()); + let hash = subgraph.hash.clone(); + let locator = subgraph_store.active_locator(&hash)?; + + if let Some(locator) = locator { + info!(logger, "Removing subgraph"; "name" => subgraph.name.to_string(), "id" => locator.id.to_string(), "hash" => hash.to_string()); + + subgraph_registrar.provider.stop(locator.clone()).await?; + subgraph_store.remove_subgraph(subgraph.name.clone())?; + subgraph_store.unassign_subgraph(&locator)?; + subgraph_store.remove_deployment(locator.id.into())?; + + info!(logger, "Subgraph removed"; "name" => subgraph.name.to_string(), "id" => locator.id.to_string(), "hash" => hash.to_string()); + } + Ok(()) } async fn deploy_subgraph( logger: &Logger, - subgraph_registrar: Arc, + subgraph_registrar: &Arc, name: SubgraphName, subgraph_id: DeploymentHash, node_id: NodeId, @@ -67,28 +91,39 @@ async fn deploy_subgraph( pub async fn drop_and_recreate_subgraph( logger: &Logger, subgraph_store: Arc, - subgraph_registrar: Arc, - name: SubgraphName, - subgraph_id: DeploymentHash, + subgraph_registrar: Arc, node_id: NodeId, - hash: DeploymentHash, + dev_subgraph: DevSubgraph, ) -> Result { - let locator = subgraph_store.active_locator(&hash)?; - if let Some(locator) = locator.clone() { - cleanup_dev_subgraph(logger, &subgraph_store, &name, &locator)?; - } + let name = dev_subgraph.name.clone(); + let hash = dev_subgraph.hash.clone(); + + let link_resolver = Arc::new(FileLinkResolver::with_base_dir( + dev_subgraph.build_dir.clone(), + )); + + cleanup_dev_subgraph(logger, &subgraph_store, &subgraph_registrar, &dev_subgraph).await?; - deploy_subgraph( + let locator = deploy_subgraph( logger, - subgraph_registrar, - name, - subgraph_id, + &subgraph_registrar, + name.clone(), + hash.clone(), node_id, None, None, ) .await - .map_err(|e| anyhow::anyhow!("Failed to deploy subgraph: {}", e)) + .map_err(|e| anyhow::anyhow!("Failed to deploy subgraph: {}", e))?; + + info!(logger, "Starting subgraph"; "name" => name.to_string(), "id" => hash.to_string(), "locator" => locator.to_string()); + subgraph_registrar + .provider + .start(locator.clone(), None, Some(link_resolver)) + .await?; + info!(logger, "Subgraph started"; "name" => name.to_string(), "id" => hash.to_string(), "locator" => locator.to_string()); + + Ok(locator) } /// Watch for subgraph updates, drop and recreate them @@ -97,19 +132,21 @@ pub async fn drop_and_recreate_subgraph( pub async fn watch_subgraph_updates( logger: &Logger, subgraph_store: Arc, - subgraph_registrar: Arc, + subgraph_registrar: Arc, node_id: NodeId, - mut rx: Receiver<(DeploymentHash, SubgraphName)>, + mut rx: Receiver, ) { - while let Some((hash, name)) = rx.recv().await { + while let Some(dev_subgraph) = rx.recv().await { + let name = dev_subgraph.name.clone(); + let hash = dev_subgraph.hash.clone(); + let build_dir = dev_subgraph.build_dir.clone(); + let res = drop_and_recreate_subgraph( logger, subgraph_store.clone(), subgraph_registrar.clone(), - name.clone(), - hash.clone(), node_id.clone(), - hash.clone(), + dev_subgraph, ) .await; @@ -117,6 +154,7 @@ pub async fn watch_subgraph_updates( error!(logger, "Failed to drop and recreate subgraph"; "name" => name.to_string(), "hash" => hash.to_string(), + "build_dir" => format!("{}", build_dir.display()), "error" => e.to_string() ); } diff --git a/node/src/dev/watcher.rs b/node/src/dev/watcher.rs index df0b70d6bf0..385ea9e6034 100644 --- a/node/src/dev/watcher.rs +++ b/node/src/dev/watcher.rs @@ -8,6 +8,8 @@ use std::path::{Path, PathBuf}; use std::sync::mpsc; use std::time::Duration; +use super::helpers::DevSubgraph; + const WATCH_DELAY: Duration = Duration::from_secs(5); /// Maps manifest files to their parent directories and returns a mapping of deployment hashes to directories @@ -69,7 +71,7 @@ pub async fn watch_subgraphs( logger: &Logger, manifests: Vec, exclusions: Vec, - sender: Sender<(DeploymentHash, SubgraphName)>, + sender: Sender, ) { let logger = logger.new(slog::o!("component" => ">>>>> Watcher")); info!(logger, "Watching subgraphs: {}", manifests.join(", ")); @@ -84,7 +86,7 @@ pub async fn watch_subgraph_dirs( logger: &Logger, hash_to_dir: BTreeMap, exclusions: Vec, - sender: Sender<(DeploymentHash, SubgraphName)>, + sender: Sender, ) { if hash_to_dir.is_empty() { info!(logger, "No directories to watch"); @@ -133,7 +135,7 @@ async fn process_file_events( rx: mpsc::Receiver>, exclusion_set: &GlobSet, hash_to_dir: &BTreeMap, - sender: Sender<(DeploymentHash, SubgraphName)>, + sender: Sender, ) { loop { // Wait for an event @@ -182,17 +184,18 @@ fn is_relevant_event( async fn redeploy_all_subgraphs( logger: &Logger, hash_to_dir: &BTreeMap, - sender: &Sender<(DeploymentHash, SubgraphName)>, + sender: &Sender, ) { info!(logger, "File change detected, redeploying all subgraphs"); let mut count = 0; - for id in hash_to_dir.keys() { + for (id, build_dir) in hash_to_dir.iter() { let _ = sender - .send(( - id.clone(), - SubgraphName::new(format!("subgraph-{}", count)) + .send(DevSubgraph { + hash: id.clone(), + name: SubgraphName::new(format!("subgraph-{}", count)) .expect("Failed to create subgraph name"), - )) + build_dir: build_dir.clone(), + }) .await; count += 1; } From 4246e9da88405e3678b325e4510854180cbc7b16 Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Sat, 10 May 2025 21:28:35 +0530 Subject: [PATCH 8/8] node: override link resolver for gnd --- node/src/dev/helpers.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/node/src/dev/helpers.rs b/node/src/dev/helpers.rs index 9b6d0c4c91a..6147f93fd2c 100644 --- a/node/src/dev/helpers.rs +++ b/node/src/dev/helpers.rs @@ -63,27 +63,29 @@ async fn deploy_subgraph( logger: &Logger, subgraph_registrar: &Arc, name: SubgraphName, - subgraph_id: DeploymentHash, + hash: DeploymentHash, node_id: NodeId, debug_fork: Option, start_block: Option, + link_resolver: Arc, ) -> Result { - info!(logger, "Re-deploying subgraph"; "name" => name.to_string(), "id" => subgraph_id.to_string()); + info!(logger, "Re-deploying subgraph"; "name" => name.to_string(), "id" => hash.to_string()); + subgraph_registrar.create_subgraph(name.clone()).await?; subgraph_registrar .create_subgraph_version( name.clone(), - subgraph_id.clone(), + hash.clone(), node_id, debug_fork, start_block, None, None, - None, + Some(link_resolver) , ) .await .and_then(|locator| { - info!(logger, "Subgraph deployed"; "name" => name.to_string(), "id" => subgraph_id.to_string(), "locator" => locator.to_string()); + info!(logger, "Subgraph deployed"; "name" => name.to_string(), "id" => hash.to_string(), "locator" => locator.to_string()); Ok(locator) }) } @@ -112,6 +114,7 @@ pub async fn drop_and_recreate_subgraph( node_id, None, None, + link_resolver.clone(), ) .await .map_err(|e| anyhow::anyhow!("Failed to deploy subgraph: {}", e))?;