Skip to content

Commit

Permalink
fallback logic for resolvers (#7766)
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed May 11, 2022
1 parent 3125227 commit 15e9eb5
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,11 @@ def __new__(
output_description: Optional[str] = None,
metadata_entries: Optional[Sequence[MetadataEntry]] = None,
):
# backcompat logic to handle ExternalAssetNodes serialized without op_names/graph_name
if not op_names:
op_names = list(filter(None, [op_name]))
if not graph_name:
graph_name = op_name
return super(ExternalAssetNode, cls).__new__(
cls,
asset_key=check.inst_param(asset_key, "asset_key", AssetKey),
Expand Down Expand Up @@ -879,7 +884,9 @@ def external_asset_graph_from_defs(
depended_by=list(dep_by[asset_key].values()),
compute_kind=node_def.tags.get("kind"),
# backcompat
op_name=graph_name or next(iter(op_names_by_asset_key[asset_key]), None),
op_name=graph_name
or next(iter(op_names_by_asset_key[asset_key]), None)
or node_def.name,
graph_name=graph_name,
op_names=op_names_by_asset_key[asset_key],
op_description=node_def.description,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import pytest

from dagster import AssetKey, AssetsDefinition, GraphOut, Out, graph, op
from dagster import AssetKey, AssetsDefinition, GraphOut, In, Out, graph, job, op
from dagster.core.asset_defs import AssetIn, SourceAsset, asset, build_assets_job, multi_asset
from dagster.core.definitions.metadata import MetadataEntry, MetadataValue
from dagster.core.host_representation.external_data import (
ExternalAssetDependedBy,
ExternalAssetDependency,
Expand Down Expand Up @@ -267,7 +264,6 @@ def assets():
]


@pytest.mark.skip("Temporarily break inter-op dependency in external data")
def test_inter_op_dependency():
@asset
def in1():
Expand All @@ -284,6 +280,7 @@ def downstream(only_in, mixed, only_out): # pylint: disable=unused-argument
@multi_asset(
outs={"only_in": Out(), "mixed": Out(), "only_out": Out()},
internal_asset_deps={
"only_in": {AssetKey("in1"), AssetKey("in2")},
"mixed": {AssetKey("in1"), AssetKey("only_in")},
"only_out": {AssetKey("only_in"), AssetKey("mixed")},
},
Expand Down Expand Up @@ -366,13 +363,6 @@ def assets(in1, in2): # pylint: disable=unused-argument
op_description=None,
job_names=["assets_job"],
output_name="mixed",
metadata_entries=[
MetadataEntry(
label=".dagster/asset_deps",
description=None,
entry_data=MetadataValue.text("[set] (unserializable)"),
)
],
),
ExternalAssetNode(
asset_key=AssetKey(["only_in"]),
Expand All @@ -383,9 +373,7 @@ def assets(in1, in2): # pylint: disable=unused-argument
depended_by=[
ExternalAssetDependedBy(downstream_asset_key=AssetKey(["downstream"])),
ExternalAssetDependedBy(downstream_asset_key=AssetKey(["mixed"])),
ExternalAssetDependedBy(
downstream_asset_key=AssetKey(["only_out"]), output_name="only_in"
),
ExternalAssetDependedBy(downstream_asset_key=AssetKey(["only_out"])),
],
op_name="assets",
graph_name=None,
Expand All @@ -410,13 +398,49 @@ def assets(in1, in2): # pylint: disable=unused-argument
op_description=None,
job_names=["assets_job"],
output_name="only_out",
metadata_entries=[
MetadataEntry(
label=".dagster/asset_deps",
description=None,
entry_data=MetadataValue.text("[set] (unserializable)"),
)
],
),
]


def test_explicit_asset_keys():
@op(out={"a": Out(asset_key=AssetKey("a"))})
def a_op():
return 1

@op(
ins={"a": In(asset_key=AssetKey("a"))},
out={"b": Out(asset_key=AssetKey("b"))},
)
def b_op(a):
return a + 1

@job
def assets_job():
b_op(a_op())

external_asset_nodes = external_asset_graph_from_defs([assets_job], source_assets_by_key={})
assert external_asset_nodes == [
ExternalAssetNode(
asset_key=AssetKey("a"),
op_name="a_op",
graph_name=None,
op_names=["a_op"],
op_description=None,
dependencies=[],
depended_by=[ExternalAssetDependedBy(AssetKey("b"))],
job_names=["assets_job"],
output_name="a",
),
ExternalAssetNode(
asset_key=AssetKey("b"),
op_name="b_op",
graph_name=None,
op_names=["b_op"],
op_description=None,
dependencies=[ExternalAssetDependency(AssetKey("a"))],
depended_by=[],
job_names=["assets_job"],
output_name="b",
),
]

Expand Down

0 comments on commit 15e9eb5

Please sign in to comment.