Skip to content

Commit

Permalink
Unify all alembic scripts into single directory, creating single hist…
Browse files Browse the repository at this point in the history
…ory (#7411)

* Unify all alembic scripts into single directory, creating single history

* merge the alembic env script; will cause all metadatas to be marked

* fix scripts

* fix manifest, guard sqlite upgrades

* migration engine

* fix logging overrides

* allow overrides for ursula

* add readme

* fix alembic script location for ursula

* fix pg connection, used by internal
  • Loading branch information
prha committed Apr 28, 2022
1 parent b12af8c commit 6e52818
Show file tree
Hide file tree
Showing 109 changed files with 531 additions and 929 deletions.
1 change: 1 addition & 0 deletions python_modules/dagster/MANIFEST.in
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ include README.rst
include LICENSE
include COPYING
recursive-include dagster *.md
recursive-include dagster/core/storage/alembic *
recursive-include dagster/core/storage/event_log/sqlite/alembic *
recursive-include dagster/core/storage/runs/sqlite/alembic *
recursive-include dagster/core/storage/schedules/sqlite/alembic *
Expand Down
114 changes: 114 additions & 0 deletions python_modules/dagster/dagster/core/storage/alembic/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Storage migrations

We use alembic (https://alembic.sqlalchemy.org/en/latest/) to manage the schema migrations for our storage. This adds a directory of migration scripts to your repo and an alembic_version table to your database to keep track of which migration scripts have been applied.

## Adding a schema migration

Migrations are only required when you are altering the schema of an existing table (adding/removing a column, adding an index, etc).

To add a sql schema migration, follow the following steps:

1. Change the schema definition
1. add a column foo to RunsTable in dagster.core.storage.runs.schema
1. Add an alembic migration script. You'll typically use a specific engine (e.g. sqlite) to create the script, but the migration will apply to all storage implementations
1. `cd python_modules/dagster/dagster/core/storage/runs/sqlite/alembic; alembic revision -m 'add column foo'`
1. Fill in the upgrade/downgrade parts of the migration using alembic operations: `op.add_column('runs', db.Column('foo', db.String))`
1. Make sure that any storage-specific changes are guarded (e.g. if this should only apply to run storage, then do a `runs` table existence check)
1. Make sure that any dialect-specific changes are guarded (e.g. if this should only apply to MySQL, then wrap in a conditional).

Users should be prompted to manually run migrations using `dagster instance migrate`.

## Testing a schema migration

For schema migrations, we test migration behavior in sqlite / postgres / mysql.

### sqlite

Migration tests for sqlite can be found here: `python_modules/dagster/dagster_tests/general_tests/compat_tests/test_back_compat.py`

To add a new back-compat test for sqlite, follow the following steps:

1. Switch code branches to master or some revision before you’ve added the schema change.
1. Change your dagster.yaml to use the default sqlite implementation for run/event_log storage.
1. Make sure your configured storage directory (e.g. $DAGSTER_HOME/history) is wiped
1. Start dagit and execute a pipeline run, to ensure that both the run db and per-run event_log dbs are created.
1. Copy the runs.db and all per-run event log dbs to the back compat test directory:
- `mkdir python_modules/dagster/dagster_tests/general_tests/compat_tests/<my_schema_change>/sqlite/history`
- `cp $DAGSTER_HOME/history/runs.db\* python_modules/dagster/dagster_tests/general_tests/compat_tests/<my_schema_change>/sqlite/history/`
- `cp -R $DAGSTER_HOME/history/runs python_modules/dagster/dagster_tests/general_tests/compat_tests/<my_schema_change>/sqlite/history/`
1. Write your back compat test, loading your snapshot directory

### postgres

Migration tests for postgres can be found here: `python_modules/libraries/dagster-postgres/dagster_postgres_tests/compat_tests/test_back_compat.py`

To add a new back-compat test for postgres, follow the following steps:

1. Switch code branches to master or some revision before you’ve added the schema change.
1. Change your dagster.yaml to use a wiped postgres storage configuration
```
event_log_storage:
module: dagster_postgres.event_log
class: PostgresEventLogStorage
config:
postgres_url: "postgresql://test:test@localhost:5432/test"
run_storage:
module: dagster_postgres.run_storage
class: PostgresRunStorage
config:
postgres_url: "postgresql://test:test@localhost:5432/test"
schedule_storage:
module: dagster_postgres.schedule_storage
class: PostgresScheduleStorage
config:
postgres_url: "postgresql://test:test@localhost:5432/test"
```
1. Wipe, if you haven’t already dagster run wipe
1. Start dagit and execute a pipeline run, to ensure that both the run db and per-run event_log dbs are created.
1. Create a pg dump file
- `mkdir python_modules/libraries/dagster-postgres/dagster_postgres_tests/compat_tests/<my_schema_change>/postgres`
- `pg_dump test > python_modules/libraries/dagster-postgres/dagster_postgres_tests/compat_tests/<my_schema_change>/postgres/pg_dump.txt`
1. Write your back compat test, loading your snapshot directory

### mysql

Migration tests for mysql can be found here:
REPO_ROOT/python_modules/libraries/dagster-mysql/dagster_mysql_tests/compat_tests/test_back_compat.py

To add a new back-compat test for mysql, follow the following steps:

1. Switch code branches to master or some revision before you’ve added the schema change.
2. Change your dagster.yaml to use a wiped postgres storage configuration
```
event_log_storage:
module: dagster_mysql.event_log
class: MySQLEventLogStorage
config:
mysql_url: "mysql+mysqlconnector://test:test@localhost:3306/test"
run_storage:
module: dagster_mysql.run_storage
class: MySQLRunStorage
config:
mysql_url: "mysql+mysqlconnector://test:test@localhost:3306/test"
schedule_storage:
module: dagster_mysql.schedule_storage
class: MySQLScheduleStorage
config:
mysql_url: "mysql+mysqlconnector://test:test@localhost:3306/test"
```
3. Wipe, if you haven’t already dagster run wipe
4. Start dagit and execute a pipeline run, to ensure that both the run db and per-run event_log dbs are created.
5. Create a mysql dump file
- `mkdir python_modules/libraries/dagster-mysql/dagster_mysql_tests/compat_tests/<my_schema_change>/mysql`
- `mysqldump test > python_modules/libraries/dagster-mysql/dagster_mysql_tests/compat_tests/<my_schema_change>/mysql/mysql_dump.sql -p`
6. Write your back compat test, loading your snapshot directory

### Adding a data migration

Generally we do not want to force users to data migrations, especially over the event log which might be extremely large and therefore expensive.

For secondary index tables (e.g. derived tables from event_log), you can write your custom data migration script, and mark the status of the migration in the secondary_indexes table. This allows you to write guards in your EventLogStorage class that optionally reads from the event log or from the secondary index table, depending on the status of the migration.

See `EventLogStorage.has_secondary_index` and `EventLogStorage.enable_secondary_index` for more.

Users should be prompted to manually run data migrations using `dagster instance reindex`.
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
from alembic import context

from dagster.core.storage.event_log import SqlEventLogStorageMetadata
from dagster.core.storage.sqlite import run_migrations_offline, run_migrations_online
from dagster.core.storage.runs import SqlRunStorage
from dagster.core.storage.schedules import SqlScheduleStorage
from dagster.core.storage.sql import run_migrations_offline, run_migrations_online

config = context.config

target_metadata = SqlEventLogStorageMetadata
target_metadata = [SqlEventLogStorageMetadata, SqlRunStorage, SqlScheduleStorage]

if context.is_offline_mode():
run_migrations_offline(context, config, target_metadata)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Base revision for SQL-backed event log storage
Revision ID: 567bc23fd1ac
Revises:
Revises:
Create Date: 2019-11-21 09:59:57.028730
"""
Expand All @@ -10,10 +10,12 @@
# alembic dynamically populates the alembic.context module

import sqlalchemy as sa
from alembic import op
from alembic import context, op
from sqlalchemy import Column
from sqlalchemy.engine import reflection

from dagster.core.storage.event_log import SqlEventLogStorageTable

# revision identifiers, used by Alembic.
revision = "567bc23fd1ac"
down_revision = None
Expand All @@ -28,18 +30,12 @@ def upgrade():
bind = op.get_context().bind

inspector = reflection.Inspector.from_engine(bind)

if "postgresql" not in inspector.dialect.dialect_description:
raise Exception(
"Bailing: refusing to run a migration for postgres-backed event log storage against "
"a non-postgres database of dialect {dialect}".format(
dialect=inspector.dialect.dialect_description
)
)

has_tables = inspector.get_table_names()

if "event_log" in has_tables:
if "event_log" not in has_tables:
return

if "postgresql" in inspector.dialect.dialect_description:
op.drop_column(
table_name="event_log",
column_name="id",
Expand Down Expand Up @@ -75,6 +71,20 @@ def upgrade():
)
op.drop_table("event_log")

elif "sqlite" in inspector.dialect.dialect_description:
has_columns = [col["name"] for col in inspector.get_columns("event_logs")]
with op.batch_alter_table("event_logs") as batch_op:
if "row_id" in has_columns:
batch_op.alter_column(column_name="row_id", new_column_name="id")
if "run_id" not in has_columns:
batch_op.add_column(column=sa.Column("run_id", sa.String(255)))

op.execute(
SqlEventLogStorageTable.update(None)
.where(SqlEventLogStorageTable.c.run_id == None)
.values({"run_id": context.config.attributes.get("run_id", None)})
)


def downgrade():
raise Exception("Base revision, no downgrade is possible")
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
"""Base revision -- noop
Revision ID: da7cd32b690d
Revises:
Revises: 567bc23fd1ac
Create Date: 2019-11-22 11:02:06.180581
"""
# revision identifiers, used by Alembic.
revision = "da7cd32b690d"
down_revision = None
down_revision = "567bc23fd1ac"
branch_labels = None
depends_on = None

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""cascade run deletion
Revision ID: 8f8dba68fd3b
Revises: 567bc23fd1ac
Revises: da7cd32b690d
Create Date: 2020-02-10 12:52:49.540462
"""
Expand All @@ -13,7 +13,7 @@

# revision identifiers, used by Alembic.
revision = "8f8dba68fd3b"
down_revision = "567bc23fd1ac"
down_revision = "da7cd32b690d"
branch_labels = None
depends_on = None

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""cascade run deletion
Revision ID: 9fe9e746268c
Revises: da7cd32b690d
Revises: 8f8dba68fd3b
Create Date: 2020-02-10 18:13:58.993653
"""
Expand All @@ -12,7 +12,7 @@

# revision identifiers, used by Alembic.
revision = "9fe9e746268c"
down_revision = "da7cd32b690d"
down_revision = "8f8dba68fd3b"
branch_labels = None
depends_on = None

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""add step_key pipeline_name
Revision ID: 1ebdd7a9686f
Revises: 8f8dba68fd3b
Revises: 9fe9e746268c
Create Date: 2020-03-31 11:21:26.811734
"""
Expand All @@ -13,7 +13,7 @@

# revision identifiers, used by Alembic.
revision = "1ebdd7a9686f"
down_revision = "8f8dba68fd3b"
down_revision = "9fe9e746268c"
branch_labels = None
depends_on = None

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""add step_key pipeline_name
Revision ID: 3b1e175a2be3
Revises: 567bc23fd1ac
Revises: 1ebdd7a9686f
Create Date: 2020-03-31 11:01:42.609069
"""
Expand All @@ -13,7 +13,7 @@

# revision identifiers, used by Alembic.
revision = "3b1e175a2be3"
down_revision = "567bc23fd1ac"
down_revision = "1ebdd7a9686f"
branch_labels = None
depends_on = None

Expand All @@ -23,12 +23,16 @@ def upgrade():
inspector = reflection.Inspector.from_engine(bind)
has_tables = inspector.get_table_names()
if "event_logs" in has_tables:
op.add_column("event_logs", sa.Column("step_key", sa.String))
columns = [x.get("name") for x in inspector.get_columns("event_logs")]
if "step_key" not in columns:
op.add_column("event_logs", sa.Column("step_key", sa.String))


def downgrade():
bind = op.get_context().bind
inspector = reflection.Inspector.from_engine(bind)
has_tables = inspector.get_table_names()
if "event_logs" in has_tables:
op.drop_column("event_logs", "step_key")
columns = [x.get("name") for x in inspector.get_columns("event_logs")]
if "step_key" in columns:
op.drop_column("event_logs", "step_key")
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""add snapshots to run storage
Revision ID: c63a27054f08
Revises: 3b1e175a2be3
Create Date: 2020-04-09 05:57:20.639458
"""
import sqlalchemy as sa
from alembic import op
from sqlalchemy.engine import reflection

from dagster.core.storage.migration.utils import has_column, has_table

# alembic magic breaks pylint
# pylint: disable=no-member

# revision identifiers, used by Alembic.
revision = "c63a27054f08"
down_revision = "3b1e175a2be3"
branch_labels = None
depends_on = None


def upgrade():
bind = op.get_context().bind
inspector = reflection.Inspector.from_engine(bind)

if not has_table("runs"):
return

if not has_table("snapshots"):
op.create_table(
"snapshots",
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True, nullable=False),
sa.Column("snapshot_id", sa.String(255), unique=True, nullable=False),
sa.Column("snapshot_body", sa.LargeBinary, nullable=False),
sa.Column("snapshot_type", sa.String(63), nullable=False),
)

if not has_column("runs", "snapshot_id"):
if "sqlite" in inspector.dialect.dialect_description:
# Sqlite does not support adding foreign keys to existing
# tables, so we are forced to fallback on this witchcraft.
# See https://alembic.sqlalchemy.org/en/latest/batch.html#dealing-with-referencing-foreign-keys
# for additional context
with op.batch_alter_table("runs") as batch_op:
batch_op.execute("PRAGMA foreign_keys = OFF;")
batch_op.add_column(
sa.Column(
"snapshot_id",
sa.String(255),
sa.ForeignKey(
"snapshots.snapshot_id", name="fk_runs_snapshot_id_snapshots_snapshot_id"
),
),
)
op.execute("PRAGMA foreign_keys = ON;")
else:
op.add_column(
"runs",
sa.Column("snapshot_id", sa.String(255), sa.ForeignKey("snapshots.snapshot_id")),
)


def downgrade():
bind = op.get_context().bind
inspector = reflection.Inspector.from_engine(bind)

if not has_table("runs"):
return

if has_column("runs", "snapshot_id"):
if "sqlite" in inspector.dialect.dialect_description:
with op.batch_alter_table("runs") as batch_op:
batch_op.drop_column("snapshot_id")
else:
op.drop_column("runs", "snapshot_id")

if has_table("snapshots"):
op.drop_table("snapshots")

0 comments on commit 6e52818

Please sign in to comment.