Skip to content

Commit

Permalink
Make candidate_slice and true_slice Optional
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed May 3, 2024
1 parent a226325 commit e138ccc
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,13 @@ def get_asset_slice(self, asset_key: "AssetKey") -> "AssetSlice":
),
)

def get_asset_slice_from_subset(self, subset: AssetSubset) -> "AssetSlice":
def get_asset_slice_from_subset(self, subset: AssetSubset) -> Optional["AssetSlice"]:
if subset.is_compatible_with_partitions_def(self._get_partitions_def(subset.asset_key)):
return _slice_from_subset(self, subset)
else:
return None

def get_asset_slice_from_valid_subset(self, subset: ValidAssetSubset) -> "AssetSlice":
return _slice_from_subset(self, subset)

def compute_missing_subslice(
Expand All @@ -353,7 +359,7 @@ def compute_missing_subslice(
if self.asset_graph.get(asset_key).is_materializable:
# cheap call which takes advantage of the partition status cache
materialized_subset = self._queryer.get_materialized_asset_subset(asset_key=asset_key)
materialized_slice = self.get_asset_slice_from_subset(materialized_subset)
materialized_slice = self.get_asset_slice_from_valid_subset(materialized_subset)
return from_slice.compute_difference(materialized_slice)
else:
# more expensive call
Expand All @@ -365,7 +371,7 @@ def compute_missing_subslice(
missing_subset = ValidAssetSubset.from_asset_partitions_set(
asset_key, self._get_partitions_def(asset_key), missing_asset_partitions
)
return self.get_asset_slice_from_subset(missing_subset)
return self.get_asset_slice_from_valid_subset(missing_subset)

def compute_parent_asset_slice(
self, parent_asset_key: AssetKey, asset_slice: AssetSlice
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def evaluate_for_asset(self, context: "SchedulingContext") -> "SchedulingResult"
true_subset, subsets_with_metadata = freshness_evaluation_results_for_asset_key(
context.legacy_context.root_context
)
true_slice = context.asset_graph_view.get_asset_slice_from_subset(true_subset)
true_slice = context.asset_graph_view.get_asset_slice_from_valid_subset(true_subset)
return SchedulingResult.create(context, true_slice, subsets_with_metadata)


Expand Down Expand Up @@ -217,7 +217,9 @@ def evaluate_for_asset(self, context: "SchedulingContext") -> "SchedulingResult"
- context.legacy_context.materialized_requested_or_discarded_since_previous_tick_subset
)

true_slice = context.asset_graph_view.get_asset_slice_from_subset(asset_subset_to_request)
true_slice = context.asset_graph_view.get_asset_slice_from_valid_subset(
asset_subset_to_request
)
return SchedulingResult.create(context, true_slice=true_slice)


Expand Down Expand Up @@ -435,7 +437,7 @@ def evaluate_for_asset(self, context: "SchedulingContext") -> "SchedulingResult"
ignore_subset=context.legacy_context.materialized_requested_or_discarded_since_previous_tick_subset,
)
)
true_slice = context.asset_graph_view.get_asset_slice_from_subset(true_subset)
true_slice = context.asset_graph_view.get_asset_slice_from_valid_subset(true_subset)
return SchedulingResult.create(context, true_slice, subsets_with_metadata)


Expand Down Expand Up @@ -534,7 +536,9 @@ def evaluate_for_asset(self, context: "SchedulingContext") -> "SchedulingResult"

return SchedulingResult.create(
context,
true_slice=context.asset_graph_view.get_asset_slice_from_subset(unhandled_candidates),
true_slice=context.asset_graph_view.get_asset_slice_from_valid_subset(
unhandled_candidates
),
# we keep track of the handled subset instead of the unhandled subset because new
# partitions may spontaneously jump into existence at any time
extra_state=handled_subset,
Expand Down Expand Up @@ -588,7 +592,7 @@ def evaluate_for_asset(self, context: "SchedulingContext") -> "SchedulingResult"
asset_partitions_by_evaluation_data, ignore_subset=subset_to_evaluate
)
)
true_slice = context.asset_graph_view.get_asset_slice_from_subset(true_subset)
true_slice = context.asset_graph_view.get_asset_slice_from_valid_subset(true_subset)
return SchedulingResult.create(context, true_slice, subsets_with_metadata)


Expand Down Expand Up @@ -645,7 +649,7 @@ def evaluate_for_asset(
asset_partitions_by_evaluation_data, ignore_subset=subset_to_evaluate
)
)
true_slice = context.asset_graph_view.get_asset_slice_from_subset(true_subset)
true_slice = context.asset_graph_view.get_asset_slice_from_valid_subset(true_subset)
return SchedulingResult.create(context, true_slice, subsets_with_metadata)


Expand Down Expand Up @@ -739,7 +743,7 @@ def evaluate_for_asset(
asset_partitions_by_evaluation_data, ignore_subset=subset_to_evaluate
)
)
true_slice = context.asset_graph_view.get_asset_slice_from_subset(true_subset)
true_slice = context.asset_graph_view.get_asset_slice_from_valid_subset(true_subset)
return SchedulingResult.create(context, true_slice, subsets_with_metadata)


Expand Down Expand Up @@ -965,7 +969,7 @@ def evaluate_for_asset(self, context: "SchedulingContext") -> "SchedulingResult"

return SchedulingResult.create(
context,
true_slice=context.asset_graph_view.get_asset_slice_from_subset(
true_slice=context.asset_graph_view.get_asset_slice_from_valid_subset(
context.legacy_context.candidate_subset - all_parents_updated_subset
),
extra_state=list(updated_subsets_by_key.values()),
Expand Down Expand Up @@ -1016,7 +1020,7 @@ def evaluate_for_asset(self, context: "SchedulingContext") -> "SchedulingResult"
asset_partitions_by_evaluation_data, ignore_subset=subset_to_evaluate
)
)
true_slice = context.asset_graph_view.get_asset_slice_from_subset(true_subset)
true_slice = context.asset_graph_view.get_asset_slice_from_valid_subset(true_subset)
return SchedulingResult.create(context, true_slice, subsets_with_metadata)


Expand Down Expand Up @@ -1056,7 +1060,7 @@ def evaluate_for_asset(self, context: "SchedulingContext") -> "SchedulingResult"
else:
true_subset = context.legacy_context.candidate_subset & backfilling_subset

true_slice = context.asset_graph_view.get_asset_slice_from_subset(true_subset)
true_slice = context.asset_graph_view.get_asset_slice_from_valid_subset(true_subset)
return SchedulingResult.create(context, true_slice)


Expand Down Expand Up @@ -1085,7 +1089,7 @@ def evaluate_for_asset(self, context: "SchedulingContext") -> "SchedulingResult"

return SchedulingResult.create(
context,
context.asset_graph_view.get_asset_slice_from_subset(
context.asset_graph_view.get_asset_slice_from_valid_subset(
AssetSubset.from_asset_partitions_set(
context.legacy_context.asset_key,
context.legacy_context.partitions_def,
Expand Down Expand Up @@ -1123,13 +1127,13 @@ def evaluate_for_asset(self, context: "SchedulingContext") -> "SchedulingResult"
if dagster_run and dagster_run.status in IN_PROGRESS_RUN_STATUSES:
return SchedulingResult.create(
context,
context.asset_graph_view.get_asset_slice_from_subset(
context.asset_graph_view.get_asset_slice_from_valid_subset(
context.legacy_context.candidate_subset
),
)
return SchedulingResult.create(
context,
context.asset_graph_view.get_asset_slice_from_subset(
context.asset_graph_view.get_asset_slice_from_valid_subset(
context.legacy_context.empty_subset()
),
)
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ def evaluate(self, context: SchedulingContext) -> SchedulingResult:
previous_cron_tick = self._get_previous_cron_tick(context)

if (
# never evaluated
# never evaluated or evaluation info is no longer valid
context.previous_evaluation_info is None
or context.previous_evaluation_node is None
or context.previous_evaluation_node.true_slice is None
# not evaluated since latest schedule tick
or context.previous_evaluation_info.temporal_context.effective_dt < previous_cron_tick
# has new set of candidates
Expand All @@ -46,7 +47,7 @@ def evaluate(self, context: SchedulingContext) -> SchedulingResult:
)
# TODO: implement this on the AssetGraphView
true_slice = context.candidate_slice.compute_intersection(
context.asset_graph_view.get_asset_slice_from_subset(updated_subset)
context.asset_graph_view.get_asset_slice_from_valid_subset(updated_subset)
)
else:
true_slice = context.previous_evaluation_node.true_slice
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ class SchedulingEvaluationResultNode(DagsterModel):
asset_key: AssetKey
condition_unique_id: str

true_slice: AssetSlice
candidate_slice: AssetSlice
true_slice: Optional[AssetSlice]
candidate_slice: Optional[AssetSlice]
subsets_with_metadata: Sequence[AssetSubsetWithMetadata]

extra_state: Any
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import datetime

from dagster import AutoMaterializePolicy, Definitions, asset
from dagster._core.definitions.declarative_scheduling.legacy.asset_condition import (
AssetCondition,
)
from dagster._core.definitions.declarative_scheduling.serialized_objects import (
AssetConditionEvaluation,
)
from dagster._core.remote_representation.external_data import external_repository_data_from_def
from dagster._serdes import serialize_value
from dagster._serdes.serdes import deserialize_value

from ..base_scenario import run_request
from ..scenario_specs import (
daily_partitions_def,
day_partition_key,
one_asset,
time_partitions_start_datetime,
)
from .asset_condition_scenario import AssetConditionScenarioState


def test_missing_unpartitioned() -> None:
state = AssetConditionScenarioState(one_asset, asset_condition=AssetCondition.missing())

state, result = state.evaluate("A")
assert result.true_subset.size == 1

evaluation1 = deserialize_value(
serialize_value(AssetConditionEvaluation.from_result(result)), AssetConditionEvaluation
)

# still true
state, result = state.evaluate("A")
assert result.true_subset.size == 1

evaluation2 = AssetConditionEvaluation.from_result(result)

assert evaluation2.equivalent_to_stored_evaluation(evaluation1)

# after a run of A it's now False
state, result = state.with_runs(run_request("A")).evaluate("A")
assert result.true_subset.size == 0

# if we evaluate from scratch, it's also False
_, result = state.without_previous_evaluation_state().evaluate("A")
assert result.true_subset.size == 0


def test_missing_time_partitioned() -> None:
state = (
AssetConditionScenarioState(one_asset, asset_condition=AssetCondition.missing())
.with_asset_properties(partitions_def=daily_partitions_def)
.with_current_time(time_partitions_start_datetime)
.with_current_time_advanced(days=6, minutes=1)
)

state, result = state.evaluate("A")
assert result.true_subset.size == 6

# still true
state, result = state.evaluate("A")
assert result.true_subset.size == 6

# after two runs of A those partitions are now False
state, result = state.with_runs(
run_request("A", day_partition_key(time_partitions_start_datetime, 2)),
run_request("A", day_partition_key(time_partitions_start_datetime, 3)),
).evaluate("A")
assert result.true_subset.size == 4

# result is stable
state, result = state.evaluate("A")
assert result.true_subset.size == 4

# if the partitions definition changes, then we have 1 fewer missing partition
state = state.with_asset_properties(
partitions_def=daily_partitions_def._replace(
start=time_partitions_start_datetime + datetime.timedelta(days=1)
)
)
state, result = state.evaluate("A")
assert result.true_subset.size == 3
assert False

# if we evaluate from scratch, get the same answer
_, result = state.without_previous_evaluation_state().evaluate("A")
assert result.true_subset.size == 3


def test_serialize_definitions_with_asset_condition():
amp = AutoMaterializePolicy.from_asset_condition(
AssetCondition.parent_newer() & ~AssetCondition.updated_since_cron("0 * * * *")
)

@asset(auto_materialize_policy=amp)
def my_asset():
return 0

result = serialize_value(
external_repository_data_from_def(Definitions(assets=[my_asset]).get_repository_def())
)
assert isinstance(result, str)

0 comments on commit e138ccc

Please sign in to comment.