Skip to content

Commit

Permalink
Create FailedSchedulingCondition
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed May 13, 2024
1 parent b801b27 commit fc7420a
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,10 @@ def compute_in_progress_asset_slice(self, *, asset_key: "AssetKey") -> "AssetSli
self, self._queryer.get_in_progress_asset_subset(asset_key=asset_key)
)

@cached_method
def compute_failed_asset_slice(self, *, asset_key: "AssetKey") -> "AssetSlice":
return _slice_from_subset(self, self._queryer.get_failed_asset_subset(asset_key=asset_key))

@cached_method
def compute_updated_since_cursor_slice(
self, *, asset_key: AssetKey, cursor: Optional[int]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from .legacy import RuleCondition as RuleCondition
from .legacy.asset_condition import AssetCondition as AssetCondition
from .operands import (
FailedSchedulingCondition as FailedSchedulingCondition,
InLatestTimeWindowCondition as InLatestTimeWindowCondition,
InProgressSchedulingCondition as InProgressSchedulingCondition,
MissingSchedulingCondition as MissingSchedulingCondition,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from .parent_newer_condition import ParentNewerCondition as ParentNewerCondition
from .scheduled_since_condition import ScheduledSinceCondition as ScheduledSinceCondition
from .slice_conditions import (
FailedSchedulingCondition as FailedSchedulingCondition,
InLatestTimeWindowCondition as InLatestTimeWindowCondition,
InProgressSchedulingCondition as InProgressSchedulingCondition,
MissingSchedulingCondition as MissingSchedulingCondition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ def compute_slice(self, context: SchedulingContext) -> AssetSlice:
return context.asset_graph_view.compute_in_progress_asset_slice(asset_key=context.asset_key)


@whitelist_for_serdes
class FailedSchedulingCondition(SliceSchedulingCondition):
@property
def description(self) -> str:
return "Latest run failed"

def compute_slice(self, context: SchedulingContext) -> AssetSlice:
return context.asset_graph_view.compute_failed_asset_slice(asset_key=context.asset_key)


@whitelist_for_serdes
class RequestedThisTickCondition(SliceSchedulingCondition):
@property
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

if TYPE_CHECKING:
from .operands import (
FailedSchedulingCondition,
InLatestTimeWindowCondition,
InProgressSchedulingCondition,
MissingSchedulingCondition,
Expand Down Expand Up @@ -120,6 +121,13 @@ def in_progress() -> "InProgressSchedulingCondition":

return InProgressSchedulingCondition()

@staticmethod
def failed() -> "FailedSchedulingCondition":
"""Returns a SchedulingCondition that is true for an asset partition if its latest run failed."""
from .operands import FailedSchedulingCondition

return FailedSchedulingCondition()

@staticmethod
def updated_since_cron(
cron_schedule: str, cron_timezone: str = "UTC"
Expand Down
27 changes: 27 additions & 0 deletions python_modules/dagster/dagster/_utils/caching_instance_queryer.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from dagster._core.storage.dagster_run import (
IN_PROGRESS_RUN_STATUSES,
DagsterRun,
DagsterRunStatus,
RunRecord,
)
from dagster._core.storage.tags import PARTITION_NAME_TAG
Expand Down Expand Up @@ -193,6 +194,32 @@ def get_in_progress_asset_subset(self, *, asset_key: AssetKey) -> ValidAssetSubs

return ValidAssetSubset(asset_key=asset_key, value=value)

@cached_method
def get_failed_asset_subset(self, *, asset_key: AssetKey) -> ValidAssetSubset:
"""Returns an AssetSubset representing the subset of the asset that failed to be
materialized its most recent run.
"""
partitions_def = self.asset_graph.get(asset_key).partitions_def
if partitions_def:
cache_value = self._get_updated_cache_value(asset_key=asset_key)
if cache_value is None:
value = partitions_def.empty_subset()
else:
value = cache_value.deserialize_failed_partition_subsets(partitions_def)
else:
# ideally, unpartitioned assets would also be handled by the asset status cache
planned_materialization_info = (
self.instance.event_log_storage.get_latest_planned_materialization_info(asset_key)
)
if not planned_materialization_info:
value = False
else:
dagster_run = self.instance.get_run_by_id(planned_materialization_info.run_id)

value = dagster_run is not None and dagster_run.status == DagsterRunStatus.FAILURE

return ValidAssetSubset(asset_key=asset_key, value=value)

####################
# ASSET RECORDS / STORAGE IDS
####################
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from dagster import SchedulingCondition
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.events import AssetKeyPartitionKey

from ..base_scenario import run_request
from ..scenario_specs import one_asset, two_partitions_def
from .asset_condition_scenario import AssetConditionScenarioState


def test_failed_unpartitioned() -> None:
state = AssetConditionScenarioState(one_asset, asset_condition=SchedulingCondition.failed())

# no failed partitions
state, result = state.evaluate("A")
assert result.true_subset.size == 0

# now a partition fails
state = state.with_failed_run_for_asset("A")
state, result = state.evaluate("A")
assert result.true_subset.size == 1

# the next run completes successfully
state = state.with_runs(run_request("A"))
_, result = state.evaluate("A")
assert result.true_subset.size == 0


def test_in_progress_static_partitioned() -> None:
state = AssetConditionScenarioState(
one_asset, asset_condition=SchedulingCondition.failed()
).with_asset_properties(partitions_def=two_partitions_def)

# no failed_runs
state, result = state.evaluate("A")
assert result.true_subset.size == 0

# now one partition fails
state = state.with_failed_run_for_asset("A", partition_key="1")
state, result = state.evaluate("A")
assert result.true_subset.size == 1
assert result.true_subset.asset_partitions == {AssetKeyPartitionKey(AssetKey("A"), "1")}

# now that partition succeeds
state = state.with_runs(run_request("A", partition_key="1"))
state, result = state.evaluate("A")
assert result.true_subset.size == 0

# now both partitions fail
state = state.with_failed_run_for_asset(
"A",
partition_key="1",
).with_failed_run_for_asset(
"A",
partition_key="2",
)
state, result = state.evaluate("A")
assert result.true_subset.size == 2

# now both partitions succeed
state = state.with_runs(
run_request("A", partition_key="1"),
run_request("A", partition_key="2"),
)
_, result = state.evaluate("A")
assert result.true_subset.size == 0
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from collections import namedtuple
from contextlib import contextmanager
from dataclasses import dataclass, field
from typing import Iterable, NamedTuple, Optional, Sequence, Union, cast
from typing import AbstractSet, Iterable, NamedTuple, Optional, Sequence, Union, cast

import mock
import pendulum
Expand Down Expand Up @@ -259,28 +259,46 @@ def with_asset_properties(
self, scenario_spec=self.scenario_spec.with_asset_properties(keys, **kwargs)
)

def with_in_progress_run_for_asset(
self, asset_key: CoercibleToAssetKey, partition_key: Optional[str] = None
def _with_run_with_status_for_assets(
self,
asset_keys: AbstractSet[AssetKey],
partition_key: Optional[str],
status: DagsterRunStatus,
) -> Self:
in_progress_run_id = make_new_run_id()
run_id = make_new_run_id()
with pendulum_freeze_time(self.current_time):
asset_key = AssetKey.from_coercible(asset_key)
job_def = self.scenario_spec.defs.get_implicit_job_def_for_assets(
asset_keys=[asset_key]
asset_keys=list(asset_keys)
)
assert job_def
execution_plan = create_execution_plan(job_def, run_config={})
self.instance.create_run_for_job(
job_def=job_def,
run_id=in_progress_run_id,
status=DagsterRunStatus.STARTED,
asset_selection=frozenset({AssetKey.from_coercible(asset_key)}),
run_id=run_id,
status=status,
asset_selection=frozenset(asset_keys),
execution_plan=execution_plan,
tags={PARTITION_NAME_TAG: partition_key} if partition_key else None,
)
assert self.instance.get_run_by_id(in_progress_run_id)
assert self.instance.get_run_by_id(run_id)
return self

def with_in_progress_run_for_asset(
self, asset_key: CoercibleToAssetKey, partition_key: Optional[str] = None
) -> Self:
asset_key = AssetKey.from_coercible(asset_key)
return self._with_run_with_status_for_assets(
asset_keys={asset_key}, partition_key=partition_key, status=DagsterRunStatus.STARTED
)

def with_failed_run_for_asset(
self, asset_key: CoercibleToAssetKey, partition_key: Optional[str] = None
) -> Self:
asset_key = AssetKey.from_coercible(asset_key)
return self._with_run_with_status_for_assets(
asset_keys={asset_key}, partition_key=partition_key, status=DagsterRunStatus.FAILURE
)

def with_runs(self, *run_requests: RunRequest) -> Self:
start = datetime.datetime.now()

Expand Down

0 comments on commit fc7420a

Please sign in to comment.