Skip to content

Commit

Permalink
Rename SchedulingContext object
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed May 2, 2024
1 parent 0961765 commit ddbe7a3
Show file tree
Hide file tree
Showing 10 changed files with 58 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.data_time import CachingDataTimeResolver
from dagster._core.definitions.data_version import CachingStaleStatusResolver
from dagster._core.definitions.declarative_scheduling.scheduling_condition_evaluation_context import (
SchedulingConditionEvaluationContext,
from dagster._core.definitions.declarative_scheduling.scheduling_context import (
SchedulingContext,
)
from dagster._core.definitions.events import AssetKey, AssetKeyPartitionKey
from dagster._core.definitions.run_request import RunRequest
Expand Down Expand Up @@ -230,7 +230,7 @@ def evaluate_asset(
evaluation_state_by_key=evaluation_state_by_key,
expected_data_time_mapping=expected_data_time_mapping,
)
context = SchedulingConditionEvaluationContext(
context = SchedulingContext(
asset_key=asset_key,
condition=asset_condition,
condition_unique_id=asset_condition.get_unique_id(parent_unique_id=None),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
AssetConditionResult,
)

from .declarative_scheduling.scheduling_condition_evaluation_context import (
SchedulingConditionEvaluationContext,
from .declarative_scheduling.scheduling_context import (
SchedulingContext,
)


Expand Down Expand Up @@ -67,9 +67,7 @@ def to_asset_condition(self) -> "AssetCondition":
return RuleCondition(rule=self)

@abstractmethod
def evaluate_for_asset(
self, context: "SchedulingConditionEvaluationContext"
) -> "AssetConditionResult":
def evaluate_for_asset(self, context: "SchedulingContext") -> "AssetConditionResult":
"""The core evaluation function for the rule. This function takes in a context object and
returns a mapping from evaluated rules to the set of asset partitions that the rule applies
to.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@
AssetConditionResult,
)

from .declarative_scheduling.scheduling_condition_evaluation_context import (
SchedulingConditionEvaluationContext,
from .declarative_scheduling.scheduling_context import (
SchedulingContext,
)


Expand All @@ -74,9 +74,7 @@ def decision_type(self) -> AutoMaterializeDecisionType:
def description(self) -> str:
return "required to meet this or downstream asset's freshness policy"

def evaluate_for_asset(
self, context: "SchedulingConditionEvaluationContext"
) -> "AssetConditionResult":
def evaluate_for_asset(self, context: "SchedulingContext") -> "AssetConditionResult":
from .declarative_scheduling.asset_condition import AssetConditionResult

true_subset, subsets_with_metadata = freshness_evaluation_results_for_asset_key(
Expand All @@ -101,9 +99,7 @@ def decision_type(self) -> AutoMaterializeDecisionType:
def description(self) -> str:
return f"not materialized since last cron schedule tick of '{self.cron_schedule}' (timezone: {self.timezone})"

def missed_cron_ticks(
self, context: "SchedulingConditionEvaluationContext"
) -> Sequence[datetime.datetime]:
def missed_cron_ticks(self, context: "SchedulingContext") -> Sequence[datetime.datetime]:
"""Returns the cron ticks which have been missed since the previous cursor was generated."""
# if it's the first time evaluating this rule, then just count the latest tick as missed
if (
Expand Down Expand Up @@ -131,7 +127,7 @@ def missed_cron_ticks(

def get_new_candidate_asset_partitions(
self,
context: "SchedulingConditionEvaluationContext",
context: "SchedulingContext",
missed_ticks: Sequence[datetime.datetime],
) -> AbstractSet[AssetKeyPartitionKey]:
if not missed_ticks:
Expand Down Expand Up @@ -192,9 +188,7 @@ def get_new_candidate_asset_partitions(
for time_partition_key in missed_time_partition_keys
}

def evaluate_for_asset(
self, context: "SchedulingConditionEvaluationContext"
) -> "AssetConditionResult":
def evaluate_for_asset(self, context: "SchedulingContext") -> "AssetConditionResult":
from .declarative_scheduling.asset_condition import AssetConditionResult

missed_ticks = self.missed_cron_ticks(context)
Expand Down Expand Up @@ -248,7 +242,7 @@ def description(self) -> str:

def passes(
self,
context: "SchedulingConditionEvaluationContext",
context: "SchedulingContext",
asset_partitions: Iterable[AssetKeyPartitionKey],
) -> Iterable[AssetKeyPartitionKey]:
if self.latest_run_required_tags is None:
Expand Down Expand Up @@ -347,9 +341,7 @@ def description(self) -> str:
else:
return base

def evaluate_for_asset(
self, context: "SchedulingConditionEvaluationContext"
) -> "AssetConditionResult":
def evaluate_for_asset(self, context: "SchedulingContext") -> "AssetConditionResult":
"""Evaluates the set of asset partitions of this asset whose parents have been updated,
or will update on this tick.
"""
Expand Down Expand Up @@ -454,7 +446,7 @@ def decision_type(self) -> AutoMaterializeDecisionType:
def description(self) -> str:
return "materialization is missing"

def get_handled_subset(self, context: "SchedulingConditionEvaluationContext") -> AssetSubset:
def get_handled_subset(self, context: "SchedulingContext") -> AssetSubset:
"""Returns the AssetSubset which has been handled (materialized, requested, or discarded).
Accounts for cases in which the partitions definition may have changed between ticks.
"""
Expand All @@ -474,9 +466,7 @@ def get_handled_subset(self, context: "SchedulingConditionEvaluationContext") ->
| previous_handled_subset
)

def evaluate_for_asset(
self, context: "SchedulingConditionEvaluationContext"
) -> "AssetConditionResult":
def evaluate_for_asset(self, context: "SchedulingContext") -> "AssetConditionResult":
"""Evaluates the set of asset partitions for this asset which are missing and were not
previously discarded.
"""
Expand Down Expand Up @@ -558,9 +548,7 @@ def decision_type(self) -> AutoMaterializeDecisionType:
def description(self) -> str:
return "waiting on upstream data to be up to date"

def evaluate_for_asset(
self, context: "SchedulingConditionEvaluationContext"
) -> "AssetConditionResult":
def evaluate_for_asset(self, context: "SchedulingContext") -> "AssetConditionResult":
from .declarative_scheduling.asset_condition import AssetConditionResult

asset_partitions_by_evaluation_data = defaultdict(set)
Expand Down Expand Up @@ -612,7 +600,7 @@ def description(self) -> str:

def evaluate_for_asset(
self,
context: "SchedulingConditionEvaluationContext",
context: "SchedulingContext",
) -> "AssetConditionResult":
from .declarative_scheduling.asset_condition import AssetConditionResult

Expand Down Expand Up @@ -688,7 +676,7 @@ def description(self) -> str:

def evaluate_for_asset(
self,
context: "SchedulingConditionEvaluationContext",
context: "SchedulingContext",
) -> "AssetConditionResult":
from .declarative_scheduling.asset_condition import AssetConditionResult

Expand Down Expand Up @@ -765,7 +753,7 @@ def decision_type(self) -> AutoMaterializeDecisionType:
def description(self) -> str:
return f"waiting until all upstream assets have updated since the last cron schedule tick of '{self.cron_schedule}' (timezone: {self.timezone})"

def passed_time_window(self, context: "SchedulingConditionEvaluationContext") -> TimeWindow:
def passed_time_window(self, context: "SchedulingContext") -> TimeWindow:
"""Returns the window of time that has passed between the previous two cron ticks. All
parent assets must contain all data from this time window in order for this asset to be
materialized.
Expand All @@ -782,7 +770,7 @@ def passed_time_window(self, context: "SchedulingConditionEvaluationContext") ->

def get_parent_subset_updated_since_cron(
self,
context: "SchedulingConditionEvaluationContext",
context: "SchedulingContext",
parent_asset_key: AssetKey,
passed_time_window: TimeWindow,
) -> ValidAssetSubset:
Expand Down Expand Up @@ -834,7 +822,7 @@ def get_parent_subset_updated_since_cron(
return new_parent_subset | previous_parent_subset

def get_parent_subsets_updated_since_cron_by_key(
self, context: "SchedulingConditionEvaluationContext", passed_time_window: TimeWindow
self, context: "SchedulingContext", passed_time_window: TimeWindow
) -> Mapping[AssetKey, ValidAssetSubset]:
"""Returns a mapping of parent asset keys to the AssetSubset of each parent that has been
updated since the end of the previous cron tick. Does not compute this value for time-window
Expand All @@ -858,7 +846,7 @@ def get_parent_subsets_updated_since_cron_by_key(

def parent_updated_since_cron(
self,
context: "SchedulingConditionEvaluationContext",
context: "SchedulingContext",
passed_time_window: TimeWindow,
parent_asset_key: AssetKey,
child_asset_partition: AssetKeyPartitionKey,
Expand Down Expand Up @@ -914,9 +902,7 @@ def parent_updated_since_cron(
for p in non_updated_parent_asset_partitions
)

def evaluate_for_asset(
self, context: "SchedulingConditionEvaluationContext"
) -> "AssetConditionResult":
def evaluate_for_asset(self, context: "SchedulingContext") -> "AssetConditionResult":
from .declarative_scheduling.asset_condition import AssetConditionResult

passed_time_window = self.passed_time_window(context)
Expand Down Expand Up @@ -990,9 +976,7 @@ def decision_type(self) -> AutoMaterializeDecisionType:
def description(self) -> str:
return "required parent partitions do not exist"

def evaluate_for_asset(
self, context: "SchedulingConditionEvaluationContext"
) -> "AssetConditionResult":
def evaluate_for_asset(self, context: "SchedulingContext") -> "AssetConditionResult":
from .declarative_scheduling.asset_condition import AssetConditionResult

asset_partitions_by_evaluation_data = defaultdict(set)
Expand Down Expand Up @@ -1043,9 +1027,7 @@ def description(self) -> str:
else:
return "targeted by an in-progress backfill"

def evaluate_for_asset(
self, context: "SchedulingConditionEvaluationContext"
) -> "AssetConditionResult":
def evaluate_for_asset(self, context: "SchedulingContext") -> "AssetConditionResult":
from .declarative_scheduling.asset_condition import AssetConditionResult

backfilling_subset = (
Expand Down Expand Up @@ -1080,9 +1062,7 @@ def decision_type(self) -> AutoMaterializeDecisionType:
def description(self) -> str:
return f"exceeds {self.limit} materialization(s) per minute"

def evaluate_for_asset(
self, context: "SchedulingConditionEvaluationContext"
) -> "AssetConditionResult":
def evaluate_for_asset(self, context: "SchedulingContext") -> "AssetConditionResult":
from .declarative_scheduling.asset_condition import AssetConditionResult

# the set of asset partitions which exceed the limit
Expand Down Expand Up @@ -1113,9 +1093,7 @@ def decision_type(self) -> AutoMaterializeDecisionType:
def description(self) -> str:
return "in-progress run for asset"

def evaluate_for_asset(
self, context: "SchedulingConditionEvaluationContext"
) -> "AssetConditionResult":
def evaluate_for_asset(self, context: "SchedulingContext") -> "AssetConditionResult":
from .declarative_scheduling.asset_condition import AssetConditionResult

if context.legacy_context.partitions_def is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from ..auto_materialize_rule import AutoMaterializeRule

if TYPE_CHECKING:
from .scheduling_condition_evaluation_context import SchedulingConditionEvaluationContext
from .scheduling_context import SchedulingContext


T = TypeVar("T")
Expand Down Expand Up @@ -211,7 +211,7 @@ def true_subset(self) -> AssetSubset:

@staticmethod
def create(
context: "SchedulingConditionEvaluationContext", root_result: "AssetConditionResult"
context: "SchedulingContext", root_result: "AssetConditionResult"
) -> "AssetConditionEvaluationState":
"""Convenience constructor to generate an AssetConditionEvaluationState from the root result
and the context in which it was evaluated.
Expand Down Expand Up @@ -285,7 +285,7 @@ def get_unique_id(self, parent_unique_id: Optional[str]) -> str:
return non_secure_md5_hash_str("".join(parts).encode())

@abstractmethod
def evaluate(self, context: "SchedulingConditionEvaluationContext") -> "AssetConditionResult":
def evaluate(self, context: "SchedulingContext") -> "AssetConditionResult":
raise NotImplementedError()

@property
Expand Down Expand Up @@ -463,7 +463,7 @@ def get_unique_id(self, parent_unique_id: Optional[str]) -> str:
def description(self) -> str:
return self.rule.description

def evaluate(self, context: "SchedulingConditionEvaluationContext") -> "AssetConditionResult":
def evaluate(self, context: "SchedulingContext") -> "AssetConditionResult":
context.legacy_context.root_context.daemon_context.logger.debug(
f"Evaluating rule: {self.rule.to_snapshot()}"
)
Expand All @@ -490,7 +490,7 @@ def children(self) -> Sequence[AssetCondition]:
def description(self) -> str:
return "All of"

def evaluate(self, context: "SchedulingConditionEvaluationContext") -> "AssetConditionResult":
def evaluate(self, context: "SchedulingContext") -> "AssetConditionResult":
child_results: List[AssetConditionResult] = []
true_subset = context.candidate_subset
for child in self.children:
Expand Down Expand Up @@ -518,7 +518,7 @@ def children(self) -> Sequence[AssetCondition]:
def description(self) -> str:
return "Any of"

def evaluate(self, context: "SchedulingConditionEvaluationContext") -> "AssetConditionResult":
def evaluate(self, context: "SchedulingContext") -> "AssetConditionResult":
child_results: List[AssetConditionResult] = []
true_subset = context.asset_graph_view.create_empty_slice(
context.asset_key
Expand Down Expand Up @@ -548,7 +548,7 @@ def description(self) -> str:
def children(self) -> Sequence[AssetCondition]:
return [self.operand]

def evaluate(self, context: "SchedulingConditionEvaluationContext") -> "AssetConditionResult":
def evaluate(self, context: "SchedulingContext") -> "AssetConditionResult":
child_context = context.for_child_condition(
child_condition=self.operand, candidate_subset=context.candidate_subset
)
Expand Down Expand Up @@ -578,7 +578,7 @@ def true_subset(self) -> AssetSubset:

@staticmethod
def create_from_children(
context: "SchedulingConditionEvaluationContext",
context: "SchedulingContext",
true_subset: ValidAssetSubset,
child_results: Sequence["AssetConditionResult"],
) -> "AssetConditionResult":
Expand All @@ -597,7 +597,7 @@ def create_from_children(

@staticmethod
def create(
context: "SchedulingConditionEvaluationContext",
context: "SchedulingContext",
true_subset: ValidAssetSubset,
subsets_with_metadata: Sequence[AssetSubsetWithMetadata] = [],
extra_state: PackableValue = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from dagster._serdes.serdes import whitelist_for_serdes

from .asset_condition import AssetCondition, AssetConditionResult
from .scheduling_condition_evaluation_context import SchedulingConditionEvaluationContext
from .scheduling_context import SchedulingContext


class DepConditionWrapperCondition(AssetCondition):
Expand All @@ -18,7 +18,7 @@ class DepConditionWrapperCondition(AssetCondition):
def description(self) -> str:
return f"{self.dep_key.to_user_string()}"

def evaluate(self, context: SchedulingConditionEvaluationContext) -> AssetConditionResult:
def evaluate(self, context: SchedulingContext) -> AssetConditionResult:
# only evaluate parents of the current candidate subset
dep_candidate_subset = context.candidate_slice.compute_parent_slice(
self.dep_key
Expand Down Expand Up @@ -51,7 +51,7 @@ class AnyDepsCondition(AssetCondition):
def description(self) -> str:
return "Any deps"

def evaluate(self, context: SchedulingConditionEvaluationContext) -> AssetConditionResult:
def evaluate(self, context: SchedulingContext) -> AssetConditionResult:
dep_results = []
true_subset = context.asset_graph_view.create_empty_slice(
context.asset_key
Expand Down Expand Up @@ -81,7 +81,7 @@ class AllDepsCondition(AssetCondition):
def description(self) -> str:
return "All deps"

def evaluate(self, context: SchedulingConditionEvaluationContext) -> AssetConditionResult:
def evaluate(self, context: SchedulingContext) -> AssetConditionResult:
dep_results = []
true_subset = context.candidate_subset

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@


@dataclass(frozen=True)
class SchedulingConditionEvaluationContext:
class SchedulingContext:
# the AssetKey of the currently-evaluated asset
asset_key: AssetKey

Expand Down
Loading

0 comments on commit ddbe7a3

Please sign in to comment.