From a4c99d7c992135d62e493cb1e0191b5f400a7d37 Mon Sep 17 00:00:00 2001 From: Owen Kephart Date: Tue, 7 May 2024 10:58:13 -0700 Subject: [PATCH] Remove some AssetCondition references --- .../operands/slice_conditions.py | 5 ++--- .../operands/updated_since_cron_condition.py | 2 ++ .../test_asset_condition.py | 8 ++++---- .../test_dep_condition.py | 18 +++++++----------- .../test_in_progress_condition.py | 8 +++++--- .../test_latest_time_window_condition.py | 16 +++++++--------- 6 files changed, 27 insertions(+), 30 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_scheduling/operands/slice_conditions.py b/python_modules/dagster/dagster/_core/definitions/declarative_scheduling/operands/slice_conditions.py index 86b85a434e9b8..53648ab54bce1 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_scheduling/operands/slice_conditions.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_scheduling/operands/slice_conditions.py @@ -5,12 +5,11 @@ from dagster._core.asset_graph_view.asset_graph_view import AssetSlice from dagster._serdes.serdes import whitelist_for_serdes -from ..legacy.asset_condition import AssetCondition -from ..scheduling_condition import SchedulingResult +from ..scheduling_condition import SchedulingCondition, SchedulingResult from ..scheduling_context import SchedulingContext -class SliceSchedulingCondition(AssetCondition): +class SliceSchedulingCondition(SchedulingCondition): """Base class for simple conditions which compute a simple slice of the asset graph.""" @abstractmethod diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_scheduling/operands/updated_since_cron_condition.py b/python_modules/dagster/dagster/_core/definitions/declarative_scheduling/operands/updated_since_cron_condition.py index 7c2d3164c7e79..d9a6ea691f8ff 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_scheduling/operands/updated_since_cron_condition.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_scheduling/operands/updated_since_cron_condition.py @@ -1,11 +1,13 @@ import datetime +from dagster._serdes.serdes import whitelist_for_serdes from dagster._utils.schedules import reverse_cron_string_iterator from ..scheduling_condition import SchedulingCondition, SchedulingResult from ..scheduling_context import SchedulingContext +@whitelist_for_serdes class UpdatedSinceCronCondition(SchedulingCondition): cron_schedule: str cron_timezone: str diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/test_asset_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/test_asset_condition.py index 33caa548313fa..3afa5d2781c02 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/test_asset_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/test_asset_condition.py @@ -1,4 +1,4 @@ -from dagster import AssetCondition, AutoMaterializePolicy, Definitions, asset +from dagster import AutoMaterializePolicy, Definitions, SchedulingCondition, asset from dagster._core.definitions.declarative_scheduling.serialized_objects import ( AssetConditionEvaluation, ) @@ -17,7 +17,7 @@ def test_missing_unpartitioned() -> None: - state = AssetConditionScenarioState(one_asset, asset_condition=AssetCondition.missing()) + state = AssetConditionScenarioState(one_asset, asset_condition=SchedulingCondition.missing()) state, result = state.evaluate("A") assert result.true_subset.size == 1 @@ -45,7 +45,7 @@ def test_missing_unpartitioned() -> None: def test_missing_time_partitioned() -> None: state = ( - AssetConditionScenarioState(one_asset, asset_condition=AssetCondition.missing()) + AssetConditionScenarioState(one_asset, asset_condition=SchedulingCondition.missing()) .with_asset_properties(partitions_def=daily_partitions_def) .with_current_time(time_partitions_start_datetime) .with_current_time_advanced(days=6, minutes=1) @@ -72,7 +72,7 @@ def test_missing_time_partitioned() -> None: def test_serialize_definitions_with_asset_condition() -> None: amp = AutoMaterializePolicy.from_asset_condition( - AssetCondition.parent_newer() & ~AssetCondition.updated_since_cron("0 * * * *") + SchedulingCondition.parent_newer() & ~SchedulingCondition.updated_since_cron("0 * * * *") ) @asset(auto_materialize_policy=amp) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/test_dep_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/test_dep_condition.py index 2533c7deb6519..e448f88afd9cc 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/test_dep_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/test_dep_condition.py @@ -1,13 +1,9 @@ import dagster._check as check import pytest +from dagster import SchedulingCondition from dagster._core.definitions.asset_key import AssetKey from dagster._core.definitions.asset_subset import AssetSubset -from dagster._core.definitions.declarative_scheduling.legacy.asset_condition import ( - AssetCondition, -) -from dagster._core.definitions.declarative_scheduling.scheduling_condition import ( - SchedulingResult, -) +from dagster._core.definitions.declarative_scheduling.scheduling_condition import SchedulingResult from dagster._core.definitions.declarative_scheduling.scheduling_context import ( SchedulingContext, ) @@ -20,7 +16,7 @@ def get_hardcoded_condition(): true_set = set() - class HardcodedCondition(AssetCondition): + class HardcodedCondition(SchedulingCondition): @property def description(self) -> str: return "..." @@ -52,9 +48,9 @@ def evaluate(self, context: SchedulingContext) -> SchedulingResult: def test_dep_missing_unpartitioned(is_any: bool) -> None: inner_condition, true_set = get_hardcoded_condition() condition = ( - AssetCondition.any_deps_match(inner_condition) + SchedulingCondition.any_deps_match(inner_condition) if is_any - else AssetCondition.all_deps_match(inner_condition) + else SchedulingCondition.all_deps_match(inner_condition) ) state = AssetConditionScenarioState(one_asset_depends_on_two, asset_condition=condition) @@ -80,9 +76,9 @@ def test_dep_missing_unpartitioned(is_any: bool) -> None: def test_dep_missing_partitioned(is_any: bool) -> None: inner_condition, true_set = get_hardcoded_condition() condition = ( - AssetCondition.any_deps_match(inner_condition) + SchedulingCondition.any_deps_match(inner_condition) if is_any - else AssetCondition.all_deps_match(inner_condition) + else SchedulingCondition.all_deps_match(inner_condition) ) state = AssetConditionScenarioState( one_asset_depends_on_two, asset_condition=condition diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/test_in_progress_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/test_in_progress_condition.py index 74eb53ca5446f..6838133144323 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/test_in_progress_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/test_in_progress_condition.py @@ -1,5 +1,5 @@ +from dagster import SchedulingCondition from dagster._core.definitions.asset_key import AssetKey -from dagster._core.definitions.declarative_scheduling.legacy.asset_condition import AssetCondition from dagster._core.definitions.events import AssetKeyPartitionKey from ..scenario_specs import one_asset, two_partitions_def @@ -7,7 +7,9 @@ def test_in_progress_unpartitioned() -> None: - state = AssetConditionScenarioState(one_asset, asset_condition=AssetCondition.in_progress()) + state = AssetConditionScenarioState( + one_asset, asset_condition=SchedulingCondition.in_progress() + ) # no run in progress state, result = state.evaluate("A") @@ -26,7 +28,7 @@ def test_in_progress_unpartitioned() -> None: def test_in_progress_static_partitioned() -> None: state = AssetConditionScenarioState( - one_asset, asset_condition=AssetCondition.in_progress() + one_asset, asset_condition=SchedulingCondition.in_progress() ).with_asset_properties(partitions_def=two_partitions_def) # no run in progress diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/test_latest_time_window_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/test_latest_time_window_condition.py index bb2a0728f5e17..034ecc57b5e1f 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/test_latest_time_window_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/test_latest_time_window_condition.py @@ -1,9 +1,7 @@ import datetime +from dagster import SchedulingCondition from dagster._core.definitions.asset_key import AssetKey -from dagster._core.definitions.declarative_scheduling.legacy.asset_condition import ( - AssetCondition, -) from dagster._core.definitions.events import AssetKeyPartitionKey from ..scenario_specs import ( @@ -17,7 +15,7 @@ def test_in_latest_time_window_unpartitioned() -> None: state = AssetConditionScenarioState( - one_asset, asset_condition=AssetCondition.in_latest_time_window() + one_asset, asset_condition=SchedulingCondition.in_latest_time_window() ) state, result = state.evaluate("A") @@ -27,7 +25,7 @@ def test_in_latest_time_window_unpartitioned() -> None: def test_in_latest_time_window_unpartitioned_lookback() -> None: state = AssetConditionScenarioState( one_asset, - asset_condition=AssetCondition.in_latest_time_window( + asset_condition=SchedulingCondition.in_latest_time_window( lookback_delta=datetime.timedelta(days=3) ), ) @@ -38,7 +36,7 @@ def test_in_latest_time_window_unpartitioned_lookback() -> None: def test_in_latest_time_window_static_partitioned() -> None: state = AssetConditionScenarioState( - one_asset, asset_condition=AssetCondition.in_latest_time_window() + one_asset, asset_condition=SchedulingCondition.in_latest_time_window() ).with_asset_properties(partitions_def=two_partitions_def) state, result = state.evaluate("A") @@ -48,7 +46,7 @@ def test_in_latest_time_window_static_partitioned() -> None: def test_in_latest_time_window_static_partitioned_lookback() -> None: state = AssetConditionScenarioState( one_asset, - asset_condition=AssetCondition.in_latest_time_window( + asset_condition=SchedulingCondition.in_latest_time_window( lookback_delta=datetime.timedelta(days=3) ), ).with_asset_properties(partitions_def=two_partitions_def) @@ -59,7 +57,7 @@ def test_in_latest_time_window_static_partitioned_lookback() -> None: def test_in_latest_time_window_time_partitioned() -> None: state = AssetConditionScenarioState( - one_asset, asset_condition=AssetCondition.in_latest_time_window() + one_asset, asset_condition=SchedulingCondition.in_latest_time_window() ).with_asset_properties(partitions_def=daily_partitions_def) # no partitions exist yet @@ -85,7 +83,7 @@ def test_in_latest_time_window_time_partitioned() -> None: def test_in_latest_time_window_time_partitioned_lookback() -> None: state = AssetConditionScenarioState( one_asset, - asset_condition=AssetCondition.in_latest_time_window( + asset_condition=SchedulingCondition.in_latest_time_window( lookback_delta=datetime.timedelta(days=3) ), ).with_asset_properties(partitions_def=daily_partitions_def)