Skip to content

Commit

Permalink
define_asset_job accept AssetKey and AssetsDefinition selections
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed Jan 9, 2023
1 parent 313481f commit 7e559fa
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 17 deletions.
Expand Up @@ -183,7 +183,11 @@ def _selection_from_string(string: str) -> "AssetSelection":

def define_asset_job(
name: str,
selection: Optional[Union[str, Sequence[str], "AssetSelection"]] = None,
selection: Optional[
Union[
str, Sequence[str], Sequence[AssetKey], Sequence["AssetsDefinition"], "AssetSelection"
]
] = None,
config: Optional[Union[ConfigMapping, Mapping[str, Any], "PartitionedConfig[object]"]] = None,
description: Optional[str] = None,
tags: Optional[Mapping[str, Any]] = None,
Expand All @@ -196,13 +200,17 @@ def define_asset_job(
Args:
name (str):
The name for the job.
selection (Union[str, Sequence[str], AssetSelection]):
A selection over the set of Assets available in your code location. This can be a string
such as "my_asset*", a list of such strings (representing a union of these selections),
or an AssetSelection object.
selection (Union[str, Sequence[str], Sequence[AssetKey], Sequence[AssetsDefinition], AssetSelection]):
The assets that will be materialized when the job is run.
This selection will be resolved to a set of Assets once the code location is loaded with a
set of AssetsDefinitions.
The selected assets must all be included in the assets that are passed to the assets
argument of the Definitions object that this job is included on.
The string "my_asset*" selects my_asset and all downstream assets within the code
location. A list of strings represents the union of all assets selected by strings
within the list.
The selection will be resolved to a set of assets once the when location is loaded.
config:
Describes how the Job is parameterized at runtime.
Expand Down Expand Up @@ -250,6 +258,16 @@ def asset1():
jobs=[define_asset_job("all_assets")],
)
# A job that targets a single asset
@asset
def asset1():
...
defs = Definitions(
assets=[asset1],
jobs=[define_asset_job("all_assets", selection=[asset1])],
)
# A job that targets all the assets in a group:
defs = Definitions(
assets=assets,
Expand All @@ -267,21 +285,33 @@ def asset1():
resources={"slack_client": prod_slack_client},
)
"""
from dagster._core.definitions import AssetSelection
from dagster._core.definitions import AssetsDefinition, AssetSelection

selection = check.opt_inst_param(
selection, "selection", (str, list, AssetSelection), default=AssetSelection.all()
)
# convert string-based selections to AssetSelection objects
if isinstance(selection, str):
selection = _selection_from_string(selection)
elif isinstance(selection, list):
check.list_param(selection, "selection", of_type=str)
selection = reduce(operator.or_, [_selection_from_string(s) for s in selection])
resolved_selection: AssetSelection
if selection is None:
resolved_selection = AssetSelection.all()
elif isinstance(selection, str):
resolved_selection = _selection_from_string(selection)
elif isinstance(selection, AssetSelection):
resolved_selection = selection
elif isinstance(selection, list) and all(isinstance(el, str) for el in selection):
resolved_selection = reduce(
operator.or_, [_selection_from_string(cast(str, s)) for s in selection]
)
elif isinstance(selection, list) and all(isinstance(el, AssetsDefinition) for el in selection):
resolved_selection = AssetSelection.assets(*cast(Sequence[AssetsDefinition], selection))
elif isinstance(selection, list) and all(isinstance(el, AssetKey) for el in selection):
resolved_selection = AssetSelection.keys(*cast(Sequence[AssetKey], selection))
else:
check.failed(
"selection argument must be one of str, Sequence[str], Sequence[AssetKey],"
f" Sequence[AssetsDefinition], AssetSelection. Was {type(selection)}."
)

return UnresolvedAssetJobDefinition(
name=name,
selection=cast(AssetSelection, selection),
selection=resolved_selection,
config=config,
description=description,
tags=tags,
Expand Down
Expand Up @@ -332,6 +332,14 @@ def c(b):
"start,a,b,c,d",
["core", "models"],
),
(
[
AssetKey.from_user_string("core/models/a"),
AssetKey.from_user_string("core/models/b"),
],
"a,b",
["core", "models"],
),
],
)
def test_define_selection_job(job_selection, expected_assets, use_multi, prefixes):
Expand Down Expand Up @@ -408,6 +416,30 @@ def test_define_selection_job(job_selection, expected_assets, use_multi, prefixe
assert result.output_for_node(output, "result") == value


def test_define_selection_job_assets_definition_selection():
@asset
def asset1():
...

@asset
def asset2():
...

@asset
def asset3():
...

all_assets = [asset1, asset2, asset3]

job1 = define_asset_job("job1", selection=[asset1, asset2]).resolve(
all_assets, source_assets=[]
)
asset_keys = list(job1.asset_layer.asset_keys)
assert len(asset_keys) == 2
assert set(asset_keys) == {asset1.key, asset2.key}
job1.execute_in_process()


def test_source_asset_selection():
@asset
def a(source):
Expand Down

0 comments on commit 7e559fa

Please sign in to comment.