Skip to content

Commit

Permalink
[1/n] Subsetting Stack: use subset_selector for AssetGoup.build_job (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed May 17, 2022
1 parent 6e15dfb commit 252f285
Show file tree
Hide file tree
Showing 5 changed files with 296 additions and 224 deletions.
147 changes: 32 additions & 115 deletions python_modules/dagster/dagster/core/asset_defs/asset_group.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import inspect
import os
import pkgutil
import re
import warnings
from collections import defaultdict
from importlib import import_module
from types import ModuleType
from typing import (
AbstractSet,
Any,
Dict,
Generator,
Expand All @@ -16,6 +16,7 @@
Optional,
Sequence,
Set,
Tuple,
Union,
cast,
)
Expand All @@ -32,7 +33,6 @@

from ..definitions.executor_definition import ExecutorDefinition
from ..definitions.job_definition import JobDefinition
from ..definitions.op_definition import OpDefinition
from ..definitions.partition import PartitionsDefinition
from ..definitions.resource_definition import ResourceDefinition
from ..errors import DagsterInvalidDefinitionError
Expand Down Expand Up @@ -201,12 +201,14 @@ def build_job(
job_with_multiple_selections = the_asset_group.build_job(selection=["*some_asset", "other_asset++"])
"""

from dagster.core.selector.subset_selector import parse_op_selection
from dagster.core.selector.subset_selector import parse_asset_selection

check.str_param(name, "name")

if not isinstance(selection, str):
selection = check.opt_list_param(selection, "selection", of_type=str)
else:
selection = [selection]
executor_def = check.opt_inst_param(
executor_def, "executor_def", ExecutorDefinition, self.executor_def
)
Expand All @@ -216,34 +218,9 @@ def build_job(
**{"root_manager": build_root_manager(build_source_assets_by_key(self.source_assets))},
}

with warnings.catch_warnings():
warnings.simplefilter("ignore", category=ExperimentalWarning)
mega_job_def = build_assets_job(
name=name,
assets=self.assets,
source_assets=self.source_assets,
resource_defs=resource_defs,
executor_def=executor_def,
)

if selection:
op_selection = self._parse_asset_selection(selection, job_name=name)
# We currently re-use the logic from op selection to parse the
# asset key selection, but this has disadvantages. Eventually we
# will want to decouple these implementations.
# https://github.com/dagster-io/dagster/issues/6647.
resolved_op_selection_dict = parse_op_selection(mega_job_def, op_selection)

included_assets = []
excluded_assets: List[Union[AssetsDefinition, SourceAsset]] = list(self.source_assets)

op_names = set(list(resolved_op_selection_dict.keys()))

for asset in self.assets:
if asset.op.name in op_names:
included_assets.append(asset)
else:
excluded_assets.append(asset)
selected_asset_keys = parse_asset_selection(self.assets, selection)
included_assets, excluded_assets = self._subset_assets_defs(selected_asset_keys)
else:
included_assets = cast(List[AssetsDefinition], self.assets)
# Call to list(...) serves as a copy constructor, so that we don't
Expand All @@ -263,94 +240,33 @@ def build_job(
)
return asset_job

def _parse_asset_selection(self, selection: Union[str, List[str]], job_name: str) -> List[str]:
"""Convert selection over asset keys to selection over ops"""

asset_keys_to_ops: Dict[str, List[OpDefinition]] = {}
op_names_to_asset_keys: Dict[str, Set[str]] = {}
seen_asset_keys: Set[str] = set()

if isinstance(selection, str):
selection = [selection]

if len(selection) == 1 and selection[0] == "*":
return selection

source_asset_keys = set()
def _subset_assets_defs(
self, selected_asset_keys: AbstractSet[AssetKey]
) -> Tuple[Sequence[AssetsDefinition], Sequence[AssetsDefinition]]:
"""Given a list of asset key selection queries, generate a set of AssetsDefinition objects
representing the included/excluded definitions.
"""
included_assets: Set[AssetsDefinition] = set()
excluded_assets: Set[AssetsDefinition] = set()

for asset in self.assets:
if asset.op.name not in op_names_to_asset_keys:
op_names_to_asset_keys[asset.op.name] = set()
for asset_key in asset.asset_keys:
asset_key_as_str = ">".join([piece for piece in asset_key.path])
op_names_to_asset_keys[asset.op.name].add(asset_key_as_str)
if not asset_key_as_str in asset_keys_to_ops:
asset_keys_to_ops[asset_key_as_str] = []
asset_keys_to_ops[asset_key_as_str].append(asset.op)

for asset in self.source_assets:
asset_key_as_str = ">".join([piece for piece in asset.key.path])
source_asset_keys.add(asset_key_as_str)

op_selection = []

for clause in selection:
token_matching = re.compile(r"^(\*?\+*)?([>.\w\d\[\]?_-]+)(\+*\*?)?$").search(
clause.strip()
)
parts = token_matching.groups() if token_matching is not None else None
if parts is None:
raise DagsterInvalidDefinitionError(
f"When attempting to create job '{job_name}', the clause "
f"{clause} within the asset key selection was invalid. Please "
"review the selection syntax here: "
"https://docs.dagster.io/concepts/ops-jobs-graphs/job-execution#op-selection-syntax."
)
upstream_part, key_str, downstream_part = parts

# Error if you express a clause in terms of a source asset key.
# Eventually we will want to support selection over source asset
# keys as a means of running downstream ops.
# https://github.com/dagster-io/dagster/issues/6647
if key_str in source_asset_keys:
raise DagsterInvalidDefinitionError(
f"When attempting to create job '{job_name}', the clause '"
f"{clause}' selects asset_key '{key_str}', which comes from "
"a source asset. Source assets can't be materialized, and "
"therefore can't be subsetted into a job. Please choose a "
"subset on asset keys that are materializable - that is, "
f"included on assets within the group. Valid assets: {list(asset_keys_to_ops.keys())}"
)
if key_str not in asset_keys_to_ops:
raise DagsterInvalidDefinitionError(
f"When attempting to create job '{job_name}', the clause "
f"'{clause}' within the asset key selection did not match "
f"any asset keys. Present asset keys: {list(asset_keys_to_ops.keys())}"
)

seen_asset_keys.add(key_str)

for op in asset_keys_to_ops[key_str]:

op_clause = f"{upstream_part}{op.name}{downstream_part}"
op_selection.append(op_clause)

# Verify that for each selected asset key, the corresponding op had all
# asset keys selected. Eventually, we will want to have specific syntax
# that allows for selecting all asset keys for a given multi-asset
# https://github.com/dagster-io/dagster/issues/6647.
for op_name, asset_key_set in op_names_to_asset_keys.items():
are_keys_in_set = [key in seen_asset_keys for key in asset_key_set]
if any(are_keys_in_set) and not all(are_keys_in_set):
# intersection
selected_subset = selected_asset_keys & asset.asset_keys
# all assets in this def are selected
if selected_subset == asset.asset_keys:
included_assets.add(asset)
# no assets in this def are selected
elif len(selected_subset) == 0:
excluded_assets.add(asset)
else:
raise DagsterInvalidDefinitionError(
f"When building job '{job_name}', the asset '{op_name}' "
f"contains asset keys {sorted(list(asset_key_set))}, but "
f"attempted to select only {sorted(list(asset_key_set.intersection(seen_asset_keys)))}. "
"Selecting only some of the asset keys for a particular "
"asset is not yet supported behavior. Please select all "
"asset keys produced by a given asset when subsetting."
f"When building job, the AssetsDefinition '{asset.node_def.name}' "
f"contains asset keys {sorted(list(asset.asset_keys))}, but "
f"attempted to select only {sorted(list(selected_subset))}. "
"This AssetsDefinition does not support subsetting. Please select all "
"asset keys produced by this asset."
)
return op_selection
return list(included_assets), list(excluded_assets)

def to_source_assets(self) -> Sequence[SourceAsset]:
"""
Expand Down Expand Up @@ -744,7 +660,8 @@ def _validate_resource_reqs_for_asset_group(
missing_resource_keys = list(set(resource_keys) - present_resource_keys)
if missing_resource_keys:
raise DagsterInvalidDefinitionError(
f"AssetGroup is missing required resource keys for asset '{asset_def.op.name}'. Missing resource keys: {missing_resource_keys}"
f"AssetGroup is missing required resource keys for asset '{asset_def.node_def.name}'. "
f"Missing resource keys: {missing_resource_keys}"
)

for output_name, asset_key in asset_def.asset_keys_by_output_name.items():
Expand Down
12 changes: 11 additions & 1 deletion python_modules/dagster/dagster/core/asset_defs/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def __init__(
partitions_def: Optional[PartitionsDefinition] = None,
partition_mappings: Optional[Mapping[AssetKey, PartitionMapping]] = None,
asset_deps: Optional[Mapping[AssetKey, AbstractSet[AssetKey]]] = None,
# if adding new fields, make sure to handle them in the with_replaced_asset_keys method
# if adding new fields, make sure to handle them in both with_replaced_asset_keys
):
self._node_def = node_def
self._asset_keys_by_input_name = check.dict_param(
Expand Down Expand Up @@ -196,6 +196,16 @@ def with_replaced_asset_keys(
node_def=self.node_def,
partitions_def=self.partitions_def,
partition_mappings=self._partition_mappings,
asset_deps={
output_asset_key_replacements.get(key, key): {
input_asset_key_replacements.get(
upstream_key,
output_asset_key_replacements.get(upstream_key, upstream_key),
)
for upstream_key in value
}
for key, value in self.asset_deps.items()
},
)

def to_source_assets(self) -> Sequence[SourceAsset]:
Expand Down
10 changes: 10 additions & 0 deletions python_modules/dagster/dagster/core/definitions/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,16 @@ def to_string(self, legacy: Optional[bool] = False) -> Optional[str]:
return ASSET_KEY_STRUCTURED_DELIMITER.join(self.path)
return seven.json.dumps(self.path)

def to_user_string(self) -> str:
"""
E.g. "first_component>second_component"
"""
return ">".join(self.path)

@staticmethod
def from_user_string(asset_key_string: str) -> "AssetKey":
return AssetKey(asset_key_string.split(">"))

@staticmethod
def from_db_string(asset_key_string: Optional[str]) -> Optional["AssetKey"]:
if not asset_key_string:
Expand Down
61 changes: 59 additions & 2 deletions python_modules/dagster/dagster/core/selector/subset_selector.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import re
import sys
from collections import defaultdict, deque
from typing import TYPE_CHECKING, AbstractSet, Dict, List, NamedTuple
from typing import TYPE_CHECKING, AbstractSet, Any, Dict, FrozenSet, List, NamedTuple, Sequence, Set

from dagster.core.definitions.dependency import DependencyStructure
from dagster.core.errors import DagsterExecutionStepNotFoundError, DagsterInvalidSubsetError
from dagster.utils import check

if TYPE_CHECKING:
from dagster.core.asset_defs import AssetsDefinition
from dagster.core.definitions.events import AssetKey
from dagster.core.definitions.job_definition import JobDefinition

MAX_NUM = sys.maxsize
Expand Down Expand Up @@ -45,6 +47,23 @@ def __new__(cls, op_selection, resolved_op_selection, parent_job_def):
)


def generate_asset_dep_graph(assets_defs: Sequence["AssetsDefinition"]) -> Dict[str, Any]:
graph: Dict[str, Any] = {"upstream": {}, "downstream": {}}
for assets_def in assets_defs:
for asset_key in assets_def.asset_keys:
asset_name = asset_key.to_user_string()
upstream_asset_keys = assets_def.asset_deps[asset_key]
graph["upstream"][asset_name] = set()
# for each asset upstream of this one, set that as upstream, and this downstream of it
for upstream_key in upstream_asset_keys:
upstream_name = upstream_key.to_user_string()
if upstream_name not in graph["downstream"]:
graph["downstream"][upstream_name] = set()
graph["upstream"][asset_name].add(upstream_name)
graph["downstream"][upstream_name].add(asset_name)
return graph


def generate_dep_graph(pipeline_def):
"""'pipeline to dependency graph. It currently only supports top-level solids.
Expand Down Expand Up @@ -136,7 +155,7 @@ def _get_depth(part):
return len(part)
return None

token_matching = re.compile(r"^(\*?\+*)?([.\w\d\[\]?_-]+)(\+*\*?)?$").search(clause.strip())
token_matching = re.compile(r"^(\*?\+*)?([.>\w\d\[\]?_-]+)(\+*\*?)?$").search(clause.strip())
# return None if query is invalid
parts = token_matching.groups() if token_matching is not None else []
if len(parts) != 3:
Expand Down Expand Up @@ -334,3 +353,41 @@ def parse_step_selection(step_deps, step_selection):
steps_set.update(subset)

return frozenset(steps_set)


def parse_asset_selection(
assets_defs: Sequence["AssetsDefinition"], asset_selection: Sequence[str]
) -> FrozenSet["AssetKey"]:
"""Find assets that match the given selection query
Args:
assets_defs (Sequence[Assetsdefinition]): A set of AssetsDefinition objects to select over
asset_selection (List[str]): a list of the asset key selection queries (including single
asset key) to execute.
Returns:
FrozenSet[str]: a frozenset of qualified deduplicated asset keys, empty if no qualified
subset selected.
"""
from dagster.core.definitions.events import AssetKey

check.list_param(asset_selection, "asset_selection", of_type=str)

# special case: select *
if len(asset_selection) == 1 and asset_selection[0] == "*":
return frozenset(set().union(*(ad.asset_keys for ad in assets_defs)))

graph = generate_asset_dep_graph(assets_defs)
assets_set: Set[str] = set()

# loop over clauses
for clause in asset_selection:
subset = clause_to_subset(graph, clause)
if len(subset) == 0:
raise DagsterInvalidSubsetError(
f"No qualified assets to execute found for clause='{clause}'"
)
assets_set.update(subset)

# at the end, turn the user selection strings into asset keys
return frozenset({AssetKey.from_user_string(asset_string) for asset_string in assets_set})

0 comments on commit 252f285

Please sign in to comment.