diff --git a/node/src/manager/commands/chain.rs b/node/src/manager/commands/chain.rs index 11622dca2da..cd93531cb4c 100644 --- a/node/src/manager/commands/chain.rs +++ b/node/src/manager/commands/chain.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use diesel::sql_query; use diesel::Connection; +use diesel::PgConnection; use diesel::RunQueryDsl; use graph::blockchain::BlockHash; use graph::blockchain::BlockPtr; @@ -23,6 +24,7 @@ use graph_chain_ethereum::chain::BlockFinality; use graph_chain_ethereum::EthereumAdapter; use graph_chain_ethereum::EthereumAdapterTrait as _; use graph_store_postgres::add_chain; +use graph_store_postgres::block_store::primary::Chain as PrimaryChain; use graph_store_postgres::find_chain; use graph_store_postgres::update_chain_name; use graph_store_postgres::BlockStore; @@ -218,6 +220,31 @@ pub async fn update_chain_genesis( Ok(()) } +struct ChainSwapOutcome { + latest_backup_name: String, + previous_backup_final_name: Option, + reused_previous_backup: bool, + allocated_chain: Option, +} + +fn next_backup_name(conn: &mut PgConnection, chain: &str, base: &str) -> Result { + let mut suffix = 0usize; + + loop { + let candidate = if suffix == 0 { + format!("{chain}-{base}") + } else { + format!("{chain}-{base}-{suffix}") + }; + + if find_chain(conn, &candidate)?.is_none() { + return Ok(candidate); + } + + suffix += 1; + } +} + pub fn change_block_cache_shard( primary_store: ConnectionPool, store: Arc, @@ -231,49 +258,112 @@ pub fn change_block_cache_shard( let chain = find_chain(&mut conn, &chain_name)? .ok_or_else(|| anyhow!("unknown chain: {}", chain_name))?; let old_shard = chain.shard; + let canonical_backup_name = format!("{chain_name}-old"); + + let existing_backup = find_chain(&mut conn, &canonical_backup_name)?; + let had_existing_backup = existing_backup.is_some(); + let existing_backup_store = store.chain_store(&canonical_backup_name); println!("Current shard: {}", old_shard); let chain_store = store .chain_store(&chain_name) .ok_or_else(|| anyhow!("unknown chain: {}", &chain_name))?; - let new_name = format!("{}-old", &chain_name); let ident = chain_store.chain_identifier()?; + let target_shard = Shard::new(shard.clone())?; + let reuse_existing_backup = existing_backup + .as_ref() + .map(|backup| backup.shard.as_str() == target_shard.as_str()) + .unwrap_or(false); - conn.transaction(|conn| -> Result<(), StoreError> { - let shard = Shard::new(shard.to_string())?; - - let chain = BlockStore::allocate_chain(conn, &chain_name, &shard, &ident)?; - - store.add_chain_store(&chain,ChainStatus::Ingestible, true)?; - - // Drop the foreign key constraint on deployment_schemas + let outcome = conn.transaction(|conn| -> Result { sql_query( "alter table deployment_schemas drop constraint deployment_schemas_network_fkey;", ) .execute(conn)?; - // Update the current chain name to chain-old - update_chain_name(conn, &chain_name, &new_name)?; + let mut previous_backup_final_name: Option = None; + + let temp_backup_name = if let Some(backup) = existing_backup.as_ref() { + let temp_name = next_backup_name(conn, &chain_name, "old")?; + update_chain_name(conn, &backup.name, &temp_name)?; + previous_backup_final_name = Some(temp_name.clone()); + Some(temp_name) + } else { + None + }; + + let latest_backup_name = next_backup_name(conn, &chain_name, "old")?; + update_chain_name(conn, &chain_name, &latest_backup_name)?; + let mut allocated_chain = None; - // Create a new chain with the name in the destination shard - let _ = add_chain(conn, &chain_name, &shard, ident)?; + if reuse_existing_backup { + if let Some(temp_name) = temp_backup_name.as_ref() { + update_chain_name(conn, temp_name, &chain_name)?; + previous_backup_final_name = Some(chain_name.clone()); + } + } else { + let created_chain = add_chain(conn, &chain_name, &target_shard, ident.clone())?; + allocated_chain = Some(created_chain); + } - // Re-add the foreign key constraint sql_query( "alter table deployment_schemas add constraint deployment_schemas_network_fkey foreign key (network) references chains(name);", ) - .execute(conn)?; - Ok(()) + .execute(conn)?; + + Ok(ChainSwapOutcome { + latest_backup_name, + previous_backup_final_name, + reused_previous_backup: reuse_existing_backup, + allocated_chain, + }) })?; - chain_store.update_name(&new_name)?; + let ChainSwapOutcome { + latest_backup_name, + previous_backup_final_name, + reused_previous_backup, + allocated_chain, + } = outcome; + + chain_store.update_name(&latest_backup_name)?; + + if let (Some(backup_store), Some(final_name)) = ( + existing_backup_store.as_ref(), + previous_backup_final_name.as_ref(), + ) { + backup_store.update_name(final_name)?; + } + + if let Some(ref chain) = allocated_chain { + store.add_chain_store(chain, ChainStatus::Ingestible, true)?; + } println!( "Changed block cache shard for {} from {} to {}", chain_name, old_shard, shard ); + println!("Latest backup recorded as `{}`", latest_backup_name); + + if reused_previous_backup { + println!( + "Reused existing backup `{}` as the active `{}` chain", + canonical_backup_name, chain_name + ); + } else if let Some(ref preserved) = previous_backup_final_name { + if had_existing_backup { + println!("Preserved earlier backup as `{}`", preserved); + } + } + + if allocated_chain.is_some() { + println!( + "Allocated new chain state for `{}` on shard {}", + chain_name, shard + ); + } Ok(()) }