Skip to content

Commit

Permalink
Add table to keep track of schedule data migrations 1/5 (#7182)
Browse files Browse the repository at this point in the history
* add secondary index table migration to track schedule data migrations

* schema migration stuff

* fix sqlite migration
  • Loading branch information
prha committed Apr 1, 2022
1 parent 60059f9 commit 8503e6a
Show file tree
Hide file tree
Showing 16 changed files with 1,589 additions and 7 deletions.
2 changes: 2 additions & 0 deletions python_modules/dagster/dagster/core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,7 @@ def upgrade(self, print_fn=None):
if print_fn:
print_fn("Updating schedule storage...")
self._schedule_storage.upgrade()
self._schedule_storage.migrate(print_fn)

def optimize_for_dagit(self, statement_timeout):
if self._schedule_storage:
Expand All @@ -654,6 +655,7 @@ def reindex(self, print_fn=lambda _: None):
self._event_storage.reindex_events(print_fn)
self._event_storage.reindex_assets(print_fn)
self._run_storage.optimize(print_fn)
self._schedule_storage.optimize(print_fn)
print_fn("Done.")

def dispose(self):
Expand Down
14 changes: 14 additions & 0 deletions python_modules/dagster/dagster/core/storage/migration/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,3 +245,17 @@ def drop_run_record_start_end_timestamps():

op.drop_column("runs", "start_time")
op.drop_column("runs", "end_time")


def create_schedule_secondary_index_table():
if not has_table("jobs"):
return

if not has_table("secondary_indexes"):
op.create_table(
"secondary_indexes",
db.Column("id", db.Integer, primary_key=True, autoincrement=True),
db.Column("name", db.String, unique=True),
db.Column("create_timestamp", db.DateTime, server_default=db.text("CURRENT_TIMESTAMP")),
db.Column("migration_completed", db.DateTime),
)
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import abc
from typing import Iterable, List, Mapping, Optional, Sequence
from typing import Callable, Iterable, List, Mapping, Optional, Sequence

from dagster.core.definitions.run_request import InstigatorType
from dagster.core.instance import MayHaveInstanceWeakref
Expand Down Expand Up @@ -123,5 +123,11 @@ def get_tick_stats(self, origin_id: str):
def upgrade(self):
"""Perform any needed migrations"""

def migrate(self, print_fn: Optional[Callable] = None, force_rebuild_all: bool = False):
"""Call this method to run any required data migrations"""

def optimize(self, print_fn: Optional[Callable] = None, force_rebuild_all: bool = False):
"""Call this method to run any optional data migrations for optimized reads"""

def optimize_for_dagit(self, statement_timeout: int):
"""Allows for optimizing database connection / use in the context of a long lived dagit process"""
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from typing import Callable, Mapping

REQUIRED_SCHEDULE_DATA_MIGRATIONS: Mapping[str, Callable] = {}
OPTIONAL_SCHEDULE_DATA_MIGRATIONS: Mapping[str, Callable] = {}
13 changes: 12 additions & 1 deletion python_modules/dagster/dagster/core/storage/schedules/schema.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import sqlalchemy as db

from ..sql import get_current_timestamp
from ..sql import MySQLCompatabilityTypes, get_current_timestamp

ScheduleStorageSqlMetadata = db.MetaData()

Expand Down Expand Up @@ -30,6 +30,17 @@
db.Column("update_timestamp", db.DateTime, server_default=get_current_timestamp()),
)

# Secondary Index migration table, used to track data migrations, event_logs and runs.
# This schema should match the schema in the event_log storage, run schema
SecondaryIndexMigrationTable = db.Table(
"secondary_indexes",
ScheduleStorageSqlMetadata,
db.Column("id", db.Integer, primary_key=True, autoincrement=True),
db.Column("name", MySQLCompatabilityTypes.UniqueText, unique=True),
db.Column("create_timestamp", db.DateTime, server_default=get_current_timestamp()),
db.Column("migration_completed", db.DateTime),
)

db.Index(
"idx_job_tick_status",
JobTickTable.c.job_origin_id,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from abc import abstractmethod
from collections import defaultdict
from typing import Iterable, Mapping, Optional, Sequence, cast
from datetime import datetime
from typing import Callable, Iterable, Mapping, Optional, Sequence, cast

import sqlalchemy as db

Expand All @@ -18,7 +19,8 @@
from dagster.utils import utc_datetime_from_timestamp

from .base import ScheduleStorage
from .schema import JobTable, JobTickTable
from .migration import OPTIONAL_SCHEDULE_DATA_MIGRATIONS, REQUIRED_SCHEDULE_DATA_MIGRATIONS
from .schema import JobTable, JobTickTable, SecondaryIndexMigrationTable


class SqlScheduleStorage(ScheduleStorage):
Expand Down Expand Up @@ -299,3 +301,65 @@ def wipe(self):
# https://stackoverflow.com/a/54386260/324449
conn.execute(JobTable.delete()) # pylint: disable=no-value-for-parameter
conn.execute(JobTickTable.delete()) # pylint: disable=no-value-for-parameter

# MIGRATIONS

def has_secondary_index_table(self):
with self.connect() as conn:
return "secondary_indexes" in db.inspect(conn).get_table_names()

def has_built_index(self, migration_name: str) -> bool:
if not self.has_secondary_index_table():
return False

query = (
db.select([1])
.where(SecondaryIndexMigrationTable.c.name == migration_name)
.where(SecondaryIndexMigrationTable.c.migration_completed != None)
.limit(1)
)
with self.connect() as conn:
results = conn.execute(query).fetchall()

return len(results) > 0

def mark_index_built(self, migration_name: str):
query = (
SecondaryIndexMigrationTable.insert().values( # pylint: disable=no-value-for-parameter
name=migration_name,
migration_completed=datetime.now(),
)
)
with self.connect() as conn:
try:
conn.execute(query)
except db.exc.IntegrityError:
conn.execute(
SecondaryIndexMigrationTable.update() # pylint: disable=no-value-for-parameter
.where(SecondaryIndexMigrationTable.c.name == migration_name)
.values(migration_completed=datetime.now())
)

def _execute_data_migrations(
self, migrations, print_fn: Optional[Callable] = None, force_rebuild_all: bool = False
):
for migration_name, migration_fn in migrations.items():
if self.has_built_index(migration_name):
if not force_rebuild_all:
continue
if print_fn:
print_fn(f"Starting data migration: {migration_name}")
migration_fn()(self, print_fn)
self.mark_index_built(migration_name)
if print_fn:
print_fn(f"Finished data migration: {migration_name}")

def migrate(self, print_fn: Optional[Callable] = None, force_rebuild_all: bool = False):
self._execute_data_migrations(
REQUIRED_SCHEDULE_DATA_MIGRATIONS, print_fn, force_rebuild_all
)

def optimize(self, print_fn: Optional[Callable] = None, force_rebuild_all: bool = False):
self._execute_data_migrations(
OPTIONAL_SCHEDULE_DATA_MIGRATIONS, print_fn, force_rebuild_all
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""add migration table
Revision ID: 54666da3db5c
Revises: 0da417ae1b81
Create Date: 2022-03-23 12:58:43.144576
"""
from dagster.core.storage.migration.utils import create_schedule_secondary_index_table

# revision identifiers, used by Alembic.
revision = "54666da3db5c"
down_revision = "0da417ae1b81"
branch_labels = None
depends_on = None


def upgrade():
create_schedule_secondary_index_table()


def downgrade():
pass
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,29 @@ def config_type(cls):
def from_config_value(inst_data, config_value):
return SqliteScheduleStorage.from_local(inst_data=inst_data, **config_value)

@staticmethod
def from_local(base_dir, inst_data=None):
@classmethod
def from_local(cls, base_dir, inst_data=None):
check.str_param(base_dir, "base_dir")
mkdir_p(base_dir)
conn_string = create_db_conn_string(base_dir, "schedules")
engine = create_engine(conn_string, poolclass=NullPool)
alembic_config = get_alembic_config(__file__)

should_migrate_data = False
with engine.connect() as connection:
db_revision, head_revision = check_alembic_revision(alembic_config, connection)
if not (db_revision and head_revision):
ScheduleStorageSqlMetadata.create_all(engine)
engine.execute("PRAGMA journal_mode=WAL;")
stamp_alembic_rev(alembic_config, connection)
should_migrate_data = True

schedule_storage = cls(conn_string, inst_data)
if should_migrate_data:
schedule_storage.migrate()
schedule_storage.optimize()

return SqliteScheduleStorage(conn_string, inst_data)
return schedule_storage

@contextmanager
def connect(self):
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -782,3 +782,18 @@ def __new__(

result = _deserialize_json(storage_str, legacy_env)
assert result.message is not None


def test_schedule_secondary_index_table_backcompat():
src_dir = file_relative_path(__file__, "snapshot_0_14_6_schedule_migration_table/sqlite")
with copy_directory(src_dir) as test_dir:
db_path = os.path.join(test_dir, "schedules", "schedules.db")

assert get_current_alembic_version(db_path) == "0da417ae1b81"

assert "secondary_indexes" not in get_sqlite3_tables(db_path)

with DagsterInstance.from_ref(InstanceRef.from_dir(test_dir)) as instance:
instance.upgrade()

assert "secondary_indexes" in get_sqlite3_tables(db_path)
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""add migration table
Revision ID: d538c9496c01
Revises: 130b087bc274
Create Date: 2022-03-23 13:00:19.789472
"""
from dagster.core.storage.migration.utils import create_schedule_secondary_index_table

# revision identifiers, used by Alembic.
revision = "d538c9496c01"
down_revision = "130b087bc274"
branch_labels = None
depends_on = None


def upgrade():
create_schedule_secondary_index_table()


def downgrade():
pass
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ def _init_db(self):
ScheduleStorageSqlMetadata.create_all(conn)
stamp_alembic_rev(mysql_alembic_config(__file__), conn)

# mark all the data migrations as applied
self.migrate()
self.optimize()

def optimize_for_dagit(self, statement_timeout):
# When running in dagit, hold an open connection
# https://github.com/dagster-io/dagster/issues/3719
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""add migration table
Revision ID: 8d66aa722f94
Revises: 9c5f00e80ef2
Create Date: 2022-03-23 12:59:59.571272
"""
from dagster.core.storage.migration.utils import create_schedule_secondary_index_table

# revision identifiers, used by Alembic.
revision = "8d66aa722f94"
down_revision = "9c5f00e80ef2"
branch_labels = None
depends_on = None


def upgrade():
create_schedule_secondary_index_table()


def downgrade():
pass
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ def _init_db(self):
ScheduleStorageSqlMetadata.create_all(conn)
stamp_alembic_rev(pg_alembic_config(__file__), conn)

# mark all the data migrations as applied
self.migrate()
self.optimize()

def optimize_for_dagit(self, statement_timeout):
# When running in dagit, hold an open connection and set statement_timeout
self._engine = create_engine(
Expand Down

0 comments on commit 8503e6a

Please sign in to comment.