Skip to content

Commit

Permalink
[refactor] misc core solid -> node renames (#12368)
Browse files Browse the repository at this point in the history
### Summary & Motivation

- GraphDefinition.get_solid -> get_node
- (FanIn)InputPointer.solid_name -> node_name
- OutputPointer.solid_name -> node_name

One per commit

### How I Tested These Changes

BK
  • Loading branch information
smackesey committed Feb 15, 2023
1 parent ec70f8a commit 47dc694
Show file tree
Hide file tree
Showing 12 changed files with 41 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,20 +94,20 @@ def _resolve_output_to_destinations(
if mapping.graph_output_name != output_name:
continue
output_pointer = mapping.maps_from
output_node = node_def.node_named(output_pointer.solid_name)
output_node = node_def.node_named(output_pointer.node_name)

all_destinations.extend(
_resolve_output_to_destinations(
output_pointer.output_name,
output_node.definition,
NodeHandle(output_pointer.solid_name, parent=handle),
NodeHandle(output_pointer.node_name, parent=handle),
)
)

output_def = output_node.definition.output_def_named(output_pointer.output_name)
downstream_input_handles = (
node_def.dependency_structure.output_to_downstream_inputs_for_node(
output_pointer.solid_name
output_pointer.node_name
).get(NodeOutput(output_node, output_def), [])
)
for input_handle in downstream_input_handles:
Expand Down Expand Up @@ -152,7 +152,7 @@ def _build_graph_dependencies(
)
outputs_by_graph_handle[curr_node_handle] = {
mapping.graph_output_name: NodeOutputHandle(
NodeHandle(mapping.maps_from.solid_name, parent=curr_node_handle),
NodeHandle(mapping.maps_from.node_name, parent=curr_node_handle),
mapping.maps_from.output_name,
)
for mapping in sub_node.definition.output_mappings
Expand Down
2 changes: 1 addition & 1 deletion python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -1105,7 +1105,7 @@ def _validate_graph_def(graph_def: "GraphDefinition", prefix: Optional[Sequence[

# set of nodes that have outputs mapped to a graph output
mapped_output_nodes = {
output_mapping.maps_from.solid_name for output_mapping in graph_def.output_mappings
output_mapping.maps_from.node_name for output_mapping in graph_def.output_mappings
}

# leaf nodes which do not have an associated mapped output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ def node_named(self, name: str) -> Node:

return self._node_dict[name]

def get_solid(self, handle: NodeHandle) -> Node:
def get_node(self, handle: NodeHandle) -> Node:
check.inst_param(handle, "handle", NodeHandle)
current = handle
lineage: List[str] = []
Expand Down Expand Up @@ -429,7 +429,7 @@ def resolve_output_to_origin(

mapping = self.get_output_mapping(output_name)
check.invariant(mapping, "Can only resolve outputs for valid output names")
mapped_node = self.node_named(mapping.maps_from.solid_name)
mapped_node = self.node_named(mapping.maps_from.node_name)
return mapped_node.definition.resolve_output_to_origin(
mapping.maps_from.output_name,
NodeHandle(mapped_node.name, handle),
Expand All @@ -439,7 +439,7 @@ def resolve_output_to_origin_op_def(self, output_name: str) -> "OpDefinition":
mapping = self.get_output_mapping(output_name)
check.invariant(mapping, "Can only resolve outputs for valid output names")
return self.node_named(
mapping.maps_from.solid_name
mapping.maps_from.node_name
).definition.resolve_output_to_origin_op_def(output_name)

def default_value_for_input(self, input_name: str) -> object:
Expand All @@ -451,7 +451,7 @@ def default_value_for_input(self, input_name: str) -> object:

mapping = self.get_input_mapping(input_name)
check.invariant(mapping, "Can only resolve inputs for valid input names")
mapped_node = self.node_named(mapping.maps_to.solid_name)
mapped_node = self.node_named(mapping.maps_to.node_name)

return mapped_node.definition.default_value_for_input(mapping.maps_to.input_name)

Expand All @@ -464,7 +464,7 @@ def input_has_default(self, input_name: str) -> bool:

mapping = self.get_input_mapping(input_name)
check.invariant(mapping, "Can only resolve inputs for valid input names")
mapped_node = self.node_named(mapping.maps_to.solid_name)
mapped_node = self.node_named(mapping.maps_to.node_name)

return mapped_node.definition.input_has_default(mapping.maps_to.input_name)

Expand All @@ -484,7 +484,7 @@ def config_schema(self) -> Optional[IDefinitionConfigSchema]:

def input_supports_dynamic_output_dep(self, input_name: str) -> bool:
mapping = self.get_input_mapping(input_name)
target_node = mapping.maps_to.solid_name
target_node = mapping.maps_to.node_name
# check if input mapped to solid which is downstream of another dynamic output within
if self.dependency_structure.is_dynamic_mapped(target_node):
return False
Expand Down Expand Up @@ -760,10 +760,10 @@ def resolve_input_to_destinations(
continue
# recurse into graph structure
all_destinations += self.node_named(
mapping.maps_to.solid_name
mapping.maps_to.node_name
).definition.resolve_input_to_destinations(
NodeInputHandle(
NodeHandle(mapping.maps_to.solid_name, parent=input_handle.node_handle),
NodeHandle(mapping.maps_to.node_name, parent=input_handle.node_handle),
mapping.maps_to.input_name,
),
)
Expand Down Expand Up @@ -944,12 +944,12 @@ def _validate_out_mappings(
output_defs: List[OutputDefinition] = []
for mapping in output_mappings:
if isinstance(mapping, OutputMapping):
target_solid = solid_dict.get(mapping.maps_from.solid_name)
target_solid = solid_dict.get(mapping.maps_from.node_name)
if target_solid is None:
raise DagsterInvalidDefinitionError(
"In {class_name} '{name}' output mapping references node "
"'{solid_name}' which it does not contain.".format(
name=name, solid_name=mapping.maps_from.solid_name, class_name=class_name
name=name, solid_name=mapping.maps_from.node_name, class_name=class_name
)
)
if not target_solid.has_output(mapping.maps_from.output_name):
Expand All @@ -976,7 +976,7 @@ def _validate_out_mappings(
raise DagsterInvalidDefinitionError(
"In {class_name} '{name}' output '{mapping.graph_output_name}' of type"
" {mapping.dagster_type.display_name} maps from"
" {mapping.maps_from.solid_name}.{mapping.maps_from.output_name} of different"
" {mapping.maps_from.node_name}.{mapping.maps_from.output_name} of different"
" type {target_output.dagster_type.display_name}. OutputMapping source and"
" destination must have the same type.".format(
class_name=class_name,
Expand Down
24 changes: 7 additions & 17 deletions python_modules/dagster/dagster/_core/definitions/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,36 +360,28 @@ def _checked_inferred_type(inferred: InferredInputProps) -> DagsterType:
return resolved_type


class InputPointer(NamedTuple("_InputPointer", [("solid_name", str), ("input_name", str)])):
def __new__(cls, solid_name: str, input_name: str):
class InputPointer(NamedTuple("_InputPointer", [("node_name", str), ("input_name", str)])):
def __new__(cls, node_name: str, input_name: str):
return super(InputPointer, cls).__new__(
cls,
check.str_param(solid_name, "solid_name"),
check.str_param(node_name, "node_name"),
check.str_param(input_name, "input_name"),
)

@property
def node_name(self) -> str:
return self.solid_name


class FanInInputPointer(
NamedTuple(
"_FanInInputPointer", [("solid_name", str), ("input_name", str), ("fan_in_index", int)]
"_FanInInputPointer", [("node_name", str), ("input_name", str), ("fan_in_index", int)]
)
):
def __new__(cls, solid_name: str, input_name: str, fan_in_index: int):
def __new__(cls, node_name: str, input_name: str, fan_in_index: int):
return super(FanInInputPointer, cls).__new__(
cls,
check.str_param(solid_name, "solid_name"),
check.str_param(node_name, "node_name"),
check.str_param(input_name, "input_name"),
check.int_param(fan_in_index, "fan_in_index"),
)

@property
def node_name(self) -> str:
return self.solid_name


class InputMapping(NamedTuple):
"""Defines an input mapping for a graph.
Expand Down Expand Up @@ -450,9 +442,7 @@ def maps_to_fan_in(self) -> bool:

def describe(self) -> str:
idx = self.maps_to.fan_in_index if isinstance(self.maps_to, FanInInputPointer) else ""
return (
f"{self.graph_input_name} -> {self.maps_to.solid_name}:{self.maps_to.input_name}{idx}"
)
return f"{self.graph_input_name} -> {self.maps_to.node_name}:{self.maps_to.input_name}{idx}"

def get_definition(self) -> "InputDefinition":
return InputDefinition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -830,14 +830,14 @@ def get_subselected_graph_definition(
# filter out unselected input/output mapping
new_input_mappings = list(
filter(
lambda input_mapping: input_mapping.maps_to.solid_name
lambda input_mapping: input_mapping.maps_to.node_name
in [name for name, _ in selected_nodes],
graph._input_mappings, # pylint: disable=protected-access
)
)
new_output_mappings = list(
filter(
lambda output_mapping: output_mapping.maps_from.solid_name
lambda output_mapping: output_mapping.maps_from.node_name
in [name for name, _ in selected_nodes],
graph._output_mappings, # pylint: disable=protected-access
)
Expand Down
10 changes: 3 additions & 7 deletions python_modules/dagster/dagster/_core/definitions/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,18 +251,14 @@ def is_dynamic(self) -> bool:
return True


class OutputPointer(NamedTuple("_OutputPointer", [("solid_name", str), ("output_name", str)])):
def __new__(cls, solid_name: str, output_name: Optional[str] = None):
class OutputPointer(NamedTuple("_OutputPointer", [("node_name", str), ("output_name", str)])):
def __new__(cls, node_name: str, output_name: Optional[str] = None):
return super(OutputPointer, cls).__new__(
cls,
check.str_param(solid_name, "solid_name"),
check.str_param(node_name, "node_name"),
check.opt_str_param(output_name, "output_name", DEFAULT_OUTPUT),
)

@property
def node_name(self):
return self.solid_name


class OutputMapping(NamedTuple):
"""Defines an output mapping for a graph.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ def has_solid_def(self, name: str) -> bool:
return name in self._all_node_defs

def get_solid(self, handle: NodeHandle) -> Node:
return self._graph_def.get_solid(handle)
return self._graph_def.get_node(handle)

def has_solid_named(self, name: str) -> bool:
return self._graph_def.has_node_named(name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def core_execute_in_process(

def _check_top_level_inputs(job_def: JobDefinition) -> None:
for input_mapping in job_def.graph.input_mappings:
node = job_def.graph.node_named(input_mapping.maps_to.solid_name)
node = job_def.graph.node_named(input_mapping.maps_to.node_name)
top_level_input_name = input_mapping.graph_input_name
input_name = input_mapping.maps_to.input_name
_check_top_level_inputs_for_node(
Expand All @@ -111,7 +111,7 @@ def _check_top_level_inputs_for_node(
if isinstance(node.definition, GraphDefinition):
graph_def = cast(GraphDefinition, node.definition)
for input_mapping in graph_def.input_mappings:
next_node = graph_def.node_named(input_mapping.maps_to.solid_name)
next_node = graph_def.node_named(input_mapping.maps_to.node_name)
input_name = input_mapping.maps_to.input_name
_check_top_level_inputs_for_node(
next_node,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def output_value(self, output_name: str = DEFAULT_OUTPUT) -> object:
)
# Resolve the first layer of mapping
output_mapping = graph_def.get_output_mapping(output_name)
mapped_node = graph_def.node_named(output_mapping.maps_from.solid_name)
mapped_node = graph_def.node_named(output_mapping.maps_from.node_name)
origin_output_def, origin_handle = mapped_node.definition.resolve_output_to_origin(
output_mapping.maps_from.output_name,
NodeHandle(mapped_node.name, None),
Expand All @@ -97,7 +97,7 @@ def output_value(self, output_name: str = DEFAULT_OUTPUT) -> object:
def output_for_node(self, node_str: str, output_name: str = DEFAULT_OUTPUT) -> object:
# resolve handle of node that node_str is referring to
target_handle = NodeHandle.from_string(node_str)
target_node_def = self.job_def.graph.get_solid(target_handle).definition
target_node_def = self.job_def.graph.get_node(target_handle).definition
origin_output_def, origin_handle = target_node_def.resolve_output_to_origin(
output_name, NodeHandle.from_string(node_str)
)
Expand Down
10 changes: 5 additions & 5 deletions python_modules/dagster/dagster/_core/execution/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def result_for_handle(
else:
check.inst_param(handle, "handle", NodeHandle)

node = self.container.get_solid(handle)
node = self.container.get_node(handle)

return self._result_for_handle(node, handle)

Expand Down Expand Up @@ -292,8 +292,8 @@ def output_values(self) -> Mapping[str, object]:
output_mapping = self.node.definition.get_output_mapping(output_name)

inner_solid_values = self._result_for_handle(
self.node.definition.node_named(output_mapping.maps_from.solid_name),
NodeHandle(output_mapping.maps_from.solid_name, None),
self.node.definition.node_named(output_mapping.maps_from.node_name),
NodeHandle(output_mapping.maps_from.node_name, None),
).output_values

if inner_solid_values is not None: # may be None if inner solid was skipped
Expand Down Expand Up @@ -323,8 +323,8 @@ def output_value(self, output_name: str = DEFAULT_OUTPUT) -> object:
output_mapping = self.node.definition.get_output_mapping(output_name)

return self._result_for_handle(
self.node.definition.node_named(output_mapping.maps_from.solid_name),
NodeHandle(output_mapping.maps_from.solid_name, None),
self.node.definition.node_named(output_mapping.maps_from.node_name),
NodeHandle(output_mapping.maps_from.node_name, None),
).output_value(output_mapping.maps_from.output_name)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1195,7 +1195,7 @@ def external_asset_graph_from_defs(
for asset_key, node_tuple_list in node_defs_by_asset_key.items():
node_output_handle, job_def = node_tuple_list[0]

node_def = job_def.graph.get_solid(node_output_handle.node_handle).definition
node_def = job_def.graph.get_node(node_output_handle.node_handle).definition
output_def = node_def.output_def_named(node_output_handle.output_name)

asset_info = asset_info_by_asset_key[asset_key]
Expand Down
4 changes: 2 additions & 2 deletions python_modules/dagster/dagster/_core/snap/solid.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def __new__(

def build_output_mapping_snap(output_mapping: OutputMapping) -> OutputMappingSnap:
return OutputMappingSnap(
mapped_solid_name=output_mapping.maps_from.solid_name,
mapped_solid_name=output_mapping.maps_from.node_name,
mapped_output_name=output_mapping.maps_from.output_name,
external_output_name=output_mapping.graph_output_name,
)
Expand Down Expand Up @@ -154,7 +154,7 @@ def __new__(cls, mapped_solid_name: str, mapped_input_name: str, external_input_

def build_input_mapping_snap(input_mapping: InputMapping) -> InputMappingSnap:
return InputMappingSnap(
mapped_solid_name=input_mapping.maps_to.solid_name,
mapped_solid_name=input_mapping.maps_to.node_name,
mapped_input_name=input_mapping.maps_to.input_name,
external_input_name=input_mapping.graph_input_name,
)
Expand Down

0 comments on commit 47dc694

Please sign in to comment.