Skip to content

Commit

Permalink
add tick selector index migration (#7198)
Browse files Browse the repository at this point in the history
  • Loading branch information
prha committed Apr 1, 2022
1 parent 6262a51 commit 8b8ce0d
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 0 deletions.
15 changes: 15 additions & 0 deletions python_modules/dagster/dagster/core/storage/migration/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,3 +285,18 @@ def create_instigators_table():

if not has_column("job_ticks", "selector_id"):
op.add_column("job_ticks", db.Column("selector_id", db.String(255)))


def create_tick_selector_index():
if not has_table("job_ticks"):
# not a schedule storage db
return

indexes = [x.get("name") for x in get_inspector().get_indexes("job_ticks")]
if "idx_tick_selector_timestamp" in indexes:
# already migrated
return

op.create_index(
"idx_tick_selector_timestamp", "job_ticks", ["selector_id", "timestamp"], unique=False
)
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,4 @@
mysql_length=32,
)
db.Index("idx_job_tick_timestamp", JobTickTable.c.job_origin_id, JobTickTable.c.timestamp)
db.Index("idx_tick_selector_timestamp", JobTickTable.c.selector_id, JobTickTable.c.timestamp)
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""add tick selector index
Revision ID: 721d858e1dda
Revises: c892b3fe0a9f
Create Date: 2022-03-25 10:28:29.065161
"""
from dagster.core.storage.migration.utils import create_tick_selector_index

# revision identifiers, used by Alembic.
revision = "721d858e1dda"
down_revision = "c892b3fe0a9f"
branch_labels = None
depends_on = None


def upgrade():
create_tick_selector_index()


def downgrade():
pass
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,13 @@ def get_sqlite3_columns(db_path, table_name):
return [r[1] for r in cursor.fetchall()]


def get_sqlite3_indexes(db_path, table_name):
con = sqlite3.connect(db_path)
cursor = con.cursor()
cursor.execute('PRAGMA index_list("{}");'.format(table_name))
return [r[1] for r in cursor.fetchall()]


def test_snapshot_0_7_6_pre_add_pipeline_snapshot():
run_id = "fb0b3905-068b-4444-8f00-76fcbaef7e8b"
src_dir = file_relative_path(__file__, "snapshot_0_7_6_pre_add_pipeline_snapshot/sqlite")
Expand Down Expand Up @@ -869,3 +876,18 @@ def test_jobs_selector_id_migration():
.where(JobTickTable.c.selector_id.isnot(None))
)[0][0]
assert migrated_tick_count == legacy_tick_count


def test_tick_selector_index_migration():
src_dir = file_relative_path(__file__, "snapshot_0_14_6_post_schema_pre_data_migration/sqlite")
import sqlalchemy as db

with copy_directory(src_dir) as test_dir:
db_path = os.path.join(test_dir, "schedules", "schedules.db")

assert get_current_alembic_version(db_path) == "c892b3fe0a9f"

with DagsterInstance.from_ref(InstanceRef.from_dir(test_dir)) as instance:
assert "idx_tick_selector_timestamp" not in get_sqlite3_indexes(db_path, "job_ticks")
instance.upgrade()
assert "idx_tick_selector_timestamp" in get_sqlite3_indexes(db_path, "job_ticks")
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""add tick selector index
Revision ID: d32d1d6de793
Revises: 5b467f7af3f6
Create Date: 2022-03-25 10:29:10.895341
"""
from dagster.core.storage.migration.utils import create_tick_selector_index

# revision identifiers, used by Alembic.
revision = "d32d1d6de793"
down_revision = "5b467f7af3f6"
branch_labels = None
depends_on = None


def upgrade():
create_tick_selector_index()


def downgrade():
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""add tick selector index
Revision ID: b601eb913efa
Revises: 16e3115a602a
Create Date: 2022-03-25 10:28:53.372766
"""
from dagster.core.storage.migration.utils import create_tick_selector_index

# revision identifiers, used by Alembic.
revision = "b601eb913efa"
down_revision = "16e3115a602a"
branch_labels = None
depends_on = None


def upgrade():
create_tick_selector_index()


def downgrade():
pass

0 comments on commit 8b8ce0d

Please sign in to comment.