Skip to content

Commit

Permalink
Allow repositories to contain asset definitions and source assets for…
Browse files Browse the repository at this point in the history
… the same asset key (#7781)

* validate collisions

* do not error on source asset collision
  • Loading branch information
sryza committed May 7, 2022
1 parent 68a99d6 commit cba9144
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ def __add__(self, other: "AssetGroup") -> "AssetGroup":

if self.resource_defs != other.resource_defs:
raise DagsterInvalidDefinitionError(
"Can't add asset groups together with different resource definition dictionarys"
"Can't add asset groups together with different resource definition dictionaries"
)

if self.executor_def != other.executor_def:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
SensorDefinition,
)
from dagster.core.definitions.time_window_partitions import TimeWindowPartitionsDefinition
from dagster.core.errors import DagsterInvalidDefinitionError, DagsterInvariantViolationError
from dagster.core.errors import DagsterInvalidDefinitionError
from dagster.core.snap import PipelineSnapshot
from dagster.serdes import DefaultNamedTupleSerializer, whitelist_for_serdes
from dagster.utils.error import SerializableErrorInfo
Expand Down Expand Up @@ -817,26 +817,21 @@ def external_asset_graph_from_defs(
]

for source_asset in source_assets_by_key.values():
if source_asset.key in node_defs_by_asset_key:
raise DagsterInvariantViolationError(
f"Asset with key {source_asset.key.to_string()} is defined both as a source asset"
" and as a non-source asset"
)

# TODO: For now we are dropping partition metadata entries
metadata_entries = [
entry for entry in source_asset.metadata_entries if isinstance(entry, MetadataEntry)
]
asset_nodes.append(
ExternalAssetNode(
asset_key=source_asset.key,
dependencies=list(deps[source_asset.key].values()),
depended_by=list(dep_by[source_asset.key].values()),
job_names=[],
op_description=source_asset.description,
metadata_entries=metadata_entries,
if source_asset.key not in node_defs_by_asset_key:
# TODO: For now we are dropping partition metadata entries
metadata_entries = [
entry for entry in source_asset.metadata_entries if isinstance(entry, MetadataEntry)
]
asset_nodes.append(
ExternalAssetNode(
asset_key=source_asset.key,
dependencies=list(deps[source_asset.key].values()),
depended_by=list(dep_by[source_asset.key].values()),
job_names=[],
op_description=source_asset.description,
metadata_entries=metadata_entries,
)
)
)

for asset_key, node_tuple_list in node_defs_by_asset_key.items():
node_output_handle, job_def = node_tuple_list[0]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,6 @@
import pytest

from dagster import (
AssetKey,
AssetsDefinition,
DagsterInvariantViolationError,
GraphOut,
Out,
graph,
op,
)
from dagster import AssetKey, AssetsDefinition, GraphOut, Out, graph, op
from dagster.core.asset_defs import AssetIn, SourceAsset, asset, build_assets_job, multi_asset
from dagster.core.definitions.metadata import MetadataEntry, MetadataValue
from dagster.core.host_representation.external_data import (
Expand Down Expand Up @@ -522,21 +514,6 @@ def foo(bar):
]


def test_source_asset_conflicts_with_asset():
bar_source_asset = SourceAsset(key=AssetKey("bar"), description="def")

@asset
def bar():
pass

job1 = build_assets_job("job1", [bar])

with pytest.raises(DagsterInvariantViolationError):
external_asset_graph_from_defs(
[job1], source_assets_by_key={AssetKey("bar"): bar_source_asset}
)


def test_nasty_nested_graph_asset():
@op
def add_one(i):
Expand Down

0 comments on commit cba9144

Please sign in to comment.