From 69348109fd9049b8799136e76c447a5114e5476d Mon Sep 17 00:00:00 2001 From: Alex TYRODE Date: Fri, 2 May 2025 21:58:58 +0000 Subject: [PATCH] feat: implement Redis-based migration locking mechanism - Introduced run_migrations_with_lock function to manage database migrations with a Redis distributed lock, ensuring that only one instance runs migrations at a time. - Updated main.py to utilize the new migration function, enhancing startup reliability and preventing migration conflicts. - Added detailed logging for migration status and outcomes, improving visibility during the migration process. --- src/backend/database/database.py | 163 ++++++++++++++++++++++++++----- src/backend/main.py | 17 +++- 2 files changed, 150 insertions(+), 30 deletions(-) diff --git a/src/backend/database/database.py b/src/backend/database/database.py index 40c6d13..c08e712 100644 --- a/src/backend/database/database.py +++ b/src/backend/database/database.py @@ -4,7 +4,9 @@ import os import asyncio -from typing import AsyncGenerator +import subprocess +import time +from typing import AsyncGenerator, Optional from urllib.parse import quote_plus as urlquote from pathlib import Path @@ -13,7 +15,6 @@ from sqlalchemy.schema import CreateSchema from fastapi import Depends from alembic.config import Config -from alembic import command from .models import Base, SCHEMA_NAME @@ -39,37 +40,147 @@ engine, class_=AsyncSession, expire_on_commit=False ) -async def run_migrations() -> None: - """Run database migrations using Alembic""" - # Get the path to the alembic.ini file - alembic_ini_path = Path(__file__).parent / "alembic.ini" +async def run_migrations_with_lock(redis_client=None, lock_timeout: int = 120, max_wait_time: int = 300) -> bool: + """ + Run database migrations using Alembic with a Redis distributed lock. + All workers will wait for the migration to complete before proceeding. + + Args: + redis_client: Redis client instance + lock_timeout: How long the lock should be held (seconds) + max_wait_time: Maximum time to wait for migrations to complete (seconds) + + Returns: + bool: True if migrations were run successfully or completed by another instance, False on timeout or error + """ + if redis_client is None: + # Import here to avoid circular imports + from config import get_redis_client + redis_client = get_redis_client() + + # Keys for Redis coordination + lock_name = "alembic_migration_lock" + status_key = "alembic_migration_status" + lock_value = f"instance_{time.time()}" - # Create Alembic configuration - alembic_cfg = Config(str(alembic_ini_path)) + # Check if migrations are already completed + migration_status = redis_client.get(status_key) + if migration_status == "completed": + print("Migrations already completed - continuing startup") + return True - # Set the script_location to the correct path - # This ensures Alembic finds the migrations directory - alembic_cfg.set_main_option('script_location', str(Path(__file__).parent / "migrations")) + # Try to acquire the lock - non-blocking + lock_acquired = redis_client.set( + lock_name, + lock_value, + nx=True, # Only set if key doesn't exist + ex=lock_timeout # Expiry in seconds + ) - # Define a function to run in a separate thread - def run_upgrade(): - # Import the command module here to avoid import issues - from alembic import command + if lock_acquired: + print("This instance will run migrations") + try: + # Set status to in-progress + redis_client.set(status_key, "in_progress", ex=lock_timeout) + + # Run migrations + success = await run_migrations_subprocess() + + if success: + # Set status to completed with a longer expiry (1 hour) + redis_client.set(status_key, "completed", ex=3600) + print("Migration completed successfully - signaling other instances") + return True + else: + # Set status to failed + redis_client.set(status_key, "failed", ex=3600) + print("Migration failed - signaling other instances") + return False + finally: + # Release the lock only if we're the owner + current_value = redis_client.get(lock_name) + if current_value == lock_value: + redis_client.delete(lock_name) + else: + print("Another instance is running migrations - waiting for completion") - # Set attributes that env.py might need - import sys - from pathlib import Path + # Wait for the migration to complete + start_time = time.time() + while time.time() - start_time < max_wait_time: + # Check migration status + status = redis_client.get(status_key) + + if status == "completed": + print("Migrations completed by another instance - continuing startup") + return True + elif status == "failed": + print("Migrations failed in another instance - continuing startup with caution") + return False + elif status is None: + # No status yet, might be a stale lock or not started + # Check if lock exists + if not redis_client.exists(lock_name): + # Lock released but no status - try to acquire the lock ourselves + print("No active migration lock - attempting to acquire") + return await run_migrations_with_lock(redis_client, lock_timeout, max_wait_time) + + # Wait before checking again + await asyncio.sleep(1) - # Add the database directory to sys.path + # Timeout waiting for migration + print(f"Timeout waiting for migrations after {max_wait_time} seconds") + return False + +async def run_migrations_subprocess() -> bool: + """ + Run Alembic migrations using a subprocess + + Returns: + bool: True if migrations were successful, False otherwise + """ + try: + # Get the path to the database directory db_dir = Path(__file__).parent - if str(db_dir) not in sys.path: - sys.path.insert(0, str(db_dir)) - # Run the upgrade command - command.upgrade(alembic_cfg, "head") - - # Run the migrations in a separate thread to avoid blocking the event loop - await asyncio.to_thread(run_upgrade) + # Create a subprocess to run alembic + process = await asyncio.create_subprocess_exec( + 'alembic', 'upgrade', 'head', + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + cwd=str(db_dir) # Run in the database directory + ) + + # Wait for the process to complete with a timeout + try: + stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=60) + + if process.returncode == 0: + print("Database migrations completed successfully") + if stdout: + print(stdout.decode()) + return True + else: + print(f"Migration failed with error code {process.returncode}") + if stderr: + print(stderr.decode()) + return False + + except asyncio.TimeoutError: + print("Migration timed out after 60 seconds") + # Try to terminate the process + process.terminate() + return False + + except Exception as e: + print(f"Migration failed: {str(e)}") + return False + +async def run_migrations() -> None: + """ + Legacy function to run migrations directly (without lock) + This is kept for backward compatibility + """ + await run_migrations_subprocess() async def init_db() -> None: """Initialize the database with required tables""" diff --git a/src/backend/main.py b/src/backend/main.py index 563ef39..3b51dec 100644 --- a/src/backend/main.py +++ b/src/backend/main.py @@ -18,7 +18,7 @@ from routers.pad_router import pad_router from routers.template_pad_router import template_pad_router from database.service import TemplatePadService -from database.database import async_session, run_migrations +from database.database import async_session, run_migrations_with_lock # Initialize PostHog if API key is available if POSTHOG_API_KEY: @@ -80,10 +80,19 @@ async def lifespan(_: FastAPI): await init_db() print("Database connection established successfully") - # Run database migrations + # Run database migrations with Redis lock + # All workers will wait for the migration to complete before proceeding try: - await run_migrations() - print("Database migrations completed successfully") + migration_success = await run_migrations_with_lock( + redis_client=redis_client, + lock_timeout=120, # 2 minutes timeout for the lock + max_wait_time=300 # 5 minutes maximum wait time + ) + + if migration_success: + print("Database migrations completed successfully or already done") + else: + print("Warning: Migrations failed or timed out - proceeding with caution") except Exception as e: print(f"Warning: Failed to run migrations: {str(e)}")