Skip to content

Commit

Permalink
Rename the InstanceMigratedError to be less pushy about migrating (#7141
Browse files Browse the repository at this point in the history
)

Summary:
A lot of the logic here is from a time when every migration was required - but today lots of migrations are not, and the actual error is just as interesting as whether the alembic revision is out of date.

Rename the error and the language to be more like a warning instead of a "you had better migrate right now or else" error.

Test Plan: BK, manually trigger a SQL error while the db is out of date and view the resulting stack trace in dagit
  • Loading branch information
gibsondan committed Mar 22, 2022
1 parent 291fbf5 commit bfb8978
Show file tree
Hide file tree
Showing 10 changed files with 38 additions and 78 deletions.
19 changes: 5 additions & 14 deletions python_modules/dagster/dagster/core/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
"""

import sys
import traceback
from contextlib import contextmanager

from dagster import check
Expand Down Expand Up @@ -430,27 +429,19 @@ def __init__(self, *args, **kwargs):
super(DagsterBackfillFailedError, self).__init__(*args, **kwargs)


class DagsterInstanceMigrationRequired(DagsterError):
class DagsterInstanceSchemaOutdated(DagsterError):
"""Indicates that the dagster instance must be migrated."""

def __init__(self, msg=None, db_revision=None, head_revision=None, original_exc_info=None):
super(DagsterInstanceMigrationRequired, self).__init__(
"Instance is out of date and must be migrated{additional_msg}."
"{revision_clause} Please run `dagster instance migrate`.{original_exception_clause}".format(
additional_msg=" ({msg})".format(msg=msg) if msg else "",
def __init__(self, db_revision=None, head_revision=None):
super(DagsterInstanceSchemaOutdated, self).__init__(
"Raised an exception that may indicate that the Dagster database needs to be be migrated."
"{revision_clause} To migrate, run `dagster instance migrate`.".format(
revision_clause=(
" Database is at revision {db_revision}, head is "
"{head_revision}.".format(db_revision=db_revision, head_revision=head_revision)
if db_revision or head_revision
else ""
),
original_exception_clause=(
"\n\nOriginal exception:\n\n{original_exception}".format(
original_exception="".join(traceback.format_exception(*original_exc_info))
)
if original_exc_info
else ""
),
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,7 @@ def _connect(self):
engine = create_engine(self._conn_string, poolclass=NullPool)
conn = engine.connect()
try:
with handle_schema_errors(
conn,
get_alembic_config(__file__),
msg="ConsolidatedSqliteEventLogStorage requires migration",
):
with handle_schema_errors(conn, get_alembic_config(__file__)):
yield conn
finally:
conn.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,7 @@ def _connect(self, shard):
conn = engine.connect()

try:
with handle_schema_errors(
conn,
get_alembic_config(__file__),
msg="SqliteEventLogStorage for shard {shard}".format(shard=shard),
):
with handle_schema_errors(conn, get_alembic_config(__file__)):
yield conn
finally:
conn.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,7 @@ def connect(self):
engine = create_engine(self._conn_string, poolclass=NullPool)
conn = engine.connect()
try:
with handle_schema_errors(
conn,
get_alembic_config(__file__),
msg="Sqlite run storage requires migration",
):
with handle_schema_errors(conn, get_alembic_config(__file__)):
yield conn
finally:
conn.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ def connect(self):
with handle_schema_errors(
conn,
get_alembic_config(__file__),
msg="Sqlite schedule storage requires migration",
):
yield conn
finally:
Expand Down
16 changes: 5 additions & 11 deletions python_modules/dagster/dagster/core/storage/sql.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# pylint chokes on the perfectly ok import from alembic.migration
import sys
import threading
from contextlib import contextmanager
from functools import lru_cache
Expand All @@ -11,7 +10,7 @@
from alembic.script import ScriptDirectory
from sqlalchemy.ext.compiler import compiles

from dagster.core.errors import DagsterInstanceMigrationRequired
from dagster.core.errors import DagsterInstanceSchemaOutdated
from dagster.utils import file_relative_path
from dagster.utils.log import quieten

Expand Down Expand Up @@ -58,10 +57,10 @@ def check_alembic_revision(alembic_config, conn):


@contextmanager
def handle_schema_errors(conn, alembic_config, msg=None):
def handle_schema_errors(conn, alembic_config):
try:
yield
except (db.exc.OperationalError, db.exc.ProgrammingError, db.exc.StatementError):
except (db.exc.OperationalError, db.exc.ProgrammingError, db.exc.StatementError) as e:
db_revision, head_revision = (None, None)

try:
Expand All @@ -73,15 +72,10 @@ def handle_schema_errors(conn, alembic_config, msg=None):
pass

if db_revision != head_revision:
# Disable exception chaining since the original exception is included in the
# message, and the fact that the instance needs migrating should be the first
# thing the user sees
raise DagsterInstanceMigrationRequired(
msg=msg,
raise DagsterInstanceSchemaOutdated(
db_revision=db_revision,
head_revision=head_revision,
original_exc_info=sys.exc_info(),
) from None
) from e

raise

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
)
from dagster.cli.debug import DebugRunPayload
from dagster.core.definitions.dependency import NodeHandle
from dagster.core.errors import DagsterInstanceMigrationRequired
from dagster.core.errors import DagsterInstanceSchemaOutdated
from dagster.core.events import DagsterEvent
from dagster.core.events.log import EventLogEntry
from dagster.core.instance import DagsterInstance, InstanceRef
Expand All @@ -45,7 +45,7 @@


def _migration_regex(warning, current_revision, expected_revision=None):
instruction = re.escape("Please run `dagster instance migrate`.")
instruction = re.escape("To migrate, run `dagster instance migrate`.")
if expected_revision:
revision = re.escape(
"Database is at revision {}, head is {}.".format(current_revision, expected_revision)
Expand All @@ -57,23 +57,21 @@ def _migration_regex(warning, current_revision, expected_revision=None):

def _run_storage_migration_regex(current_revision, expected_revision=None):
warning = re.escape(
"Instance is out of date and must be migrated (Sqlite run storage requires migration)."
"Raised an exception that may indicate that the Dagster database needs to be be migrated."
)
return _migration_regex(warning, current_revision, expected_revision)


def _schedule_storage_migration_regex(current_revision, expected_revision=None):
warning = re.escape(
"Instance is out of date and must be migrated (Sqlite schedule storage requires migration)."
"Raised an exception that may indicate that the Dagster database needs to be be migrated."
)
return _migration_regex(warning, current_revision, expected_revision)


def _event_log_migration_regex(run_id, current_revision, expected_revision=None):
warning = re.escape(
"Instance is out of date and must be migrated (SqliteEventLogStorage for run {}).".format(
run_id
)
"Raised an exception that may indicate that the Dagster database needs to be be migrated."
)
return _migration_regex(warning, current_revision, expected_revision)

Expand Down Expand Up @@ -162,7 +160,7 @@ def noop_pipeline():
noop_solid()

with pytest.raises(
DagsterInstanceMigrationRequired,
DagsterInstanceSchemaOutdated,
match=_run_storage_migration_regex(current_revision="9fe9e746268c"),
):
execute_pipeline(noop_pipeline, instance=instance)
Expand Down Expand Up @@ -555,8 +553,8 @@ class PipelineRun(
):
pass

@_whitelist_for_serdes(legacy_env) # pylint: disable=unused-variable
class PipelineRunStatus(Enum):
@_whitelist_for_serdes(legacy_env)
class PipelineRunStatus(Enum): # pylint: disable=unused-variable
QUEUED = "QUEUED"
NOT_STARTED = "NOT_STARTED"

Expand Down Expand Up @@ -728,7 +726,7 @@ def test_legacy_event_log_load():
whitelist_map=legacy_env,
storage_name="EventLogEntry", # use this to avoid collision with current EventLogEntry
)
class OldEventLogEntry(
class OldEventLogEntry( # pylint: disable=unused-variable
NamedTuple(
"_OldEventLogEntry",
[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,7 @@ def create_mysql_connection(engine, dunder_file, storage_type_desc=None):
try:
# Retry connection to gracefully handle transient connection issues
conn = retry_mysql_connection_fn(engine.connect)
with handle_schema_errors(
conn,
mysql_alembic_config(dunder_file),
msg="MySQL {}storage requires migration".format(storage_type_desc),
):
with handle_schema_errors(conn, mysql_alembic_config(dunder_file)):
yield conn
finally:
if conn:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,7 @@ def create_pg_connection(engine, dunder_file, storage_type_desc=None):
try:
# Retry connection to gracefully handle transient connection issues
conn = retry_pg_connection_fn(engine.connect)
with handle_schema_errors(
conn,
pg_alembic_config(dunder_file),
msg="Postgres {}storage requires migration".format(storage_type_desc),
):
with handle_schema_errors(conn, pg_alembic_config(dunder_file)):
yield conn
finally:
if conn:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
reconstructable,
solid,
)
from dagster.core.errors import DagsterInstanceMigrationRequired
from dagster.core.errors import DagsterInstanceSchemaOutdated
from dagster.core.instance import DagsterInstance
from dagster.core.storage.pipeline_run import RunsFilter
from dagster.core.storage.tags import PARTITION_NAME_TAG, PARTITION_SET_TAG
Expand Down Expand Up @@ -53,7 +53,7 @@ def noop_pipeline():
noop_solid()

with pytest.raises(
DagsterInstanceMigrationRequired, match=_migration_regex("run", current_revision=None)
DagsterInstanceSchemaOutdated, match=_migration_regex(current_revision=None)
):
execute_pipeline(noop_pipeline, instance=instance)

Expand Down Expand Up @@ -111,8 +111,8 @@ def asset_pipeline():
asset_solid()

with pytest.raises(
DagsterInstanceMigrationRequired,
match=_migration_regex("run", current_revision="c9159e740d7e"),
DagsterInstanceSchemaOutdated,
match=_migration_regex(current_revision="c9159e740d7e"),
):
execute_pipeline(asset_pipeline, instance=instance)

Expand Down Expand Up @@ -148,8 +148,8 @@ def simple_pipeline():
tags = {PARTITION_NAME_TAG: "my_partition", PARTITION_SET_TAG: "my_partition_set"}

with pytest.raises(
DagsterInstanceMigrationRequired,
match=_migration_regex("run", current_revision="3e0770016702"),
DagsterInstanceSchemaOutdated,
match=_migration_regex(current_revision="3e0770016702"),
):
execute_pipeline(simple_pipeline, tags=tags, instance=instance)

Expand Down Expand Up @@ -191,7 +191,7 @@ def test_0_10_6_add_bulk_actions_table(hostname, conn_string):
template = template_fd.read().format(hostname=hostname)
target_fd.write(template)

with pytest.raises(DagsterInstanceMigrationRequired):
with pytest.raises(DagsterInstanceSchemaOutdated):
with DagsterInstance.from_config(tempdir) as instance:
instance.get_backfills()

Expand All @@ -218,8 +218,8 @@ def test_0_11_0_add_asset_details(hostname, conn_string):
with DagsterInstance.from_config(tempdir) as instance:
storage = instance._event_storage
with pytest.raises(
DagsterInstanceMigrationRequired,
match=_migration_regex("event log", current_revision="3e71cf573ba6"),
DagsterInstanceSchemaOutdated,
match=_migration_regex(current_revision="3e71cf573ba6"),
):
storage.all_asset_keys()
instance.upgrade()
Expand Down Expand Up @@ -262,8 +262,8 @@ def noop_pipeline():
# Ensure that migration required exception throws, since you are trying to use the
# migration-required column.
with pytest.raises(
DagsterInstanceMigrationRequired,
match=_migration_regex("run", current_revision="7cba9eeaaf1d"),
DagsterInstanceSchemaOutdated,
match=_migration_regex(current_revision="7cba9eeaaf1d"),
):
instance.get_runs(filters=RunsFilter(mode="the_mode"))

Expand Down Expand Up @@ -338,11 +338,9 @@ def _reconstruct_from_file(hostname, conn_string, path, username="test", passwor
)


def _migration_regex(storage_name, current_revision, expected_revision=None):
def _migration_regex(current_revision, expected_revision=None):
warning = re.escape(
"Instance is out of date and must be migrated (Postgres {} storage requires migration).".format(
storage_name
)
"Raised an exception that may indicate that the Dagster database needs to be be migrated."
)

if expected_revision:
Expand All @@ -351,7 +349,7 @@ def _migration_regex(storage_name, current_revision, expected_revision=None):
)
else:
revision = "Database is at revision {}, head is [a-z0-9]+.".format(current_revision)
instruction = re.escape("Please run `dagster instance migrate`.")
instruction = re.escape("To migrate, run `dagster instance migrate`.")

return "{} {} {}".format(warning, revision, instruction)

Expand Down

0 comments on commit bfb8978

Please sign in to comment.