Skip to content

Commit

Permalink
Error when amultiple asset groups are used in one repo, when a job is…
Browse files Browse the repository at this point in the history
… passed in with the asset group reserved name (#6891)
  • Loading branch information
dpeng817 committed Mar 3, 2022
1 parent e3e2143 commit 919fb31
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 7 deletions.
4 changes: 2 additions & 2 deletions python_modules/dagster/dagster/core/asset_defs/asset_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ def __new__(
executor_def=executor_def,
)

@property
def all_assets_job_name(self) -> str:
@staticmethod
def all_assets_job_name() -> str:
"""The name of the mega-job that the provided list of assets is coerced into."""
return "__ASSET_GROUP"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,10 @@ def from_list(cls, repository_definitions):
target_type=definition.target_type, target=definition.describe_target()
)
)
if definition.name == AssetGroup.all_assets_job_name():
raise DagsterInvalidDefinitionError(
f"Attempted to provide job called {AssetGroup.all_assets_job_name()} to repository, which is a reserved name. Please rename the job."
)
pipelines_or_jobs[definition.name] = definition
elif isinstance(definition, PartitionSetDefinition):
if definition.name in partition_sets:
Expand Down Expand Up @@ -627,8 +631,12 @@ def from_list(cls, repository_definitions):

elif isinstance(definition, AssetGroup):
asset_group = definition
pipelines_or_jobs[asset_group.all_assets_job_name] = build_assets_job(
asset_group.all_assets_job_name,
if asset_group.all_assets_job_name() in pipelines_or_jobs:
raise DagsterInvalidDefinitionError(
"When constructing repository, attempted to pass multiple AssetGroups. There can only be one AssetGroup per repository."
)
pipelines_or_jobs[asset_group.all_assets_job_name()] = build_assets_job(
asset_group.all_assets_job_name(),
assets=asset_group.assets,
source_assets=asset_group.source_assets,
resource_defs=asset_group.resource_defs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
IOManager,
Out,
fs_asset_io_manager,
graph,
in_process_executor,
io_manager,
job,
mem_io_manager,
repository,
resource,
Expand Down Expand Up @@ -36,7 +38,7 @@ def the_repo():

assert len(the_repo.get_all_jobs()) == 1
asset_group_underlying_job = the_repo.get_all_jobs()[0]
assert asset_group_underlying_job.name == group.all_assets_job_name
assert asset_group_underlying_job.name == group.all_assets_job_name()

result = asset_group_underlying_job.execute_in_process()
assert result.success
Expand Down Expand Up @@ -71,7 +73,7 @@ def the_repo():
return [group]

asset_group_underlying_job = the_repo.get_all_jobs()[0]
assert asset_group_underlying_job.name == group.all_assets_job_name
assert asset_group_underlying_job.name == group.all_assets_job_name()

result = asset_group_underlying_job.execute_in_process()
assert result.success
Expand All @@ -93,7 +95,7 @@ def the_repo():
return [group]

asset_group_underlying_job = the_repo.get_all_jobs()[0]
assert asset_group_underlying_job.name == group.all_assets_job_name
assert asset_group_underlying_job.name == group.all_assets_job_name()

result = asset_group_underlying_job.execute_in_process()
assert result.success
Expand Down Expand Up @@ -375,3 +377,32 @@ def asset_foo():
group.resource_defs["io_manager"] # pylint: disable=comparison-with-callable
== fs_asset_io_manager
)
assert group.resource_defs["io_manager"] == fs_asset_io_manager


def test_repo_with_multiple_asset_groups():
with pytest.raises(
DagsterInvalidDefinitionError,
match="When constructing repository, attempted to pass multiple "
"AssetGroups. There can only be one AssetGroup per repository.",
):

@repository
def the_repo():
return [AssetGroup(assets=[]), AssetGroup(assets=[])]


def test_job_with_reserved_name():
@graph
def the_graph():
pass

the_job = the_graph.to_job(name=AssetGroup.all_assets_job_name())
with pytest.raises(
DagsterInvalidDefinitionError,
match=f"Attempted to provide job called {AssetGroup.all_assets_job_name()} to repository, which is a reserved name.",
):

@repository
def the_repo():
return [the_job]

0 comments on commit 919fb31

Please sign in to comment.