Skip to content

Commit

Permalink
Remove all AssetSubset references from the context and result objects
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed May 2, 2024
1 parent 8bc6892 commit e5e6ed8
Show file tree
Hide file tree
Showing 11 changed files with 123 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,16 @@ def compute_difference(self, other: "AssetSlice") -> "AssetSlice":
self._asset_graph_view, self._compatible_subset - other.convert_to_valid_asset_subset()
)

def compute_union(self, other: "AssetSlice") -> "AssetSlice":
return _slice_from_subset(
self._asset_graph_view, self._compatible_subset | other.convert_to_valid_asset_subset()
)

def compute_intersection(self, other: "AssetSlice") -> "AssetSlice":
return _slice_from_subset(
self._asset_graph_view, self._compatible_subset & other.convert_to_valid_asset_subset()
)

def compute_intersection_with_partition_keys(
self, partition_keys: AbstractSet[str]
) -> "AssetSlice":
Expand Down Expand Up @@ -327,7 +337,7 @@ def get_asset_slice(self, asset_key: "AssetKey") -> "AssetSlice":
),
)

def get_asset_slice_from_subset(self, subset: ValidAssetSubset) -> "AssetSlice":
def get_asset_slice_from_subset(self, subset: AssetSubset) -> "AssetSlice":
return _slice_from_subset(self, subset)

def compute_parent_asset_slice(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ def evaluate_for_asset(self, context: "SchedulingContext") -> "SchedulingResult"
true_subset, subsets_with_metadata = freshness_evaluation_results_for_asset_key(
context.legacy_context.root_context
)
return SchedulingResult.create(context, true_subset, subsets_with_metadata)
true_slice = context.asset_graph_view.get_asset_slice_from_subset(true_subset)
return SchedulingResult.create(context, true_slice, subsets_with_metadata)


@whitelist_for_serdes
Expand Down Expand Up @@ -216,7 +217,8 @@ def evaluate_for_asset(self, context: "SchedulingContext") -> "SchedulingResult"
- context.legacy_context.materialized_requested_or_discarded_since_previous_tick_subset
)

return SchedulingResult.create(context, true_subset=asset_subset_to_request)
true_slice = context.asset_graph_view.get_asset_slice_from_subset(asset_subset_to_request)
return SchedulingResult.create(context, true_slice=true_slice)


@whitelist_for_serdes
Expand Down Expand Up @@ -433,7 +435,8 @@ def evaluate_for_asset(self, context: "SchedulingContext") -> "SchedulingResult"
ignore_subset=context.legacy_context.materialized_requested_or_discarded_since_previous_tick_subset,
)
)
return SchedulingResult.create(context, true_subset, subsets_with_metadata)
true_slice = context.asset_graph_view.get_asset_slice_from_subset(true_subset)
return SchedulingResult.create(context, true_slice, subsets_with_metadata)


@whitelist_for_serdes
Expand Down Expand Up @@ -531,7 +534,7 @@ def evaluate_for_asset(self, context: "SchedulingContext") -> "SchedulingResult"

return SchedulingResult.create(
context,
true_subset=unhandled_candidates,
true_slice=context.asset_graph_view.get_asset_slice_from_subset(unhandled_candidates),
# we keep track of the handled subset instead of the unhandled subset because new
# partitions may spontaneously jump into existence at any time
extra_state=handled_subset,
Expand Down Expand Up @@ -585,7 +588,8 @@ def evaluate_for_asset(self, context: "SchedulingContext") -> "SchedulingResult"
asset_partitions_by_evaluation_data, ignore_subset=subset_to_evaluate
)
)
return SchedulingResult.create(context, true_subset, subsets_with_metadata)
true_slice = context.asset_graph_view.get_asset_slice_from_subset(true_subset)
return SchedulingResult.create(context, true_slice, subsets_with_metadata)


@whitelist_for_serdes
Expand Down Expand Up @@ -641,7 +645,8 @@ def evaluate_for_asset(
asset_partitions_by_evaluation_data, ignore_subset=subset_to_evaluate
)
)
return SchedulingResult.create(context, true_subset, subsets_with_metadata)
true_slice = context.asset_graph_view.get_asset_slice_from_subset(true_subset)
return SchedulingResult.create(context, true_slice, subsets_with_metadata)


@whitelist_for_serdes
Expand Down Expand Up @@ -734,7 +739,8 @@ def evaluate_for_asset(
asset_partitions_by_evaluation_data, ignore_subset=subset_to_evaluate
)
)
return SchedulingResult.create(context, true_subset, subsets_with_metadata)
true_slice = context.asset_graph_view.get_asset_slice_from_subset(true_subset)
return SchedulingResult.create(context, true_slice, subsets_with_metadata)


@whitelist_for_serdes
Expand Down Expand Up @@ -959,7 +965,9 @@ def evaluate_for_asset(self, context: "SchedulingContext") -> "SchedulingResult"

return SchedulingResult.create(
context,
true_subset=context.legacy_context.candidate_subset - all_parents_updated_subset,
true_slice=context.asset_graph_view.get_asset_slice_from_subset(
context.legacy_context.candidate_subset - all_parents_updated_subset
),
extra_state=list(updated_subsets_by_key.values()),
)

Expand Down Expand Up @@ -1008,7 +1016,8 @@ def evaluate_for_asset(self, context: "SchedulingContext") -> "SchedulingResult"
asset_partitions_by_evaluation_data, ignore_subset=subset_to_evaluate
)
)
return SchedulingResult.create(context, true_subset, subsets_with_metadata)
true_slice = context.asset_graph_view.get_asset_slice_from_subset(true_subset)
return SchedulingResult.create(context, true_slice, subsets_with_metadata)


@whitelist_for_serdes
Expand Down Expand Up @@ -1047,7 +1056,8 @@ def evaluate_for_asset(self, context: "SchedulingContext") -> "SchedulingResult"
else:
true_subset = context.legacy_context.candidate_subset & backfilling_subset

return SchedulingResult.create(context, true_subset)
true_slice = context.asset_graph_view.get_asset_slice_from_subset(true_subset)
return SchedulingResult.create(context, true_slice)


@whitelist_for_serdes
Expand Down Expand Up @@ -1075,10 +1085,12 @@ def evaluate_for_asset(self, context: "SchedulingContext") -> "SchedulingResult"

return SchedulingResult.create(
context,
AssetSubset.from_asset_partitions_set(
context.legacy_context.asset_key,
context.legacy_context.partitions_def,
rate_limited_asset_partitions,
context.asset_graph_view.get_asset_slice_from_subset(
AssetSubset.from_asset_partitions_set(
context.legacy_context.asset_key,
context.legacy_context.partitions_def,
rate_limited_asset_partitions,
)
),
)

Expand Down Expand Up @@ -1109,5 +1121,15 @@ def evaluate_for_asset(self, context: "SchedulingContext") -> "SchedulingResult"
if planned_materialization_info:
dagster_run = instance.get_run_by_id(planned_materialization_info.run_id)
if dagster_run and dagster_run.status in IN_PROGRESS_RUN_STATUSES:
return SchedulingResult.create(context, context.legacy_context.candidate_subset)
return SchedulingResult.create(context, context.legacy_context.empty_subset())
return SchedulingResult.create(
context,
context.asset_graph_view.get_asset_slice_from_subset(
context.legacy_context.candidate_subset
),
)
return SchedulingResult.create(
context,
context.asset_graph_view.get_asset_slice_from_subset(
context.legacy_context.empty_subset()
),
)
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def evaluate(self, context: SchedulingContext) -> SchedulingResult:
else:
true_slice = self.compute_slice(context)

return SchedulingResult.create(context, true_slice.convert_to_valid_asset_subset())
return SchedulingResult.create(context, true_slice)


@whitelist_for_serdes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,28 +30,25 @@ def evaluate(self, context: SchedulingContext) -> SchedulingResult:
# never evaluated
context.previous_evaluation_info is None
or context.previous_evaluation_node is None
# partitions def has changed
or not context.previous_evaluation_node.true_subset.is_compatible_with_partitions_def(
context.partitions_def
)
# not evaluated since latest schedule tick
or context.previous_evaluation_info.temporal_context.effective_dt < previous_cron_tick
# has new set of candidates
or context.has_new_candidate_subset()
or context.previous_evaluation_node.candidate_slice != context.candidate_slice
# asset updated since latest evaluation
or context.asset_updated_since_previous_tick()
):
# do a full recomputation
true_subset = (
context.candidate_subset
& context._queryer.get_asset_subset_updated_after_time( # noqa
updated_subset = (
context.legacy_context.instance_queryer.get_asset_subset_updated_after_time(
asset_key=context.asset_key,
after_time=previous_cron_tick,
)
)
else:
true_subset = context.previous_evaluation_node.true_subset.as_valid(
context.partitions_def
# TODO: implement this on the AssetGraphView
true_slice = context.candidate_slice.compute_intersection(
context.asset_graph_view.get_asset_slice_from_subset(updated_subset)
)
else:
true_slice = context.previous_evaluation_node.true_slice

return SchedulingResult.create(context, true_subset)
return SchedulingResult.create(context, true_slice)
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ def description(self) -> str:

def evaluate(self, context: SchedulingContext) -> SchedulingResult:
child_results: List[SchedulingResult] = []
true_subset = context.candidate_subset
true_slice = context.candidate_slice
for child in self.children:
child_context = context.for_child_condition(
child_condition=child, candidate_subset=true_subset
child_condition=child, candidate_slice=true_slice
)
child_result = child.evaluate(child_context)
child_results.append(child_result)
true_subset &= child_result.true_subset
return SchedulingResult.create_from_children(context, true_subset, child_results)
true_slice = true_slice.compute_intersection(child_result.true_slice)
return SchedulingResult.create_from_children(context, true_slice, child_results)


@experimental
Expand All @@ -52,17 +52,16 @@ def description(self) -> str:

def evaluate(self, context: SchedulingContext) -> SchedulingResult:
child_results: List[SchedulingResult] = []
true_subset = context.asset_graph_view.create_empty_slice(
context.asset_key
).convert_to_valid_asset_subset()
true_slice = context.asset_graph_view.create_empty_slice(context.asset_key)
for child in self.children:
child_context = context.for_child_condition(
child_condition=child, candidate_subset=context.candidate_subset
child_condition=child, candidate_slice=context.candidate_slice
)
child_result = child.evaluate(child_context)
child_results.append(child_result)
true_subset |= child_result.true_subset
return SchedulingResult.create_from_children(context, true_subset, child_results)
true_slice = true_slice.compute_union(child_result.true_slice)

return SchedulingResult.create_from_children(context, true_slice, child_results)


@experimental
Expand All @@ -82,9 +81,9 @@ def children(self) -> Sequence[SchedulingCondition]:

def evaluate(self, context: SchedulingContext) -> SchedulingResult:
child_context = context.for_child_condition(
child_condition=self.operand, candidate_subset=context.candidate_subset
child_condition=self.operand, candidate_slice=context.candidate_slice
)
child_result = self.operand.evaluate(child_context)
true_subset = context.candidate_subset - child_result.true_subset
true_slice = context.candidate_slice.compute_difference(child_result.true_slice)

return SchedulingResult.create_from_children(context, true_subset, [child_result])
return SchedulingResult.create_from_children(context, true_slice, [child_result])
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,19 @@ def description(self) -> str:
return f"{self.dep_key.to_user_string()}"

def evaluate(self, context: SchedulingContext) -> SchedulingResult:
# only evaluate parents of the current candidate subset
dep_candidate_subset = context.candidate_slice.compute_parent_slice(
self.dep_key
).convert_to_valid_asset_subset()
# only evaluate parents of the current candidates
dep_candidate_slice = context.candidate_slice.compute_parent_slice(self.dep_key)
dep_context = context.for_child_condition(
child_condition=self.operand,
candidate_subset=dep_candidate_subset,
child_condition=self.operand, candidate_slice=dep_candidate_slice
)

# evaluate condition against the dependency
dep_result = self.operand.evaluate(dep_context)

# find all children of the true dep slice
true_subset = dep_result.true_slice.compute_child_slice(
context.asset_key
).convert_to_valid_asset_subset()
true_slice = dep_result.true_slice.compute_child_slice(context.asset_key)
return SchedulingResult.create_from_children(
context=context,
true_subset=true_subset,
child_results=[dep_result],
context=context, true_slice=true_slice, child_results=[dep_result]
)


Expand All @@ -53,23 +46,22 @@ def description(self) -> str:

def evaluate(self, context: SchedulingContext) -> SchedulingResult:
dep_results = []
true_subset = context.asset_graph_view.create_empty_slice(
context.asset_key
).convert_to_valid_asset_subset()
true_slice = context.asset_graph_view.create_empty_slice(context.asset_key)

dep_keys = context.asset_graph_view.asset_graph.get(context.asset_key).parent_keys
for dep_key in dep_keys:
dep_condition = DepConditionWrapperCondition(dep_key=dep_key, operand=self.operand)
dep_result = dep_condition.evaluate(
context.for_child_condition(
child_condition=dep_condition, candidate_subset=context.candidate_subset
child_condition=dep_condition, candidate_slice=context.candidate_slice
)
)
dep_results.append(dep_result)
true_subset |= dep_result.true_subset
true_slice = true_slice.compute_union(dep_result.true_slice)

true_slice = context.candidate_slice.compute_intersection(true_slice)
return SchedulingResult.create_from_children(
context, true_subset=context.candidate_subset & true_subset, child_results=dep_results
context, true_slice=true_slice, child_results=dep_results
)


Expand All @@ -83,19 +75,19 @@ def description(self) -> str:

def evaluate(self, context: SchedulingContext) -> SchedulingResult:
dep_results = []
true_subset = context.candidate_subset
true_slice = context.candidate_slice

dep_keys = context.asset_graph_view.asset_graph.get(context.asset_key).parent_keys
for dep_key in dep_keys:
dep_condition = DepConditionWrapperCondition(dep_key=dep_key, operand=self.operand)
dep_result = dep_condition.evaluate(
context.for_child_condition(
child_condition=dep_condition, candidate_subset=context.candidate_subset
child_condition=dep_condition, candidate_slice=context.candidate_slice
)
)
dep_results.append(dep_result)
true_subset &= dep_result.true_subset
true_slice = true_slice.compute_intersection(dep_result.true_slice)

return SchedulingResult.create_from_children(
context, true_subset=true_subset, child_results=dep_results
context, true_slice=true_slice, child_results=dep_results
)
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pendulum

from dagster._core.asset_graph_view.asset_graph_view import AssetSlice
from dagster._core.definitions.asset_subset import AssetSubset, ValidAssetSubset
from dagster._core.definitions.asset_subset import AssetSubset
from dagster._core.definitions.declarative_scheduling.serialized_objects import (
AssetConditionSnapshot,
AssetSubsetWithMetadata,
Expand Down Expand Up @@ -152,7 +152,7 @@ def true_subset(self) -> AssetSubset:
@staticmethod
def create_from_children(
context: "SchedulingContext",
true_subset: ValidAssetSubset,
true_slice: AssetSlice,
child_results: Sequence["SchedulingResult"],
) -> "SchedulingResult":
"""Returns a new AssetConditionEvaluation from the given child results."""
Expand All @@ -161,8 +161,8 @@ def create_from_children(
condition_unique_id=context.condition_unique_id,
start_timestamp=context.create_time.timestamp(),
end_timestamp=pendulum.now("UTC").timestamp(),
true_slice=context.asset_graph_view.get_asset_slice_from_subset(true_subset),
candidate_subset=context.candidate_subset,
true_slice=true_slice,
candidate_subset=context.candidate_slice.convert_to_valid_asset_subset(),
subsets_with_metadata=[],
child_results=child_results,
extra_state=None,
Expand All @@ -171,7 +171,7 @@ def create_from_children(
@staticmethod
def create(
context: "SchedulingContext",
true_subset: ValidAssetSubset,
true_slice: AssetSlice,
subsets_with_metadata: Sequence[AssetSubsetWithMetadata] = [],
extra_state: PackableValue = None,
) -> "SchedulingResult":
Expand All @@ -181,8 +181,8 @@ def create(
condition_unique_id=context.condition_unique_id,
start_timestamp=context.create_time.timestamp(),
end_timestamp=pendulum.now("UTC").timestamp(),
true_slice=context.asset_graph_view.get_asset_slice_from_subset(true_subset),
candidate_subset=context.candidate_subset,
true_slice=true_slice,
candidate_subset=context.candidate_slice.convert_to_valid_asset_subset(),
subsets_with_metadata=subsets_with_metadata,
child_results=[],
extra_state=extra_state,
Expand Down
Loading

0 comments on commit e5e6ed8

Please sign in to comment.