Skip to content

Commit

Permalink
Convert SchedulingContext to DagsterModel
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Apr 30, 2024
1 parent 49f9edb commit b130055
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ def evaluate_asset(
current_evaluation_state_by_key=evaluation_state_by_key,
create_time=pendulum.now("UTC"),
logger=self.logger,
_legacy_context=legacy_context,
inner_legacy_context=legacy_context,
)

result = asset_condition.evaluate(context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Dict,
FrozenSet,
List,
Expand Down Expand Up @@ -199,7 +200,7 @@ class AssetConditionEvaluationState:
previous_tick_evaluation_timestamp: Optional[float]

max_storage_id: Optional[int]
extra_state_by_unique_id: Mapping[str, PackableValue]
extra_state_by_unique_id: Mapping[str, Any]

@property
def asset_key(self) -> AssetKey:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import dataclasses
import datetime
import functools
import logging
from dataclasses import dataclass
from typing import TYPE_CHECKING, AbstractSet, Mapping, Optional, Tuple
from typing import TYPE_CHECKING, AbstractSet, Any, Mapping, Optional, Tuple

import pendulum

Expand All @@ -21,15 +19,15 @@
)
from dagster._core.definitions.events import AssetKeyPartitionKey
from dagster._core.definitions.partition import PartitionsDefinition
from dagster._model import DagsterModel

from .asset_condition_evaluation_context import AssetConditionEvaluationContext

if TYPE_CHECKING:
from dagster._utils.caching_instance_queryer import CachingInstanceQueryer


@dataclass(frozen=True)
class SchedulingContext:
class SchedulingContext(DagsterModel):
asset_key: AssetKey

condition: AssetCondition
Expand All @@ -49,7 +47,9 @@ class SchedulingContext:
# which rely on the legact context object. however, this object contains many fields
# which are not relevant to the scheduling condition evaluation, and so keeping it
# as a reference here makes it easy to remove it in the future.
_legacy_context: AssetConditionEvaluationContext
#
# marked as Any to avoid circular imports in the pydantic validation logic
inner_legacy_context: Any # AssetConditionEvaluationContext

@functools.cached_property
def candidate_slice(self) -> AssetSlice:
Expand All @@ -59,10 +59,6 @@ def candidate_slice(self) -> AssetSlice:
def partitions_def(self) -> Optional[PartitionsDefinition]:
return self.asset_graph_view.asset_graph.get(self.asset_key).partitions_def

@property
def legacy_context(self) -> AssetConditionEvaluationContext:
return self._legacy_context

@property
def start_timestamp(self) -> float:
return self.create_time.timestamp()
Expand All @@ -85,6 +81,10 @@ def new_max_storage_id(self) -> Optional[int]:
# TODO: this should be pulled from the asset graph view
return self._get_updated_parents_and_storage_id()[1]

@property
def legacy_context(self) -> AssetConditionEvaluationContext:
return self.inner_legacy_context

@property
def _queryer(self) -> "CachingInstanceQueryer":
return self.asset_graph_view._queryer # noqa
Expand Down Expand Up @@ -126,17 +126,18 @@ def for_child_condition(
asset_key: Optional[AssetKey] = None,
):
child_unique_id = child_condition.get_unique_id(parent_unique_id=self.condition_unique_id)
return dataclasses.replace(
self,
asset_key=asset_key or self.asset_key,
condition=child_condition,
condition_unique_id=child_unique_id,
candidate_subset=candidate_subset,
previous_evaluation=self.previous_evaluation.for_child(child_unique_id)
if self.previous_evaluation
else None,
create_time=pendulum.now("UTC"),
_legacy_context=self._legacy_context.for_child(
child_condition, child_unique_id, candidate_subset
),
return self.model_copy(
update={
"asset_key": asset_key or self.asset_key,
"condition": child_condition,
"condition_unique_id": child_unique_id,
"candidate_subset": candidate_subset,
"previous_evaluation": self.previous_evaluation.for_child(child_unique_id)
if self.previous_evaluation
else None,
"create_time": pendulum.now("UTC"),
"inner_legacy_context": self.legacy_context.for_child(
child_condition, child_unique_id, candidate_subset
),
}
)
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def evaluate(
current_evaluation_state_by_key={},
create_time=self.current_time,
logger=self.logger,
_legacy_context=legacy_context,
inner_legacy_context=legacy_context,
)

full_result = asset_condition.evaluate(context)
Expand Down

0 comments on commit b130055

Please sign in to comment.