diff --git a/store/postgres/src/copy.rs b/store/postgres/src/copy.rs index 9e9ba187c6a..e0ae71eab3e 100644 --- a/store/postgres/src/copy.rs +++ b/store/postgres/src/copy.rs @@ -37,7 +37,7 @@ use graph::{ info, lazy_static, o, warn, BlockNumber, BlockPtr, CheapClone, Logger, StoreError, ENV_VARS, }, schema::EntityType, - slog::error, + slog::{debug, error}, }; use itertools::Itertools; @@ -113,16 +113,33 @@ table! { } /// Return `true` if the site is the source of a copy operation. The copy -/// operation might be just queued or in progress already -pub fn is_source(conn: &mut PgConnection, site: &Site) -> Result { +/// operation might be just queued or in progress already. This method will +/// block until a fdw connection becomes available. +pub fn is_source(logger: &Logger, pool: &ConnectionPool, site: &Site) -> Result { use active_copies as ac; + // We use a fdw connection to check if the site is being copied. If we + // used an ordinary connection and there are many calls to this method, + // postgres_fdw might open an unmanageable number of connections into + // the primary, which makes the primary run out of connections + let mut last_log = Instant::now(); + let mut conn = pool.get_fdw(&logger, || { + if last_log.elapsed() > LOG_INTERVAL { + last_log = Instant::now(); + debug!( + logger, + "Waiting for fdw connection to check if site {} is being copied", site.namespace + ); + } + false + })?; + select(diesel::dsl::exists( ac::table .filter(ac::src.eq(site.id)) .filter(ac::cancelled_at.is_null()), )) - .get_result::(conn) + .get_result::(&mut conn) .map_err(StoreError::from) } diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index 91230d63b7b..c78b06be46d 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -1235,8 +1235,7 @@ impl DeploymentStore { req: PruneRequest, ) -> Result<(), StoreError> { { - let mut conn = store.get_conn()?; - if copy::is_source(&mut conn, &site)? { + if copy::is_source(&logger, &store.pool, &site)? { debug!( logger, "Skipping pruning since this deployment is being copied"