Skip to content

Commit

Permalink
allow multiple asset groups on a repository (#7649)
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed Apr 29, 2022
1 parent 8fbd524 commit 596515d
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
AbstractSet,
Callable,
Dict,
Iterable,
List,
Mapping,
NamedTuple,
Expand Down Expand Up @@ -446,6 +447,10 @@ def upstream_assets_for_asset(self, asset_key: AssetKey) -> AbstractSet[AssetKey
def dependency_node_handles_by_asset_key(self) -> Mapping[AssetKey, Sequence[NodeOutputHandle]]:
return self._dependency_node_handles_by_asset_key

@property
def asset_keys(self) -> Iterable[AssetKey]:
return self._dependency_node_handles_by_asset_key.keys()

def asset_key_for_input(self, node_handle: NodeHandle, input_name: str) -> Optional[AssetKey]:
return self._asset_keys_by_node_input_handle.get(NodeInputHandle(node_handle, input_name))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ def from_list(
schedules: Dict[str, ScheduleDefinition] = {}
sensors: Dict[str, SensorDefinition] = {}
source_assets: Dict[AssetKey, SourceAsset] = {}
encountered_asset_group = False
combined_asset_group = None
for definition in repository_definitions:
if isinstance(definition, PipelineDefinition):
if (
Expand Down Expand Up @@ -694,25 +694,22 @@ def from_list(
pipelines_or_jobs[coerced.name] = coerced

elif isinstance(definition, AssetGroup):
if encountered_asset_group:
raise DagsterInvalidDefinitionError(
"When constructing repository, attempted to pass multiple AssetGroups. "
"There can only be one AssetGroup per repository."
)

encountered_asset_group = True
asset_group = definition

for job_def in asset_group.get_base_jobs():
pipelines_or_jobs[job_def.name] = job_def

source_assets = {
source_asset.key: source_asset for source_asset in asset_group.source_assets
}

if combined_asset_group:
combined_asset_group += definition
else:
combined_asset_group = definition
else:
check.failed(f"Unexpected repository entry {definition}")

if combined_asset_group:
for job_def in combined_asset_group.get_base_jobs():
pipelines_or_jobs[job_def.name] = job_def

source_assets = {
source_asset.key: source_asset
for source_asset in combined_asset_group.source_assets
}

pipelines: Dict[str, PipelineDefinition] = {}
jobs: Dict[str, JobDefinition] = {}
for name, pipeline_or_job in pipelines_or_jobs.items():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,18 +442,6 @@ def asset_foo():
)


def test_repo_with_multiple_asset_groups():
with pytest.raises(
DagsterInvalidDefinitionError,
match="When constructing repository, attempted to pass multiple "
"AssetGroups. There can only be one AssetGroup per repository.",
):

@repository
def the_repo(): # pylint: disable=unused-variable
return [AssetGroup(assets=[]), AssetGroup(assets=[])]


def test_job_with_reserved_name():
@graph
def the_graph():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
SensorDefinition,
SolidDefinition,
SourceAsset,
asset,
build_schedule_from_partitioned_job,
daily_partitioned_config,
daily_schedule,
Expand Down Expand Up @@ -619,3 +620,27 @@ def my_repo():
return [AssetGroup(assets=[], source_assets=[foo, bar])]

assert my_repo.source_assets_by_key == {AssetKey("foo"): foo, AssetKey("bar"): bar}


def test_multiple_asset_groups_one_repo():
@asset
def asset1():
...

@asset
def asset2():
...

group1 = AssetGroup(assets=[asset1], source_assets=[SourceAsset(key=AssetKey("foo"))])
group2 = AssetGroup(assets=[asset2], source_assets=[SourceAsset(key=AssetKey("bar"))])

@repository
def my_repo():
return [group1, group2]

assert my_repo.source_assets_by_key.keys() == {AssetKey("foo"), AssetKey("bar")}
assert len(my_repo.get_all_jobs()) == 1
assert set(my_repo.get_all_jobs()[0].asset_layer.asset_keys) == {
AssetKey(["asset1"]),
AssetKey(["asset2"]),
}

0 comments on commit 596515d

Please sign in to comment.