diff --git a/python_modules/dagster/dagster/_core/definitions/auto_materialize_policy.py b/python_modules/dagster/dagster/_core/definitions/auto_materialize_policy.py index eb5fd2e7d758..0b0246c1b6e9 100644 --- a/python_modules/dagster/dagster/_core/definitions/auto_materialize_policy.py +++ b/python_modules/dagster/dagster/_core/definitions/auto_materialize_policy.py @@ -176,15 +176,21 @@ def skip_rules(self) -> AbstractSet["AutoMaterializeRule"]: @staticmethod def from_asset_condition(asset_condition: SchedulingCondition) -> "AutoMaterializePolicy": + return AutoMaterializePolicy.from_scheduling_condition(asset_condition) + + @staticmethod + def from_scheduling_condition( + scheduling_condition: SchedulingCondition, + ) -> "AutoMaterializePolicy": """Constructs an AutoMaterializePolicy which will materialize an asset partition whenever - the provided asset_condition evaluates to True. + the provided scheduling_condition evaluates to True. Args: - asset_condition (AssetCondition): The condition which determines whether an asset + scheduling_condition (SchedulingCondition): The condition which determines whether an asset partition should be materialized. """ return AutoMaterializePolicy( - rules=set(), max_materializations_per_minute=None, asset_condition=asset_condition + rules=set(), max_materializations_per_minute=None, asset_condition=scheduling_condition ) @public diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_scheduling/scheduling_condition.py b/python_modules/dagster/dagster/_core/definitions/declarative_scheduling/scheduling_condition.py index d037787ab33c..ea4f09d70c7a 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_scheduling/scheduling_condition.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_scheduling/scheduling_condition.py @@ -18,6 +18,8 @@ from dagster._utils.security import non_secure_md5_hash_str if TYPE_CHECKING: + from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy + from .operands import ( FailedSchedulingCondition, InLatestTimeWindowCondition, @@ -62,6 +64,12 @@ def get_unique_id(self, *, parent_unique_id: Optional[str], index: Optional[int] parts = [str(parent_unique_id), str(index), self.__class__.__name__, self.description] return non_secure_md5_hash_str("".join(parts).encode()) + def as_auto_materialize_policy(self) -> "AutoMaterializePolicy": + """Returns an AutoMaterializePolicy which contains this condition.""" + from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy + + return AutoMaterializePolicy.from_scheduling_condition(self) + @abstractmethod def evaluate(self, context: "SchedulingContext") -> "SchedulingResult": raise NotImplementedError() diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/test_auto_materialize_policy_conversion.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/test_auto_materialize_policy_conversion.py new file mode 100644 index 000000000000..cd8606d7c855 --- /dev/null +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/test_auto_materialize_policy_conversion.py @@ -0,0 +1,32 @@ +import dagster._check as check +from dagster import ( + AutoMaterializePolicy, + Definitions, + SchedulingCondition, + asset, + deserialize_value, + serialize_value, +) + + +def test_round_trip_conversion() -> None: + policy = SchedulingCondition.eager_with_rate_limit().as_auto_materialize_policy() + serialized_policy = serialize_value(policy) + deserialized_policy = deserialize_value(serialized_policy, AutoMaterializePolicy) + assert policy == deserialized_policy + assert deserialized_policy.asset_condition == SchedulingCondition.eager_with_rate_limit() + + +def test_defs() -> None: + @asset( + auto_materialize_policy=SchedulingCondition.eager_with_rate_limit().as_auto_materialize_policy() + ) + def my_asset() -> None: ... + + defs = Definitions(assets=[my_asset]) + + asset_graph_amp = defs.get_asset_graph().get(my_asset.key).auto_materialize_policy + assert ( + check.not_none(asset_graph_amp).to_scheduling_condition() + == SchedulingCondition.eager_with_rate_limit() + )