Skip to content

Commit

Permalink
fix: Propagate migration errors and show an informative message (#2994)
Browse files Browse the repository at this point in the history
* Propagate errors on failed migrations

* Update migration error message

* Don't block with queue.get

* Use get_nowait instead for clarity

* Fix annotations
  • Loading branch information
anticorrelator committed Apr 26, 2024
1 parent 88103af commit 3718e10
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 17 deletions.
49 changes: 35 additions & 14 deletions src/phoenix/db/migrate.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import logging
from pathlib import Path
from queue import Empty, Queue
from threading import Thread
from typing import Optional

from alembic import command
from alembic.config import Config
from sqlalchemy import URL

from phoenix.exceptions import PhoenixMigrationError
from phoenix.settings import Settings

logger = logging.getLogger(__name__)
Expand All @@ -16,27 +19,32 @@ def printif(condition: bool, text: str) -> None:
print(text)


def migrate(url: URL) -> None:
def migrate(url: URL, error_queue: Optional["Queue[Exception]"] = None) -> None:
"""
Runs migrations on the database.
NB: Migrate only works on non-memory databases.
Args:
url: The database URL.
"""
log_migrations = Settings.log_migrations
printif(log_migrations, "🏃‍♀️‍➡️ Running migrations on the database.")
printif(log_migrations, "---------------------------")
config_path = str(Path(__file__).parent.resolve() / "alembic.ini")
alembic_cfg = Config(config_path)
try:
log_migrations = Settings.log_migrations
printif(log_migrations, "🏃‍♀️‍➡️ Running migrations on the database.")
printif(log_migrations, "---------------------------")
config_path = str(Path(__file__).parent.resolve() / "alembic.ini")
alembic_cfg = Config(config_path)

# Explicitly set the migration directory
scripts_location = str(Path(__file__).parent.resolve() / "migrations")
alembic_cfg.set_main_option("script_location", scripts_location)
alembic_cfg.set_main_option("sqlalchemy.url", str(url))
command.upgrade(alembic_cfg, "head")
printif(log_migrations, "---------------------------")
printif(log_migrations, "✅ Migrations complete.")
# Explicitly set the migration directory
scripts_location = str(Path(__file__).parent.resolve() / "migrations")
alembic_cfg.set_main_option("script_location", scripts_location)
alembic_cfg.set_main_option("sqlalchemy.url", str(url))
command.upgrade(alembic_cfg, "head")
printif(log_migrations, "---------------------------")
printif(log_migrations, "✅ Migrations complete.")
except Exception as e:
if error_queue:
error_queue.put(e)
raise e


def migrate_in_thread(url: URL) -> None:
Expand All @@ -45,6 +53,19 @@ def migrate_in_thread(url: URL) -> None:
This is needed because depending on the context (notebook)
the migration process can fail to execute in the main thread.
"""
t = Thread(target=migrate, args=(url,))
error_queue: Queue[Exception] = Queue()
t = Thread(target=migrate, args=(url, error_queue))
t.start()
t.join()

try:
result = error_queue.get_nowait()
except Empty:
return

if result is not None:
error_message = (
"\n\nUnable to migrate configured Phoenix DB. Original error:\n"
f"{type(result).__name__}: {str(result)}"
)
raise PhoenixMigrationError(error_message) from result
4 changes: 2 additions & 2 deletions src/phoenix/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ class PhoenixException(Exception):
pass


class PhoenixContextLimitExceeded(PhoenixException):
class PhoenixEvaluationNameIsMissing(PhoenixException):
pass


class PhoenixEvaluationNameIsMissing(PhoenixException):
class PhoenixMigrationError(PhoenixException):
pass
20 changes: 19 additions & 1 deletion src/phoenix/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from phoenix.core.model_schema import Model
from phoenix.db.bulk_inserter import BulkInserter
from phoenix.db.engines import create_engine
from phoenix.exceptions import PhoenixMigrationError
from phoenix.pointcloud.umap_parameters import UMAPParameters
from phoenix.server.api.context import Context, DataLoaders
from phoenix.server.api.dataloaders import (
Expand Down Expand Up @@ -242,11 +243,28 @@ def create_app(
)
)
initial_batch_of_evaluations = () if initial_evaluations is None else initial_evaluations
engine = create_engine(database_url)
try:
engine = create_engine(database_url)
except PhoenixMigrationError as e:
msg = (
"\n\n⚠️⚠️ Phoenix failed to migrate the database to the latest version. ⚠️⚠️\n\n"
"The database may be in a dirty state. To resolve this, the Alembic CLI can be used\n"
"from the `src/phoenix/db` directory inside the Phoenix project root. From here,\n"
"revert any partial migrations and run `alembic stamp` to reset the migration state,\n"
"then try starting Phoenix again.\n\n"
"If issues persist, please reach out for support in the Arize community Slack:\n"
"https://arize-ai.slack.com\n\n"
"You can also refer to the Alembic documentation for more information:\n"
"https://alembic.sqlalchemy.org/en/latest/tutorial.html\n\n"
""
)
raise PhoenixMigrationError(msg) from e

if is_server_instrumentation_enabled():
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor

SQLAlchemyInstrumentor().instrument(engine=engine.sync_engine)

db = _db(engine)
bulk_inserter = BulkInserter(
db,
Expand Down

0 comments on commit 3718e10

Please sign in to comment.