Skip to content

Commit

Permalink
remove handle_schema_errors that wrap db errors with schema outdated …
Browse files Browse the repository at this point in the history
…exceptions (#7886)
  • Loading branch information
prha committed May 16, 2022
1 parent fe59b68 commit 80153e0
Show file tree
Hide file tree
Showing 10 changed files with 21 additions and 85 deletions.
17 changes: 0 additions & 17 deletions python_modules/dagster/dagster/core/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,23 +429,6 @@ def __init__(self, *args, **kwargs):
super(DagsterBackfillFailedError, self).__init__(*args, **kwargs)


class DagsterInstanceSchemaOutdated(DagsterError):
"""Indicates that the dagster instance must be migrated."""

def __init__(self, db_revision=None, head_revision=None):
super(DagsterInstanceSchemaOutdated, self).__init__(
"Raised an exception that may indicate that the Dagster database needs to be be migrated."
"{revision_clause} To migrate, run `dagster instance migrate`.".format(
revision_clause=(
" Database is at revision {db_revision}, head is "
"{head_revision}.".format(db_revision=db_revision, head_revision=head_revision)
if db_revision or head_revision
else ""
),
)
)


class DagsterRunAlreadyExists(DagsterError):
"""Indicates that a pipeline run already exists in a run storage."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
check_alembic_revision,
create_engine,
get_alembic_config,
handle_schema_errors,
run_alembic_upgrade,
stamp_alembic_rev,
)
Expand Down Expand Up @@ -98,8 +97,7 @@ def _connect(self):
engine = create_engine(self._conn_string, poolclass=NullPool)
conn = engine.connect()
try:
with handle_schema_errors(conn, get_alembic_config(__file__)):
yield conn
yield conn
finally:
conn.close()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
check_alembic_revision,
create_engine,
get_alembic_config,
handle_schema_errors,
run_alembic_upgrade,
stamp_alembic_rev,
)
Expand Down Expand Up @@ -201,8 +200,7 @@ def _connect(self, shard):
conn = engine.connect()

try:
with handle_schema_errors(conn, get_alembic_config(__file__)):
yield conn
yield conn
finally:
conn.close()
engine.dispose()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
check_alembic_revision,
create_engine,
get_alembic_config,
handle_schema_errors,
run_alembic_downgrade,
run_alembic_upgrade,
stamp_alembic_rev,
Expand Down Expand Up @@ -101,8 +100,7 @@ def connect(self):
engine = create_engine(self._conn_string, poolclass=NullPool)
conn = engine.connect()
try:
with handle_schema_errors(conn, get_alembic_config(__file__)):
yield conn
yield conn
finally:
conn.close()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
check_alembic_revision,
create_engine,
get_alembic_config,
handle_schema_errors,
run_alembic_upgrade,
stamp_alembic_rev,
)
Expand Down Expand Up @@ -74,11 +73,7 @@ def connect(self):
engine = create_engine(self._conn_string, poolclass=NullPool)
conn = engine.connect()
try:
with handle_schema_errors(
conn,
get_alembic_config(__file__),
):
yield conn
yield conn
finally:
conn.close()

Expand Down
26 changes: 0 additions & 26 deletions python_modules/dagster/dagster/core/storage/sql.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# pylint chokes on the perfectly ok import from alembic.migration
import threading
from contextlib import contextmanager
from functools import lru_cache

import sqlalchemy as db
Expand All @@ -10,7 +9,6 @@
from alembic.script import ScriptDirectory
from sqlalchemy.ext.compiler import compiles

from dagster.core.errors import DagsterInstanceSchemaOutdated
from dagster.utils import file_relative_path
from dagster.utils.log import quieten

Expand Down Expand Up @@ -62,30 +60,6 @@ def check_alembic_revision(alembic_config, conn):
return (db_revision, head_revision)


@contextmanager
def handle_schema_errors(conn, alembic_config):
try:
yield
except (db.exc.OperationalError, db.exc.ProgrammingError, db.exc.StatementError) as e:
db_revision, head_revision = (None, None)

try:
with quieten():
db_revision, head_revision = check_alembic_revision(alembic_config, conn)
# If exceptions were raised during the revision check, we want to swallow them and
# allow the original exception to fall through
except Exception:
pass

if db_revision != head_revision:
raise DagsterInstanceSchemaOutdated(
db_revision=db_revision,
head_revision=head_revision,
) from e

raise


def run_migrations_offline(context, config, target_metadata):
"""Run migrations in 'offline' mode.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
from typing import NamedTuple, Optional, Union

import pytest
import sqlalchemy as db

from dagster import AssetKey, AssetMaterialization, Output
from dagster import _check as check
from dagster import execute_pipeline, file_relative_path, job, pipeline, solid
from dagster.cli.debug import DebugRunPayload
from dagster.core.definitions.dependency import NodeHandle
from dagster.core.errors import DagsterInstanceSchemaOutdated
from dagster.core.events import DagsterEvent
from dagster.core.events.log import EventLogEntry
from dagster.core.instance import DagsterInstance, InstanceRef
Expand Down Expand Up @@ -161,8 +161,7 @@ def noop_pipeline():
noop_solid()

with pytest.raises(
DagsterInstanceSchemaOutdated,
match=_run_storage_migration_regex(current_revision="9fe9e746268c"),
(db.exc.OperationalError, db.exc.ProgrammingError, db.exc.StatementError)
):
execute_pipeline(noop_pipeline, instance=instance)

Expand Down Expand Up @@ -822,7 +821,6 @@ def test_instigators_table_backcompat():

def test_jobs_selector_id_migration():
src_dir = file_relative_path(__file__, "snapshot_0_14_6_post_schema_pre_data_migration/sqlite")
import sqlalchemy as db

from dagster.core.storage.schedules.migration import SCHEDULE_JOBS_SELECTOR_ID
from dagster.core.storage.schedules.schema import InstigatorsTable, JobTable, JobTickTable
Expand Down Expand Up @@ -875,7 +873,6 @@ def test_jobs_selector_id_migration():

def test_tick_selector_index_migration():
src_dir = file_relative_path(__file__, "snapshot_0_14_6_post_schema_pre_data_migration/sqlite")
import sqlalchemy as db # pylint: disable=unused-import

with copy_directory(src_dir) as test_dir:
db_path = os.path.join(test_dir, "schedules", "schedules.db")
Expand Down
5 changes: 2 additions & 3 deletions python_modules/libraries/dagster-mysql/dagster_mysql/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from dagster import Field, IntSource, Selector, StringSource
from dagster import _check as check
from dagster.core.storage.sql import get_alembic_config, handle_schema_errors
from dagster.core.storage.sql import get_alembic_config

# 1 hr - anything less than 8 hrs (MySQL's default `wait_timeout` should work)
MYSQL_POOL_RECYCLE = 3600
Expand Down Expand Up @@ -161,8 +161,7 @@ def create_mysql_connection(engine, dunder_file, storage_type_desc=None):
try:
# Retry connection to gracefully handle transient connection issues
conn = retry_mysql_connection_fn(engine.connect)
with handle_schema_errors(conn, mysql_alembic_config(dunder_file)):
yield conn
yield conn
finally:
if conn:
conn.close()
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from dagster import Field, IntSource, Permissive, StringSource
from dagster import _check as check
from dagster.core.definitions.policy import Backoff, Jitter, calculate_delay
from dagster.core.storage.sql import get_alembic_config, handle_schema_errors
from dagster.core.storage.sql import get_alembic_config


class DagsterPostgresException(Exception):
Expand Down Expand Up @@ -153,7 +153,7 @@ def pg_alembic_config(dunder_file, script_location=None):


@contextmanager
def create_pg_connection(engine, alembic_config, storage_type_desc=None):
def create_pg_connection(engine, _alembic_config, storage_type_desc=None):
check.inst_param(engine, "engine", sqlalchemy.engine.Engine)
check.opt_str_param(storage_type_desc, "storage_type_desc", "")

Expand All @@ -166,8 +166,7 @@ def create_pg_connection(engine, alembic_config, storage_type_desc=None):
try:
# Retry connection to gracefully handle transient connection issues
conn = retry_pg_connection_fn(engine.connect)
with handle_schema_errors(conn, alembic_config):
yield conn
yield conn
finally:
if conn:
conn.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import tempfile

import pytest
from sqlalchemy import create_engine
import sqlalchemy as db

from dagster import (
AssetKey,
Expand All @@ -20,7 +20,6 @@
reconstructable,
solid,
)
from dagster.core.errors import DagsterInstanceSchemaOutdated
from dagster.core.instance import DagsterInstance
from dagster.core.storage.event_log.migration import ASSET_KEY_INDEX_COLS
from dagster.core.storage.pipeline_run import RunsFilter
Expand Down Expand Up @@ -58,7 +57,7 @@ def noop_pipeline():
noop_solid()

with pytest.raises(
DagsterInstanceSchemaOutdated, match=_migration_regex(current_revision=None)
(db.exc.OperationalError, db.exc.ProgrammingError, db.exc.StatementError)
):
execute_pipeline(noop_pipeline, instance=instance)

Expand Down Expand Up @@ -118,8 +117,7 @@ def asset_pipeline():
asset_solid()

with pytest.raises(
DagsterInstanceSchemaOutdated,
match=_migration_regex(current_revision="c9159e740d7e"),
(db.exc.OperationalError, db.exc.ProgrammingError, db.exc.StatementError)
):
execute_pipeline(asset_pipeline, instance=instance)

Expand Down Expand Up @@ -157,8 +155,7 @@ def simple_pipeline():
tags = {PARTITION_NAME_TAG: "my_partition", PARTITION_SET_TAG: "my_partition_set"}

with pytest.raises(
DagsterInstanceSchemaOutdated,
match=_migration_regex(current_revision="3e0770016702"),
(db.exc.OperationalError, db.exc.ProgrammingError, db.exc.StatementError)
):
execute_pipeline(simple_pipeline, tags=tags, instance=instance)

Expand Down Expand Up @@ -204,7 +201,9 @@ def test_0_10_6_add_bulk_actions_table(hostname, conn_string):
template = template_fd.read().format(hostname=hostname)
target_fd.write(template)

with pytest.raises(DagsterInstanceSchemaOutdated):
with pytest.raises(
(db.exc.OperationalError, db.exc.ProgrammingError, db.exc.StatementError)
):
with DagsterInstance.from_config(tempdir) as instance:
instance.get_backfills()

Expand Down Expand Up @@ -233,8 +232,7 @@ def test_0_11_0_add_asset_details(hostname, conn_string):
with DagsterInstance.from_config(tempdir) as instance:
storage = instance._event_storage
with pytest.raises(
DagsterInstanceSchemaOutdated,
match=_migration_regex(current_revision="3e71cf573ba6"),
(db.exc.OperationalError, db.exc.ProgrammingError, db.exc.StatementError)
):
storage.all_asset_keys()
instance.upgrade()
Expand Down Expand Up @@ -279,8 +277,7 @@ def noop_pipeline():
# Ensure that migration required exception throws, since you are trying to use the
# migration-required column.
with pytest.raises(
DagsterInstanceSchemaOutdated,
match=_migration_regex(current_revision="7cba9eeaaf1d"),
(db.exc.OperationalError, db.exc.ProgrammingError, db.exc.StatementError)
):
instance.get_runs(filters=RunsFilter(mode="the_mode"))

Expand Down Expand Up @@ -380,7 +377,7 @@ def asset_job():


def _reconstruct_from_file(hostname, conn_string, path, username="test", password="test"):
engine = create_engine(conn_string)
engine = db.create_engine(conn_string)
engine.execute("drop schema public cascade;")
engine.execute("create schema public;")
env = os.environ.copy()
Expand Down Expand Up @@ -513,8 +510,6 @@ def test_instigators_table_backcompat(hostname, conn_string):


def test_jobs_selector_id_migration(hostname, conn_string):
import sqlalchemy as db

from dagster.core.storage.schedules.migration import SCHEDULE_JOBS_SELECTOR_ID
from dagster.core.storage.schedules.schema import InstigatorsTable, JobTable, JobTickTable

Expand Down

0 comments on commit 80153e0

Please sign in to comment.