diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_scheduling/scheduling_condition.py b/python_modules/dagster/dagster/_core/definitions/declarative_scheduling/scheduling_condition.py index dedb01116c201..ec4560c5d9831 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_scheduling/scheduling_condition.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_scheduling/scheduling_condition.py @@ -187,7 +187,10 @@ def parent_newer() -> "ParentNewerCondition": return ParentNewerCondition() @staticmethod - def eager() -> "SchedulingCondition": + def eager( + *, + failure_retry_delta: datetime.timedelta = datetime.timedelta(hours=1), + ) -> "SchedulingCondition": """Returns a condition which will "eagerly" fill in missing partitions as they are created, and ensures unpartitioned assets are updated whenever their dependencies are updated (either via scheduled execution or ad-hoc runs). @@ -201,6 +204,10 @@ def eager() -> "SchedulingCondition": - None of its parent partitions are missing - None of its parent partitions are currently part of an in-progress run - It is not currently part of an in-progress run + + It will also refuse to materialize an asset partition if the latest materialization attempt + failed and has not already been scheduled within the failure_retry_delta, to prevent + repeated failures. """ missing_or_parent_updated = ( SchedulingCondition.parent_newer() @@ -213,12 +220,16 @@ def eager() -> "SchedulingCondition": any_parent_in_progress = SchedulingCondition.any_deps_match( SchedulingCondition.in_progress() ) + failed_recently = SchedulingCondition.failed() & SchedulingCondition.scheduled_since( + failure_retry_delta + ) return ( SchedulingCondition.in_latest_time_window() & missing_or_parent_updated & ~any_parent_missing & ~any_parent_in_progress & ~SchedulingCondition.in_progress() + & ~failed_recently ) @staticmethod diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/test_eager_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/test_eager_condition.py new file mode 100644 index 0000000000000..48e3e756f80aa --- /dev/null +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/test_eager_condition.py @@ -0,0 +1,104 @@ +import datetime + +from dagster import SchedulingCondition + +from dagster_tests.definitions_tests.auto_materialize_tests.base_scenario import run_request + +from ..scenario_specs import hourly_partitions_def, two_assets_in_sequence +from .asset_condition_scenario import AssetConditionScenarioState + + +def test_eager_unpartitioned() -> None: + state = AssetConditionScenarioState( + two_assets_in_sequence, + asset_condition=SchedulingCondition.eager(), + ensure_empty_result=False, + ) + + # parent hasn't updated yet + state, result = state.evaluate("B") + assert result.true_subset.size == 0 + + # parent updated, now can execute + state = state.with_runs(run_request("A")) + state, result = state.evaluate("B") + assert result.true_subset.size == 1 + state = state.with_runs( + *(run_request(ak, pk) for ak, pk in result.true_subset.asset_partitions) + ) + + # now B has been materialized, so don't execute again + state, result = state.evaluate("B") + assert result.true_subset.size == 0 + + # A gets materialized again before the hour, execute B again + state = state.with_runs(run_request("A")) + state, result = state.evaluate("B") + assert result.true_subset.size == 1 + # however, B fails + state = state.with_failed_run_for_asset("B") + + # do not try to materialize B again immediately + state, result = state.evaluate("B") + assert result.true_subset.size == 0 + + # now it's been over an hour since B was requested, try again + state = state.with_current_time_advanced(hours=1, seconds=1) + state, result = state.evaluate("B") + assert result.true_subset.size == 1 + + +def test_eager_hourly_partitioned() -> None: + state = ( + AssetConditionScenarioState( + two_assets_in_sequence, + asset_condition=SchedulingCondition.eager( + failure_retry_delta=datetime.timedelta(minutes=10) + ), + ensure_empty_result=False, + ) + .with_asset_properties(partitions_def=hourly_partitions_def) + .with_current_time("2020-02-02T01:05:00") + ) + + # parent hasn't updated yet + state, result = state.evaluate("B") + assert result.true_subset.size == 0 + + # historical parent updated, doesn't matter + state = state.with_runs(run_request("A", "2019-07-05-00:00")) + state, result = state.evaluate("B") + assert result.true_subset.size == 0 + + # latest parent updated, now can execute + state = state.with_runs(run_request("A", "2020-02-02-00:00")) + state, result = state.evaluate("B") + assert result.true_subset.size == 1 + state = state.with_runs( + *(run_request(ak, pk) for ak, pk in result.true_subset.asset_partitions) + ) + + # now B has been materialized, so don't execute again + state, result = state.evaluate("B") + assert result.true_subset.size == 0 + + # new partition comes into being, parent hasn't been materialized yet + state = state.with_current_time_advanced(hours=1) + state, result = state.evaluate("B") + assert result.true_subset.size == 0 + + # parent gets materialized, B requested + state = state.with_runs(run_request("A", "2020-02-02-01:00")) + state, result = state.evaluate("B") + assert result.true_subset.size == 1 + # but it fails + state = state.with_failed_run_for_asset("B", "2020-02-02-01:00") + + # B does not get immediately requested again + state, result = state.evaluate("B") + assert result.true_subset.size == 0 + + # now it's been over 10 minutes since B was requested, try again + state = state.with_current_time_advanced(minutes=10, seconds=1) + state, result = state.evaluate("B") + assert result.true_subset.size == 1