Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions store/postgres/src/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use graph::{
info, lazy_static, o, warn, BlockNumber, BlockPtr, CheapClone, Logger, StoreError, ENV_VARS,
},
schema::EntityType,
slog::error,
slog::{debug, error},
};
use itertools::Itertools;

Expand Down Expand Up @@ -113,16 +113,33 @@ table! {
}

/// Return `true` if the site is the source of a copy operation. The copy
/// operation might be just queued or in progress already
pub fn is_source(conn: &mut PgConnection, site: &Site) -> Result<bool, StoreError> {
/// operation might be just queued or in progress already. This method will
/// block until a fdw connection becomes available.
pub fn is_source(logger: &Logger, pool: &ConnectionPool, site: &Site) -> Result<bool, StoreError> {
use active_copies as ac;

// We use a fdw connection to check if the site is being copied. If we
// used an ordinary connection and there are many calls to this method,
// postgres_fdw might open an unmanageable number of connections into
// the primary, which makes the primary run out of connections
let mut last_log = Instant::now();
let mut conn = pool.get_fdw(&logger, || {
if last_log.elapsed() > LOG_INTERVAL {
last_log = Instant::now();
debug!(
logger,
"Waiting for fdw connection to check if site {} is being copied", site.namespace
);
}
false
})?;

select(diesel::dsl::exists(
ac::table
.filter(ac::src.eq(site.id))
.filter(ac::cancelled_at.is_null()),
))
.get_result::<bool>(conn)
.get_result::<bool>(&mut conn)
.map_err(StoreError::from)
}

Expand Down
3 changes: 1 addition & 2 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1235,8 +1235,7 @@ impl DeploymentStore {
req: PruneRequest,
) -> Result<(), StoreError> {
{
let mut conn = store.get_conn()?;
if copy::is_source(&mut conn, &site)? {
if copy::is_source(&logger, &store.pool, &site)? {
debug!(
logger,
"Skipping pruning since this deployment is being copied"
Expand Down
Loading