Skip to content

Commit

Permalink
add CLI command to print the storage schema version (#7910)
Browse files Browse the repository at this point in the history
* add CLI command to print the storage schema

* update instance info cli to show schema info
  • Loading branch information
prha committed May 18, 2022
1 parent f27ee8b commit 445194b
Show file tree
Hide file tree
Showing 14 changed files with 116 additions and 8 deletions.
4 changes: 4 additions & 0 deletions python_modules/dagster/dagster/cli/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@ def info_command():

click.echo("$DAGSTER_HOME: {}\n".format(home))

click.echo("\nInstance configuration:\n-----------------------")
click.echo(instance.info_str())

click.echo("\nStorage schema state:\n---------------------")
click.echo(instance.schema_str())


@instance_cli.command(name="migrate", help="Automatically migrate an out of date instance.")
def migrate_command():
Expand Down
22 changes: 22 additions & 0 deletions python_modules/dagster/dagster/core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,28 @@ def info_dict(self):
def info_str(self) -> str:
return yaml.dump(self.info_dict(), default_flow_style=False, sort_keys=False)

def schema_str(self) -> str:
def _schema_dict(alembic_version):
if not alembic_version:
return None
db_revision, head_revision = alembic_version
return {
"current": db_revision,
"latest": head_revision,
}

return yaml.dump(
{
"schema": {
"event_log_storage": _schema_dict(self._event_storage.alembic_version()),
"run_storage": _schema_dict(self._event_storage.alembic_version()),
"schedule_storage": _schema_dict(self._event_storage.alembic_version()),
}
},
default_flow_style=False,
sort_keys=False,
)

@property
def run_storage(self) -> "RunStorage":
return self._run_storage
Expand Down
3 changes: 3 additions & 0 deletions python_modules/dagster/dagster/core/storage/event_log/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,9 @@ def get_materialization_count_by_partition(
) -> Mapping[AssetKey, Mapping[str, int]]:
pass

def alembic_version(self):
return None


def extract_asset_events_cursor(cursor, before_cursor, after_cursor, ascending):
if cursor:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,11 @@ def dispose(self):
self._obs.stop()
self._obs.join(timeout=15)

def alembic_version(self):
alembic_config = get_alembic_config(__file__)
with self.index_connection() as conn:
return check_alembic_revision(alembic_config, conn)


class SqliteEventLogStorageWatchdog(PatternMatchingEventHandler):
def __init__(self, event_log_storage, run_id, callback, start_cursor, **kwargs):
Expand Down
3 changes: 3 additions & 0 deletions python_modules/dagster/dagster/core/storage/runs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,3 +384,6 @@ def add_backfill(self, partition_backfill: PartitionBackfill):
@abstractmethod
def update_backfill(self, partition_backfill: PartitionBackfill):
"""Update a partition backfill in run storage"""

def alembic_version(self):
return None
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,8 @@ def delete_run(self, run_id):
with self.connect() as conn:
conn.execute(remove_tags)
conn.execute(remove_run)

def alembic_version(self):
alembic_config = get_alembic_config(__file__)
with self.connect() as conn:
return check_alembic_revision(alembic_config, conn)
3 changes: 3 additions & 0 deletions python_modules/dagster/dagster/core/storage/schedules/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,6 @@ def optimize(self, print_fn: Optional[Callable] = None, force_rebuild_all: bool

def optimize_for_dagit(self, statement_timeout: int):
"""Allows for optimizing database connection / use in the context of a long lived dagit process"""

def alembic_version(self):
return None
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,8 @@ def upgrade(self):
alembic_config = get_alembic_config(__file__)
with self.connect() as conn:
run_alembic_upgrade(alembic_config, conn)

def alembic_version(self):
alembic_config = get_alembic_config(__file__)
with self.connect() as conn:
return check_alembic_revision(alembic_config, conn)
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@
SqlPollingEventWatcher,
)
from dagster.core.storage.event_log.migration import ASSET_KEY_INDEX_COLS
from dagster.core.storage.sql import stamp_alembic_rev # pylint: disable=unused-import
from dagster.core.storage.sql import create_engine, run_alembic_upgrade
from dagster.core.storage.sql import (
check_alembic_revision,
create_engine,
run_alembic_upgrade,
stamp_alembic_rev,
)
from dagster.serdes import ConfigurableClass, ConfigurableClassData

from ..utils import (
Expand Down Expand Up @@ -181,3 +185,8 @@ def dispose(self):
if not self._disposed:
self._disposed = True
self._event_watcher.close()

def alembic_version(self):
alembic_config = mysql_alembic_config(__file__)
with self._connect() as conn:
return check_alembic_revision(alembic_config, conn)
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@
RunStorageSqlMetadata,
SqlRunStorage,
)
from dagster.core.storage.sql import stamp_alembic_rev # pylint: disable=unused-import
from dagster.core.storage.sql import create_engine, run_alembic_upgrade
from dagster.core.storage.sql import (
check_alembic_revision,
create_engine,
run_alembic_upgrade,
stamp_alembic_rev,
)
from dagster.serdes import ConfigurableClass, ConfigurableClassData, serialize_dagster_namedtuple
from dagster.utils import utc_datetime_from_timestamp

Expand Down Expand Up @@ -144,3 +148,8 @@ def add_daemon_heartbeat(self, daemon_heartbeat):
body=serialize_dagster_namedtuple(daemon_heartbeat),
)
)

def alembic_version(self):
alembic_config = mysql_alembic_config(__file__)
with self.connect() as conn:
return check_alembic_revision(alembic_config, conn)
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
import dagster._check as check
from dagster.core.storage.schedules import ScheduleStorageSqlMetadata, SqlScheduleStorage
from dagster.core.storage.schedules.schema import InstigatorsTable
from dagster.core.storage.sql import create_engine, run_alembic_upgrade, stamp_alembic_rev
from dagster.core.storage.sql import (
check_alembic_revision,
create_engine,
run_alembic_upgrade,
stamp_alembic_rev,
)
from dagster.serdes import ConfigurableClass, ConfigurableClassData, serialize_dagster_namedtuple

from ..utils import (
Expand Down Expand Up @@ -126,3 +131,8 @@ def _add_or_update_instigators_table(self, conn, state):
update_timestamp=pendulum.now("UTC"),
)
)

def alembic_version(self):
alembic_config = mysql_alembic_config(__file__)
with self.connect() as conn:
return check_alembic_revision(alembic_config, conn)
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@
SqlEventLogStorageTable,
)
from dagster.core.storage.event_log.migration import ASSET_KEY_INDEX_COLS
from dagster.core.storage.sql import create_engine, run_alembic_upgrade, stamp_alembic_rev
from dagster.core.storage.sql import (
check_alembic_revision,
create_engine,
run_alembic_upgrade,
stamp_alembic_rev,
)
from dagster.serdes import ConfigurableClass, ConfigurableClassData, deserialize_as

from ..utils import (
Expand Down Expand Up @@ -262,3 +267,8 @@ def dispose(self):
self._disposed = True
if self._event_watcher:
self._event_watcher.close()

def alembic_version(self):
alembic_config = pg_alembic_config(__file__)
with self._connect() as conn:
return check_alembic_revision(alembic_config, conn)
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@
RunStorageSqlMetadata,
SqlRunStorage,
)
from dagster.core.storage.sql import create_engine, run_alembic_upgrade, stamp_alembic_rev
from dagster.core.storage.sql import (
check_alembic_revision,
create_engine,
run_alembic_upgrade,
stamp_alembic_rev,
)
from dagster.serdes import ConfigurableClass, ConfigurableClassData, serialize_dagster_namedtuple
from dagster.utils import utc_datetime_from_timestamp

Expand Down Expand Up @@ -157,3 +162,8 @@ def add_daemon_heartbeat(self, daemon_heartbeat):
},
)
)

def alembic_version(self):
alembic_config = pg_alembic_config(__file__)
with self.connect() as conn:
return check_alembic_revision(alembic_config, conn)
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
import dagster._check as check
from dagster.core.storage.schedules import ScheduleStorageSqlMetadata, SqlScheduleStorage
from dagster.core.storage.schedules.schema import InstigatorsTable
from dagster.core.storage.sql import create_engine, run_alembic_upgrade, stamp_alembic_rev
from dagster.core.storage.sql import (
check_alembic_revision,
create_engine,
run_alembic_upgrade,
stamp_alembic_rev,
)
from dagster.serdes import ConfigurableClass, ConfigurableClassData, serialize_dagster_namedtuple

from ..utils import (
Expand Down Expand Up @@ -134,3 +139,8 @@ def _add_or_update_instigators_table(self, conn, state):
},
)
)

def alembic_version(self):
alembic_config = pg_alembic_config(__file__)
with self.connect() as conn:
return check_alembic_revision(alembic_config, conn)

0 comments on commit 445194b

Please sign in to comment.