Skip to content

Commit

Permalink
Migration: add columns action_type and selector_id to bulk_actions (#…
Browse files Browse the repository at this point in the history
…7995)

selector_id: a way to filter backfills by Job

action_type: will hold an enum of types (BACKFILL and TERMINATION for now)
  • Loading branch information
johannkm committed Jun 6, 2022
1 parent 75e295d commit 1b20e2e
Show file tree
Hide file tree
Showing 12 changed files with 182 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""add columns action_type and selector_id to bulk_actions
Revision ID: 6860f830e40c
Revises: 721d858e1dda
Create Date: 2022-05-20 15:00:01.260860
"""
import sqlalchemy as db
from alembic import op

from dagster.core.storage.migration.utils import has_column, has_index, has_table

# revision identifiers, used by Alembic.
revision = "6860f830e40c"
down_revision = "721d858e1dda"
branch_labels = None
depends_on = None


def upgrade():
if not has_table("bulk_actions"):
return

if not has_column("bulk_actions", "action_type"):
op.add_column("bulk_actions", db.Column("action_type", db.String(32), nullable=True))

if not has_column("bulk_actions", "selector_id"):
op.add_column("bulk_actions", db.Column("selector_id", db.Text, nullable=True))

if not has_index("bulk_actions", "idx_bulk_actions_action_type"):
op.create_index(
"idx_bulk_actions_action_type",
"bulk_actions",
["action_type"],
unique=False,
mysql_length={"action_type": 32},
)

if not has_index("bulk_actions", "idx_bulk_actions_selector_id"):
op.create_index(
"idx_bulk_actions_selector_id",
"bulk_actions",
["selector_id"],
unique=False,
mysql_length={"selector_id": 64},
)


def downgrade():
with op.batch_alter_table("bulk_actions") as batch_op:
if has_index("bulk_actions", "idx_bulk_actions_action_type"):
batch_op.drop_index("idx_bulk_actions_action_type")
if has_index("bulk_actions", "idx_bulk_actions_selector_id"):
batch_op.drop_index("idx_bulk_actions_selector_id")
if has_column("bulk_actions", "action_type"):
batch_op.drop_column("action_type")
if has_column("bulk_actions", "selector_id"):
batch_op.drop_column("selector_id")
4 changes: 4 additions & 0 deletions python_modules/dagster/dagster/core/storage/runs/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@
db.Column("status", db.String(255), nullable=False),
db.Column("timestamp", db.types.TIMESTAMP, nullable=False),
db.Column("body", db.Text),
db.Column("action_type", db.String(32)),
db.Column("selector_id", db.Text),
)

InstanceInfo = db.Table(
Expand All @@ -98,6 +100,8 @@
db.Index("idx_run_partitions", RunsTable.c.partition_set, RunsTable.c.partition, mysql_length=64)
db.Index("idx_bulk_actions", BulkActionsTable.c.key, mysql_length=32)
db.Index("idx_bulk_actions_status", BulkActionsTable.c.status, mysql_length=32)
db.Index("idx_bulk_actions_action_type", BulkActionsTable.c.action_type, mysql_length=32)
db.Index("idx_bulk_actions_selector_id", BulkActionsTable.c.selector_id, mysql_length=64)
db.Index("idx_run_status", RunsTable.c.status, mysql_length=32)
db.Index(
"idx_run_range",
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -902,3 +902,44 @@ def test_repo_label_tag_migration():

count = instance.get_runs_count(job_repo_filter)
assert count == 2


def test_add_bulk_actions_columns():
src_dir = file_relative_path(__file__, "snapshot_0_14_16_bulk_actions_columns/sqlite")

with copy_directory(src_dir) as test_dir:

db_path = os.path.join(test_dir, "history", "runs.db")
assert {"id", "key", "status", "timestamp", "body"} == set(
get_sqlite3_columns(db_path, "bulk_actions")
)
assert "idx_bulk_actions_action_type" not in get_sqlite3_indexes(db_path, "bulk_actions")
assert "idx_bulk_actions_selector_id" not in get_sqlite3_indexes(db_path, "bulk_actions")

with DagsterInstance.from_ref(InstanceRef.from_dir(test_dir)) as instance:
instance.upgrade()

assert {
"id",
"key",
"status",
"timestamp",
"body",
"action_type",
"selector_id",
} == set(get_sqlite3_columns(db_path, "bulk_actions"))
assert "idx_bulk_actions_action_type" in get_sqlite3_indexes(db_path, "bulk_actions")
assert "idx_bulk_actions_selector_id" in get_sqlite3_indexes(db_path, "bulk_actions")

instance._run_storage._alembic_downgrade(rev="721d858e1dda")

assert get_current_alembic_version(db_path) == "721d858e1dda"
assert {"id", "key", "status", "timestamp", "body"} == set(
get_sqlite3_columns(db_path, "bulk_actions")
)
assert "idx_bulk_actions_action_type" not in get_sqlite3_indexes(
db_path, "bulk_actions"
)
assert "idx_bulk_actions_selector_id" not in get_sqlite3_indexes(
db_path, "bulk_actions"
)
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,22 @@
import subprocess
import tempfile

from sqlalchemy import create_engine
from sqlalchemy import create_engine, inspect

from dagster import AssetKey, AssetObservation, Output, job, op
from dagster.core.instance import DagsterInstance
from dagster.core.storage.event_log.migration import ASSET_KEY_INDEX_COLS
from dagster.utils import file_relative_path


def get_columns(instance, table_name: str):
return set(c["name"] for c in inspect(instance.run_storage._engine).get_columns(table_name))


def get_indexes(instance, table_name: str):
return set(c["name"] for c in inspect(instance.run_storage._engine).get_indexes(table_name))


def _reconstruct_from_file(hostname, conn_string, path, _username="root", _password="test"):
engine = create_engine(conn_string)
engine.execute("drop schema test;")
Expand Down Expand Up @@ -171,3 +179,32 @@ def test_jobs_selector_id_migration(hostname, conn_string):
.where(JobTickTable.c.selector_id.isnot(None))
)[0][0]
assert migrated_tick_count == legacy_tick_count


def test_add_bulk_actions_columns(hostname, conn_string):
new_columns = {"selector_id", "action_type"}
new_indexes = {"idx_bulk_actions_action_type", "idx_bulk_actions_selector_id"}

_reconstruct_from_file(
hostname,
conn_string,
# use an old snapshot, it has the bulk actions table but not the new columns
file_relative_path(__file__, "snapshot_0_14_6_post_schema_pre_data_migration.sql"),
)

with tempfile.TemporaryDirectory() as tempdir:
with open(
file_relative_path(__file__, "dagster.yaml"), "r", encoding="utf8"
) as template_fd:
with open(os.path.join(tempdir, "dagster.yaml"), "w", encoding="utf8") as target_fd:
template = template_fd.read().format(hostname=hostname)
target_fd.write(template)

with DagsterInstance.from_config(tempdir) as instance:

assert get_columns(instance, "bulk_actions") & new_columns == set()
assert get_indexes(instance, "bulk_actions") & new_indexes == set()

instance.upgrade()
assert new_columns <= get_columns(instance, "bulk_actions")
assert new_indexes <= get_indexes(instance, "bulk_actions")
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import pytest
import sqlalchemy as db
from sqlalchemy import inspect

from dagster import (
AssetKey,
Expand All @@ -27,6 +28,14 @@
from dagster.utils import file_relative_path


def get_columns(instance, table_name: str):
return set(c["name"] for c in inspect(instance.run_storage._engine).get_columns(table_name))


def get_indexes(instance, table_name: str):
return set(c["name"] for c in inspect(instance.run_storage._engine).get_indexes(table_name))


def test_0_7_6_postgres_pre_add_pipeline_snapshot(hostname, conn_string):
_reconstruct_from_file(
hostname,
Expand Down Expand Up @@ -570,3 +579,35 @@ def test_jobs_selector_id_migration(hostname, conn_string):
.where(JobTickTable.c.selector_id.isnot(None))
)[0][0]
assert migrated_tick_count == legacy_tick_count


def test_add_bulk_actions_columns(hostname, conn_string):
new_columns = {"selector_id", "action_type"}
new_indexes = {"idx_bulk_actions_action_type", "idx_bulk_actions_selector_id"}

_reconstruct_from_file(
hostname,
conn_string,
file_relative_path(
# use an old snapshot, it has the bulk actions table but not the new columns
__file__,
"snapshot_0_14_6_post_schema_pre_data_migration/postgres/pg_dump.txt",
),
)

with tempfile.TemporaryDirectory() as tempdir:

with open(
file_relative_path(__file__, "dagster.yaml"), "r", encoding="utf8"
) as template_fd:
with open(os.path.join(tempdir, "dagster.yaml"), "w", encoding="utf8") as target_fd:
template = template_fd.read().format(hostname=hostname)
target_fd.write(template)

with DagsterInstance.from_config(tempdir) as instance:
assert get_columns(instance, "bulk_actions") & new_columns == set()
assert get_indexes(instance, "bulk_actions") & new_indexes == set()

instance.upgrade()
assert new_columns <= get_columns(instance, "bulk_actions")
assert new_indexes <= get_indexes(instance, "bulk_actions")

0 comments on commit 1b20e2e

Please sign in to comment.