Skip to content

Commit

Permalink
Unpin alembic
Browse files Browse the repository at this point in the history
Summary:
Based on the suggestion here: sqlalchemy/alembic#782 (always pass in an engine, not just a connection)

Also stop using a deprecated type param that was removed

Test Plan: BK

Reviewers: alangenfeld, prha, nate

Reviewed By: alangenfeld

Differential Revision: https://dagster.phacility.com/D6040
  • Loading branch information
gibsondan committed Jan 20, 2021
1 parent e573617 commit 0a595be
Show file tree
Hide file tree
Showing 15 changed files with 60 additions and 59 deletions.
Expand Up @@ -75,12 +75,13 @@ def _init_db(self):
mkdir_p(self._base_dir)
engine = create_engine(self._conn_string, poolclass=NullPool)
alembic_config = get_alembic_config(__file__)
connection = engine.connect()
db_revision, head_revision = check_alembic_revision(alembic_config, connection)
if not (db_revision and head_revision):
SqlEventLogStorageMetadata.create_all(engine)
engine.execute("PRAGMA journal_mode=WAL;")
stamp_alembic_rev(alembic_config, engine)

with engine.connect() as connection:
db_revision, head_revision = check_alembic_revision(alembic_config, connection)
if not (db_revision and head_revision):
SqlEventLogStorageMetadata.create_all(engine)
engine.execute("PRAGMA journal_mode=WAL;")
stamp_alembic_rev(alembic_config, connection)

@contextmanager
def connect(self, run_id=None):
Expand Down
Expand Up @@ -121,10 +121,10 @@ def _initdb(self, engine):
with engine.connect() as connection:
db_revision, head_revision = check_alembic_revision(alembic_config, connection)

if not (db_revision and head_revision):
SqlEventLogStorageMetadata.create_all(engine)
engine.execute("PRAGMA journal_mode=WAL;")
stamp_alembic_rev(alembic_config, engine)
if not (db_revision and head_revision):
SqlEventLogStorageMetadata.create_all(engine)
engine.execute("PRAGMA journal_mode=WAL;")
stamp_alembic_rev(alembic_config, connection)

break
except (db.exc.DatabaseError, sqlite3.DatabaseError, sqlite3.OperationalError) as exc:
Expand Down
Expand Up @@ -69,12 +69,12 @@ def from_local(base_dir, inst_data=None):
conn_string = create_db_conn_string(base_dir, "runs")
engine = create_engine(conn_string, poolclass=NullPool)
alembic_config = get_alembic_config(__file__)
connection = engine.connect()
db_revision, head_revision = check_alembic_revision(alembic_config, connection)
if not (db_revision and head_revision):
RunStorageSqlMetadata.create_all(engine)
engine.execute("PRAGMA journal_mode=WAL;")
stamp_alembic_rev(alembic_config, engine)
with engine.connect() as connection:
db_revision, head_revision = check_alembic_revision(alembic_config, connection)
if not (db_revision and head_revision):
RunStorageSqlMetadata.create_all(engine)
engine.execute("PRAGMA journal_mode=WAL;")
stamp_alembic_rev(alembic_config, connection)

return SqliteRunStorage(conn_string, inst_data)

Expand Down
Expand Up @@ -45,13 +45,14 @@ def from_local(base_dir, inst_data=None):
mkdir_p(base_dir)
conn_string = create_db_conn_string(base_dir, "schedules")
engine = create_engine(conn_string, poolclass=NullPool)
connection = engine.connect()
alembic_config = get_alembic_config(__file__)
db_revision, head_revision = check_alembic_revision(alembic_config, connection)
if not (db_revision and head_revision):
ScheduleStorageSqlMetadata.create_all(engine)
engine.execute("PRAGMA journal_mode=WAL;")
stamp_alembic_rev(alembic_config, engine)

with engine.connect() as connection:
db_revision, head_revision = check_alembic_revision(alembic_config, connection)
if not (db_revision and head_revision):
ScheduleStorageSqlMetadata.create_all(engine)
engine.execute("PRAGMA journal_mode=WAL;")
stamp_alembic_rev(alembic_config, connection)

return SqliteScheduleStorage(conn_string, inst_data)

Expand Down
11 changes: 5 additions & 6 deletions python_modules/dagster/dagster/core/storage/sql.py
Expand Up @@ -116,16 +116,15 @@ def run_migrations_online(context, config, target_metadata):
and associate a connection with the context.
"""
connectable = config.attributes.get("connection", None)
connection = config.attributes.get("connection", None)

if connectable is None:
if connection is None:
raise Exception(
"No connection set in alembic config. If you are trying to run this script from the "
"command line, STOP and read the README."
)

with connectable.connect() as connection:
context.configure(connection=connection, target_metadata=target_metadata)
context.configure(connection=connection, target_metadata=target_metadata)

with context.begin_transaction():
context.run_migrations()
with context.begin_transaction():
context.run_migrations()
2 changes: 1 addition & 1 deletion python_modules/dagster/setup.py
Expand Up @@ -61,7 +61,7 @@ def get_version() -> str:
"coloredlogs>=6.1, <=14.0",
"PyYAML",
# core (not explicitly expressed atm)
"alembic>=1.2.1, <1.5.0",
"alembic>=1.2.1",
"croniter>=0.3.34",
"grpcio>=1.32.0", # ensure version we require is >= that with which we generated the grpc code (set in dev-requirements)
"grpcio-health-checking>=1.32.0",
Expand Down
Expand Up @@ -51,19 +51,18 @@ def run_migrations_online():
and associate a connection with the context.
"""
connectable = config.attributes.get("connection", None)
connection = config.attributes.get("connection", None)

if connectable is None:
if connection is None:
raise Exception(
"No connection set in alembic config. If you are trying to run this script from the "
"command line, STOP and read the README."
)

with connectable.connect() as connection:
context.configure(connection=connection, target_metadata=target_metadata)
context.configure(connection=connection, target_metadata=target_metadata)

with context.begin_transaction():
context.run_migrations()
with context.begin_transaction():
context.run_migrations()


if context.is_offline_mode():
Expand Down
Expand Up @@ -24,7 +24,7 @@ def upgrade():
has_tables = inspector.get_table_names()

if "runs" in has_tables and "run_tags" in has_tables:
op.drop_constraint("run_tags_run_id_fkey", table_name="run_tags", type="foreignkey")
op.drop_constraint("run_tags_run_id_fkey", table_name="run_tags", type_="foreignkey")
op.create_foreign_key(
"run_tags_run_id_fkey",
source_table="run_tags",
Expand All @@ -41,7 +41,7 @@ def downgrade():
has_tables = inspector.get_table_names()

if "runs" in has_tables and "run_tags" in has_tables:
op.drop_constraint("run_tags_run_id_fkey", table_name="run_tags", type="foreignkey")
op.drop_constraint("run_tags_run_id_fkey", table_name="run_tags", type_="foreignkey")
op.create_foreign_key(
"run_tags_run_id_fkey",
source_table="run_tags",
Expand Down
Expand Up @@ -77,7 +77,7 @@ def __init__(self, postgres_url, inst_data=None):
retry_pg_creation_fn(lambda: SqlEventLogStorageMetadata.create_all(conn))

# This revision may be shared by any other dagster storage classes using the same DB
stamp_alembic_rev(alembic_config, self._engine)
stamp_alembic_rev(alembic_config, conn)

def optimize_for_dagit(self, statement_timeout):
# When running in dagit, hold an open connection and set statement_timeout
Expand All @@ -90,7 +90,8 @@ def optimize_for_dagit(self, statement_timeout):

def upgrade(self):
alembic_config = get_alembic_config(__file__)
run_alembic_upgrade(alembic_config, self._engine)
with self.connect() as conn:
run_alembic_upgrade(alembic_config, conn)

@property
def inst_data(self):
Expand Down
Expand Up @@ -51,19 +51,18 @@ def run_migrations_online():
and associate a connection with the context.
"""
connectable = config.attributes.get("connection", None)
connection = config.attributes.get("connection", None)

if connectable is None:
if connection is None:
raise Exception(
"No connection set in alembic config. If you are trying to run this script from the "
"command line, STOP and read the README."
)

with connectable.connect() as connection:
context.configure(connection=connection, target_metadata=target_metadata)
context.configure(connection=connection, target_metadata=target_metadata)

with context.begin_transaction():
context.run_migrations()
with context.begin_transaction():
context.run_migrations()


if context.is_offline_mode():
Expand Down
Expand Up @@ -24,7 +24,7 @@ def upgrade():
has_tables = inspector.get_table_names()

if "runs" in has_tables and "run_tags" in has_tables:
op.drop_constraint("run_tags_run_id_fkey", table_name="run_tags", type="foreignkey")
op.drop_constraint("run_tags_run_id_fkey", table_name="run_tags", type_="foreignkey")
op.create_foreign_key(
"run_tags_run_id_fkey",
source_table="run_tags",
Expand All @@ -41,7 +41,7 @@ def downgrade():
has_tables = inspector.get_table_names()

if "runs" in has_tables and "run_tags" in has_tables:
op.drop_constraint("run_tags_run_id_fkey", table_name="run_tags", type="foreignkey")
op.drop_constraint("run_tags_run_id_fkey", table_name="run_tags", type_="foreignkey")
op.create_foreign_key(
"run_tags_run_id_fkey",
source_table="run_tags",
Expand Down
Expand Up @@ -59,7 +59,7 @@ def __init__(self, postgres_url, inst_data=None):
retry_pg_creation_fn(lambda: RunStorageSqlMetadata.create_all(conn))

# This revision may be shared by any other dagster storage classes using the same DB
stamp_alembic_rev(alembic_config, self._engine)
stamp_alembic_rev(alembic_config, conn)

def optimize_for_dagit(self, statement_timeout):
# When running in dagit, hold 1 open connection and set statement_timeout
Expand Down Expand Up @@ -100,7 +100,8 @@ def connect(self):

def upgrade(self):
alembic_config = get_alembic_config(__file__)
run_alembic_upgrade(alembic_config, self._engine)
with self.connect() as conn:
run_alembic_upgrade(alembic_config, conn)

def has_built_index(self, migration_name):
if migration_name not in self._index_migration_cache:
Expand Down
Expand Up @@ -51,19 +51,18 @@ def run_migrations_online():
and associate a connection with the context.
"""
connectable = config.attributes.get("connection", None)
connection = config.attributes.get("connection", None)

if connectable is None:
if connection is None:
raise Exception(
"No connection set in alembic config. If you are trying to run this script from the "
"command line, STOP and read the README."
)

with connectable.connect() as connection:
context.configure(connection=connection, target_metadata=target_metadata)
context.configure(connection=connection, target_metadata=target_metadata)

with context.begin_transaction():
context.run_migrations()
with context.begin_transaction():
context.run_migrations()


if context.is_offline_mode():
Expand Down
Expand Up @@ -24,7 +24,7 @@ def upgrade():
has_tables = inspector.get_table_names()

if "runs" in has_tables and "run_tags" in has_tables:
op.drop_constraint("run_tags_run_id_fkey", table_name="run_tags", type="foreignkey")
op.drop_constraint("run_tags_run_id_fkey", table_name="run_tags", type_="foreignkey")
op.create_foreign_key(
"run_tags_run_id_fkey",
source_table="run_tags",
Expand All @@ -41,7 +41,7 @@ def downgrade():
has_tables = inspector.get_table_names()

if "runs" in has_tables and "run_tags" in has_tables:
op.drop_constraint("run_tags_run_id_fkey", table_name="run_tags", type="foreignkey")
op.drop_constraint("run_tags_run_id_fkey", table_name="run_tags", type_="foreignkey")
op.create_foreign_key(
"run_tags_run_id_fkey",
source_table="run_tags",
Expand Down
Expand Up @@ -56,7 +56,7 @@ def __init__(self, postgres_url, inst_data=None):
retry_pg_creation_fn(lambda: ScheduleStorageSqlMetadata.create_all(conn))

# This revision may be shared by any other dagster storage classes using the same DB
stamp_alembic_rev(alembic_config, self._engine)
stamp_alembic_rev(alembic_config, conn)

def optimize_for_dagit(self, statement_timeout):
# When running in dagit, hold an open connection and set statement_timeout
Expand Down Expand Up @@ -97,4 +97,5 @@ def connect(self, run_id=None): # pylint: disable=arguments-differ, unused-argu

def upgrade(self):
alembic_config = get_alembic_config(__file__)
run_alembic_upgrade(alembic_config, self._engine)
with self.connect() as conn:
run_alembic_upgrade(alembic_config, conn)

0 comments on commit 0a595be

Please sign in to comment.