Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 137 additions & 26 deletions src/backend/database/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

import os
import asyncio
from typing import AsyncGenerator
import subprocess
import time
from typing import AsyncGenerator, Optional
from urllib.parse import quote_plus as urlquote
from pathlib import Path

Expand All @@ -13,7 +15,6 @@
from sqlalchemy.schema import CreateSchema
from fastapi import Depends
from alembic.config import Config
from alembic import command

from .models import Base, SCHEMA_NAME

Expand All @@ -39,37 +40,147 @@
engine, class_=AsyncSession, expire_on_commit=False
)

async def run_migrations() -> None:
"""Run database migrations using Alembic"""
# Get the path to the alembic.ini file
alembic_ini_path = Path(__file__).parent / "alembic.ini"
async def run_migrations_with_lock(redis_client=None, lock_timeout: int = 120, max_wait_time: int = 300) -> bool:
"""
Run database migrations using Alembic with a Redis distributed lock.
All workers will wait for the migration to complete before proceeding.

Args:
redis_client: Redis client instance
lock_timeout: How long the lock should be held (seconds)
max_wait_time: Maximum time to wait for migrations to complete (seconds)

Returns:
bool: True if migrations were run successfully or completed by another instance, False on timeout or error
"""
if redis_client is None:
# Import here to avoid circular imports
from config import get_redis_client
redis_client = get_redis_client()

# Keys for Redis coordination
lock_name = "alembic_migration_lock"
status_key = "alembic_migration_status"
lock_value = f"instance_{time.time()}"

# Create Alembic configuration
alembic_cfg = Config(str(alembic_ini_path))
# Check if migrations are already completed
migration_status = redis_client.get(status_key)
if migration_status == "completed":
print("Migrations already completed - continuing startup")
return True

# Set the script_location to the correct path
# This ensures Alembic finds the migrations directory
alembic_cfg.set_main_option('script_location', str(Path(__file__).parent / "migrations"))
# Try to acquire the lock - non-blocking
lock_acquired = redis_client.set(
lock_name,
lock_value,
nx=True, # Only set if key doesn't exist
ex=lock_timeout # Expiry in seconds
)

# Define a function to run in a separate thread
def run_upgrade():
# Import the command module here to avoid import issues
from alembic import command
if lock_acquired:
print("This instance will run migrations")
try:
# Set status to in-progress
redis_client.set(status_key, "in_progress", ex=lock_timeout)

# Run migrations
success = await run_migrations_subprocess()

if success:
# Set status to completed with a longer expiry (1 hour)
redis_client.set(status_key, "completed", ex=3600)
print("Migration completed successfully - signaling other instances")
return True
else:
# Set status to failed
redis_client.set(status_key, "failed", ex=3600)
print("Migration failed - signaling other instances")
return False
finally:
# Release the lock only if we're the owner
current_value = redis_client.get(lock_name)
if current_value == lock_value:
redis_client.delete(lock_name)
else:
print("Another instance is running migrations - waiting for completion")

# Set attributes that env.py might need
import sys
from pathlib import Path
# Wait for the migration to complete
start_time = time.time()
while time.time() - start_time < max_wait_time:
# Check migration status
status = redis_client.get(status_key)

if status == "completed":
print("Migrations completed by another instance - continuing startup")
return True
elif status == "failed":
print("Migrations failed in another instance - continuing startup with caution")
return False
elif status is None:
# No status yet, might be a stale lock or not started
# Check if lock exists
if not redis_client.exists(lock_name):
# Lock released but no status - try to acquire the lock ourselves
print("No active migration lock - attempting to acquire")
return await run_migrations_with_lock(redis_client, lock_timeout, max_wait_time)

# Wait before checking again
await asyncio.sleep(1)

# Add the database directory to sys.path
# Timeout waiting for migration
print(f"Timeout waiting for migrations after {max_wait_time} seconds")
return False

async def run_migrations_subprocess() -> bool:
"""
Run Alembic migrations using a subprocess

Returns:
bool: True if migrations were successful, False otherwise
"""
try:
# Get the path to the database directory
db_dir = Path(__file__).parent
if str(db_dir) not in sys.path:
sys.path.insert(0, str(db_dir))

# Run the upgrade command
command.upgrade(alembic_cfg, "head")

# Run the migrations in a separate thread to avoid blocking the event loop
await asyncio.to_thread(run_upgrade)
# Create a subprocess to run alembic
process = await asyncio.create_subprocess_exec(
'alembic', 'upgrade', 'head',
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(db_dir) # Run in the database directory
)

# Wait for the process to complete with a timeout
try:
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=60)

if process.returncode == 0:
print("Database migrations completed successfully")
if stdout:
print(stdout.decode())
return True
else:
print(f"Migration failed with error code {process.returncode}")
if stderr:
print(stderr.decode())
return False

except asyncio.TimeoutError:
print("Migration timed out after 60 seconds")
# Try to terminate the process
process.terminate()
return False

except Exception as e:
print(f"Migration failed: {str(e)}")
return False

async def run_migrations() -> None:
"""
Legacy function to run migrations directly (without lock)
This is kept for backward compatibility
"""
await run_migrations_subprocess()

async def init_db() -> None:
"""Initialize the database with required tables"""
Expand Down
17 changes: 13 additions & 4 deletions src/backend/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from routers.pad_router import pad_router
from routers.template_pad_router import template_pad_router
from database.service import TemplatePadService
from database.database import async_session, run_migrations
from database.database import async_session, run_migrations_with_lock

# Initialize PostHog if API key is available
if POSTHOG_API_KEY:
Expand Down Expand Up @@ -80,10 +80,19 @@ async def lifespan(_: FastAPI):
await init_db()
print("Database connection established successfully")

# Run database migrations
# Run database migrations with Redis lock
# All workers will wait for the migration to complete before proceeding
try:
await run_migrations()
print("Database migrations completed successfully")
migration_success = await run_migrations_with_lock(
redis_client=redis_client,
lock_timeout=120, # 2 minutes timeout for the lock
max_wait_time=300 # 5 minutes maximum wait time
)

if migration_success:
print("Database migrations completed successfully or already done")
else:
print("Warning: Migrations failed or timed out - proceeding with caution")
except Exception as e:
print(f"Warning: Failed to run migrations: {str(e)}")

Expand Down