Skip to content

Commit

Permalink
Update eager condition to not retry as frequently on failure
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed May 13, 2024
1 parent fc7420a commit edb5d0f
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,10 @@ def parent_newer() -> "ParentNewerCondition":
return ParentNewerCondition()

@staticmethod
def eager() -> "SchedulingCondition":
def eager(
*,
failure_retry_delta: datetime.timedelta = datetime.timedelta(hours=1),
) -> "SchedulingCondition":
"""Returns a condition which will "eagerly" fill in missing partitions as they are created,
and ensures unpartitioned assets are updated whenever their dependencies are updated (either
via scheduled execution or ad-hoc runs).
Expand All @@ -201,6 +204,10 @@ def eager() -> "SchedulingCondition":
- None of its parent partitions are missing
- None of its parent partitions are currently part of an in-progress run
- It is not currently part of an in-progress run
It will also refuse to materialize an asset partition if the latest materialization attempt
failed and has not already been scheduled within the failure_retry_delta, to prevent
repeated failures.
"""
missing_or_parent_updated = (
SchedulingCondition.parent_newer()
Expand All @@ -213,12 +220,16 @@ def eager() -> "SchedulingCondition":
any_parent_in_progress = SchedulingCondition.any_deps_match(
SchedulingCondition.in_progress()
)
failed_recently = SchedulingCondition.failed() & SchedulingCondition.scheduled_since(
failure_retry_delta
)
return (
SchedulingCondition.in_latest_time_window()
& missing_or_parent_updated
& ~any_parent_missing
& ~any_parent_in_progress
& ~SchedulingCondition.in_progress()
& ~failed_recently
)

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import datetime

from dagster import SchedulingCondition

from dagster_tests.definitions_tests.auto_materialize_tests.base_scenario import run_request

from ..scenario_specs import hourly_partitions_def, two_assets_in_sequence
from .asset_condition_scenario import AssetConditionScenarioState


def test_eager_unpartitioned() -> None:
state = AssetConditionScenarioState(
two_assets_in_sequence,
asset_condition=SchedulingCondition.eager(),
ensure_empty_result=False,
)

# parent hasn't updated yet
state, result = state.evaluate("B")
assert result.true_subset.size == 0

# parent updated, now can execute
state = state.with_runs(run_request("A"))
state, result = state.evaluate("B")
assert result.true_subset.size == 1
state = state.with_runs(
*(run_request(ak, pk) for ak, pk in result.true_subset.asset_partitions)
)

# now B has been materialized, so don't execute again
state, result = state.evaluate("B")
assert result.true_subset.size == 0

# A gets materialized again before the hour, execute B again
state = state.with_runs(run_request("A"))
state, result = state.evaluate("B")
assert result.true_subset.size == 1
# however, B fails
state = state.with_failed_run_for_asset("B")

# do not try to materialize B again immediately
state, result = state.evaluate("B")
assert result.true_subset.size == 0

# now it's been over an hour since B was requested, try again
state = state.with_current_time_advanced(hours=1, seconds=1)
state, result = state.evaluate("B")
assert result.true_subset.size == 1


def test_eager_hourly_partitioned() -> None:
state = (
AssetConditionScenarioState(
two_assets_in_sequence,
asset_condition=SchedulingCondition.eager(
failure_retry_delta=datetime.timedelta(minutes=10)
),
ensure_empty_result=False,
)
.with_asset_properties(partitions_def=hourly_partitions_def)
.with_current_time("2020-02-02T01:05:00")
)

# parent hasn't updated yet
state, result = state.evaluate("B")
assert result.true_subset.size == 0

# historical parent updated, doesn't matter
state = state.with_runs(run_request("A", "2019-07-05-00:00"))
state, result = state.evaluate("B")
assert result.true_subset.size == 0

# latest parent updated, now can execute
state = state.with_runs(run_request("A", "2020-02-02-00:00"))
state, result = state.evaluate("B")
assert result.true_subset.size == 1
state = state.with_runs(
*(run_request(ak, pk) for ak, pk in result.true_subset.asset_partitions)
)

# now B has been materialized, so don't execute again
state, result = state.evaluate("B")
assert result.true_subset.size == 0

# new partition comes into being, parent hasn't been materialized yet
state = state.with_current_time_advanced(hours=1)
state, result = state.evaluate("B")
assert result.true_subset.size == 0

# parent gets materialized, B requested
state = state.with_runs(run_request("A", "2020-02-02-01:00"))
state, result = state.evaluate("B")
assert result.true_subset.size == 1
# but it fails
state = state.with_failed_run_for_asset("B", "2020-02-02-01:00")

# B does not get immediately requested again
state, result = state.evaluate("B")
assert result.true_subset.size == 0

# now it's been over 10 minutes since B was requested, try again
state = state.with_current_time_advanced(minutes=10, seconds=1)
state, result = state.evaluate("B")
assert result.true_subset.size == 1

0 comments on commit edb5d0f

Please sign in to comment.