Skip to content

Commit

Permalink
populate op names and graph name for ExternalAssetNode (#7721)
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed May 4, 2022
1 parent fbb1abb commit 59a9915
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {AssetKey} from './types';
type AssetMinimal = {
assetKey: {path: string[]};
opName: string | null;
opNames: string[];
jobNames: string[];
partitionDefinition: string | null;
repository: {name: string; location: {name: string}};
Expand Down Expand Up @@ -102,12 +103,20 @@ export const LaunchAssetExecutionButton: React.FC<{
tags: [
{
key: DagsterTag.StepSelection,
value: assets.map((o) => o.opName!).join(','),
value: ([] as string[]).concat
.apply(
[],
assets.map((o) => o.opNames),
)
.join(','),
},
],
},
runConfigData: {},
stepKeys: assets.map((o) => o.opName!),
stepKeys: ([] as string[]).concat.apply(
[],
assets.map((o) => o.opNames),
),
selector: {
repositoryLocationName: repoAddress.location,
repositoryName: repoAddress.name,
Expand Down
26 changes: 12 additions & 14 deletions js_modules/dagit/packages/core/src/pipelines/GraphExplorer.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -223,20 +223,18 @@ export const GraphExplorer: React.FC<GraphExplorerProps> = (props) => {
/>
)}
{explodeCompositesEnabled && (
<OptionsOverlay>
<Checkbox
format="switch"
label="Explode graphs"
checked={options.explodeComposites}
onChange={() => {
handleQueryChange('');
setOptions({
...options,
explodeComposites: !options.explodeComposites,
});
}}
/>
</OptionsOverlay>
<Checkbox
format="switch"
label="Explode graphs"
checked={options.explodeComposites}
onChange={() => {
handleQueryChange('');
setOptions({
...options,
explodeComposites: !options.explodeComposites,
});
}}
/>
)}
</OptionsOverlay>
)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ def __init__(
super().__init__(
id=external_asset_node.asset_key.to_string(),
assetKey=external_asset_node.asset_key,
opName=external_asset_node.op_name,
description=external_asset_node.op_description,
opName=external_asset_node.op_name,
)

@property
Expand Down Expand Up @@ -296,13 +296,6 @@ def resolve_dependencies(self, graphene_info) -> Sequence[GrapheneAssetDependenc
for dep in self._external_asset_node.dependencies
]

def resolve_graphName(self, _graphene_info) -> Optional[str]:
# todo OwenKephart - return the correct graph name here
if self._external_asset_node.op_name:
return self._external_asset_node.op_name
else:
return None

def resolve_jobNames(self, _graphene_info) -> Sequence[str]:
return self._external_asset_node.job_names

Expand Down Expand Up @@ -378,12 +371,11 @@ def resolve_op(
else:
return None

def resolve_opNames(self, _graphene_info) -> List[str]:
# todo OwenKephart: Return the correct list of op names.
if self._external_asset_node.op_name:
return [self._external_asset_node.op_name]
else:
return []
def resolve_opNames(self, _graphene_info) -> Sequence[str]:
return self._external_asset_node.op_names or []

def resolve_graphName(self, _graphene_info) -> Optional[str]:
return self._external_asset_node.graph_name

def resolve_partitionDefinition(self, _graphene_info) -> Optional[str]:
partitions_def_data = self._external_asset_node.partitions_def_data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,9 @@ def _validate_resource_reqs_for_asset_group(
):
present_resource_keys = set(resource_defs.keys())
for asset_def in asset_list:
resource_keys = set(asset_def.op.required_resource_keys or {})
resource_keys: Set[str] = set()
for op_def in asset_def.node_def.iterate_solid_defs():
resource_keys.update(set(op_def.required_resource_keys or {}))
missing_resource_keys = list(set(resource_keys) - present_resource_keys)
if missing_resource_keys:
raise DagsterInvalidDefinitionError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,8 @@ class ExternalAssetNode(
("depended_by", Sequence[ExternalAssetDependedBy]),
("compute_kind", Optional[str]),
("op_name", Optional[str]),
("op_names", Optional[Sequence[str]]),
("graph_name", Optional[str]),
("op_description", Optional[str]),
("job_names", Sequence[str]),
("partitions_def_data", Optional[ExternalPartitionsDefinitionData]),
Expand All @@ -697,6 +699,8 @@ def __new__(
depended_by: Sequence[ExternalAssetDependedBy],
compute_kind: Optional[str] = None,
op_name: Optional[str] = None,
op_names: Optional[Sequence[str]] = None,
graph_name: Optional[str] = None,
op_description: Optional[str] = None,
job_names: Optional[Sequence[str]] = None,
partitions_def_data: Optional[ExternalPartitionsDefinitionData] = None,
Expand All @@ -715,6 +719,8 @@ def __new__(
),
compute_kind=check.opt_str_param(compute_kind, "compute_kind"),
op_name=check.opt_str_param(op_name, "op_name"),
op_names=check.opt_list_param(op_names, "op_names"),
graph_name=check.opt_str_param(graph_name, "graph_name"),
op_description=check.opt_str_param(
op_description or output_description, "op_description"
),
Expand Down Expand Up @@ -771,11 +777,19 @@ def external_asset_graph_from_defs(
deps: Dict[AssetKey, Dict[AssetKey, ExternalAssetDependency]] = defaultdict(dict)
dep_by: Dict[AssetKey, Dict[AssetKey, ExternalAssetDependedBy]] = defaultdict(dict)
all_upstream_asset_keys: Set[AssetKey] = set()
op_names_by_asset_key: Dict[AssetKey, Sequence[str]] = {}

for pipeline_def in pipelines:
asset_info_by_node_output = pipeline_def.asset_layer.asset_info_by_node_output_handle
for node_output_handle, asset_info in asset_info_by_node_output.items():
output_key = asset_info.key
if output_key not in op_names_by_asset_key:
op_names_by_asset_key[output_key] = [
str(handle)
for handle in pipeline_def.asset_layer.dependency_node_handles_by_asset_key.get(
output_key, []
)
]
upstream_asset_keys = pipeline_def.asset_layer.upstream_assets_for_asset(output_key)
all_upstream_asset_keys.update(upstream_asset_keys)
node_defs_by_asset_key[output_key].append((node_output_handle, pipeline_def))
Expand Down Expand Up @@ -855,13 +869,23 @@ def external_asset_graph_from_defs(
"Only static partition and time window partitions are currently supported."
)

# if the asset is produced by an op at the top level of the graph, graph_name should be None
graph_name = None
node_handle = node_output_handle.node_handle
while node_handle.parent:
node_handle = node_handle.parent
graph_name = node_handle.name

asset_nodes.append(
ExternalAssetNode(
asset_key=asset_key,
dependencies=list(deps[asset_key].values()),
depended_by=list(dep_by[asset_key].values()),
compute_kind=node_def.tags.get("kind"),
op_name=str(node_output_handle.node_handle),
# backcompat
op_name=graph_name or next(iter(op_names_by_asset_key[asset_key]), None),
graph_name=graph_name,
op_names=op_names_by_asset_key[asset_key],
op_description=node_def.description,
job_names=job_names,
partitions_def_data=partitions_def_data,
Expand Down

0 comments on commit 59a9915

Please sign in to comment.