Skip to content

Commit

Permalink
For those whome detest wordwrap in vscode (#6887)
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeng817 committed Mar 3, 2022
1 parent e4fabe9 commit f8319ea
Showing 1 changed file with 72 additions and 23 deletions.
95 changes: 72 additions & 23 deletions python_modules/dagster/dagster/core/asset_defs/asset_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,30 @@ class AssetGroup(
],
)
):
"""Defines a group of assets, along with environment information in the form of resources and an executor.
"""Defines a group of assets, along with environment information in the
form of resources and an executor.
An AssetGroup can be provided to a :py:class:`RepositoryDefinition`. When provided to a repository, the constituent assets can be materialized from Dagit. The AssetGroup also provides an interface for creating jobs from subselections of assets, which can then be provided to a :py:class:`ScheduleDefinition` or :py:class:`SensorDefinition`.
An AssetGroup can be provided to a :py:class:`RepositoryDefinition`. When
provided to a repository, the constituent assets can be materialized from
Dagit. The AssetGroup also provides an interface for creating jobs from
subselections of assets, which can then be provided to a
:py:class:`ScheduleDefinition` or :py:class:`SensorDefinition`.
There can only be one AssetGroup per repository.
Args:
assets (Sequence[AssetsDefinition]): The set of software-defined assets to group.
source_assets (Optional[Sequence[SourceAsset]]): The set of source assets that the software-defined may depend on.
resource_defs (Optional[Mapping[str, ResourceDefinition]]): A dictionary of resource definitions. When the AssetGroup is constructed, if there are any unsatisfied resource requirements from the assets, it will result in an error. Note that the `root_manager` key is a reserved resource key, and will result in an error if provided by the user.
executor_def (Optional[ExecutorDefinition]): The executor definition to use when re-materializing assets in this group.
assets (Sequence[AssetsDefinition]): The set of software-defined assets
to group.
source_assets (Optional[Sequence[SourceAsset]]): The set of source
assets that the software-defined may depend on.
resource_defs (Optional[Mapping[str, ResourceDefinition]]): A
dictionary of resource definitions. When the AssetGroup is
constructed, if there are any unsatisfied resource requirements
from the assets, it will result in an error. Note that the
`root_manager` key is a reserved resource key, and will result in
an error if provided by the user.
executor_def (Optional[ExecutorDefinition]): The executor definition to
use when re-materializing assets in this group.
Examples:
Expand All @@ -75,7 +88,11 @@ def next_asset(start_asset):
def foo_resource():
...
asset_group = AssetGroup(assets=[start_asset, next_asset], source_assets=[source_asset], resource_defs={"foo": foo_resource})
asset_group = AssetGroup(
assets=[start_asset, next_asset],
source_assets=[source_asset],
resource_defs={"foo": foo_resource},
)
...
"""
Expand Down Expand Up @@ -104,7 +121,9 @@ def __new__(
"this key, and then change all places that require this key to "
"a new value."
)
# In the case of collisions, merge_dicts takes values from the dictionary latest in the list, so we place the user provided resource defs after the defaults.
# In the case of collisions, merge_dicts takes values from the
# dictionary latest in the list, so we place the user provided resource
# defs after the defaults.
resource_defs = merge_dicts(
{"root_manager": root_manager, "io_manager": fs_asset_io_manager},
resource_defs,
Expand Down Expand Up @@ -141,15 +160,28 @@ def build_job(
name (str): The name to give the job.
selection (Union[str, List[str]]): A single selection query or list of selection queries to execute. For example:
* ``['some_asset_key']``: selects ``some_asset_key`` itself.
* ``['*some_asset_key']``: select ``some_asset_key`` and all its ancestors (upstream dependencies).
* ``['*some_asset_key+++']``: select ``some_asset_key``, all its ancestors, and its descendants
(downstream dependencies) within 3 levels down.
* ``['*some_asset_key', 'other_asset_key_a', 'other_asset_key_b+']``: select ``some_asset_key`` and all its
ancestors, ``other_asset_key_a`` itself, and ``other_asset_key_b`` and its direct child asset keys. When subselecting into a multi-asset, all of the asset keys in that multi-asset must be selected.
executor_def (Optional[ExecutorDefinition]): The executor definition to use when executing the job. Defaults to the executor on the AssetGroup. If no executor was provided on the AssetGroup, then it defaults to :py:class:`multi_or_in_process_executor`.
tags (Optional[Dict[str, Any]]): Arbitrary metadata for any execution of the Job.
Values that are not strings will be json encoded and must meet the criteria that
`json.loads(json.dumps(value)) == value`. These tag values may be overwritten by tag
* ``['*some_asset_key']``: select ``some_asset_key`` and all
its ancestors (upstream dependencies).
* ``['*some_asset_key+++']``: select ``some_asset_key``, all
its ancestors, and its descendants
(downstream dependencies) within 3 levels down.
* ``['*some_asset_key', 'other_asset_key_a', 'other_asset_key_b
+']``: select ``some_asset_key`` and all its
ancestors, ``other_asset_key_a`` itself, and
``other_asset_key_b`` and its direct child asset keys. When
subselecting into a multi-asset, all of the asset keys in
that multi-asset must be selected.
executor_def (Optional[ExecutorDefinition]): The executor
definition to use when executing the job. Defaults to the
executor on the AssetGroup. If no executor was provided on the
AssetGroup, then it defaults to
:py:class:`multi_or_in_process_executor`.
tags (Optional[Dict[str, Any]]): Arbitrary metadata for any
execution of the Job.
Values that are not strings will be json encoded and must meet t
he criteria that
`json.loads(json.dumps(value)) == value`. These tag values may
be overwritten by tag
values provided at invocation time.
description (Optional[str]): A description of the job.
Expand Down Expand Up @@ -275,11 +307,18 @@ def _parse_asset_selection(self, selection: Union[str, List[str]], job_name: str
# https://github.com/dagster-io/dagster/issues/6647
if key_str in source_asset_keys:
raise DagsterInvalidDefinitionError(
f"When attempting to create job '{job_name}', the clause '{clause}' selects asset_key '{key_str}', which comes from a source asset. Source assets can't be materialized, and therefore can't be subsetted into a job. Please choose a subset on asset keys that are materializable - that is, included on assets within the group. Valid assets: {list(asset_keys_to_ops.keys())}"
f"When attempting to create job '{job_name}', the clause '"
f"{clause}' selects asset_key '{key_str}', which comes from "
"a source asset. Source assets can't be materialized, and "
"therefore can't be subsetted into a job. Please choose a "
"subset on asset keys that are materializable - that is, "
f"included on assets within the group. Valid assets: {list(asset_keys_to_ops.keys())}"
)
if key_str not in asset_keys_to_ops:
raise DagsterInvalidDefinitionError(
f"When attempting to create job '{job_name}', the clause '{clause}' within the asset key selection did not match any asset keys. Present asset keys: {list(asset_keys_to_ops.keys())}"
f"When attempting to create job '{job_name}', the clause "
f"'{clause}' within the asset key selection did not match "
f"any asset keys. Present asset keys: {list(asset_keys_to_ops.keys())}"
)

seen_asset_keys.add(key_str)
Expand All @@ -297,7 +336,12 @@ def _parse_asset_selection(self, selection: Union[str, List[str]], job_name: str
are_keys_in_set = [key in seen_asset_keys for key in asset_key_set]
if any(are_keys_in_set) and not all(are_keys_in_set):
raise DagsterInvalidDefinitionError(
f"When building job '{job_name}', the asset '{op_name}' contains asset keys {sorted(list(asset_key_set))}, but attempted to select only {sorted(list(asset_key_set.intersection(seen_asset_keys)))}. Selecting only some of the asset keys for a particular asset is not yet supported behavior. Please select all asset keys produced by a given asset when subsetting."
f"When building job '{job_name}', the asset '{op_name}' "
f"contains asset keys {sorted(list(asset_key_set))}, but "
f"attempted to select only {sorted(list(asset_key_set.intersection(seen_asset_keys)))}. "
"Selecting only some of the asset keys for a particular "
"asset is not yet supported behavior. Please select all "
"asset keys produced by a given asset when subsetting."
)
return op_selection

Expand Down Expand Up @@ -449,19 +493,24 @@ def _validate_resource_reqs_for_asset_group(
for asset_key, output_def in asset_def.output_defs_by_asset_key.items():
if output_def.io_manager_key and output_def.io_manager_key not in present_resource_keys:
raise DagsterInvalidDefinitionError(
f"Output '{output_def.name}' with AssetKey '{asset_key}' requires io manager '{output_def.io_manager_key}' but was not provided on asset group. Provided resources: {sorted(list(present_resource_keys))}"
f"Output '{output_def.name}' with AssetKey '{asset_key}' "
f"requires io manager '{output_def.io_manager_key}' but was "
f"not provided on asset group. Provided resources: {sorted(list(present_resource_keys))}"
)

for source_asset in source_assets:
if source_asset.io_manager_key and source_asset.io_manager_key not in present_resource_keys:
raise DagsterInvalidDefinitionError(
f"SourceAsset with key {source_asset.key} requires io manager with key '{source_asset.io_manager_key}', which was not provided on AssetGroup. Provided keys: {sorted(list(present_resource_keys))}"
f"SourceAsset with key {source_asset.key} requires io manager "
f"with key '{source_asset.io_manager_key}', which was not "
f"provided on AssetGroup. Provided keys: {sorted(list(present_resource_keys))}"
)

for resource_key, resource_def in resource_defs.items():
resource_keys = set(resource_def.required_resource_keys)
missing_resource_keys = sorted(list(set(resource_keys) - present_resource_keys))
if missing_resource_keys:
raise DagsterInvalidDefinitionError(
f"AssetGroup is missing required resource keys for resource '{resource_key}'. Missing resource keys: {missing_resource_keys}"
"AssetGroup is missing required resource keys for resource '"
f"{resource_key}'. Missing resource keys: {missing_resource_keys}"
)

0 comments on commit f8319ea

Please sign in to comment.