Skip to content

Commit

Permalink
[DS][39/n] Use FailedSchedulingCondition and ScheduledSince condition…
Browse files Browse the repository at this point in the history
… in eager policy (#21741)

## Summary & Motivation

As title. This was alluded to in the initial eager policy PR, this is putting them in action. In the case that the latest run for an asset partition failed, we will not request the asset if we've already requested it within the last hour. This puts a natural "rate limit" on the asset in the case that it is repeatedly failing, but does not interfere if the asset is materializing as expected.

## How I Tested These Changes
  • Loading branch information
OwenKephart committed May 14, 2024
1 parent 9eb7b35 commit e02a249
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,10 @@ def parent_newer() -> "ParentNewerCondition":
return ParentNewerCondition()

@staticmethod
def eager() -> "SchedulingCondition":
def eager_with_rate_limit(
*,
failure_retry_delta: datetime.timedelta = datetime.timedelta(hours=1),
) -> "SchedulingCondition":
"""Returns a condition which will "eagerly" fill in missing partitions as they are created,
and ensures unpartitioned assets are updated whenever their dependencies are updated (either
via scheduled execution or ad-hoc runs).
Expand All @@ -201,6 +204,10 @@ def eager() -> "SchedulingCondition":
- None of its parent partitions are missing
- None of its parent partitions are currently part of an in-progress run
- It is not currently part of an in-progress run
It will also refuse to materialize an asset partition if the latest materialization attempt
failed and has not already been scheduled within the failure_retry_delta, to prevent
repeated failures.
"""
missing_or_parent_updated = (
SchedulingCondition.parent_newer()
Expand All @@ -213,12 +220,16 @@ def eager() -> "SchedulingCondition":
any_parent_in_progress = SchedulingCondition.any_deps_match(
SchedulingCondition.in_progress()
)
failed_recently = SchedulingCondition.failed() & SchedulingCondition.scheduled_since(
failure_retry_delta
)
return (
SchedulingCondition.in_latest_time_window()
& missing_or_parent_updated
& ~any_parent_missing
& ~any_parent_in_progress
& ~SchedulingCondition.in_progress()
& ~failed_recently
)

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import datetime

from dagster import SchedulingCondition

from dagster_tests.definitions_tests.auto_materialize_tests.base_scenario import run_request

from ..scenario_specs import hourly_partitions_def, two_assets_in_sequence
from .asset_condition_scenario import AssetConditionScenarioState


def test_eager_with_rate_limit_unpartitioned() -> None:
state = AssetConditionScenarioState(
two_assets_in_sequence,
asset_condition=SchedulingCondition.eager_with_rate_limit(),
ensure_empty_result=False,
)

# parent hasn't updated yet
state, result = state.evaluate("B")
assert result.true_subset.size == 0

# parent updated, now can execute
state = state.with_runs(run_request("A"))
state, result = state.evaluate("B")
assert result.true_subset.size == 1
state = state.with_runs(
*(run_request(ak, pk) for ak, pk in result.true_subset.asset_partitions)
)

# now B has been materialized, so don't execute again
state, result = state.evaluate("B")
assert result.true_subset.size == 0

# A gets materialized again before the hour, execute B again
state = state.with_runs(run_request("A"))
state, result = state.evaluate("B")
assert result.true_subset.size == 1
# however, B fails
state = state.with_failed_run_for_asset("B")

# do not try to materialize B again immediately
state, result = state.evaluate("B")
assert result.true_subset.size == 0

# now it's been over an hour since B was requested, try again
state = state.with_current_time_advanced(hours=1, seconds=1)
state, result = state.evaluate("B")
assert result.true_subset.size == 1


def test_eager_with_rate_limit_hourly_partitioned() -> None:
state = (
AssetConditionScenarioState(
two_assets_in_sequence,
asset_condition=SchedulingCondition.eager_with_rate_limit(
failure_retry_delta=datetime.timedelta(minutes=10)
),
ensure_empty_result=False,
)
.with_asset_properties(partitions_def=hourly_partitions_def)
.with_current_time("2020-02-02T01:05:00")
)

# parent hasn't updated yet
state, result = state.evaluate("B")
assert result.true_subset.size == 0

# historical parent updated, doesn't matter
state = state.with_runs(run_request("A", "2019-07-05-00:00"))
state, result = state.evaluate("B")
assert result.true_subset.size == 0

# latest parent updated, now can execute
state = state.with_runs(run_request("A", "2020-02-02-00:00"))
state, result = state.evaluate("B")
assert result.true_subset.size == 1
state = state.with_runs(
*(run_request(ak, pk) for ak, pk in result.true_subset.asset_partitions)
)

# now B has been materialized, so don't execute again
state, result = state.evaluate("B")
assert result.true_subset.size == 0

# new partition comes into being, parent hasn't been materialized yet
state = state.with_current_time_advanced(hours=1)
state, result = state.evaluate("B")
assert result.true_subset.size == 0

# parent gets materialized, B requested
state = state.with_runs(run_request("A", "2020-02-02-01:00"))
state, result = state.evaluate("B")
assert result.true_subset.size == 1
# but it fails
state = state.with_failed_run_for_asset("B", "2020-02-02-01:00")

# B does not get immediately requested again
state, result = state.evaluate("B")
assert result.true_subset.size == 0

# now it's been over 10 minutes since B was requested, try again
state = state.with_current_time_advanced(minutes=10, seconds=1)
state, result = state.evaluate("B")
assert result.true_subset.size == 1

0 comments on commit e02a249

Please sign in to comment.