Skip to content

Commit

Permalink
[assets] UnresolvedAssetJobDefinition (#8207)
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Jun 8, 2022
1 parent 833c7a1 commit 198053a
Show file tree
Hide file tree
Showing 17 changed files with 944 additions and 83 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import tempfile

import pytest
from dagster_pyspark import pyspark_resource
from hacker_news_assets.core import core_assets_prod
from hacker_news_assets.resources.hn_resource import hn_snapshot_client
Expand All @@ -8,6 +9,7 @@
from dagster import AssetGroup, ResourceDefinition, fs_io_manager, mem_io_manager


@pytest.mark.skip()
def test_download():
with tempfile.TemporaryDirectory() as temp_dir:
test_job = AssetGroup(
Expand Down
1 change: 1 addition & 0 deletions python_modules/dagster/dagster/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@
from dagster.core.definitions.schedule_definition import build_schedule_context
from dagster.core.definitions.sensor_definition import build_sensor_context
from dagster.core.definitions.step_launcher import StepLauncher
from dagster.core.definitions.unresolved_asset_job_definition import define_asset_job
from dagster.core.definitions.utils import (
config_from_files,
config_from_pkg_resources,
Expand Down
84 changes: 47 additions & 37 deletions python_modules/dagster/dagster/core/asset_defs/asset_selection.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,17 @@
import operator
from abc import ABC
from functools import reduce
from typing import AbstractSet, FrozenSet, Optional

from typing_extensions import TypeAlias
from typing import AbstractSet, FrozenSet, Optional, Sequence

import dagster._check as check
from dagster.core.asset_defs.assets import AssetsDefinition
from dagster.core.definitions.events import AssetKey
from dagster.core.selector.subset_selector import (
Direction,
fetch_connected_assets_definitions,
fetch_connected,
generate_asset_dep_graph,
generate_asset_name_to_definition_map,
)

AssetSet: TypeAlias = AbstractSet[AssetsDefinition] # makes sigs more readable


class AssetSelection(ABC):
@staticmethod
Expand All @@ -42,7 +38,7 @@ def __or__(self, other: "AssetSelection") -> "OrAssetSelection":
def __and__(self, other: "AssetSelection") -> "AndAssetSelection":
return AndAssetSelection(self, other)

def resolve(self, all_assets: AssetSet) -> AssetSet:
def resolve(self, all_assets: Sequence[AssetsDefinition]) -> FrozenSet[AssetKey]:
return Resolver(all_assets).resolve(self)


Expand Down Expand Up @@ -88,55 +84,69 @@ def __init__(self, child: AssetSelection, *, depth: Optional[int] = None):


class Resolver:
def __init__(self, all_assets: AssetSet):
def __init__(self, all_assets: Sequence[AssetsDefinition]):
self.all_assets = all_assets
self.asset_dep_graph = generate_asset_dep_graph(list(all_assets))
self.all_assets_by_name = generate_asset_name_to_definition_map(all_assets)

def resolve(self, node: AssetSelection) -> AssetSet:
def resolve(self, root_node: AssetSelection) -> FrozenSet[AssetKey]:
return frozenset(
{AssetKey.from_user_string(asset_name) for asset_name in self._resolve(root_node)}
)

def _resolve(self, node: AssetSelection) -> AbstractSet[str]:
if isinstance(node, AllAssetSelection):
return self.all_assets
return set(self.all_assets_by_name.keys())
elif isinstance(node, AndAssetSelection):
child_1, child_2 = [self.resolve(child) for child in node.children]
child_1, child_2 = [self._resolve(child) for child in node.children]
return child_1 & child_2
elif isinstance(node, DownstreamAssetSelection):
child = self.resolve(node.children[0])
child = self._resolve(node.children[0])
return reduce(
operator.or_,
[self._gather_connected_assets(asset, "downstream", node.depth) for asset in child],
[
{asset_name}
| fetch_connected(
item=asset_name,
graph=self.asset_dep_graph,
direction="downstream",
depth=node.depth,
)
for asset_name in child
],
)
elif isinstance(node, GroupsAssetSelection):
return {
a
for a in self.all_assets
if any(_match_group(a, pattern) for pattern in node.children)
}
return reduce(
operator.or_,
[_match_groups(assets_def, set(node.children)) for assets_def in self.all_assets],
)
elif isinstance(node, KeysAssetSelection):
return {a for a in self.all_assets if any(_match_key(a, key) for key in node.children)}
return set(node.children)
elif isinstance(node, OrAssetSelection):
child_1, child_2 = [self.resolve(child) for child in node.children]
child_1, child_2 = [self._resolve(child) for child in node.children]
return child_1 | child_2
elif isinstance(node, UpstreamAssetSelection):
child = self.resolve(node.children[0])
child = self._resolve(node.children[0])
return reduce(
operator.or_,
[self._gather_connected_assets(asset, "upstream", node.depth) for asset in child],
[
{asset_name}
| fetch_connected(
item=asset_name,
graph=self.asset_dep_graph,
direction="upstream",
depth=node.depth,
)
for asset_name in child
],
)
else:
check.failed(f"Unknown node type: {type(node)}")

def _gather_connected_assets(
self, asset: AssetsDefinition, direction: Direction, depth: Optional[int]
) -> FrozenSet[AssetsDefinition]:
connected = fetch_connected_assets_definitions(
asset, self.asset_dep_graph, self.all_assets_by_name, direction=direction, depth=depth
)
return connected | {asset}


def _match_key(asset: AssetsDefinition, key_str: str) -> bool:
return any(key_str == key.to_user_string() for key in asset.asset_keys)


def _match_group(asset: AssetsDefinition, group_str: str) -> bool:
return any(group_str == group_name for group_name in asset.group_names.values())
def _match_groups(assets_def: AssetsDefinition, groups: AbstractSet[str]) -> AbstractSet[str]:
return {
asset_key.to_user_string()
for asset_key, group in assets_def.group_names.items()
if group in groups
}
29 changes: 20 additions & 9 deletions python_modules/dagster/dagster/core/definitions/asset_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from dagster.core.selector.subset_selector import AssetSelectionData
from dagster.utils.backcompat import ExperimentalWarning

from ..errors import DagsterInvalidDefinitionError
from ..errors import DagsterInvalidSubsetError
from .dependency import NodeHandle, NodeInputHandle, NodeOutputHandle, SolidOutputHandle
from .executor_definition import ExecutorDefinition
from .graph_definition import GraphDefinition
Expand Down Expand Up @@ -619,13 +619,13 @@ def group_names_by_assets(self) -> Mapping[AssetKey, str]:

def build_asset_selection_job(
name: str,
assets: Iterable["AssetsDefinition"],
source_assets: Iterable["SourceAsset"],
executor_def: ExecutorDefinition,
resource_defs: Mapping[str, ResourceDefinition],
description: str,
tags: Dict[str, Any],
asset_selection: Optional[FrozenSet[AssetKey]],
assets: Sequence["AssetsDefinition"],
source_assets: Sequence["SourceAsset"],
executor_def: Optional[ExecutorDefinition] = None,
resource_defs: Optional[Mapping[str, ResourceDefinition]] = None,
description: Optional[str] = None,
tags: Optional[Dict[str, Any]] = None,
asset_selection: Optional[FrozenSet[AssetKey]] = None,
asset_selection_data: Optional[AssetSelectionData] = None,
):
from dagster.core.asset_defs import build_assets_job
Expand Down Expand Up @@ -667,9 +667,12 @@ def _subset_assets_defs(
included_assets: Set[AssetsDefinition] = set()
excluded_assets: Set[AssetsDefinition] = set()

included_keys: Set[AssetKey] = set()

for asset in assets:
# intersection
selected_subset = selected_asset_keys & asset.asset_keys
included_keys.update(selected_subset)
# all assets in this def are selected
if selected_subset == asset.asset_keys:
included_assets.add(asset)
Expand All @@ -683,14 +686,22 @@ def _subset_assets_defs(
# subset of the asset that we don't want
excluded_assets.add(asset.subset_for(asset.asset_keys - subset_asset.asset_keys))
else:
raise DagsterInvalidDefinitionError(
raise DagsterInvalidSubsetError(
f"When building job, the AssetsDefinition '{asset.node_def.name}' "
f"contains asset keys {sorted(list(asset.asset_keys))}, but "
f"attempted to select only {sorted(list(selected_subset))}. "
"This AssetsDefinition does not support subsetting. Please select all "
"asset keys produced by this asset."
)

missed_keys = selected_asset_keys - included_keys
if missed_keys:
raise DagsterInvalidSubsetError(
f"When building job, the AssetKey(s) {[key.to_user_string() for key in missed_keys]} "
"were selected, but are not produced by any of the provided AssetsDefinitions. Make "
"sure that keys are spelled correctly and that all of the expected definitions are "
"provided."
)
all_excluded_assets: Sequence[Union["AssetsDefinition", "SourceAsset"]] = [
*excluded_assets,
*source_assets,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
)
from ..schedule_definition import ScheduleDefinition
from ..sensor_definition import SensorDefinition
from ..unresolved_asset_job_definition import UnresolvedAssetJobDefinition


class _Repository:
Expand Down Expand Up @@ -45,6 +46,7 @@ def __call__(self, fn: Callable[[], Any]) -> RepositoryDefinition:
or isinstance(definition, AssetGroup)
or isinstance(definition, AssetsDefinition)
or isinstance(definition, SourceAsset)
or isinstance(definition, UnresolvedAssetJobDefinition)
):
bad_definitions.append((i, type(definition)))
if bad_definitions:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,8 @@ def _get_job_def_for_asset_selection(

new_job = build_asset_selection_job(
name=self.name,
assets=self.asset_layer.assets_defs_by_key.values(),
source_assets=self.asset_layer.source_assets_by_key.values(),
assets=list(self.asset_layer.assets_defs_by_key.values()),
source_assets=list(self.asset_layer.source_assets_by_key.values()),
executor_def=self.executor_def,
resource_defs=self.resource_defs,
description=self.description,
Expand Down

0 comments on commit 198053a

Please sign in to comment.