Skip to content

Commit

Permalink
[2/n] Interop Stack: Use AssetJobInfo to construct ExternalAssetNodes (
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Apr 25, 2022
1 parent fdb2274 commit 4d44a79
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 191 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,15 @@ def _assets_job_info_for_node(
# must be in an op (or solid)
if node_handle is None:
check.failed("Must have node_handle for non-graph NodeDefinition")

input_asset_keys: Set[AssetKey] = set()
for input_def in node_def.input_defs:
input_key = input_def.hardcoded_asset_key
if input_key:
input_asset_keys.add(input_key)
input_handle = NodeInputHandle(node_handle, input_def.name)
asset_key_by_input[input_handle] = input_key

for output_def in node_def.output_defs:
output_key = output_def.hardcoded_asset_key
if output_key:
Expand All @@ -107,6 +109,7 @@ def _assets_job_info_for_node(
asset_key_by_input.update(n_asset_key_by_input)
asset_info_by_output.update(n_asset_info_by_output)
asset_deps.update(n_asset_deps)

return asset_key_by_input, asset_info_by_output, asset_deps


Expand Down Expand Up @@ -160,7 +163,11 @@ def from_graph(graph_def: GraphDefinition) -> "AssetLayer":
asset_deps=asset_deps,
)

def upstream_assets(self, asset_key: AssetKey) -> AbstractSet[AssetKey]:
@property
def asset_info_by_node_output_handle(self) -> Mapping[NodeOutputHandle, AssetOutputInfo]:
return self._asset_info_by_node_output_handle

def upstream_assets_for_asset(self, asset_key: AssetKey) -> AbstractSet[AssetKey]:
check.invariant(
asset_key in self._asset_deps,
"AssetKey '{asset_key}' is not produced by this JobDefinition.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,24 @@
from abc import ABC, abstractmethod
from collections import defaultdict
from datetime import datetime
from typing import Any, Dict, List, Mapping, NamedTuple, Optional, Sequence, Set, Tuple, Union, cast
from typing import Any, Dict, List, Mapping, NamedTuple, Optional, Sequence, Set, Tuple, Union

from dagster import StaticPartitionsDefinition, check
from dagster.core.asset_defs import SourceAsset
from dagster.core.asset_defs.decorators import ASSET_DEPENDENCY_METADATA_KEY
from dagster.core.definitions import (
JobDefinition,
OutputDefinition,
PartitionSetDefinition,
PartitionsDefinition,
PipelineDefinition,
PresetDefinition,
RepositoryDefinition,
ScheduleDefinition,
)
from dagster.core.definitions.asset_layer import AssetOutputInfo
from dagster.core.definitions.dependency import NodeOutputHandle
from dagster.core.definitions.events import AssetKey
from dagster.core.definitions.metadata import MetadataEntry
from dagster.core.definitions.mode import DEFAULT_MODE_NAME
from dagster.core.definitions.node_definition import NodeDefinition
from dagster.core.definitions.partition import PartitionScheduleDefinition, ScheduleType
from dagster.core.definitions.schedule_definition import DefaultScheduleStatus
from dagster.core.definitions.sensor_definition import (
Expand Down Expand Up @@ -628,12 +627,6 @@ def __new__(
input_name: Optional[str] = None,
output_name: Optional[str] = None,
):
check.invariant(
(input_name is None) ^ (output_name is None),
"When constructing ExternalAssetDependency, exactly one of `input_name` and "
f"`output_name` should be supplied. AssetKey `{upstream_asset_key}` is associated with "
f"input `{input_name}` and output `{output_name}`.",
)
return super(ExternalAssetDependency, cls).__new__(
cls,
upstream_asset_key=upstream_asset_key,
Expand Down Expand Up @@ -665,12 +658,6 @@ def __new__(
input_name: Optional[str] = None,
output_name: Optional[str] = None,
):
check.invariant(
(input_name is None) ^ (output_name is None),
"When constructing ExternalAssetDependedBy, exactly one of `input_name` and "
f"`output_name` should be supplied. AssetKey `{downstream_asset_key}` is associated with "
f"input `{input_name}` and output `{output_name}`.",
)
return super(ExternalAssetDependedBy, cls).__new__(
cls,
downstream_asset_key=downstream_asset_key,
Expand Down Expand Up @@ -777,57 +764,30 @@ def external_asset_graph_from_defs(
pipelines: Sequence[PipelineDefinition], source_assets_by_key: Mapping[AssetKey, SourceAsset]
) -> Sequence[ExternalAssetNode]:
node_defs_by_asset_key: Dict[
AssetKey, List[Tuple[OutputDefinition, NodeDefinition, PipelineDefinition]]
AssetKey, List[Tuple[NodeOutputHandle, PipelineDefinition]]
] = defaultdict(list)
asset_info_by_asset_key: Dict[AssetKey, AssetOutputInfo] = dict()

deps: Dict[AssetKey, Dict[AssetKey, ExternalAssetDependency]] = defaultdict(dict)
dep_by: Dict[AssetKey, Dict[AssetKey, ExternalAssetDependedBy]] = defaultdict(dict)
all_upstream_asset_keys: Set[AssetKey] = set()

for pipeline in pipelines:
for node_def in pipeline.all_node_defs:
input_name_by_asset_key = {
id.hardcoded_asset_key: id.name
for id in node_def.input_defs
if id.hardcoded_asset_key is not None
}

output_name_by_asset_key = {
od.hardcoded_asset_key: od.name
for od in node_def.output_defs
if od.hardcoded_asset_key is not None
}

node_upstream_asset_keys = set(
filter(None, (id.hardcoded_asset_key for id in node_def.input_defs))
)
all_upstream_asset_keys.update(node_upstream_asset_keys)

for output_def in node_def.output_defs:
output_asset_key = output_def.hardcoded_asset_key
if not output_asset_key:
continue

node_defs_by_asset_key[output_asset_key].append((output_def, node_def, pipeline))

# if no deps specified, assume depends on all inputs and no outputs
asset_deps = cast(
Set[AssetKey], (output_def.metadata or {}).get(ASSET_DEPENDENCY_METADATA_KEY)
for pipeline_def in pipelines:
asset_info_by_node_output = pipeline_def.asset_layer.asset_info_by_node_output_handle
for node_output_handle, asset_info in asset_info_by_node_output.items():
output_key = asset_info.key
upstream_asset_keys = pipeline_def.asset_layer.upstream_assets_for_asset(output_key)
all_upstream_asset_keys.update(upstream_asset_keys)
node_defs_by_asset_key[output_key].append((node_output_handle, pipeline_def))
asset_info_by_asset_key[output_key] = asset_info
for upstream_key in upstream_asset_keys:
deps[output_key][upstream_key] = ExternalAssetDependency(
upstream_asset_key=upstream_key
)
dep_by[upstream_key][output_key] = ExternalAssetDependedBy(
downstream_asset_key=output_key
)
if asset_deps is None:
asset_deps = node_upstream_asset_keys

for upstream_asset_key in asset_deps:
deps[output_asset_key][upstream_asset_key] = ExternalAssetDependency(
upstream_asset_key=upstream_asset_key,
input_name=input_name_by_asset_key.get(upstream_asset_key),
output_name=output_name_by_asset_key.get(upstream_asset_key),
)
dep_by[upstream_asset_key][output_asset_key] = ExternalAssetDependedBy(
downstream_asset_key=output_asset_key,
input_name=input_name_by_asset_key.get(upstream_asset_key),
output_name=output_name_by_asset_key.get(upstream_asset_key),
)

asset_keys_without_definitions = all_upstream_asset_keys.difference(
node_defs_by_asset_key.keys()
).difference(source_assets_by_key.keys())
Expand Down Expand Up @@ -865,39 +825,43 @@ def external_asset_graph_from_defs(
)

for asset_key, node_tuple_list in node_defs_by_asset_key.items():
output_def, node_def, _ = node_tuple_list[0]
job_names = [job_def.name for _, _, job_def in node_tuple_list]
node_output_handle, job_def = node_tuple_list[0]

node_def = job_def.graph.get_solid(node_output_handle.node_handle).definition
output_def = node_def.output_def_named(node_output_handle.output_name)

asset_info = asset_info_by_asset_key[asset_key]

job_names = [job_def.name for _, job_def in node_tuple_list]

# temporary workaround to retrieve asset partition definition from job
partitions_def_data: Optional[
Union[
ExternalTimeWindowPartitionsDefinitionData, ExternalStaticPartitionsDefinitionData
ExternalTimeWindowPartitionsDefinitionData,
ExternalStaticPartitionsDefinitionData,
]
] = None

if output_def.asset_partitions_def:
partitions_def = output_def.asset_partitions_def
if partitions_def:
if isinstance(partitions_def, TimeWindowPartitionsDefinition):
partitions_def_data = external_time_window_partitions_definition_from_def(
partitions_def
)
elif isinstance(partitions_def, StaticPartitionsDefinition):
partitions_def_data = external_static_partitions_definition_from_def(
partitions_def
)
else:
raise DagsterInvalidDefinitionError(
"Only static partition and time window partitions are currently supported."
)
partitions_def = asset_info.partitions_def
if partitions_def:
if isinstance(partitions_def, TimeWindowPartitionsDefinition):
partitions_def_data = external_time_window_partitions_definition_from_def(
partitions_def
)
elif isinstance(partitions_def, StaticPartitionsDefinition):
partitions_def_data = external_static_partitions_definition_from_def(partitions_def)
else:
raise DagsterInvalidDefinitionError(
"Only static partition and time window partitions are currently supported."
)

asset_nodes.append(
ExternalAssetNode(
asset_key=asset_key,
dependencies=list(deps[asset_key].values()),
depended_by=list(dep_by[asset_key].values()),
compute_kind=node_def.tags.get("kind"),
op_name=node_def.name,
op_name=str(node_output_handle.node_handle),
op_description=node_def.description,
job_names=job_names,
partitions_def_data=partitions_def_data,
Expand Down

0 comments on commit 4d44a79

Please sign in to comment.