From 7907df9204862a93083bcf7317af39d0da2f48a3 Mon Sep 17 00:00:00 2001 From: Owen Kephart Date: Tue, 30 Apr 2024 06:06:03 -0700 Subject: [PATCH] Convert SchedulingContext to DagsterModel --- .../_core/definitions/asset_daemon_context.py | 2 +- .../declarative_scheduling/asset_condition.py | 3 +- .../scheduling_context.py | 47 ++++++++++--------- .../asset_condition_scenario.py | 2 +- 4 files changed, 28 insertions(+), 26 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py index 93f589c295d6..d67cdab242d2 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py @@ -243,7 +243,7 @@ def evaluate_asset( current_evaluation_state_by_key=evaluation_state_by_key, create_time=pendulum.now("UTC"), logger=self.logger, - _legacy_context=legacy_context, + inner_legacy_context=legacy_context, ) result = asset_condition.evaluate(context) diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_scheduling/asset_condition.py b/python_modules/dagster/dagster/_core/definitions/declarative_scheduling/asset_condition.py index 1ebea6a923cf..aa0db4d240e6 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_scheduling/asset_condition.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_scheduling/asset_condition.py @@ -4,6 +4,7 @@ from typing import ( TYPE_CHECKING, AbstractSet, + Any, Dict, FrozenSet, List, @@ -199,7 +200,7 @@ class AssetConditionEvaluationState: previous_tick_evaluation_timestamp: Optional[float] max_storage_id: Optional[int] - extra_state_by_unique_id: Mapping[str, PackableValue] + extra_state_by_unique_id: Mapping[str, Any] @property def asset_key(self) -> AssetKey: diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_scheduling/scheduling_context.py b/python_modules/dagster/dagster/_core/definitions/declarative_scheduling/scheduling_context.py index 9eabfc99125a..92ab4c8e085a 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_scheduling/scheduling_context.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_scheduling/scheduling_context.py @@ -1,9 +1,7 @@ -import dataclasses import datetime import functools import logging -from dataclasses import dataclass -from typing import TYPE_CHECKING, AbstractSet, Mapping, Optional, Tuple +from typing import TYPE_CHECKING, AbstractSet, Any, Mapping, Optional, Tuple import pendulum @@ -21,6 +19,7 @@ ) from dagster._core.definitions.events import AssetKeyPartitionKey from dagster._core.definitions.partition import PartitionsDefinition +from dagster._model import DagsterModel from .asset_condition_evaluation_context import AssetConditionEvaluationContext @@ -28,8 +27,7 @@ from dagster._utils.caching_instance_queryer import CachingInstanceQueryer -@dataclass(frozen=True) -class SchedulingContext: +class SchedulingContext(DagsterModel): # the AssetKey of the currently-evaluated asset asset_key: AssetKey @@ -73,7 +71,9 @@ class SchedulingContext: # which rely on the legact context object. however, this object contains many fields # which are not relevant to the scheduling condition evaluation, and so keeping it # as a reference here makes it easy to remove it in the future. - _legacy_context: AssetConditionEvaluationContext + # + # marked as Any to avoid circular imports in the pydantic validation logic + inner_legacy_context: Any # AssetConditionEvaluationContext @functools.cached_property def candidate_slice(self) -> AssetSlice: @@ -83,10 +83,6 @@ def candidate_slice(self) -> AssetSlice: def partitions_def(self) -> Optional[PartitionsDefinition]: return self.asset_graph_view.asset_graph.get(self.asset_key).partitions_def - @property - def legacy_context(self) -> AssetConditionEvaluationContext: - return self._legacy_context - @property def start_timestamp(self) -> float: return self.create_time.timestamp() @@ -109,6 +105,10 @@ def new_max_storage_id(self) -> Optional[int]: # TODO: this should be pulled from the asset graph view return self._get_updated_parents_and_storage_id()[1] + @property + def legacy_context(self) -> AssetConditionEvaluationContext: + return self.inner_legacy_context + @property def _queryer(self) -> "CachingInstanceQueryer": return self.asset_graph_view._queryer # noqa @@ -150,17 +150,18 @@ def for_child_condition( asset_key: Optional[AssetKey] = None, ): child_unique_id = child_condition.get_unique_id(parent_unique_id=self.condition_unique_id) - return dataclasses.replace( - self, - asset_key=asset_key or self.asset_key, - condition=child_condition, - condition_unique_id=child_unique_id, - candidate_subset=candidate_subset, - previous_evaluation=self.previous_evaluation.for_child(child_unique_id) - if self.previous_evaluation - else None, - create_time=pendulum.now("UTC"), - _legacy_context=self._legacy_context.for_child( - child_condition, child_unique_id, candidate_subset - ), + return self.model_copy( + update={ + "asset_key": asset_key or self.asset_key, + "condition": child_condition, + "condition_unique_id": child_unique_id, + "candidate_subset": candidate_subset, + "previous_evaluation": self.previous_evaluation.for_child(child_unique_id) + if self.previous_evaluation + else None, + "create_time": pendulum.now("UTC"), + "inner_legacy_context": self.legacy_context.for_child( + child_condition, child_unique_id, candidate_subset + ), + } ) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/asset_condition_scenario.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/asset_condition_scenario.py index c46d6fd2c6fe..3165d03f532c 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/asset_condition_scenario.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/asset_condition_scenario.py @@ -101,7 +101,7 @@ def evaluate( current_evaluation_state_by_key={}, create_time=self.current_time, logger=self.logger, - _legacy_context=legacy_context, + inner_legacy_context=legacy_context, ) full_result = asset_condition.evaluate(context)