From e138ccc8d4a6d32b122ec2135f06fd57eb8330a9 Mon Sep 17 00:00:00 2001 From: Owen Kephart Date: Fri, 3 May 2024 15:58:08 -0700 Subject: [PATCH] Make candidate_slice and true_slice Optional --- .../asset_graph_view/asset_graph_view.py | 12 +- .../auto_materialize_rule_impls.py | 30 ++--- .../operands/updated_since_cron_condition.py | 5 +- .../scheduling_evaluation_info.py | 4 +- .../test_legacy_missing_condition.py | 104 ++++++++++++++++++ 5 files changed, 135 insertions(+), 20 deletions(-) create mode 100644 python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/test_legacy_missing_condition.py diff --git a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py index df73abd0a6610..0e8001ce52d77 100644 --- a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py +++ b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py @@ -337,7 +337,13 @@ def get_asset_slice(self, asset_key: "AssetKey") -> "AssetSlice": ), ) - def get_asset_slice_from_subset(self, subset: AssetSubset) -> "AssetSlice": + def get_asset_slice_from_subset(self, subset: AssetSubset) -> Optional["AssetSlice"]: + if subset.is_compatible_with_partitions_def(self._get_partitions_def(subset.asset_key)): + return _slice_from_subset(self, subset) + else: + return None + + def get_asset_slice_from_valid_subset(self, subset: ValidAssetSubset) -> "AssetSlice": return _slice_from_subset(self, subset) def compute_missing_subslice( @@ -353,7 +359,7 @@ def compute_missing_subslice( if self.asset_graph.get(asset_key).is_materializable: # cheap call which takes advantage of the partition status cache materialized_subset = self._queryer.get_materialized_asset_subset(asset_key=asset_key) - materialized_slice = self.get_asset_slice_from_subset(materialized_subset) + materialized_slice = self.get_asset_slice_from_valid_subset(materialized_subset) return from_slice.compute_difference(materialized_slice) else: # more expensive call @@ -365,7 +371,7 @@ def compute_missing_subslice( missing_subset = ValidAssetSubset.from_asset_partitions_set( asset_key, self._get_partitions_def(asset_key), missing_asset_partitions ) - return self.get_asset_slice_from_subset(missing_subset) + return self.get_asset_slice_from_valid_subset(missing_subset) def compute_parent_asset_slice( self, parent_asset_key: AssetKey, asset_slice: AssetSlice diff --git a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule_impls.py b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule_impls.py index 5ad1fe27ab5f1..e4baa2e0669c6 100644 --- a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule_impls.py +++ b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule_impls.py @@ -80,7 +80,7 @@ def evaluate_for_asset(self, context: "SchedulingContext") -> "SchedulingResult" true_subset, subsets_with_metadata = freshness_evaluation_results_for_asset_key( context.legacy_context.root_context ) - true_slice = context.asset_graph_view.get_asset_slice_from_subset(true_subset) + true_slice = context.asset_graph_view.get_asset_slice_from_valid_subset(true_subset) return SchedulingResult.create(context, true_slice, subsets_with_metadata) @@ -217,7 +217,9 @@ def evaluate_for_asset(self, context: "SchedulingContext") -> "SchedulingResult" - context.legacy_context.materialized_requested_or_discarded_since_previous_tick_subset ) - true_slice = context.asset_graph_view.get_asset_slice_from_subset(asset_subset_to_request) + true_slice = context.asset_graph_view.get_asset_slice_from_valid_subset( + asset_subset_to_request + ) return SchedulingResult.create(context, true_slice=true_slice) @@ -435,7 +437,7 @@ def evaluate_for_asset(self, context: "SchedulingContext") -> "SchedulingResult" ignore_subset=context.legacy_context.materialized_requested_or_discarded_since_previous_tick_subset, ) ) - true_slice = context.asset_graph_view.get_asset_slice_from_subset(true_subset) + true_slice = context.asset_graph_view.get_asset_slice_from_valid_subset(true_subset) return SchedulingResult.create(context, true_slice, subsets_with_metadata) @@ -534,7 +536,9 @@ def evaluate_for_asset(self, context: "SchedulingContext") -> "SchedulingResult" return SchedulingResult.create( context, - true_slice=context.asset_graph_view.get_asset_slice_from_subset(unhandled_candidates), + true_slice=context.asset_graph_view.get_asset_slice_from_valid_subset( + unhandled_candidates + ), # we keep track of the handled subset instead of the unhandled subset because new # partitions may spontaneously jump into existence at any time extra_state=handled_subset, @@ -588,7 +592,7 @@ def evaluate_for_asset(self, context: "SchedulingContext") -> "SchedulingResult" asset_partitions_by_evaluation_data, ignore_subset=subset_to_evaluate ) ) - true_slice = context.asset_graph_view.get_asset_slice_from_subset(true_subset) + true_slice = context.asset_graph_view.get_asset_slice_from_valid_subset(true_subset) return SchedulingResult.create(context, true_slice, subsets_with_metadata) @@ -645,7 +649,7 @@ def evaluate_for_asset( asset_partitions_by_evaluation_data, ignore_subset=subset_to_evaluate ) ) - true_slice = context.asset_graph_view.get_asset_slice_from_subset(true_subset) + true_slice = context.asset_graph_view.get_asset_slice_from_valid_subset(true_subset) return SchedulingResult.create(context, true_slice, subsets_with_metadata) @@ -739,7 +743,7 @@ def evaluate_for_asset( asset_partitions_by_evaluation_data, ignore_subset=subset_to_evaluate ) ) - true_slice = context.asset_graph_view.get_asset_slice_from_subset(true_subset) + true_slice = context.asset_graph_view.get_asset_slice_from_valid_subset(true_subset) return SchedulingResult.create(context, true_slice, subsets_with_metadata) @@ -965,7 +969,7 @@ def evaluate_for_asset(self, context: "SchedulingContext") -> "SchedulingResult" return SchedulingResult.create( context, - true_slice=context.asset_graph_view.get_asset_slice_from_subset( + true_slice=context.asset_graph_view.get_asset_slice_from_valid_subset( context.legacy_context.candidate_subset - all_parents_updated_subset ), extra_state=list(updated_subsets_by_key.values()), @@ -1016,7 +1020,7 @@ def evaluate_for_asset(self, context: "SchedulingContext") -> "SchedulingResult" asset_partitions_by_evaluation_data, ignore_subset=subset_to_evaluate ) ) - true_slice = context.asset_graph_view.get_asset_slice_from_subset(true_subset) + true_slice = context.asset_graph_view.get_asset_slice_from_valid_subset(true_subset) return SchedulingResult.create(context, true_slice, subsets_with_metadata) @@ -1056,7 +1060,7 @@ def evaluate_for_asset(self, context: "SchedulingContext") -> "SchedulingResult" else: true_subset = context.legacy_context.candidate_subset & backfilling_subset - true_slice = context.asset_graph_view.get_asset_slice_from_subset(true_subset) + true_slice = context.asset_graph_view.get_asset_slice_from_valid_subset(true_subset) return SchedulingResult.create(context, true_slice) @@ -1085,7 +1089,7 @@ def evaluate_for_asset(self, context: "SchedulingContext") -> "SchedulingResult" return SchedulingResult.create( context, - context.asset_graph_view.get_asset_slice_from_subset( + context.asset_graph_view.get_asset_slice_from_valid_subset( AssetSubset.from_asset_partitions_set( context.legacy_context.asset_key, context.legacy_context.partitions_def, @@ -1123,13 +1127,13 @@ def evaluate_for_asset(self, context: "SchedulingContext") -> "SchedulingResult" if dagster_run and dagster_run.status in IN_PROGRESS_RUN_STATUSES: return SchedulingResult.create( context, - context.asset_graph_view.get_asset_slice_from_subset( + context.asset_graph_view.get_asset_slice_from_valid_subset( context.legacy_context.candidate_subset ), ) return SchedulingResult.create( context, - context.asset_graph_view.get_asset_slice_from_subset( + context.asset_graph_view.get_asset_slice_from_valid_subset( context.legacy_context.empty_subset() ), ) diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_scheduling/operands/updated_since_cron_condition.py b/python_modules/dagster/dagster/_core/definitions/declarative_scheduling/operands/updated_since_cron_condition.py index a91875da3aaa6..730472d014a21 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_scheduling/operands/updated_since_cron_condition.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_scheduling/operands/updated_since_cron_condition.py @@ -27,9 +27,10 @@ def evaluate(self, context: SchedulingContext) -> SchedulingResult: previous_cron_tick = self._get_previous_cron_tick(context) if ( - # never evaluated + # never evaluated or evaluation info is no longer valid context.previous_evaluation_info is None or context.previous_evaluation_node is None + or context.previous_evaluation_node.true_slice is None # not evaluated since latest schedule tick or context.previous_evaluation_info.temporal_context.effective_dt < previous_cron_tick # has new set of candidates @@ -46,7 +47,7 @@ def evaluate(self, context: SchedulingContext) -> SchedulingResult: ) # TODO: implement this on the AssetGraphView true_slice = context.candidate_slice.compute_intersection( - context.asset_graph_view.get_asset_slice_from_subset(updated_subset) + context.asset_graph_view.get_asset_slice_from_valid_subset(updated_subset) ) else: true_slice = context.previous_evaluation_node.true_slice diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_scheduling/scheduling_evaluation_info.py b/python_modules/dagster/dagster/_core/definitions/declarative_scheduling/scheduling_evaluation_info.py index 060f4ae322b3f..1218c5210d530 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_scheduling/scheduling_evaluation_info.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_scheduling/scheduling_evaluation_info.py @@ -23,8 +23,8 @@ class SchedulingEvaluationResultNode(DagsterModel): asset_key: AssetKey condition_unique_id: str - true_slice: AssetSlice - candidate_slice: AssetSlice + true_slice: Optional[AssetSlice] + candidate_slice: Optional[AssetSlice] subsets_with_metadata: Sequence[AssetSubsetWithMetadata] extra_state: Any diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/test_legacy_missing_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/test_legacy_missing_condition.py new file mode 100644 index 0000000000000..4857a721ddd1b --- /dev/null +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/test_legacy_missing_condition.py @@ -0,0 +1,104 @@ +import datetime + +from dagster import AutoMaterializePolicy, Definitions, asset +from dagster._core.definitions.declarative_scheduling.legacy.asset_condition import ( + AssetCondition, +) +from dagster._core.definitions.declarative_scheduling.serialized_objects import ( + AssetConditionEvaluation, +) +from dagster._core.remote_representation.external_data import external_repository_data_from_def +from dagster._serdes import serialize_value +from dagster._serdes.serdes import deserialize_value + +from ..base_scenario import run_request +from ..scenario_specs import ( + daily_partitions_def, + day_partition_key, + one_asset, + time_partitions_start_datetime, +) +from .asset_condition_scenario import AssetConditionScenarioState + + +def test_missing_unpartitioned() -> None: + state = AssetConditionScenarioState(one_asset, asset_condition=AssetCondition.missing()) + + state, result = state.evaluate("A") + assert result.true_subset.size == 1 + + evaluation1 = deserialize_value( + serialize_value(AssetConditionEvaluation.from_result(result)), AssetConditionEvaluation + ) + + # still true + state, result = state.evaluate("A") + assert result.true_subset.size == 1 + + evaluation2 = AssetConditionEvaluation.from_result(result) + + assert evaluation2.equivalent_to_stored_evaluation(evaluation1) + + # after a run of A it's now False + state, result = state.with_runs(run_request("A")).evaluate("A") + assert result.true_subset.size == 0 + + # if we evaluate from scratch, it's also False + _, result = state.without_previous_evaluation_state().evaluate("A") + assert result.true_subset.size == 0 + + +def test_missing_time_partitioned() -> None: + state = ( + AssetConditionScenarioState(one_asset, asset_condition=AssetCondition.missing()) + .with_asset_properties(partitions_def=daily_partitions_def) + .with_current_time(time_partitions_start_datetime) + .with_current_time_advanced(days=6, minutes=1) + ) + + state, result = state.evaluate("A") + assert result.true_subset.size == 6 + + # still true + state, result = state.evaluate("A") + assert result.true_subset.size == 6 + + # after two runs of A those partitions are now False + state, result = state.with_runs( + run_request("A", day_partition_key(time_partitions_start_datetime, 2)), + run_request("A", day_partition_key(time_partitions_start_datetime, 3)), + ).evaluate("A") + assert result.true_subset.size == 4 + + # result is stable + state, result = state.evaluate("A") + assert result.true_subset.size == 4 + + # if the partitions definition changes, then we have 1 fewer missing partition + state = state.with_asset_properties( + partitions_def=daily_partitions_def._replace( + start=time_partitions_start_datetime + datetime.timedelta(days=1) + ) + ) + state, result = state.evaluate("A") + assert result.true_subset.size == 3 + assert False + + # if we evaluate from scratch, get the same answer + _, result = state.without_previous_evaluation_state().evaluate("A") + assert result.true_subset.size == 3 + + +def test_serialize_definitions_with_asset_condition(): + amp = AutoMaterializePolicy.from_asset_condition( + AssetCondition.parent_newer() & ~AssetCondition.updated_since_cron("0 * * * *") + ) + + @asset(auto_materialize_policy=amp) + def my_asset(): + return 0 + + result = serialize_value( + external_repository_data_from_def(Definitions(assets=[my_asset]).get_repository_def()) + ) + assert isinstance(result, str)