From 0a595bedd5991b184ac5ee31bf621e6af6b4fb27 Mon Sep 17 00:00:00 2001 From: gibsondan Date: Tue, 19 Jan 2021 12:22:48 -0600 Subject: [PATCH] Unpin alembic Summary: Based on the suggestion here: https://github.com/sqlalchemy/alembic/issues/782 (always pass in an engine, not just a connection) Also stop using a deprecated type param that was removed Test Plan: BK Reviewers: alangenfeld, prha, nate Reviewed By: alangenfeld Differential Revision: https://dagster.phacility.com/D6040 --- .../sqlite/consolidated_sqlite_event_log.py | 13 +++++++------ .../storage/event_log/sqlite/sqlite_event_log.py | 8 ++++---- .../core/storage/runs/sqlite/sqlite_run_storage.py | 12 ++++++------ .../schedules/sqlite/sqlite_schedule_storage.py | 13 +++++++------ python_modules/dagster/dagster/core/storage/sql.py | 11 +++++------ python_modules/dagster/setup.py | 2 +- .../dagster_postgres/event_log/alembic/env.py | 11 +++++------ .../versions/8f8dba68fd3b_cascade_run_deletion.py | 4 ++-- .../dagster_postgres/event_log/event_log.py | 5 +++-- .../dagster_postgres/run_storage/alembic/env.py | 11 +++++------ .../versions/8f8dba68fd3b_cascade_run_deletion.py | 4 ++-- .../dagster_postgres/run_storage/run_storage.py | 5 +++-- .../schedule_storage/alembic/env.py | 11 +++++------ .../versions/8f8dba68fd3b_cascade_run_deletion.py | 4 ++-- .../schedule_storage/schedule_storage.py | 5 +++-- 15 files changed, 60 insertions(+), 59 deletions(-) diff --git a/python_modules/dagster/dagster/core/storage/event_log/sqlite/consolidated_sqlite_event_log.py b/python_modules/dagster/dagster/core/storage/event_log/sqlite/consolidated_sqlite_event_log.py index 6e62326cb272..dc19600fd9f7 100644 --- a/python_modules/dagster/dagster/core/storage/event_log/sqlite/consolidated_sqlite_event_log.py +++ b/python_modules/dagster/dagster/core/storage/event_log/sqlite/consolidated_sqlite_event_log.py @@ -75,12 +75,13 @@ def _init_db(self): mkdir_p(self._base_dir) engine = create_engine(self._conn_string, poolclass=NullPool) alembic_config = get_alembic_config(__file__) - connection = engine.connect() - db_revision, head_revision = check_alembic_revision(alembic_config, connection) - if not (db_revision and head_revision): - SqlEventLogStorageMetadata.create_all(engine) - engine.execute("PRAGMA journal_mode=WAL;") - stamp_alembic_rev(alembic_config, engine) + + with engine.connect() as connection: + db_revision, head_revision = check_alembic_revision(alembic_config, connection) + if not (db_revision and head_revision): + SqlEventLogStorageMetadata.create_all(engine) + engine.execute("PRAGMA journal_mode=WAL;") + stamp_alembic_rev(alembic_config, connection) @contextmanager def connect(self, run_id=None): diff --git a/python_modules/dagster/dagster/core/storage/event_log/sqlite/sqlite_event_log.py b/python_modules/dagster/dagster/core/storage/event_log/sqlite/sqlite_event_log.py index 3178a589f65b..50a853bb9bb0 100644 --- a/python_modules/dagster/dagster/core/storage/event_log/sqlite/sqlite_event_log.py +++ b/python_modules/dagster/dagster/core/storage/event_log/sqlite/sqlite_event_log.py @@ -121,10 +121,10 @@ def _initdb(self, engine): with engine.connect() as connection: db_revision, head_revision = check_alembic_revision(alembic_config, connection) - if not (db_revision and head_revision): - SqlEventLogStorageMetadata.create_all(engine) - engine.execute("PRAGMA journal_mode=WAL;") - stamp_alembic_rev(alembic_config, engine) + if not (db_revision and head_revision): + SqlEventLogStorageMetadata.create_all(engine) + engine.execute("PRAGMA journal_mode=WAL;") + stamp_alembic_rev(alembic_config, connection) break except (db.exc.DatabaseError, sqlite3.DatabaseError, sqlite3.OperationalError) as exc: diff --git a/python_modules/dagster/dagster/core/storage/runs/sqlite/sqlite_run_storage.py b/python_modules/dagster/dagster/core/storage/runs/sqlite/sqlite_run_storage.py index 31319ba5c0c1..cb00d9ac6a77 100644 --- a/python_modules/dagster/dagster/core/storage/runs/sqlite/sqlite_run_storage.py +++ b/python_modules/dagster/dagster/core/storage/runs/sqlite/sqlite_run_storage.py @@ -69,12 +69,12 @@ def from_local(base_dir, inst_data=None): conn_string = create_db_conn_string(base_dir, "runs") engine = create_engine(conn_string, poolclass=NullPool) alembic_config = get_alembic_config(__file__) - connection = engine.connect() - db_revision, head_revision = check_alembic_revision(alembic_config, connection) - if not (db_revision and head_revision): - RunStorageSqlMetadata.create_all(engine) - engine.execute("PRAGMA journal_mode=WAL;") - stamp_alembic_rev(alembic_config, engine) + with engine.connect() as connection: + db_revision, head_revision = check_alembic_revision(alembic_config, connection) + if not (db_revision and head_revision): + RunStorageSqlMetadata.create_all(engine) + engine.execute("PRAGMA journal_mode=WAL;") + stamp_alembic_rev(alembic_config, connection) return SqliteRunStorage(conn_string, inst_data) diff --git a/python_modules/dagster/dagster/core/storage/schedules/sqlite/sqlite_schedule_storage.py b/python_modules/dagster/dagster/core/storage/schedules/sqlite/sqlite_schedule_storage.py index bffe9d7b7a44..161bbd851590 100644 --- a/python_modules/dagster/dagster/core/storage/schedules/sqlite/sqlite_schedule_storage.py +++ b/python_modules/dagster/dagster/core/storage/schedules/sqlite/sqlite_schedule_storage.py @@ -45,13 +45,14 @@ def from_local(base_dir, inst_data=None): mkdir_p(base_dir) conn_string = create_db_conn_string(base_dir, "schedules") engine = create_engine(conn_string, poolclass=NullPool) - connection = engine.connect() alembic_config = get_alembic_config(__file__) - db_revision, head_revision = check_alembic_revision(alembic_config, connection) - if not (db_revision and head_revision): - ScheduleStorageSqlMetadata.create_all(engine) - engine.execute("PRAGMA journal_mode=WAL;") - stamp_alembic_rev(alembic_config, engine) + + with engine.connect() as connection: + db_revision, head_revision = check_alembic_revision(alembic_config, connection) + if not (db_revision and head_revision): + ScheduleStorageSqlMetadata.create_all(engine) + engine.execute("PRAGMA journal_mode=WAL;") + stamp_alembic_rev(alembic_config, connection) return SqliteScheduleStorage(conn_string, inst_data) diff --git a/python_modules/dagster/dagster/core/storage/sql.py b/python_modules/dagster/dagster/core/storage/sql.py index cf12888c5b67..a1b875d16390 100644 --- a/python_modules/dagster/dagster/core/storage/sql.py +++ b/python_modules/dagster/dagster/core/storage/sql.py @@ -116,16 +116,15 @@ def run_migrations_online(context, config, target_metadata): and associate a connection with the context. """ - connectable = config.attributes.get("connection", None) + connection = config.attributes.get("connection", None) - if connectable is None: + if connection is None: raise Exception( "No connection set in alembic config. If you are trying to run this script from the " "command line, STOP and read the README." ) - with connectable.connect() as connection: - context.configure(connection=connection, target_metadata=target_metadata) + context.configure(connection=connection, target_metadata=target_metadata) - with context.begin_transaction(): - context.run_migrations() + with context.begin_transaction(): + context.run_migrations() diff --git a/python_modules/dagster/setup.py b/python_modules/dagster/setup.py index abdc3a7d6208..cf0c6be442b5 100644 --- a/python_modules/dagster/setup.py +++ b/python_modules/dagster/setup.py @@ -61,7 +61,7 @@ def get_version() -> str: "coloredlogs>=6.1, <=14.0", "PyYAML", # core (not explicitly expressed atm) - "alembic>=1.2.1, <1.5.0", + "alembic>=1.2.1", "croniter>=0.3.34", "grpcio>=1.32.0", # ensure version we require is >= that with which we generated the grpc code (set in dev-requirements) "grpcio-health-checking>=1.32.0", diff --git a/python_modules/libraries/dagster-postgres/dagster_postgres/event_log/alembic/env.py b/python_modules/libraries/dagster-postgres/dagster_postgres/event_log/alembic/env.py index 510b09998fcc..8a8653769d58 100644 --- a/python_modules/libraries/dagster-postgres/dagster_postgres/event_log/alembic/env.py +++ b/python_modules/libraries/dagster-postgres/dagster_postgres/event_log/alembic/env.py @@ -51,19 +51,18 @@ def run_migrations_online(): and associate a connection with the context. """ - connectable = config.attributes.get("connection", None) + connection = config.attributes.get("connection", None) - if connectable is None: + if connection is None: raise Exception( "No connection set in alembic config. If you are trying to run this script from the " "command line, STOP and read the README." ) - with connectable.connect() as connection: - context.configure(connection=connection, target_metadata=target_metadata) + context.configure(connection=connection, target_metadata=target_metadata) - with context.begin_transaction(): - context.run_migrations() + with context.begin_transaction(): + context.run_migrations() if context.is_offline_mode(): diff --git a/python_modules/libraries/dagster-postgres/dagster_postgres/event_log/alembic/versions/8f8dba68fd3b_cascade_run_deletion.py b/python_modules/libraries/dagster-postgres/dagster_postgres/event_log/alembic/versions/8f8dba68fd3b_cascade_run_deletion.py index c791d36ad198..d10a6307fb04 100644 --- a/python_modules/libraries/dagster-postgres/dagster_postgres/event_log/alembic/versions/8f8dba68fd3b_cascade_run_deletion.py +++ b/python_modules/libraries/dagster-postgres/dagster_postgres/event_log/alembic/versions/8f8dba68fd3b_cascade_run_deletion.py @@ -24,7 +24,7 @@ def upgrade(): has_tables = inspector.get_table_names() if "runs" in has_tables and "run_tags" in has_tables: - op.drop_constraint("run_tags_run_id_fkey", table_name="run_tags", type="foreignkey") + op.drop_constraint("run_tags_run_id_fkey", table_name="run_tags", type_="foreignkey") op.create_foreign_key( "run_tags_run_id_fkey", source_table="run_tags", @@ -41,7 +41,7 @@ def downgrade(): has_tables = inspector.get_table_names() if "runs" in has_tables and "run_tags" in has_tables: - op.drop_constraint("run_tags_run_id_fkey", table_name="run_tags", type="foreignkey") + op.drop_constraint("run_tags_run_id_fkey", table_name="run_tags", type_="foreignkey") op.create_foreign_key( "run_tags_run_id_fkey", source_table="run_tags", diff --git a/python_modules/libraries/dagster-postgres/dagster_postgres/event_log/event_log.py b/python_modules/libraries/dagster-postgres/dagster_postgres/event_log/event_log.py index cd8c09e76a49..0c296b194cca 100644 --- a/python_modules/libraries/dagster-postgres/dagster_postgres/event_log/event_log.py +++ b/python_modules/libraries/dagster-postgres/dagster_postgres/event_log/event_log.py @@ -77,7 +77,7 @@ def __init__(self, postgres_url, inst_data=None): retry_pg_creation_fn(lambda: SqlEventLogStorageMetadata.create_all(conn)) # This revision may be shared by any other dagster storage classes using the same DB - stamp_alembic_rev(alembic_config, self._engine) + stamp_alembic_rev(alembic_config, conn) def optimize_for_dagit(self, statement_timeout): # When running in dagit, hold an open connection and set statement_timeout @@ -90,7 +90,8 @@ def optimize_for_dagit(self, statement_timeout): def upgrade(self): alembic_config = get_alembic_config(__file__) - run_alembic_upgrade(alembic_config, self._engine) + with self.connect() as conn: + run_alembic_upgrade(alembic_config, conn) @property def inst_data(self): diff --git a/python_modules/libraries/dagster-postgres/dagster_postgres/run_storage/alembic/env.py b/python_modules/libraries/dagster-postgres/dagster_postgres/run_storage/alembic/env.py index 3410dfd41795..d44231b89f7a 100644 --- a/python_modules/libraries/dagster-postgres/dagster_postgres/run_storage/alembic/env.py +++ b/python_modules/libraries/dagster-postgres/dagster_postgres/run_storage/alembic/env.py @@ -51,19 +51,18 @@ def run_migrations_online(): and associate a connection with the context. """ - connectable = config.attributes.get("connection", None) + connection = config.attributes.get("connection", None) - if connectable is None: + if connection is None: raise Exception( "No connection set in alembic config. If you are trying to run this script from the " "command line, STOP and read the README." ) - with connectable.connect() as connection: - context.configure(connection=connection, target_metadata=target_metadata) + context.configure(connection=connection, target_metadata=target_metadata) - with context.begin_transaction(): - context.run_migrations() + with context.begin_transaction(): + context.run_migrations() if context.is_offline_mode(): diff --git a/python_modules/libraries/dagster-postgres/dagster_postgres/run_storage/alembic/versions/8f8dba68fd3b_cascade_run_deletion.py b/python_modules/libraries/dagster-postgres/dagster_postgres/run_storage/alembic/versions/8f8dba68fd3b_cascade_run_deletion.py index c791d36ad198..d10a6307fb04 100644 --- a/python_modules/libraries/dagster-postgres/dagster_postgres/run_storage/alembic/versions/8f8dba68fd3b_cascade_run_deletion.py +++ b/python_modules/libraries/dagster-postgres/dagster_postgres/run_storage/alembic/versions/8f8dba68fd3b_cascade_run_deletion.py @@ -24,7 +24,7 @@ def upgrade(): has_tables = inspector.get_table_names() if "runs" in has_tables and "run_tags" in has_tables: - op.drop_constraint("run_tags_run_id_fkey", table_name="run_tags", type="foreignkey") + op.drop_constraint("run_tags_run_id_fkey", table_name="run_tags", type_="foreignkey") op.create_foreign_key( "run_tags_run_id_fkey", source_table="run_tags", @@ -41,7 +41,7 @@ def downgrade(): has_tables = inspector.get_table_names() if "runs" in has_tables and "run_tags" in has_tables: - op.drop_constraint("run_tags_run_id_fkey", table_name="run_tags", type="foreignkey") + op.drop_constraint("run_tags_run_id_fkey", table_name="run_tags", type_="foreignkey") op.create_foreign_key( "run_tags_run_id_fkey", source_table="run_tags", diff --git a/python_modules/libraries/dagster-postgres/dagster_postgres/run_storage/run_storage.py b/python_modules/libraries/dagster-postgres/dagster_postgres/run_storage/run_storage.py index 23a048ae89c7..2e5c857d8b99 100644 --- a/python_modules/libraries/dagster-postgres/dagster_postgres/run_storage/run_storage.py +++ b/python_modules/libraries/dagster-postgres/dagster_postgres/run_storage/run_storage.py @@ -59,7 +59,7 @@ def __init__(self, postgres_url, inst_data=None): retry_pg_creation_fn(lambda: RunStorageSqlMetadata.create_all(conn)) # This revision may be shared by any other dagster storage classes using the same DB - stamp_alembic_rev(alembic_config, self._engine) + stamp_alembic_rev(alembic_config, conn) def optimize_for_dagit(self, statement_timeout): # When running in dagit, hold 1 open connection and set statement_timeout @@ -100,7 +100,8 @@ def connect(self): def upgrade(self): alembic_config = get_alembic_config(__file__) - run_alembic_upgrade(alembic_config, self._engine) + with self.connect() as conn: + run_alembic_upgrade(alembic_config, conn) def has_built_index(self, migration_name): if migration_name not in self._index_migration_cache: diff --git a/python_modules/libraries/dagster-postgres/dagster_postgres/schedule_storage/alembic/env.py b/python_modules/libraries/dagster-postgres/dagster_postgres/schedule_storage/alembic/env.py index 3410dfd41795..d44231b89f7a 100644 --- a/python_modules/libraries/dagster-postgres/dagster_postgres/schedule_storage/alembic/env.py +++ b/python_modules/libraries/dagster-postgres/dagster_postgres/schedule_storage/alembic/env.py @@ -51,19 +51,18 @@ def run_migrations_online(): and associate a connection with the context. """ - connectable = config.attributes.get("connection", None) + connection = config.attributes.get("connection", None) - if connectable is None: + if connection is None: raise Exception( "No connection set in alembic config. If you are trying to run this script from the " "command line, STOP and read the README." ) - with connectable.connect() as connection: - context.configure(connection=connection, target_metadata=target_metadata) + context.configure(connection=connection, target_metadata=target_metadata) - with context.begin_transaction(): - context.run_migrations() + with context.begin_transaction(): + context.run_migrations() if context.is_offline_mode(): diff --git a/python_modules/libraries/dagster-postgres/dagster_postgres/schedule_storage/alembic/versions/8f8dba68fd3b_cascade_run_deletion.py b/python_modules/libraries/dagster-postgres/dagster_postgres/schedule_storage/alembic/versions/8f8dba68fd3b_cascade_run_deletion.py index c791d36ad198..d10a6307fb04 100644 --- a/python_modules/libraries/dagster-postgres/dagster_postgres/schedule_storage/alembic/versions/8f8dba68fd3b_cascade_run_deletion.py +++ b/python_modules/libraries/dagster-postgres/dagster_postgres/schedule_storage/alembic/versions/8f8dba68fd3b_cascade_run_deletion.py @@ -24,7 +24,7 @@ def upgrade(): has_tables = inspector.get_table_names() if "runs" in has_tables and "run_tags" in has_tables: - op.drop_constraint("run_tags_run_id_fkey", table_name="run_tags", type="foreignkey") + op.drop_constraint("run_tags_run_id_fkey", table_name="run_tags", type_="foreignkey") op.create_foreign_key( "run_tags_run_id_fkey", source_table="run_tags", @@ -41,7 +41,7 @@ def downgrade(): has_tables = inspector.get_table_names() if "runs" in has_tables and "run_tags" in has_tables: - op.drop_constraint("run_tags_run_id_fkey", table_name="run_tags", type="foreignkey") + op.drop_constraint("run_tags_run_id_fkey", table_name="run_tags", type_="foreignkey") op.create_foreign_key( "run_tags_run_id_fkey", source_table="run_tags", diff --git a/python_modules/libraries/dagster-postgres/dagster_postgres/schedule_storage/schedule_storage.py b/python_modules/libraries/dagster-postgres/dagster_postgres/schedule_storage/schedule_storage.py index d8ffec333e55..0052a880c838 100644 --- a/python_modules/libraries/dagster-postgres/dagster_postgres/schedule_storage/schedule_storage.py +++ b/python_modules/libraries/dagster-postgres/dagster_postgres/schedule_storage/schedule_storage.py @@ -56,7 +56,7 @@ def __init__(self, postgres_url, inst_data=None): retry_pg_creation_fn(lambda: ScheduleStorageSqlMetadata.create_all(conn)) # This revision may be shared by any other dagster storage classes using the same DB - stamp_alembic_rev(alembic_config, self._engine) + stamp_alembic_rev(alembic_config, conn) def optimize_for_dagit(self, statement_timeout): # When running in dagit, hold an open connection and set statement_timeout @@ -97,4 +97,5 @@ def connect(self, run_id=None): # pylint: disable=arguments-differ, unused-argu def upgrade(self): alembic_config = get_alembic_config(__file__) - run_alembic_upgrade(alembic_config, self._engine) + with self.connect() as conn: + run_alembic_upgrade(alembic_config, conn)