Skip to content

Commit

Permalink
Create cron composite condition
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed May 14, 2024
1 parent f098c04 commit 6b6127f
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,35 @@ def eager() -> "SchedulingCondition":
& ~SchedulingCondition.in_progress()
)

@staticmethod
def on_cron(cron_schedule: str, cron_timezone: str = "UTC") -> "SchedulingCondition":
"""Returns a condition which will materialize asset partitions within the latest time window
on a given cron schedule, after their parents have been updated. For example, if the
cron_schedule is set to "0 0 * * *" (every day at midnight), then this rule will not become
true on a given day until all of its parents have been updated during that same day.
Specifically, this is a composite SchedulingCondition which is true for an asset partition
if all of the following are true:
- The asset partition is within the latest time window
- All parent asset partitions have been updated since the latest tick of the provided cron
schedule, or will be requested this tick
- The asset partition has not been updated since the latest tick of the provided cron schedule
- The asset partition is not currently part of an in-progress run
"""
all_parents_updated_or_will_update = ~SchedulingCondition.any_deps_match(
~(
SchedulingCondition.updated_since_cron(cron_schedule, cron_timezone)
| SchedulingCondition.requested_this_tick()
)
)
return (
SchedulingCondition.in_latest_time_window()
& all_parents_updated_or_will_update
& ~SchedulingCondition.updated_since_cron(cron_schedule, cron_timezone)
& ~SchedulingCondition.in_progress()
)


class SchedulingResult(DagsterModel):
condition: SchedulingCondition
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
from dagster import SchedulingCondition

from dagster_tests.definitions_tests.auto_materialize_tests.base_scenario import run_request

from ..scenario_specs import hourly_partitions_def, two_assets_in_sequence
from .asset_condition_scenario import AssetConditionScenarioState


def test_on_cron_unpartitioned() -> None:
state = AssetConditionScenarioState(
two_assets_in_sequence,
asset_condition=SchedulingCondition.on_cron(cron_schedule="0 * * * *"),
).with_current_time("2020-02-02T01:05:00")

# parent hasn't updated yet
state, result = state.evaluate("B")
assert result.true_subset.size == 0

# parent updated, now can execute
state = state.with_runs(run_request("A"))
state, result = state.evaluate("B")
assert result.true_subset.size == 1
state = state.with_runs(
*(run_request(ak, pk) for ak, pk in result.true_subset.asset_partitions)
)

# now B has been materialized, so don't execute again
state, result = state.evaluate("B")
assert result.true_subset.size == 0

# A gets materialized again before the hour, so don't execute B again
state = state.with_runs(run_request("A"))
state, result = state.evaluate("B")
assert result.true_subset.size == 0

# now a new cron tick, but A still hasn't been materialized since the hour
state = state.with_current_time_advanced(hours=1)
state, result = state.evaluate("B")
assert result.true_subset.size == 0

# A gets materialized again after the hour, so execute B again
state = state.with_runs(run_request("A"))
state, result = state.evaluate("B")
assert result.true_subset.size == 1


def test_on_cron_hourly_partitioned() -> None:
state = (
AssetConditionScenarioState(
two_assets_in_sequence,
asset_condition=SchedulingCondition.on_cron(cron_schedule="0 * * * *"),
)
.with_asset_properties(partitions_def=hourly_partitions_def)
.with_current_time("2020-02-02T01:05:00")
)

# parent hasn't updated yet
state, result = state.evaluate("B")
assert result.true_subset.size == 0

# historical parent updated, doesn't matter
state = state.with_runs(run_request("A", "2019-07-05-00:00"))
state, result = state.evaluate("B")
assert result.true_subset.size == 0

# latest parent updated, now can execute
state = state.with_runs(run_request("A", "2020-02-02-00:00"))
state, result = state.evaluate("B")
assert result.true_subset.size == 1
state = state.with_runs(
*(run_request(ak, pk) for ak, pk in result.true_subset.asset_partitions)
)

# now B has been materialized, so don't execute again
state, result = state.evaluate("B")
assert result.true_subset.size == 0

# now a new cron tick, but A still hasn't been materialized since the hour
state = state.with_current_time_advanced(hours=1)
state, result = state.evaluate("B")
assert result.true_subset.size == 0

# A gets materialized with the previous partition after the hour, but that doesn't matter
state = state.with_runs(run_request("A", "2020-02-02-00:00"))
state, result = state.evaluate("B")
assert result.true_subset.size == 0

# A gets materialized with the latest partition, fire
state = state.with_runs(run_request("A", "2020-02-02-01:00"))
state, result = state.evaluate("B")
assert result.true_subset.size == 1

0 comments on commit 6b6127f

Please sign in to comment.