Skip to content

Commit

Permalink
[refactor] IExecutionStep.solid_handle -> node_handle (#12371)
Browse files Browse the repository at this point in the history
### Summary & Motivation

- Rename IExecutionStep.solid_handle -> node_handle

### How I Tested These Changes

BK
  • Loading branch information
smackesey committed Feb 17, 2023
1 parent 3fd1174 commit 866a100
Show file tree
Hide file tree
Showing 12 changed files with 25 additions and 25 deletions.
8 changes: 4 additions & 4 deletions python_modules/dagster/dagster/_core/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ def from_step(
event_type_value=check.inst_param(event_type, "event_type", DagsterEventType).value,
pipeline_name=step_context.pipeline_name,
step_handle=step_context.step.handle,
solid_handle=step_context.step.solid_handle,
solid_handle=step_context.step.node_handle,
step_kind_value=step_context.step.kind.value,
logging_tags=step_context.logging_tags,
event_specific_data=_validate_event_specific_data(event_type, event_specific_data),
Expand Down Expand Up @@ -1299,7 +1299,7 @@ def hook_completed(
event_type_value=event_type.value,
pipeline_name=step_context.pipeline_name,
step_handle=step_context.step.handle,
solid_handle=step_context.step.solid_handle,
solid_handle=step_context.step.node_handle,
step_kind_value=step_context.step.kind.value,
logging_tags=step_context.logging_tags,
message=(
Expand All @@ -1323,7 +1323,7 @@ def hook_errored(
event_type_value=event_type.value,
pipeline_name=step_context.pipeline_name,
step_handle=step_context.step.handle,
solid_handle=step_context.step.solid_handle,
solid_handle=step_context.step.node_handle,
step_kind_value=step_context.step.kind.value,
logging_tags=step_context.logging_tags,
event_specific_data=_validate_event_specific_data(
Expand All @@ -1348,7 +1348,7 @@ def hook_skipped(
event_type_value=event_type.value,
pipeline_name=step_context.pipeline_name,
step_handle=step_context.step.handle,
solid_handle=step_context.step.solid_handle,
solid_handle=step_context.step.node_handle,
step_kind_value=step_context.step.kind.value,
logging_tags=step_context.logging_tags,
message=(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def resources(self) -> "Resources":
@property
def solid_config(self) -> Any:
solid_config = self._step_execution_context.resolved_run_config.solids.get(
str(self._step_execution_context.step.solid_handle)
str(self._step_execution_context.step.node_handle)
)
return solid_config.config if solid_config else None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ def get_output_context(
"""
step = execution_plan.get_step_by_key(step_output_handle.step_key)
# get config
op_config = resolved_run_config.solids[step.solid_handle.to_string()]
op_config = resolved_run_config.solids[step.node_handle.to_string()]
outputs_config = op_config.outputs

if outputs_config:
Expand All @@ -739,7 +739,7 @@ def get_output_context(
io_manager_key = output_def.io_manager_key
resource_config = resolved_run_config.resources[io_manager_key].config

node_handle = execution_plan.get_step_by_key(step.key).solid_handle
node_handle = execution_plan.get_step_by_key(step.key).node_handle
asset_info = pipeline_def.asset_layer.asset_info_for_output(
node_handle=node_handle, output_name=step_output.name
)
Expand All @@ -763,7 +763,7 @@ def get_output_context(
metadata=output_def.metadata,
mapping_key=step_output_handle.mapping_key,
config=output_config,
op_def=pipeline_def.get_solid(step.solid_handle).definition, # type: ignore # (should be OpDefinition not NodeDefinition)
op_def=pipeline_def.get_solid(step.node_handle).definition, # type: ignore # (should be OpDefinition not NodeDefinition)
dagster_type=output_def.dagster_type,
log_manager=log_manager,
version=version,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ def step(self) -> ExecutionStep:

@property
def solid_handle(self) -> "NodeHandle":
return self.step.solid_handle
return self.step.node_handle


class PlanExecutionContext(IPlanContext):
Expand Down Expand Up @@ -496,7 +496,7 @@ def step(self) -> ExecutionStep:

@property
def solid_handle(self) -> "NodeHandle":
return self.step.solid_handle
return self.step.node_handle

@property
def required_resource_keys(self) -> AbstractSet[str]:
Expand Down Expand Up @@ -540,7 +540,7 @@ def mode_def(self) -> ModeDefinition:

@property
def solid(self) -> "Node":
return self.pipeline_def.get_solid(self._step.solid_handle)
return self.pipeline_def.get_solid(self._step.node_handle)

@property
def solid_retry_policy(self) -> Optional[RetryPolicy]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,6 @@ def execute_core_compute(
omitted_outputs = solid_output_names.difference(emitted_result_names)
if omitted_outputs:
step_context.log.info(
f"{step_context.solid_def.node_type_str} '{str(step.solid_handle)}' did not fire "
f"{step_context.solid_def.node_type_str} '{str(step.node_handle)}' did not fire "
f"outputs {repr(omitted_outputs)}"
)
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ def _create_type_materializations(
) -> Iterator[DagsterEvent]:
"""If the output has any dagster type materializers, runs them."""
step = step_context.step
current_handle = step.solid_handle
current_handle = step.node_handle

# check for output mappings at every point up the composition hierarchy
while current_handle:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def add_step(self, step: IExecutionStep) -> None:
)
)
self._seen_handles.add(step.handle)
self._steps[step.solid_handle.to_string()] = step
self._steps[step.node_handle.to_string()] = step

def get_step_by_node_handle(self, handle: NodeHandle) -> IExecutionStep:
check.inst_param(handle, "handle", NodeHandle)
Expand Down
10 changes: 5 additions & 5 deletions python_modules/dagster/dagster/_core/execution/plan/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def key(self) -> str:

@property
@abstractmethod
def solid_handle(self) -> "NodeHandle":
def node_handle(self) -> "NodeHandle":
pass

@property
Expand Down Expand Up @@ -173,12 +173,12 @@ def __new__(
)

@property
def solid_handle(self) -> "NodeHandle":
def node_handle(self) -> "NodeHandle":
return self.handle.solid_handle

@property
def solid_name(self) -> str:
return self.solid_handle.name
return self.node_handle.name

@property
def kind(self) -> StepKind:
Expand Down Expand Up @@ -264,7 +264,7 @@ def __new__(
)

@property
def solid_handle(self) -> "NodeHandle":
def node_handle(self) -> "NodeHandle":
return self.handle.solid_handle

@property
Expand Down Expand Up @@ -424,7 +424,7 @@ def __new__(
)

@property
def solid_handle(self) -> "NodeHandle":
def node_handle(self) -> "NodeHandle":
return self.handle.solid_handle

@property
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def resolve_step_versions(
if not is_executable_step(step): # type: ignore
continue

solid_def = pipeline_def.get_solid(step.solid_handle).definition
solid_def = pipeline_def.get_solid(step.node_handle).definition

input_version_dict = {
input_name: step_input.source.compute_version(
Expand All @@ -105,7 +105,7 @@ def resolve_step_versions(
)
input_versions = [version for version in input_version_dict.values()]

solid_name = str(step.solid_handle)
solid_name = str(step.node_handle)

solid_config = resolved_run_config.solids[solid_name].config

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ def get_required_resource_keys_to_init(
if step_handle not in execution_plan.step_handles_to_execute:
continue

hook_defs = pipeline_def.get_all_hooks_for_handle(step.solid_handle)
hook_defs = pipeline_def.get_all_hooks_for_handle(step.node_handle)
for hook_def in hook_defs:
resource_keys = resource_keys.union(hook_def.required_resource_keys)

Expand Down Expand Up @@ -407,7 +407,7 @@ def get_required_resource_keys_for_step(
resource_keys: Set[str] = set()

# add all the solid compute resource keys
solid_def = pipeline_def.get_solid(execution_step.solid_handle).definition
solid_def = pipeline_def.get_solid(execution_step.node_handle).definition
resource_keys = resource_keys.union(solid_def.required_resource_keys) # type: ignore # (should be OpDefinition)

# add input type, input loader, and input io manager resource keys
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ def _snapshot_from_execution_step(execution_step: IExecutionStep) -> ExecutionSt
list(map(_snapshot_from_step_output, execution_step.step_outputs)),
key=lambda so: so.name,
),
solid_handle_id=execution_step.solid_handle.to_string(),
solid_handle_id=execution_step.node_handle.to_string(),
kind=execution_step.kind,
metadata_items=sorted(
[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ def test_step_context_to_step_run_ref():
assert rehydrated_step.step_inputs == step.step_inputs
assert rehydrated_step.step_outputs == step.step_outputs
assert rehydrated_step.kind == step.kind
assert rehydrated_step.solid_handle.name == step.solid_handle.name
assert rehydrated_step.node_handle.name == step.node_handle.name
assert rehydrated_step.logging_tags == step.logging_tags
assert rehydrated_step.tags == step.tags

Expand Down

0 comments on commit 866a100

Please sign in to comment.