Skip to content

Commit

Permalink
add data migration for run repo tags (#7815)
Browse files Browse the repository at this point in the history
* add migration for runs

* pylint

* pylint

* update to sort by id, not run_id

* make sure migration supports old sqlalchemy syntax

* add subquery alias
  • Loading branch information
prha committed May 16, 2022
1 parent ea27cdc commit 9e786a1
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 4 deletions.
73 changes: 70 additions & 3 deletions python_modules/dagster/dagster/core/storage/runs/migration.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
from contextlib import ExitStack

import sqlalchemy as db
from tqdm import tqdm

import dagster._check as check
from dagster.serdes import deserialize_as

from ..pipeline_run import PipelineRunStatus
from ..pipeline_run import PipelineRun, PipelineRunStatus
from ..runs.base import RunStorage
from ..runs.schema import RunsTable
from ..tags import PARTITION_NAME_TAG, PARTITION_SET_TAG
from ..runs.schema import RunTagsTable, RunsTable
from ..tags import PARTITION_NAME_TAG, PARTITION_SET_TAG, REPOSITORY_LABEL_TAG

RUN_PARTITIONS = "run_partitions"
RUN_START_END = "run_start_end_overwritten" # was run_start_end, but renamed to overwrite bad timestamps written
RUN_REPO_LABEL_TAGS = "run_repo_label_tags"

# for `dagster instance migrate`, paired with schema changes
REQUIRED_DATA_MIGRATIONS = {
RUN_PARTITIONS: lambda: migrate_run_partition,
RUN_REPO_LABEL_TAGS: lambda: migrate_run_repo_tags,
}
# for `dagster instance reindex`, optionally run for better read performance
OPTIONAL_DATA_MIGRATIONS = {
Expand Down Expand Up @@ -138,3 +142,66 @@ def add_run_stats(run_storage: RunStorage, run_id: str) -> None:
end_time=run_stats.end_time,
)
)


def migrate_run_repo_tags(run_storage: RunStorage, print_fn=None):
from dagster.core.storage.runs.sql_run_storage import SqlRunStorage

if not isinstance(run_storage, SqlRunStorage):
return

if print_fn:
print_fn("Querying run storage.")

subquery = (
db.select([RunTagsTable.c.run_id.label("tags_run_id")])
.where(RunTagsTable.c.key == REPOSITORY_LABEL_TAG)
.alias("tag_subquery")
)
base_query = (
db.select([RunsTable.c.run_body, RunsTable.c.id])
.select_from(
RunsTable.join(subquery, RunsTable.c.run_id == subquery.c.tags_run_id, isouter=True)
)
.where(subquery.c.tags_run_id == None)
.order_by(db.asc(RunsTable.c.id))
.limit(RUN_CHUNK_SIZE)
)

cursor = None
has_more = True
while has_more:
if cursor:
query = base_query.where(RunsTable.c.id > cursor)
else:
query = base_query

with run_storage.connect() as conn:
result_proxy = conn.execute(query)
rows = result_proxy.fetchall()
result_proxy.close()

has_more = len(rows) >= RUN_CHUNK_SIZE
for row in rows:
run = deserialize_as(row[0], PipelineRun)
cursor = row[1]
write_repo_tag(conn, run)


def write_repo_tag(conn, run: PipelineRun):
if not run.external_pipeline_origin:
# nothing to do
return

repository_label = run.external_pipeline_origin.external_repository_origin.get_label()
try:
conn.execute(
RunTagsTable.insert().values( # pylint: disable=no-value-for-parameter
run_id=run.run_id,
key=REPOSITORY_LABEL_TAG,
value=repository_label,
)
)
except db.exc.IntegrityError:
# tag already exists, swallow
pass
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
from dagster.core.storage.event_log.migration import migrate_event_log_data
from dagster.core.storage.event_log.sql_event_log import SqlEventLogStorage
from dagster.core.storage.migration.utils import upgrading_instance
from dagster.core.storage.pipeline_run import DagsterRun, DagsterRunStatus
from dagster.core.storage.pipeline_run import DagsterRun, DagsterRunStatus, RunsFilter
from dagster.core.storage.tags import REPOSITORY_LABEL_TAG
from dagster.serdes import DefaultNamedTupleSerializer, create_snapshot_id
from dagster.serdes.serdes import (
WhitelistMap,
Expand Down Expand Up @@ -885,3 +886,22 @@ def test_tick_selector_index_migration():
assert "idx_tick_selector_timestamp" not in get_sqlite3_indexes(db_path, "job_ticks")
instance.upgrade()
assert "idx_tick_selector_timestamp" in get_sqlite3_indexes(db_path, "job_ticks")


def test_repo_label_tag_migration():
src_dir = file_relative_path(__file__, "snapshot_0_14_14_pre_repo_label_tags/sqlite")

with copy_directory(src_dir) as test_dir:
with DagsterInstance.from_ref(InstanceRef.from_dir(test_dir)) as instance:
job_repo_filter = RunsFilter(
job_name="hammer",
tags={REPOSITORY_LABEL_TAG: "toys_repository@dagster_test.graph_job_op_toys.repo"},
)

count = instance.get_runs_count(job_repo_filter)
assert count == 0

instance.upgrade()

count = instance.get_runs_count(job_repo_filter)
assert count == 2

0 comments on commit 9e786a1

Please sign in to comment.