Skip to content

Commit

Permalink
[DA] Update eager condition (#22746)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Two things:

### update eager definition

This subtly changes the definition of the `eager()` condition to deal
with a particular annoying case. Essentially, imagine you have assets A
-> B -> C, each with an eager policy, and starting off missing.

On the first tick, all assets are detected as missing, and so a run is
created targeting all of them. Now, this run progresses, and A is
materialized.

At this point in time, if we evaluate `B`, we will notice that `A` has
now been newly updated, which would then result in `B` being
materialized again (similar things would happen for `C`).

In the AMP world, this is handled by fundamentally redefining the
"parent updated" condition to ignore cases where the child will be
materialized in the same run. While the same thing could be done here, I
would argue that this pollutes the simplicity of the system and will
result in a fair amount of confusion (there will be cases where people
want the "pure" behavior, and the history page would now indicate that a
parent _hadn't_ updated even though it did).

The alternative path, taken here, is to avoid materializing an asset if
there's already an in-progress run for it, and only materialize if
there's been a parent update since the last time the asset was requested
_or materialized_. This means that, while that initial run is
progressing, the asset will not be re-materialized, and once that run
completes, the asset will be in a "good state" again, as its parent has
not updated more recently than the last time it was updated (or
requested)

### removes / renames some properties

This removes the "since_last_x" static constructors. We can always add
them back in, but they were (in my opinion) a bit "too cute". They don't
cover a high enough percentage of use cases to be worth having their own
dedicated static methods, and just pollute the typeahead namespace.

Also just renames "on_cron" to "cron" -- again a reversible decision but
feels a bit cleaner.

## How I Tested These Changes
  • Loading branch information
OwenKephart authored and alangenfeld committed Jun 28, 2024
1 parent 939cb25 commit ef82147
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from dagster._annotations import experimental
from dagster._core.asset_graph_view.asset_graph_view import AssetSlice, TemporalContext
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_selection import AssetSelection
from dagster._core.definitions.asset_subset import AssetSubset
from dagster._core.definitions.declarative_automation.serialized_objects import (
AssetConditionEvaluation,
Expand Down Expand Up @@ -114,40 +113,6 @@ def since(self, reset_condition: "AutomationCondition") -> "SinceCondition":

return SinceCondition(trigger_condition=self, reset_condition=reset_condition)

def since_last_updated(self) -> "SinceCondition":
"""Returns a AutomationCondition that is true if this condition has become true since the
last time this asset was updated.
"""
from .operands import NewlyUpdatedCondition
from .operators import SinceCondition

return SinceCondition(trigger_condition=self, reset_condition=NewlyUpdatedCondition())

def since_last_requested(self) -> "SinceCondition":
"""Returns a AutomationCondition that is true if this condition has become true since the
last time this asset was updated.
"""
from .operands import NewlyRequestedCondition
from .operators import SinceCondition

return SinceCondition(trigger_condition=self, reset_condition=NewlyRequestedCondition())

def since_last_cron_tick(
self, cron_schedule: str, cron_timezone: str = "UTC"
) -> "SinceCondition":
"""Returns a AutomationCondition that is true if this condition has become true since the
latest tick of the given cron schedule.
"""
from .operands import CronTickPassedCondition
from .operators import SinceCondition

return SinceCondition(
trigger_condition=self,
reset_condition=CronTickPassedCondition(
cron_schedule=cron_schedule, cron_timezone=cron_timezone
),
)

def newly_true(self) -> "NewlyTrueCondition":
"""Returns a AutomationCondition that is true only on the tick that this condition goes
from false to true for a given asset partition.
Expand All @@ -170,11 +135,7 @@ def any_deps_match(condition: "AutomationCondition") -> "AnyDepsCondition":
return AnyDepsCondition(operand=condition)

@staticmethod
def all_deps_match(
condition: "AutomationCondition",
include_selection: Optional[AssetSelection] = None,
exclude_selection: Optional[AssetSelection] = None,
) -> "AllDepsCondition":
def all_deps_match(condition: "AutomationCondition") -> "AllDepsCondition":
"""Returns a AutomationCondition that is true for an asset partition if at least one partition
of all of its dependencies evaluate to True for the given condition.
Expand Down Expand Up @@ -286,13 +247,16 @@ def eager() -> "AutomationCondition":
)
return (
AutomationCondition.in_latest_time_window()
& became_missing_or_any_deps_updated.since_last_requested()
& became_missing_or_any_deps_updated.since(
AutomationCondition.newly_requested() | AutomationCondition.newly_updated()
)
& ~any_parent_missing
& ~any_parent_in_progress
& ~AutomationCondition.in_progress()
)

@staticmethod
def on_cron(cron_schedule: str, cron_timezone: str = "UTC") -> "AutomationCondition":
def cron(cron_schedule: str, cron_timezone: str = "UTC") -> "AutomationCondition":
"""Returns a condition which will materialize asset partitions within the latest time window
on a given cron schedule, after their parents have been updated. For example, if the
cron_schedule is set to "0 0 * * *" (every day at midnight), then this rule will not become
Expand All @@ -306,15 +270,14 @@ def on_cron(cron_schedule: str, cron_timezone: str = "UTC") -> "AutomationCondit
schedule, or will be requested this tick
- The asset partition has not been requested since the latest tick of the provided cron schedule
"""
cron_tick_passed = AutomationCondition.cron_tick_passed(cron_schedule, cron_timezone)
all_deps_updated_since_cron = AutomationCondition.all_deps_match(
AutomationCondition.newly_updated().since_last_cron_tick(cron_schedule, cron_timezone)
AutomationCondition.newly_updated().since(cron_tick_passed)
| AutomationCondition.will_be_requested()
)
return (
AutomationCondition.in_latest_time_window()
& AutomationCondition.cron_tick_passed(
cron_schedule, cron_timezone
).since_last_requested()
& cron_tick_passed.since(AutomationCondition.newly_requested())
& all_deps_updated_since_cron
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ def test_missing_time_partitioned() -> None:
def test_serialize_definitions_with_asset_condition() -> None:
amp = AutoMaterializePolicy.from_asset_condition(
AutomationCondition.eager()
& ~AutomationCondition.newly_updated().since_last_cron_tick(
cron_schedule="0 * * * *", cron_timezone="UTC"
& ~AutomationCondition.newly_updated().since(
AutomationCondition.cron_tick_passed(cron_schedule="0 * * * *", cron_timezone="UTC")
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
def test_on_cron_unpartitioned() -> None:
state = AutomationConditionScenarioState(
two_assets_in_sequence,
automation_condition=AutomationCondition.on_cron(cron_schedule="0 * * * *"),
automation_condition=AutomationCondition.cron(cron_schedule="0 * * * *"),
ensure_empty_result=False,
).with_current_time("2020-02-02T00:55:00")

Expand Down Expand Up @@ -54,7 +54,7 @@ def test_on_cron_hourly_partitioned() -> None:
state = (
AutomationConditionScenarioState(
two_assets_in_sequence,
automation_condition=AutomationCondition.on_cron(cron_schedule="0 * * * *"),
automation_condition=AutomationCondition.cron(cron_schedule="0 * * * *"),
ensure_empty_result=False,
)
.with_asset_properties(partitions_def=hourly_partitions_def)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,18 @@
[
# cron condition returns a unique value hash if parents change, if schedule changes, if the
# partitions def changes, or if an asset is materialized
("b965fde7adb65aefeaceccb72d1924f7", SC.on_cron("0 * * * *"), one_parent, False),
("455fa56d35fd9ae07bc9ee891ea109d7", SC.on_cron("0 * * * *"), one_parent, True),
("e038e2ffef6417fe048dbdb927b56fdf", SC.on_cron("0 0 * * *"), one_parent, False),
("80742dcd71a359a366d8312dfa283ffb", SC.on_cron("0 * * * *"), one_parent_daily, False),
("0179e633e3c1aac0d7af0dd3a3889f1a", SC.on_cron("0 * * * *"), two_parents, False),
("72bf7d1e533896a459ea3f46d30540d6", SC.on_cron("0 * * * *"), two_parents_daily, False),
("b965fde7adb65aefeaceccb72d1924f7", SC.cron("0 * * * *"), one_parent, False),
("455fa56d35fd9ae07bc9ee891ea109d7", SC.cron("0 * * * *"), one_parent, True),
("e038e2ffef6417fe048dbdb927b56fdf", SC.cron("0 0 * * *"), one_parent, False),
("80742dcd71a359a366d8312dfa283ffb", SC.cron("0 * * * *"), one_parent_daily, False),
("0179e633e3c1aac0d7af0dd3a3889f1a", SC.cron("0 * * * *"), two_parents, False),
("72bf7d1e533896a459ea3f46d30540d6", SC.cron("0 * * * *"), two_parents_daily, False),
# same as above
("67b021dba2eb717b1d4436417b2de6f4", SC.eager(), one_parent, False),
("da76d728a9fbeda3c69199faada031dc", SC.eager(), one_parent, True),
("1af68f634579fd18181f2af6b3b93aaa", SC.eager(), one_parent_daily, False),
("065ea22c39b86160cdad9e7cc86d241e", SC.eager(), two_parents, False),
("7819068ab1f9c2212d4c5622f2b7313c", SC.eager(), two_parents_daily, False),
("b60a8bd378adc06d0f6b20d521e64a86", SC.eager(), one_parent, False),
("c8d5928ae9965d3dc4c271b20121680d", SC.eager(), one_parent, True),
("ea699de7aef5356433e435dcaf4ab51e", SC.eager(), one_parent_daily, False),
("2819ba2e50803da9f146fd034e0df412", SC.eager(), two_parents, False),
("5f94b12ce4e5c9c424b9f37335d8cb82", SC.eager(), two_parents_daily, False),
# missing condition is invariant to changes other than partitions def changes
("651bece3ee8bb50d1616924f0a65f3fd", SC.missing(), one_parent, False),
("651bece3ee8bb50d1616924f0a65f3fd", SC.missing(), one_parent, True),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
def test_updated_since_cron_unpartitioned() -> None:
state = AutomationConditionScenarioState(
one_asset,
automation_condition=AutomationCondition.newly_updated().since_last_cron_tick(
cron_schedule="0 * * * *", cron_timezone="UTC"
automation_condition=AutomationCondition.newly_updated().since(
AutomationCondition.cron_tick_passed(cron_schedule="0 * * * *", cron_timezone="UTC")
),
).with_current_time("2020-02-02T00:55:00")

Expand Down Expand Up @@ -40,8 +40,8 @@ def test_updated_since_cron_partitioned() -> None:
state = (
AutomationConditionScenarioState(
one_asset,
automation_condition=AutomationCondition.newly_updated().since_last_cron_tick(
cron_schedule="0 * * * *", cron_timezone="UTC"
automation_condition=AutomationCondition.newly_updated().since(
AutomationCondition.cron_tick_passed(cron_schedule="0 * * * *", cron_timezone="UTC")
),
)
.with_asset_properties(partitions_def=two_partitions_def)
Expand Down

0 comments on commit ef82147

Please sign in to comment.