From fdfe04215b83fa99e308afcd43a27cc6a19953ab Mon Sep 17 00:00:00 2001 From: Owen Kephart Date: Fri, 10 May 2024 11:35:53 -0700 Subject: [PATCH] Create an as_auto_materialize_policy method on SchedulingCondition --- .../definitions/auto_materialize_policy.py | 12 ++++++--- .../scheduling_condition.py | 8 ++++++ ...test_auto_materialize_policy_conversion.py | 27 +++++++++++++++++++ 3 files changed, 44 insertions(+), 3 deletions(-) create mode 100644 python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/test_auto_materialize_policy_conversion.py diff --git a/python_modules/dagster/dagster/_core/definitions/auto_materialize_policy.py b/python_modules/dagster/dagster/_core/definitions/auto_materialize_policy.py index eb5fd2e7d7585..0b0246c1b6e92 100644 --- a/python_modules/dagster/dagster/_core/definitions/auto_materialize_policy.py +++ b/python_modules/dagster/dagster/_core/definitions/auto_materialize_policy.py @@ -176,15 +176,21 @@ def skip_rules(self) -> AbstractSet["AutoMaterializeRule"]: @staticmethod def from_asset_condition(asset_condition: SchedulingCondition) -> "AutoMaterializePolicy": + return AutoMaterializePolicy.from_scheduling_condition(asset_condition) + + @staticmethod + def from_scheduling_condition( + scheduling_condition: SchedulingCondition, + ) -> "AutoMaterializePolicy": """Constructs an AutoMaterializePolicy which will materialize an asset partition whenever - the provided asset_condition evaluates to True. + the provided scheduling_condition evaluates to True. Args: - asset_condition (AssetCondition): The condition which determines whether an asset + scheduling_condition (SchedulingCondition): The condition which determines whether an asset partition should be materialized. """ return AutoMaterializePolicy( - rules=set(), max_materializations_per_minute=None, asset_condition=asset_condition + rules=set(), max_materializations_per_minute=None, asset_condition=scheduling_condition ) @public diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_scheduling/scheduling_condition.py b/python_modules/dagster/dagster/_core/definitions/declarative_scheduling/scheduling_condition.py index ec4560c5d9831..cecf946c1a2a1 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_scheduling/scheduling_condition.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_scheduling/scheduling_condition.py @@ -18,6 +18,8 @@ from dagster._utils.security import non_secure_md5_hash_str if TYPE_CHECKING: + from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy + from .operands import ( FailedSchedulingCondition, InLatestTimeWindowCondition, @@ -62,6 +64,12 @@ def get_unique_id(self, *, parent_unique_id: Optional[str], index: Optional[int] parts = [str(parent_unique_id), str(index), self.__class__.__name__, self.description] return non_secure_md5_hash_str("".join(parts).encode()) + def as_auto_materialize_policy(self) -> "AutoMaterializePolicy": + """Returns an AutoMaterializePolicy which contains this condition.""" + from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy + + return AutoMaterializePolicy.from_scheduling_condition(self) + @abstractmethod def evaluate(self, context: "SchedulingContext") -> "SchedulingResult": raise NotImplementedError() diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/test_auto_materialize_policy_conversion.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/test_auto_materialize_policy_conversion.py new file mode 100644 index 0000000000000..5907cb10ca0e2 --- /dev/null +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/test_auto_materialize_policy_conversion.py @@ -0,0 +1,27 @@ +import dagster._check as check +from dagster import ( + AutoMaterializePolicy, + Definitions, + SchedulingCondition, + asset, + deserialize_value, + serialize_value, +) + + +def test_round_trip_conversion() -> None: + policy = SchedulingCondition.eager().as_auto_materialize_policy() + serialized_policy = serialize_value(policy) + deserialized_policy = deserialize_value(serialized_policy, AutoMaterializePolicy) + assert policy == deserialized_policy + assert deserialized_policy.asset_condition == SchedulingCondition.eager() + + +def test_defs() -> None: + @asset(auto_materialize_policy=SchedulingCondition.eager().as_auto_materialize_policy()) + def my_asset() -> None: ... + + defs = Definitions(assets=[my_asset]) + + asset_graph_amp = defs.get_asset_graph().get(my_asset.key).auto_materialize_policy + assert check.not_none(asset_graph_amp).to_scheduling_condition() == SchedulingCondition.eager()