Skip to content

Commit

Permalink
Reorganize AssetGraphView methods
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed May 14, 2024
1 parent 5823a56 commit a049e0a
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -366,32 +366,11 @@ def get_asset_slice_from_asset_partitions(
),
)

def compute_missing_subslice(
self, asset_key: "AssetKey", from_slice: "AssetSlice"
) -> "AssetSlice":
"""Returns a slice which is the subset of the input slice that has never been materialized
(if it is a materializable asset) or observered (if it is an observable asset).
"""
# TODO: this logic should be simplified once we have a unified way of detecting both
# materializations and observations through the parittion status cache. at that point, the
# definition will slightly change to search for materializations and observations regardless
# of the materializability of the asset
if self.asset_graph.get(asset_key).is_materializable:
# cheap call which takes advantage of the partition status cache
materialized_subset = self._queryer.get_materialized_asset_subset(asset_key=asset_key)
materialized_slice = self.get_asset_slice_from_valid_subset(materialized_subset)
return from_slice.compute_difference(materialized_slice)
else:
# more expensive call
missing_asset_partitions = {
ap
for ap in from_slice.convert_to_valid_asset_subset().asset_partitions
if not self._queryer.asset_partition_has_materialization_or_observation(ap)
}
missing_subset = ValidAssetSubset.from_asset_partitions_set(
asset_key, self._get_partitions_def(asset_key), missing_asset_partitions
)
return self.get_asset_slice_from_valid_subset(missing_subset)
def create_empty_slice(self, asset_key: AssetKey) -> AssetSlice:
return _slice_from_subset(
self,
AssetSubset.empty(asset_key, self._get_partitions_def(asset_key)),
)

def compute_parent_asset_slice(
self, parent_asset_key: AssetKey, asset_slice: AssetSlice
Expand Down Expand Up @@ -419,11 +398,6 @@ def compute_child_asset_slice(
),
)

def compute_in_progress_asset_slice(self, asset_key: "AssetKey") -> "AssetSlice":
return _slice_from_subset(
self, self._queryer.get_in_progress_asset_subset(asset_key=asset_key)
)

def compute_intersection_with_partition_keys(
self, partition_keys: AbstractSet[str], asset_slice: AssetSlice
) -> "AssetSlice":
Expand Down Expand Up @@ -490,9 +464,42 @@ def compute_latest_time_window_slice(
else:
check.failed(f"Unsupported partitions_def: {partitions_def}")

def compute_missing_subslice(
self, asset_key: "AssetKey", from_slice: "AssetSlice"
) -> "AssetSlice":
"""Returns a slice which is the subset of the input slice that has never been materialized
(if it is a materializable asset) or observered (if it is an observable asset).
"""
# TODO: this logic should be simplified once we have a unified way of detecting both
# materializations and observations through the parittion status cache. at that point, the
# definition will slightly change to search for materializations and observations regardless
# of the materializability of the asset
if self.asset_graph.get(asset_key).is_materializable:
# cheap call which takes advantage of the partition status cache
materialized_subset = self._queryer.get_materialized_asset_subset(asset_key=asset_key)
materialized_slice = self.get_asset_slice_from_valid_subset(materialized_subset)
return from_slice.compute_difference(materialized_slice)
else:
# more expensive call
missing_asset_partitions = {
ap
for ap in from_slice.convert_to_valid_asset_subset().asset_partitions
if not self._queryer.asset_partition_has_materialization_or_observation(ap)
}
missing_subset = ValidAssetSubset.from_asset_partitions_set(
asset_key, self._get_partitions_def(asset_key), missing_asset_partitions
)
return self.get_asset_slice_from_valid_subset(missing_subset)

@cached_method
def compute_in_progress_asset_slice(self, *, asset_key: "AssetKey") -> "AssetSlice":
return _slice_from_subset(
self, self._queryer.get_in_progress_asset_subset(asset_key=asset_key)
)

@cached_method
def compute_updated_since_cursor_slice(
self, asset_key: AssetKey, cursor: Optional[int]
self, *, asset_key: AssetKey, cursor: Optional[int]
) -> AssetSlice:
subset = self._queryer.get_asset_subset_updated_after_cursor(
asset_key=asset_key, after_cursor=cursor
Expand All @@ -501,7 +508,7 @@ def compute_updated_since_cursor_slice(

@cached_method
def compute_parent_updated_since_cursor_slice(
self, asset_key: AssetKey, cursor: Optional[int]
self, *, asset_key: AssetKey, cursor: Optional[int]
) -> AssetSlice:
result_slice = self.create_empty_slice(asset_key)
for parent_key in self.asset_graph.get(asset_key).parent_keys:
Expand All @@ -512,12 +519,6 @@ def compute_parent_updated_since_cursor_slice(
)
return result_slice

def create_empty_slice(self, asset_key: AssetKey) -> AssetSlice:
return _slice_from_subset(
self,
AssetSubset.empty(asset_key, self._get_partitions_def(asset_key)),
)

class MultiDimInfo(NamedTuple):
tw_dim: PartitionDimensionDefinition
secondary_dim: PartitionDimensionDefinition
Expand Down Expand Up @@ -577,9 +578,3 @@ def _build_multi_partition_slice(
)
}
)


def _required_tw_partitions_def(
partitions_def: Optional["PartitionsDefinition"],
) -> TimeWindowPartitionsDefinition:
return check.inst(partitions_def, TimeWindowPartitionsDefinition, "Must be time windowed.")
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def description(self) -> str:
return "Part of an in-progress run"

def compute_slice(self, context: SchedulingContext) -> AssetSlice:
return context.asset_graph_view.compute_in_progress_asset_slice(context.asset_key)
return context.asset_graph_view.compute_in_progress_asset_slice(asset_key=context.asset_key)


@whitelist_for_serdes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def evaluate(self, context: SchedulingContext) -> SchedulingResult:
# since the previous evaluation
true_slice = context.previous_evaluation_node.true_slice.compute_union(
context.asset_graph_view.compute_updated_since_cursor_slice(
context.asset_key, context.previous_evaluation_max_storage_id
asset_key=context.asset_key, cursor=context.previous_evaluation_max_storage_id
)
)

Expand Down

0 comments on commit a049e0a

Please sign in to comment.