Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DS][40/n] Create an as_auto_materialize_policy method on SchedulingCondition #21788

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,21 @@ def skip_rules(self) -> AbstractSet["AutoMaterializeRule"]:

@staticmethod
def from_asset_condition(asset_condition: SchedulingCondition) -> "AutoMaterializePolicy":
return AutoMaterializePolicy.from_scheduling_condition(asset_condition)

@staticmethod
def from_scheduling_condition(
scheduling_condition: SchedulingCondition,
) -> "AutoMaterializePolicy":
"""Constructs an AutoMaterializePolicy which will materialize an asset partition whenever
the provided asset_condition evaluates to True.
the provided scheduling_condition evaluates to True.

Args:
asset_condition (AssetCondition): The condition which determines whether an asset
scheduling_condition (SchedulingCondition): The condition which determines whether an asset
partition should be materialized.
"""
return AutoMaterializePolicy(
rules=set(), max_materializations_per_minute=None, asset_condition=asset_condition
rules=set(), max_materializations_per_minute=None, asset_condition=scheduling_condition
)

@public
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from dagster._utils.security import non_secure_md5_hash_str

if TYPE_CHECKING:
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy

from .operands import (
FailedSchedulingCondition,
InLatestTimeWindowCondition,
Expand Down Expand Up @@ -62,6 +64,12 @@ def get_unique_id(self, *, parent_unique_id: Optional[str], index: Optional[int]
parts = [str(parent_unique_id), str(index), self.__class__.__name__, self.description]
return non_secure_md5_hash_str("".join(parts).encode())

def as_auto_materialize_policy(self) -> "AutoMaterializePolicy":
"""Returns an AutoMaterializePolicy which contains this condition."""
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy

return AutoMaterializePolicy.from_scheduling_condition(self)

@abstractmethod
def evaluate(self, context: "SchedulingContext") -> "SchedulingResult":
raise NotImplementedError()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import dagster._check as check
from dagster import (
AutoMaterializePolicy,
Definitions,
SchedulingCondition,
asset,
deserialize_value,
serialize_value,
)


def test_round_trip_conversion() -> None:
policy = SchedulingCondition.eager_with_rate_limit().as_auto_materialize_policy()
serialized_policy = serialize_value(policy)
deserialized_policy = deserialize_value(serialized_policy, AutoMaterializePolicy)
assert policy == deserialized_policy
assert deserialized_policy.asset_condition == SchedulingCondition.eager_with_rate_limit()


def test_defs() -> None:
@asset(
auto_materialize_policy=SchedulingCondition.eager_with_rate_limit().as_auto_materialize_policy()
)
def my_asset() -> None: ...

defs = Definitions(assets=[my_asset])

asset_graph_amp = defs.get_asset_graph().get(my_asset.key).auto_materialize_policy
assert (
check.not_none(asset_graph_amp).to_scheduling_condition()
== SchedulingCondition.eager_with_rate_limit()
)