Skip to content

Commit

Permalink
Raise error upon incomplete graph-backed asset subset (#8041)
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed May 25, 2022
1 parent 407e698 commit 84ccc32
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21825,7 +21825,7 @@
}
]
},
"description": null,
"description": "",
"graph_def_name": "foo_job",
"lineage_snapshot": null,
"mode_def_snaps": [
Expand Down Expand Up @@ -23175,7 +23175,7 @@
"tags": {}
}'''

snapshots['test_all_snapshot_ids 30'] = '046bc61b12af12c19ec9f2f0e9bbf886b727a1ea'
snapshots['test_all_snapshot_ids 30'] = '8d28524540c0b45084583be03e12091ad9ab1b5a'

snapshots['test_all_snapshot_ids 31'] = '''{
"__class__": "PipelineSnapshot",
Expand Down Expand Up @@ -24478,7 +24478,7 @@
}
]
},
"description": null,
"description": "",
"graph_def_name": "hanging_graph_asset_job",
"lineage_snapshot": null,
"mode_def_snaps": [
Expand Down Expand Up @@ -24773,7 +24773,7 @@
"tags": {}
}'''

snapshots['test_all_snapshot_ids 32'] = 'af958a0fa4e0ec4c58474480fd769f01d9706833'
snapshots['test_all_snapshot_ids 32'] = '049a7c852520e5ba22784c40172825640d0b2ef2'

snapshots['test_all_snapshot_ids 33'] = '''{
"__class__": "PipelineSnapshot",
Expand Down
96 changes: 26 additions & 70 deletions python_modules/dagster/dagster/core/asset_defs/asset_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,17 @@
from importlib import import_module
from types import ModuleType
from typing import (
AbstractSet,
Any,
Dict,
FrozenSet,
Generator,
Iterable,
List,
Mapping,
Optional,
Sequence,
Set,
Tuple,
Union,
cast,
)

import dagster._check as check
Expand All @@ -27,10 +25,12 @@
from dagster.core.definitions.executor_definition import in_process_executor
from dagster.core.errors import DagsterUnmetExecutorRequirementsError
from dagster.core.execution.execute_in_process_result import ExecuteInProcessResult
from dagster.core.selector.subset_selector import AssetSelectionData
from dagster.core.storage.fs_asset_io_manager import fs_asset_io_manager
from dagster.utils import merge_dicts
from dagster.utils.backcompat import ExperimentalWarning

from ..definitions.asset_layer import build_asset_selection_job
from ..definitions.executor_definition import ExecutorDefinition
from ..definitions.job_definition import JobDefinition
from ..definitions.partition import PartitionsDefinition
Expand Down Expand Up @@ -163,6 +163,7 @@ def build_job(
executor_def: Optional[ExecutorDefinition] = None,
tags: Optional[Dict[str, Any]] = None,
description: Optional[str] = None,
_asset_selection_data: Optional[AssetSelectionData] = None,
) -> JobDefinition:
"""Defines an executable job from the provided assets, resources, and executor.
Expand Down Expand Up @@ -204,72 +205,34 @@ def build_job(
from dagster.core.selector.subset_selector import parse_asset_selection

check.str_param(name, "name")
check.opt_inst_param(_asset_selection_data, "_asset_selection_data", AssetSelectionData)

if not isinstance(selection, str):
selected_asset_keys: FrozenSet[AssetKey] = frozenset()
if isinstance(selection, str):
selected_asset_keys = parse_asset_selection(self.assets, [selection])
elif isinstance(selection, list):
selection = check.opt_list_param(selection, "selection", of_type=str)
else:
selection = [selection]
selected_asset_keys = parse_asset_selection(self.assets, selection)
elif isinstance(selection, FrozenSet):
check.opt_set_param(selection, "selection", of_type=AssetKey)
selected_asset_keys = selection

executor_def = check.opt_inst_param(
executor_def, "executor_def", ExecutorDefinition, self.executor_def
)
description = check.opt_str_param(description, "description")
resource_defs = build_resource_defs(self.resource_defs, self.source_assets)
description = check.opt_str_param(description, "description", "")
tags = check.opt_dict_param(tags, "tags", key_type=str)

if selection:
selected_asset_keys = parse_asset_selection(self.assets, selection)
included_assets, excluded_assets = self._subset_assets_defs(selected_asset_keys)
else:
included_assets = cast(List[AssetsDefinition], self.assets)
# Call to list(...) serves as a copy constructor, so that we don't
# accidentally add to the original list
excluded_assets = list(self.source_assets)

with warnings.catch_warnings():
warnings.simplefilter("ignore", category=ExperimentalWarning)
asset_job = build_assets_job(
name=name,
assets=included_assets,
source_assets=excluded_assets,
resource_defs=resource_defs,
executor_def=executor_def,
description=description,
tags=tags,
)
return asset_job

def _subset_assets_defs(
self, selected_asset_keys: AbstractSet[AssetKey]
) -> Tuple[Sequence[AssetsDefinition], Sequence[AssetsDefinition]]:
"""Given a list of asset key selection queries, generate a set of AssetsDefinition objects
representing the included/excluded definitions.
"""
included_assets: Set[AssetsDefinition] = set()
excluded_assets: Set[AssetsDefinition] = set()

for asset in self.assets:
# intersection
selected_subset = selected_asset_keys & asset.asset_keys
# all assets in this def are selected
if selected_subset == asset.asset_keys:
included_assets.add(asset)
# no assets in this def are selected
elif len(selected_subset) == 0:
excluded_assets.add(asset)
elif asset.can_subset:
# subset of the asset that we want
subset_asset = asset.subset_for(selected_asset_keys)
included_assets.add(subset_asset)
# subset of the asset that we don't want
excluded_assets.add(asset.subset_for(asset.asset_keys - subset_asset.asset_keys))
else:
raise DagsterInvalidDefinitionError(
f"When building job, the AssetsDefinition '{asset.node_def.name}' "
f"contains asset keys {sorted(list(asset.asset_keys))}, but "
f"attempted to select only {sorted(list(selected_subset))}. "
"This AssetsDefinition does not support subsetting. Please select all "
"asset keys produced by this asset."
)
return list(included_assets), list(excluded_assets)
return build_asset_selection_job(
name=name,
assets=self.assets,
source_assets=self.source_assets,
executor_def=executor_def,
resource_defs=self.resource_defs,
description=description,
tags=tags,
asset_selection=selected_asset_keys,
)

def to_source_assets(self) -> Sequence[SourceAsset]:
"""
Expand Down Expand Up @@ -618,13 +581,6 @@ def __eq__(self, other: object) -> bool:
)


def build_resource_defs(resource_defs, source_assets):
return {
**resource_defs,
**{"root_manager": build_root_manager(build_source_assets_by_key(source_assets))},
}


def _find_assets_in_module(
module: ModuleType,
) -> Generator[Union[AssetsDefinition, SourceAsset], None, None]:
Expand Down
4 changes: 2 additions & 2 deletions python_modules/dagster/dagster/core/asset_defs/assets_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ def asset2(asset1):
Returns:
JobDefinition: A job that materializes the given assets.
"""

check.str_param(name, "name")
check.sequence_param(assets, "assets", of_type=AssetsDefinition)
check.opt_sequence_param(
Expand Down Expand Up @@ -136,8 +137,7 @@ def asset2(asset1):
tags=tags,
executor_def=executor_def,
asset_layer=AssetLayer.from_graph_and_assets_node_mapping(
graph,
assets_defs_by_node_handle,
graph, assets_defs_by_node_handle, source_assets
),
_asset_selection_data=_asset_selection_data,
)
Expand Down
132 changes: 102 additions & 30 deletions python_modules/dagster/dagster/core/definitions/asset_layer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import warnings
from collections import defaultdict
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Callable,
Dict,
FrozenSet,
Expand All @@ -14,18 +16,23 @@
Set,
Tuple,
Union,
cast,
)

import dagster._check as check
from dagster.core.definitions.events import AssetKey
from dagster.core.selector.subset_selector import AssetSelectionData
from dagster.utils.backcompat import ExperimentalWarning

from ..errors import DagsterInvalidDefinitionError
from .dependency import NodeHandle, NodeInputHandle, NodeOutputHandle
from .executor_definition import ExecutorDefinition
from .graph_definition import GraphDefinition
from .node_definition import NodeDefinition
from .resource_definition import ResourceDefinition

if TYPE_CHECKING:
from dagster.core.asset_defs import AssetGroup, AssetsDefinition
from dagster.core.asset_defs import AssetGroup, AssetsDefinition, SourceAsset
from dagster.core.execution.context.output import OutputContext

from .job_definition import JobDefinition
Expand Down Expand Up @@ -351,7 +358,10 @@ def __init__(
asset_deps: Optional[Mapping[AssetKey, AbstractSet[AssetKey]]] = None,
dependency_node_handles_by_asset_key: Optional[Mapping[AssetKey, Set[NodeHandle]]] = None,
assets_defs: Optional[List["AssetsDefinition"]] = None,
source_asset_defs: Optional[Sequence[Union["SourceAsset", "AssetsDefinition"]]] = None,
):
from dagster.core.asset_defs import AssetsDefinition, SourceAsset

self._asset_keys_by_node_input_handle = check.opt_dict_param(
asset_keys_by_node_input_handle,
"asset_keys_by_node_input_handle",
Expand All @@ -374,6 +384,9 @@ def __init__(
value_type=Set,
)
self._assets_defs = check.opt_list_param(assets_defs, "assets_defs")
self._source_asset_defs = check.opt_list_param(
source_asset_defs, "source_assets", of_type=(SourceAsset, AssetsDefinition)
)

# keep an index from node handle to all keys expected to be generated in that node
self._asset_keys_by_node_handle: Dict[NodeHandle, Set[AssetKey]] = defaultdict(set)
Expand All @@ -396,6 +409,7 @@ def from_graph(graph_def: GraphDefinition) -> "AssetLayer":
def from_graph_and_assets_node_mapping(
graph_def: GraphDefinition,
assets_defs_by_node_handle: Mapping[NodeHandle, "AssetsDefinition"],
source_assets: Optional[Sequence[Union["SourceAsset", "AssetsDefinition"]]] = None,
) -> "AssetLayer":
"""
Generate asset info from a GraphDefinition and a mapping from nodes in that graph to the
Expand Down Expand Up @@ -447,6 +461,7 @@ def from_graph_and_assets_node_mapping(
graph_def, assets_defs_by_node_handle
),
assets_defs=[assets_def for assets_def in assets_defs_by_node_handle.values()],
source_asset_defs=source_assets,
)

@property
Expand Down Expand Up @@ -483,34 +498,91 @@ def asset_info_for_output(


def build_asset_selection_job(
job_to_subselect: "JobDefinition",
asset_selection: FrozenSet[AssetKey],
asset_layer: AssetLayer,
asset_selection_data: AssetSelectionData,
) -> "JobDefinition":
from ..asset_defs.asset_group import build_resource_defs
from ..asset_defs.assets_job import build_assets_job

check.invariant(
asset_layer._assets_defs != None, # pylint:disable=protected-access
"Asset layer must have _asset_defs argument defined",
)
name: str,
assets: Sequence["AssetsDefinition"],
source_assets: Sequence[Union["AssetsDefinition", "SourceAsset"]],
executor_def: ExecutorDefinition,
resource_defs: Mapping[str, ResourceDefinition],
description: str,
tags: Dict[str, Any],
asset_selection: Optional[FrozenSet[AssetKey]],
asset_selection_data: Optional[AssetSelectionData] = None,
):
from dagster.core.asset_defs import build_assets_job

if asset_selection:
included_assets, excluded_assets = _subset_assets_defs(
assets, source_assets, asset_selection
)
resource_defs = _build_resource_defs(resource_defs, excluded_assets)
else:
included_assets = cast(List["AssetsDefinition"], assets)
# Slice [:] serves as a copy constructor, so that we don't
# accidentally add to the original list
excluded_assets = source_assets[:]

with warnings.catch_warnings():
warnings.simplefilter("ignore", category=ExperimentalWarning)
asset_job = build_assets_job(
name=name,
assets=included_assets,
source_assets=excluded_assets,
resource_defs=resource_defs,
executor_def=executor_def,
description=description,
tags=tags,
_asset_selection_data=asset_selection_data,
)

return asset_job

included_assets: List["AssetsDefinition"] = []
excluded_assets: List["AssetsDefinition"] = []
for assets_def in asset_layer._assets_defs: # pylint:disable=protected-access
if any([asset_key in asset_selection for asset_key in assets_def.asset_keys]):
included_assets.append(assets_def)

def _subset_assets_defs(
assets: Sequence["AssetsDefinition"],
source_assets: Sequence[Union["AssetsDefinition", "SourceAsset"]],
selected_asset_keys: AbstractSet[AssetKey],
) -> Tuple[Sequence["AssetsDefinition"], Sequence[Union["AssetsDefinition", "SourceAsset"]]]:
"""Given a list of asset key selection queries, generate a set of AssetsDefinition objects
representing the included/excluded definitions.
"""
from dagster.core.asset_defs import AssetsDefinition

included_assets: Set[AssetsDefinition] = set()
excluded_assets: Set[AssetsDefinition] = set()

for asset in assets:
# intersection
selected_subset = selected_asset_keys & asset.asset_keys
# all assets in this def are selected
if selected_subset == asset.asset_keys:
included_assets.add(asset)
# no assets in this def are selected
elif len(selected_subset) == 0:
excluded_assets.add(asset)
elif asset.can_subset:
# subset of the asset that we want
subset_asset = asset.subset_for(selected_asset_keys)
included_assets.add(subset_asset)
# subset of the asset that we don't want
excluded_assets.add(asset.subset_for(asset.asset_keys - subset_asset.asset_keys))
else:
excluded_assets.append(assets_def)

return build_assets_job(
name=job_to_subselect.name,
assets=included_assets,
source_assets=excluded_assets,
resource_defs=build_resource_defs(job_to_subselect.resource_defs, excluded_assets),
executor_def=job_to_subselect.executor_def,
description=job_to_subselect.description,
tags=job_to_subselect.tags,
_asset_selection_data=asset_selection_data,
)
raise DagsterInvalidDefinitionError(
f"When building job, the AssetsDefinition '{asset.node_def.name}' "
f"contains asset keys {sorted(list(asset.asset_keys))}, but "
f"attempted to select only {sorted(list(selected_subset))}. "
"This AssetsDefinition does not support subsetting. Please select all "
"asset keys produced by this asset."
)

all_excluded_assets = [*excluded_assets, *source_assets]

return list(included_assets), all_excluded_assets


def _build_resource_defs(resource_defs, source_assets):
from dagster.core.asset_defs.assets_job import build_root_manager, build_source_assets_by_key

return {
**resource_defs,
**{"root_manager": build_root_manager(build_source_assets_by_key(source_assets))},
}

0 comments on commit 84ccc32

Please sign in to comment.