Skip to content

Commit

Permalink
Create an as_auto_materialize_policy method on SchedulingCondition
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed May 14, 2024
1 parent e02a249 commit fc4966f
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,21 @@ def skip_rules(self) -> AbstractSet["AutoMaterializeRule"]:

@staticmethod
def from_asset_condition(asset_condition: SchedulingCondition) -> "AutoMaterializePolicy":
return AutoMaterializePolicy.from_scheduling_condition(asset_condition)

@staticmethod
def from_scheduling_condition(
scheduling_condition: SchedulingCondition,
) -> "AutoMaterializePolicy":
"""Constructs an AutoMaterializePolicy which will materialize an asset partition whenever
the provided asset_condition evaluates to True.
the provided scheduling_condition evaluates to True.
Args:
asset_condition (AssetCondition): The condition which determines whether an asset
scheduling_condition (SchedulingCondition): The condition which determines whether an asset
partition should be materialized.
"""
return AutoMaterializePolicy(
rules=set(), max_materializations_per_minute=None, asset_condition=asset_condition
rules=set(), max_materializations_per_minute=None, asset_condition=scheduling_condition
)

@public
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from dagster._utils.security import non_secure_md5_hash_str

if TYPE_CHECKING:
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy

from .operands import (
FailedSchedulingCondition,
InLatestTimeWindowCondition,
Expand Down Expand Up @@ -62,6 +64,12 @@ def get_unique_id(self, *, parent_unique_id: Optional[str], index: Optional[int]
parts = [str(parent_unique_id), str(index), self.__class__.__name__, self.description]
return non_secure_md5_hash_str("".join(parts).encode())

def as_auto_materialize_policy(self) -> "AutoMaterializePolicy":
"""Returns an AutoMaterializePolicy which contains this condition."""
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy

return AutoMaterializePolicy.from_scheduling_condition(self)

@abstractmethod
def evaluate(self, context: "SchedulingContext") -> "SchedulingResult":
raise NotImplementedError()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import dagster._check as check
from dagster import (
AutoMaterializePolicy,
Definitions,
SchedulingCondition,
asset,
deserialize_value,
serialize_value,
)


def test_round_trip_conversion() -> None:
policy = SchedulingCondition.eager_with_rate_limit().as_auto_materialize_policy()
serialized_policy = serialize_value(policy)
deserialized_policy = deserialize_value(serialized_policy, AutoMaterializePolicy)
assert policy == deserialized_policy
assert deserialized_policy.asset_condition == SchedulingCondition.eager_with_rate_limit()


def test_defs() -> None:
@asset(
auto_materialize_policy=SchedulingCondition.eager_with_rate_limit().as_auto_materialize_policy()
)
def my_asset() -> None: ...

defs = Definitions(assets=[my_asset])

asset_graph_amp = defs.get_asset_graph().get(my_asset.key).auto_materialize_policy
assert (
check.not_none(asset_graph_amp).to_scheduling_condition()
== SchedulingCondition.eager_with_rate_limit()
)

0 comments on commit fc4966f

Please sign in to comment.