Skip to content

Commit

Permalink
[asset-reconciliation] Factor in more run statuses (#12412)
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Feb 17, 2023
1 parent 866a100 commit 82901d3
Showing 1 changed file with 11 additions and 4 deletions.
15 changes: 11 additions & 4 deletions python_modules/dagster/dagster/_utils/caching_instance_queryer.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from dagster._core.storage.event_log.base import AssetRecord
from dagster._core.storage.event_log.sql_event_log import AssetEventTagsTable
from dagster._core.storage.pipeline_run import (
IN_PROGRESS_RUN_STATUSES,
FINISHED_STATUSES,
DagsterRun,
DagsterRunStatus,
RunRecord,
Expand Down Expand Up @@ -685,11 +685,18 @@ def get_used_data_times_for_record(
}

@cached_method
def _get_in_progress_run_ids(self) -> Sequence[str]:
def _get_in_progress_run_ids(self, current_time: datetime.datetime) -> Sequence[str]:
return [
record.pipeline_run.run_id
for record in self._instance.get_run_records(
filters=RunsFilter(statuses=IN_PROGRESS_RUN_STATUSES), limit=25
filters=RunsFilter(
statuses=[
status for status in DagsterRunStatus if status not in FINISHED_STATUSES
],
# ignore old runs that may be stuck in an unfinished state
created_after=current_time - datetime.timedelta(days=1),
),
limit=25,
)
]

Expand Down Expand Up @@ -763,7 +770,7 @@ def get_in_progress_data_times_for_key(
"""
in_progress_times: Dict[AssetKey, datetime.datetime] = {}

for run_id in self._get_in_progress_run_ids():
for run_id in self._get_in_progress_run_ids(current_time=current_time):
if not self.is_asset_in_run(run_id=run_id, asset=asset_key):
continue

Expand Down

0 comments on commit 82901d3

Please sign in to comment.