Skip to content

Commit

Permalink
add schema for instigators table, keyed by selector 2/5 (#7185)
Browse files Browse the repository at this point in the history
* add secondary index table migration to track schedule data migrations

* schema migration stuff

* fix sqlite migration

* add schema for instigators table, keyed by selector

* switch check for migration - fix mysql

* fix mysql backcompat tests to start from clean slate

* fix up comments, saving repository_selector_id
  • Loading branch information
prha committed Apr 1, 2022
1 parent 8503e6a commit ae93b45
Show file tree
Hide file tree
Showing 12 changed files with 2,841 additions and 1 deletion.
26 changes: 26 additions & 0 deletions python_modules/dagster/dagster/core/storage/migration/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from sqlalchemy.engine import reflection

from dagster import check
from dagster.core.storage.sql import get_current_timestamp


def get_inspector():
Expand Down Expand Up @@ -259,3 +260,28 @@ def create_schedule_secondary_index_table():
db.Column("create_timestamp", db.DateTime, server_default=db.text("CURRENT_TIMESTAMP")),
db.Column("migration_completed", db.DateTime),
)


def create_instigators_table():
if not has_table("instigators") and not has_table("jobs"):
# not a schedule storage db
return

if not has_table("instigators"):
op.create_table(
"instigators",
db.Column("id", db.Integer, primary_key=True, autoincrement=True),
db.Column("selector_id", db.String(255), unique=True),
db.Column("repository_selector_id", db.String(255)),
db.Column("status", db.String(63)),
db.Column("instigator_type", db.String(63), index=True),
db.Column("instigator_body", db.Text),
db.Column("create_timestamp", db.DateTime, server_default=get_current_timestamp()),
db.Column("update_timestamp", db.DateTime, server_default=get_current_timestamp()),
)

if not has_column("jobs", "selector_id"):
op.add_column("jobs", db.Column("selector_id", db.String(255)))

if not has_column("job_ticks", "selector_id"):
op.add_column("job_ticks", db.Column("selector_id", db.String(255)))
15 changes: 15 additions & 0 deletions python_modules/dagster/dagster/core/storage/schedules/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
ScheduleStorageSqlMetadata,
db.Column("id", db.Integer, primary_key=True, autoincrement=True),
db.Column("job_origin_id", db.String(255), unique=True),
db.Column("selector_id", db.String(255)),
db.Column("repository_origin_id", db.String(255)),
db.Column("status", db.String(63)),
db.Column("job_type", db.String(63), index=True),
Expand All @@ -17,11 +18,25 @@
db.Column("update_timestamp", db.DateTime, server_default=get_current_timestamp()),
)

InstigatorsTable = db.Table(
"instigators",
ScheduleStorageSqlMetadata,
db.Column("id", db.Integer, primary_key=True, autoincrement=True),
db.Column("selector_id", db.String(255), unique=True),
db.Column("repository_selector_id", db.String(255)),
db.Column("status", db.String(63)),
db.Column("instigator_type", db.String(63), index=True),
db.Column("instigator_body", db.Text),
db.Column("create_timestamp", db.DateTime, server_default=get_current_timestamp()),
db.Column("update_timestamp", db.DateTime, server_default=get_current_timestamp()),
)

JobTickTable = db.Table(
"job_ticks",
ScheduleStorageSqlMetadata,
db.Column("id", db.Integer, primary_key=True, autoincrement=True),
db.Column("job_origin_id", db.String(255), index=True),
db.Column("selector_id", db.String(255)),
db.Column("status", db.String(63)),
db.Column("type", db.String(63)),
db.Column("timestamp", db.types.TIMESTAMP),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ def _add_filter_limit(self, query, before=None, after=None, limit=None, statuses
def supports_batch_queries(self):
return True

def has_instigators_table(self):
with self.connect() as conn:
table_names = db.inspect(conn).get_table_names()
return "instigators" in table_names

def get_batch_ticks(
self,
origin_ids: Sequence[str],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""add instigators table
Revision ID: c892b3fe0a9f
Revises: 54666da3db5c
Create Date: 2022-03-18 16:16:21.007430
"""
from dagster.core.storage.migration.utils import create_instigators_table

# revision identifiers, used by Alembic.
revision = "c892b3fe0a9f"
down_revision = "54666da3db5c"
branch_labels = None
depends_on = None


def upgrade():
create_instigators_table()


def downgrade():
pass
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -797,3 +797,22 @@ def test_schedule_secondary_index_table_backcompat():
instance.upgrade()

assert "secondary_indexes" in get_sqlite3_tables(db_path)


def test_instigators_table_backcompat():
src_dir = file_relative_path(__file__, "snapshot_0_14_6_instigators_table/sqlite")
with copy_directory(src_dir) as test_dir:
db_path = os.path.join(test_dir, "schedules", "schedules.db")

assert get_current_alembic_version(db_path) == "54666da3db5c"

assert "instigators" not in get_sqlite3_tables(db_path)
assert "selector_id" not in set(get_sqlite3_columns(db_path, "jobs"))
assert "selector_id" not in set(get_sqlite3_columns(db_path, "job_ticks"))

with DagsterInstance.from_ref(InstanceRef.from_dir(test_dir)) as instance:
instance.upgrade()

assert "instigators" in get_sqlite3_tables(db_path)
assert "selector_id" in set(get_sqlite3_columns(db_path, "jobs"))
assert "selector_id" in set(get_sqlite3_columns(db_path, "job_ticks"))
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""add instigators table
Revision ID: 5b467f7af3f6
Revises: d538c9496c01
Create Date: 2022-03-18 16:17:20.338259
"""
from dagster.core.storage.migration.utils import create_instigators_table

# revision identifiers, used by Alembic.
revision = "5b467f7af3f6"
down_revision = "d538c9496c01"
branch_labels = None
depends_on = None


def upgrade():
create_instigators_table()


def downgrade():
pass

0 comments on commit ae93b45

Please sign in to comment.