Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DS][27/n] Make candidate_slice and true_slice Optional #21648

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,13 @@ def get_asset_slice(self, asset_key: "AssetKey") -> "AssetSlice":
),
)

def get_asset_slice_from_subset(self, subset: AssetSubset) -> "AssetSlice":
def get_asset_slice_from_subset(self, subset: AssetSubset) -> Optional["AssetSlice"]:
if subset.is_compatible_with_partitions_def(self._get_partitions_def(subset.asset_key)):
return _slice_from_subset(self, subset)
else:
return None

def get_asset_slice_from_valid_subset(self, subset: ValidAssetSubset) -> "AssetSlice":
return _slice_from_subset(self, subset)

def compute_missing_subslice(
Expand All @@ -353,7 +359,7 @@ def compute_missing_subslice(
if self.asset_graph.get(asset_key).is_materializable:
# cheap call which takes advantage of the partition status cache
materialized_subset = self._queryer.get_materialized_asset_subset(asset_key=asset_key)
materialized_slice = self.get_asset_slice_from_subset(materialized_subset)
materialized_slice = self.get_asset_slice_from_valid_subset(materialized_subset)
return from_slice.compute_difference(materialized_slice)
else:
# more expensive call
Expand All @@ -365,7 +371,7 @@ def compute_missing_subslice(
missing_subset = ValidAssetSubset.from_asset_partitions_set(
asset_key, self._get_partitions_def(asset_key), missing_asset_partitions
)
return self.get_asset_slice_from_subset(missing_subset)
return self.get_asset_slice_from_valid_subset(missing_subset)

def compute_parent_asset_slice(
self, parent_asset_key: AssetKey, asset_slice: AssetSlice
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def evaluate_for_asset(self, context: "SchedulingContext") -> "SchedulingResult"
true_subset, subsets_with_metadata = freshness_evaluation_results_for_asset_key(
context.legacy_context.root_context
)
true_slice = context.asset_graph_view.get_asset_slice_from_subset(true_subset)
true_slice = context.asset_graph_view.get_asset_slice_from_valid_subset(true_subset)
return SchedulingResult.create(context, true_slice, subsets_with_metadata)


Expand Down Expand Up @@ -217,7 +217,9 @@ def evaluate_for_asset(self, context: "SchedulingContext") -> "SchedulingResult"
- context.legacy_context.materialized_requested_or_discarded_since_previous_tick_subset
)

true_slice = context.asset_graph_view.get_asset_slice_from_subset(asset_subset_to_request)
true_slice = context.asset_graph_view.get_asset_slice_from_valid_subset(
asset_subset_to_request
)
return SchedulingResult.create(context, true_slice=true_slice)


Expand Down Expand Up @@ -435,7 +437,7 @@ def evaluate_for_asset(self, context: "SchedulingContext") -> "SchedulingResult"
ignore_subset=context.legacy_context.materialized_requested_or_discarded_since_previous_tick_subset,
)
)
true_slice = context.asset_graph_view.get_asset_slice_from_subset(true_subset)
true_slice = context.asset_graph_view.get_asset_slice_from_valid_subset(true_subset)
return SchedulingResult.create(context, true_slice, subsets_with_metadata)


Expand Down Expand Up @@ -534,7 +536,9 @@ def evaluate_for_asset(self, context: "SchedulingContext") -> "SchedulingResult"

return SchedulingResult.create(
context,
true_slice=context.asset_graph_view.get_asset_slice_from_subset(unhandled_candidates),
true_slice=context.asset_graph_view.get_asset_slice_from_valid_subset(
unhandled_candidates
),
# we keep track of the handled subset instead of the unhandled subset because new
# partitions may spontaneously jump into existence at any time
extra_state=handled_subset,
Expand Down Expand Up @@ -588,7 +592,7 @@ def evaluate_for_asset(self, context: "SchedulingContext") -> "SchedulingResult"
asset_partitions_by_evaluation_data, ignore_subset=subset_to_evaluate
)
)
true_slice = context.asset_graph_view.get_asset_slice_from_subset(true_subset)
true_slice = context.asset_graph_view.get_asset_slice_from_valid_subset(true_subset)
return SchedulingResult.create(context, true_slice, subsets_with_metadata)


Expand Down Expand Up @@ -645,7 +649,7 @@ def evaluate_for_asset(
asset_partitions_by_evaluation_data, ignore_subset=subset_to_evaluate
)
)
true_slice = context.asset_graph_view.get_asset_slice_from_subset(true_subset)
true_slice = context.asset_graph_view.get_asset_slice_from_valid_subset(true_subset)
return SchedulingResult.create(context, true_slice, subsets_with_metadata)


Expand Down Expand Up @@ -739,7 +743,7 @@ def evaluate_for_asset(
asset_partitions_by_evaluation_data, ignore_subset=subset_to_evaluate
)
)
true_slice = context.asset_graph_view.get_asset_slice_from_subset(true_subset)
true_slice = context.asset_graph_view.get_asset_slice_from_valid_subset(true_subset)
return SchedulingResult.create(context, true_slice, subsets_with_metadata)


Expand Down Expand Up @@ -965,7 +969,7 @@ def evaluate_for_asset(self, context: "SchedulingContext") -> "SchedulingResult"

return SchedulingResult.create(
context,
true_slice=context.asset_graph_view.get_asset_slice_from_subset(
true_slice=context.asset_graph_view.get_asset_slice_from_valid_subset(
context.legacy_context.candidate_subset - all_parents_updated_subset
),
extra_state=list(updated_subsets_by_key.values()),
Expand Down Expand Up @@ -1016,7 +1020,7 @@ def evaluate_for_asset(self, context: "SchedulingContext") -> "SchedulingResult"
asset_partitions_by_evaluation_data, ignore_subset=subset_to_evaluate
)
)
true_slice = context.asset_graph_view.get_asset_slice_from_subset(true_subset)
true_slice = context.asset_graph_view.get_asset_slice_from_valid_subset(true_subset)
return SchedulingResult.create(context, true_slice, subsets_with_metadata)


Expand Down Expand Up @@ -1056,7 +1060,7 @@ def evaluate_for_asset(self, context: "SchedulingContext") -> "SchedulingResult"
else:
true_subset = context.legacy_context.candidate_subset & backfilling_subset

true_slice = context.asset_graph_view.get_asset_slice_from_subset(true_subset)
true_slice = context.asset_graph_view.get_asset_slice_from_valid_subset(true_subset)
return SchedulingResult.create(context, true_slice)


Expand Down Expand Up @@ -1085,7 +1089,7 @@ def evaluate_for_asset(self, context: "SchedulingContext") -> "SchedulingResult"

return SchedulingResult.create(
context,
context.asset_graph_view.get_asset_slice_from_subset(
context.asset_graph_view.get_asset_slice_from_valid_subset(
AssetSubset.from_asset_partitions_set(
context.legacy_context.asset_key,
context.legacy_context.partitions_def,
Expand Down Expand Up @@ -1123,13 +1127,13 @@ def evaluate_for_asset(self, context: "SchedulingContext") -> "SchedulingResult"
if dagster_run and dagster_run.status in IN_PROGRESS_RUN_STATUSES:
return SchedulingResult.create(
context,
context.asset_graph_view.get_asset_slice_from_subset(
context.asset_graph_view.get_asset_slice_from_valid_subset(
context.legacy_context.candidate_subset
),
)
return SchedulingResult.create(
context,
context.asset_graph_view.get_asset_slice_from_subset(
context.asset_graph_view.get_asset_slice_from_valid_subset(
context.legacy_context.empty_subset()
),
)
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ def evaluate(self, context: SchedulingContext) -> SchedulingResult:
previous_cron_tick = self._get_previous_cron_tick(context)

if (
# never evaluated
# never evaluated or evaluation info is no longer valid
context.previous_evaluation_info is None
or context.previous_evaluation_node is None
or context.previous_evaluation_node.true_slice is None
# not evaluated since latest schedule tick
or context.previous_evaluation_info.temporal_context.effective_dt < previous_cron_tick
# has new set of candidates
Expand All @@ -46,7 +47,7 @@ def evaluate(self, context: SchedulingContext) -> SchedulingResult:
)
# TODO: implement this on the AssetGraphView
true_slice = context.candidate_slice.compute_intersection(
context.asset_graph_view.get_asset_slice_from_subset(updated_subset)
context.asset_graph_view.get_asset_slice_from_valid_subset(updated_subset)
)
else:
true_slice = context.previous_evaluation_node.true_slice
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ class SchedulingEvaluationResultNode(DagsterModel):
asset_key: AssetKey
condition_unique_id: str

true_slice: AssetSlice
candidate_slice: AssetSlice
true_slice: Optional[AssetSlice]
candidate_slice: Optional[AssetSlice]
subsets_with_metadata: Sequence[AssetSubsetWithMetadata]

extra_state: Any
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import dagster._check as check
import pytest
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_subset import AssetSubset
Expand Down Expand Up @@ -35,9 +36,11 @@ def evaluate(self, context: SchedulingContext) -> SchedulingResult:
).partitions_def
return SchedulingResult.create(
context,
true_slice=context.asset_graph_view.get_asset_slice_from_subset(
AssetSubset.from_asset_partitions_set(
context.asset_key, partitions_def, true_candidates
true_slice=check.not_none(
context.asset_graph_view.get_asset_slice_from_subset(
AssetSubset.from_asset_partitions_set(
context.asset_key, partitions_def, true_candidates
)
)
),
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import datetime

from dagster import AutoMaterializePolicy, Definitions, asset
from dagster._core.definitions.declarative_scheduling.legacy.asset_condition import (
AssetCondition,
)
from dagster._core.definitions.declarative_scheduling.serialized_objects import (
AssetConditionEvaluation,
)
from dagster._core.remote_representation.external_data import external_repository_data_from_def
from dagster._serdes import serialize_value
from dagster._serdes.serdes import deserialize_value

from ..base_scenario import run_request
from ..scenario_specs import (
daily_partitions_def,
day_partition_key,
one_asset,
time_partitions_start_datetime,
)
from .asset_condition_scenario import AssetConditionScenarioState


def test_missing_unpartitioned() -> None:
state = AssetConditionScenarioState(one_asset, asset_condition=AssetCondition.missing())

state, result = state.evaluate("A")
assert result.true_subset.size == 1

evaluation1 = deserialize_value(
serialize_value(AssetConditionEvaluation.from_result(result)), AssetConditionEvaluation
)

# still true
state, result = state.evaluate("A")
assert result.true_subset.size == 1

evaluation2 = AssetConditionEvaluation.from_result(result)

assert evaluation2.equivalent_to_stored_evaluation(evaluation1)

# after a run of A it's now False
state, result = state.with_runs(run_request("A")).evaluate("A")
assert result.true_subset.size == 0

# if we evaluate from scratch, it's also False
_, result = state.without_previous_evaluation_state().evaluate("A")
assert result.true_subset.size == 0


def test_missing_time_partitioned() -> None:
state = (
AssetConditionScenarioState(one_asset, asset_condition=AssetCondition.missing())
.with_asset_properties(partitions_def=daily_partitions_def)
.with_current_time(time_partitions_start_datetime)
.with_current_time_advanced(days=6, minutes=1)
)

state, result = state.evaluate("A")
assert result.true_subset.size == 6

# still true
state, result = state.evaluate("A")
assert result.true_subset.size == 6

# after two runs of A those partitions are now False
state, result = state.with_runs(
run_request("A", day_partition_key(time_partitions_start_datetime, 2)),
run_request("A", day_partition_key(time_partitions_start_datetime, 3)),
).evaluate("A")
assert result.true_subset.size == 4

# result is stable
state, result = state.evaluate("A")
assert result.true_subset.size == 4

# if the partitions definition changes, then we have 1 fewer missing partition
state = state.with_asset_properties(
partitions_def=daily_partitions_def._replace(
start=time_partitions_start_datetime + datetime.timedelta(days=1)
)
)
state, result = state.evaluate("A")
assert result.true_subset.size == 3

# if we evaluate from scratch, get the same answer
_, result = state.without_previous_evaluation_state().evaluate("A")
assert result.true_subset.size == 3


def test_serialize_definitions_with_asset_condition():
amp = AutoMaterializePolicy.from_asset_condition(
AssetCondition.parent_newer() & ~AssetCondition.updated_since_cron("0 * * * *")
)

@asset(auto_materialize_policy=amp)
def my_asset():
return 0

result = serialize_value(
external_repository_data_from_def(Definitions(assets=[my_asset]).get_repository_def())
)
assert isinstance(result, str)