Skip to content

Commit

Permalink
[refactor] Assorted pipeline_run -> dagster_run (#12383)
Browse files Browse the repository at this point in the history
### Summary & Motivation

Renames:

- IPlanContext.pipeline_run -> dagster_run
- PlanData.pipeline_run -> dagster_run
- StepHandlerContext pipeline_run -> dagster_run
- LaunchRunContext pipeline_run -> dagster_run
- ResumeRunContext pipeline_run -> dagster_run

### How I Tested These Changes

BK
  • Loading branch information
smackesey committed Feb 17, 2023
1 parent ded407f commit a090efa
Show file tree
Hide file tree
Showing 38 changed files with 73 additions and 74 deletions.
1 change: 1 addition & 0 deletions python_modules/dagster/dagster/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
# ##### NOTES ON IMPORT FORMAT
# ########################
#
# ok
# This file defines dagster's public API. Imports need to be structured/formatted so as to to ensure
# that the broadest possible set of static analyzers understand Dagster's
# public API as intended. The below guidelines ensure this is the case.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def op_config(self) -> Any:
@property
def pipeline_run(self) -> DagsterRun:
"""PipelineRun: The current pipeline run."""
return self._step_execution_context.pipeline_run
return self._step_execution_context.dagster_run

@property
def run(self) -> DagsterRun:
Expand Down
30 changes: 15 additions & 15 deletions python_modules/dagster/dagster/_core/execution/context/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,20 +100,20 @@ def pipeline(self) -> IPipeline:
return self.plan_data.pipeline

@property
def pipeline_run(self) -> DagsterRun:
return self.plan_data.pipeline_run
def dagster_run(self) -> DagsterRun:
return self.plan_data.dagster_run

@property
def run_id(self) -> str:
return self.pipeline_run.run_id
return self.dagster_run.run_id

@property
def run_config(self) -> Mapping[str, object]:
return self.pipeline_run.run_config
return self.dagster_run.run_config

@property
def pipeline_name(self) -> str:
return self.pipeline_run.pipeline_name
return self.dagster_run.pipeline_name

@property
def job_name(self) -> str:
Expand Down Expand Up @@ -165,7 +165,7 @@ class PlanData(NamedTuple):
"""

pipeline: IPipeline
pipeline_run: DagsterRun
dagster_run: DagsterRun
instance: "DagsterInstance"
execution_plan: "ExecutionPlan"
raise_on_error: bool = False
Expand Down Expand Up @@ -349,7 +349,7 @@ def partition_key(self) -> str:
get_multipartition_key_from_tags,
)

tags = self._plan_data.pipeline_run.tags
tags = self._plan_data.dagster_run.tags

is_multipartitioned = any(
[tag.startswith(MULTIDIMENSIONAL_PARTITION_PREFIX) for tag in tags.keys()]
Expand All @@ -370,7 +370,7 @@ def asset_partition_key_range(self) -> PartitionKeyRange:
get_multipartition_key_from_tags,
)

tags = self._plan_data.pipeline_run.tags
tags = self._plan_data.dagster_run.tags
if any([tag.startswith(MULTIDIMENSIONAL_PARTITION_PREFIX) for tag in tags.keys()]):
multipartition_key = get_multipartition_key_from_tags(tags)
return PartitionKeyRange(multipartition_key, multipartition_key)
Expand Down Expand Up @@ -410,11 +410,11 @@ def partition_time_window(self) -> str:

@property
def has_partition_key(self) -> bool:
return PARTITION_NAME_TAG in self._plan_data.pipeline_run.tags
return PARTITION_NAME_TAG in self._plan_data.dagster_run.tags

@property
def has_partition_key_range(self) -> bool:
return ASSET_PARTITION_RANGE_START_TAG in self._plan_data.pipeline_run.tags
return ASSET_PARTITION_RANGE_START_TAG in self._plan_data.dagster_run.tags

def for_type(self, dagster_type: DagsterType) -> "TypeCheckContext":
return TypeCheckContext(
Expand Down Expand Up @@ -768,28 +768,28 @@ def _get_source_run_id_from_logs(self, step_output_handle: StepOutputHandle) ->

def _should_load_from_previous_runs(self, step_output_handle: StepOutputHandle) -> bool:
# should not load if not a re-execution
if self.pipeline_run.parent_run_id is None:
if self.dagster_run.parent_run_id is None:
return False
# should not load if re-executing the entire pipeline
if self.pipeline_run.step_keys_to_execute is None:
if self.dagster_run.step_keys_to_execute is None:
return False

# should not load if the entire dynamic step is being executed in the current run
handle = StepHandle.parse_from_key(step_output_handle.step_key)
if (
isinstance(handle, ResolvedFromDynamicStepHandle)
and handle.unresolved_form.to_key() in self.pipeline_run.step_keys_to_execute
and handle.unresolved_form.to_key() in self.dagster_run.step_keys_to_execute
):
return False

# should not load if this step is being executed in the current run
return step_output_handle.step_key not in self.pipeline_run.step_keys_to_execute
return step_output_handle.step_key not in self.dagster_run.step_keys_to_execute

def _get_source_run_id(self, step_output_handle: StepOutputHandle) -> Optional[str]:
if self._should_load_from_previous_runs(step_output_handle):
return self._get_source_run_id_from_logs(step_output_handle)
else:
return self.pipeline_run.run_id
return self.dagster_run.run_id

def capture_step_exception(self, exception: BaseException):
self._step_exception = check.inst_param(exception, "exception", BaseException)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def create_plan_data(
) -> PlanData:
return PlanData(
pipeline=context_creation_data.pipeline,
pipeline_run=context_creation_data.pipeline_run,
dagster_run=context_creation_data.pipeline_run,
instance=context_creation_data.instance,
execution_plan=context_creation_data.execution_plan,
raise_on_error=raise_on_error,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def host_mode_execution_context_event_generator(
execution_context = PlanOrchestrationContext(
plan_data=PlanData(
pipeline=pipeline,
pipeline_run=pipeline_run,
dagster_run=pipeline_run,
instance=instance,
execution_plan=execution_plan,
raise_on_error=raise_on_error,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
def validate_reexecution_memoization(
plan_context: IPlanContext, execution_plan: ExecutionPlan
) -> None:
parent_run_id = plan_context.pipeline_run.parent_run_id
parent_run_id = plan_context.dagster_run.parent_run_id
check.opt_str_param(parent_run_id, "parent_run_id")

if parent_run_id is None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def inner_plan_execution_iterator(
try:
step_stack.enter_context(
pipeline_context.instance.compute_log_manager.watch(
step_context.pipeline_run, step_context.step.key
step_context.dagster_run, step_context.step.key
)
)
yield DagsterEvent.legacy_compute_log_step_event(step_context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ def _build_logical_version_tags(
asset_key: AssetKey, step_context: StepExecutionContext
) -> Dict[str, str]:
asset_layer = step_context.pipeline_def.asset_layer
code_version = asset_layer.code_version_for_asset(asset_key) or step_context.pipeline_run.run_id
code_version = asset_layer.code_version_for_asset(asset_key) or step_context.dagster_run.run_id
input_logical_versions: Dict[AssetKey, LogicalVersion] = {}
tags: Dict[str, str] = {}
tags[CODE_VERSION_TAG_KEY] = code_version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def launch_step(
step_context: StepExecutionContext,
) -> Iterator[DagsterEvent]:
step_run_ref = step_context_to_step_run_ref(step_context)
run_id = step_context.pipeline_run.run_id
run_id = step_context.dagster_run.run_id

step_run_dir = os.path.join(self.scratch_dir, run_id, step_run_ref.step_key)
if os.path.exists(step_run_dir):
Expand Down Expand Up @@ -151,8 +151,8 @@ def step_context_to_step_run_ref(

return StepRunRef(
run_config=step_context.run_config,
pipeline_run=step_context.pipeline_run,
run_id=step_context.pipeline_run.run_id,
pipeline_run=step_context.dagster_run,
run_id=step_context.dagster_run.run_id,
step_key=step_context.step.key,
retry_mode=retry_mode,
recon_pipeline=recon_pipeline, # type: ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def execute(
retry_mode=plan_context.retry_mode,
execution_plan=plan_context.execution_plan,
run_config=plan_context.run_config,
pipeline_run=plan_context.pipeline_run,
pipeline_run=plan_context.dagster_run,
instance=plan_context.instance,
raise_on_error=plan_context.raise_on_error,
output_capture=plan_context.output_capture,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ def execute_step_out_of_process(
) -> Iterator[Optional[DagsterEvent]]:
command = MultiprocessExecutorChildProcessCommand(
run_config=step_context.run_config,
pipeline_run=step_context.pipeline_run,
pipeline_run=step_context.dagster_run,
step_key=step.key,
instance_ref=step_context.instance.get_ref(),
term_event=term_events[step.key],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,14 @@ def _get_step_handler_context(
steps=steps,
execute_step_args=ExecuteStepArgs(
pipeline_origin=plan_context.reconstructable_pipeline.get_python_origin(),
pipeline_run_id=plan_context.pipeline_run.run_id,
pipeline_run_id=plan_context.dagster_run.run_id,
step_keys_to_execute=[step.key for step in steps],
instance_ref=plan_context.plan_data.instance.get_ref(),
retry_mode=self.retries.for_inner_plan(),
known_state=active_execution.get_known_state(),
should_verify_step=self._should_verify_step,
),
pipeline_run=plan_context.pipeline_run,
dagster_run=plan_context.dagster_run,
)

def execute(self, plan_context: PlanOrchestrationContext, execution_plan: ExecutionPlan):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,20 @@ def __init__(
plan_context: PlanOrchestrationContext,
steps: Sequence[ExecutionStep],
execute_step_args: ExecuteStepArgs,
pipeline_run: Optional[DagsterRun] = None,
dagster_run: Optional[DagsterRun] = None,
) -> None:
self._instance = instance
self._plan_context = plan_context
self._dagster_run = plan_context
self._steps_by_key = {step.key: step for step in steps}
self._execute_step_args = execute_step_args
self._pipeline_run = pipeline_run
self._pipeline_run = dagster_run

@property
def execute_step_args(self) -> ExecuteStepArgs:
return self._execute_step_args

@property
def pipeline_run(self) -> DagsterRun:
def dagster_run(self) -> DagsterRun:
# lazy load
if not self._pipeline_run:
run_id = self.execute_step_args.pipeline_run_id
Expand All @@ -52,7 +52,7 @@ def instance(self) -> DagsterInstance:
return self._instance

def get_step_context(self, step_key: str) -> IStepContext:
return self._plan_context.for_step(self._steps_by_key[step_key])
return self._dagster_run.for_step(self._steps_by_key[step_key])


class CheckStepHealthResult(
Expand Down
4 changes: 2 additions & 2 deletions python_modules/dagster/dagster/_core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2029,7 +2029,7 @@ def launch_run(self, run_id: str, workspace: "IWorkspace"):
check.failed(f"Failed to reload run {run_id}")

try:
self.run_launcher.launch_run(LaunchRunContext(pipeline_run=run, workspace=workspace))
self.run_launcher.launch_run(LaunchRunContext(dagster_run=run, workspace=workspace))
except:
error = serializable_error_info_from_exc_info(sys.exc_info())
self.report_engine_event(
Expand Down Expand Up @@ -2073,7 +2073,7 @@ def resume_run(self, run_id: str, workspace: "IWorkspace", attempt_number: int):
try:
self.run_launcher.resume_run(
ResumeRunContext(
pipeline_run=run,
dagster_run=run,
workspace=workspace,
resume_attempt_number=attempt_number,
)
Expand Down
8 changes: 4 additions & 4 deletions python_modules/dagster/dagster/_core/launcher/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,26 @@ class LaunchRunContext(NamedTuple):
Context available within a run launcher's launch_run call.
"""

pipeline_run: DagsterRun
dagster_run: DagsterRun
workspace: Optional[IWorkspace]

@property
def pipeline_code_origin(self) -> Optional[PipelinePythonOrigin]:
return self.pipeline_run.pipeline_code_origin
return self.dagster_run.pipeline_code_origin


class ResumeRunContext(NamedTuple):
"""
Context available within a run launcher's resume_run call.
"""

pipeline_run: DagsterRun
dagster_run: DagsterRun
workspace: Optional[IWorkspace]
resume_attempt_number: Optional[int] = None

@property
def pipeline_code_origin(self) -> Optional[PipelinePythonOrigin]:
return self.pipeline_run.pipeline_code_origin
return self.dagster_run.pipeline_code_origin


@whitelist_for_serdes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def launch_run(self, context: LaunchRunContext) -> None:
GrpcServerRepositoryLocation,
)

run = context.pipeline_run
run = context.dagster_run

check.inst_param(run, "run", DagsterRun)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def from_config_value(inst_data, config_value):

def launch_run(self, context: LaunchRunContext) -> None:
recon_pipeline = recon_pipeline_from_origin(context.pipeline_code_origin) # type: ignore
execute_run(recon_pipeline, context.pipeline_run, self._instance)
execute_run(recon_pipeline, context.dagster_run, self._instance)

def terminate(self, run_id):
check.not_implemented("Termination not supported.")
2 changes: 1 addition & 1 deletion python_modules/dagster/dagster/_core/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ def __init__(self, inst_data=None, bad_run_ids=None, bad_user_code_run_ids=None)
super().__init__()

def launch_run(self, context):
run = context.pipeline_run
run = context.dagster_run
check.inst_param(run, "run", DagsterRun)
check.invariant(run.status == DagsterRunStatus.STARTING)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,9 +332,7 @@ def _dequeue_run(
run = check.not_none(instance.get_run_by_id(run.run_id))

try:
instance.run_launcher.launch_run(
LaunchRunContext(pipeline_run=run, workspace=workspace)
)
instance.run_launcher.launch_run(LaunchRunContext(dagster_run=run, workspace=workspace))
except Exception as e:
error = serializable_error_info_from_exc_info(sys.exc_info())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def test_step_handler_context():
plan_context = PlanOrchestrationContext(
plan_data=PlanData(
pipeline=recon_pipeline,
pipeline_run=run,
dagster_run=run,
instance=instance,
execution_plan=execution_plan,
raise_on_error=True,
Expand All @@ -63,8 +63,8 @@ def test_step_handler_context():
plan_context=plan_context,
steps=execution_plan.steps,
execute_step_args=args,
pipeline_run=run,
dagster_run=run,
)

assert ctx.execute_step_args == args
assert ctx.pipeline_run == run
assert ctx.dagster_run == run
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,8 @@ def test_step_context_to_step_run_ref():
step_context = initialize_step_context("", instance)
step = step_context.step
step_run_ref = step_context_to_step_run_ref(step_context)
assert step_run_ref.run_config == step_context.pipeline_run.run_config
assert step_run_ref.run_id == step_context.pipeline_run.run_id
assert step_run_ref.run_config == step_context.dagster_run.run_config
assert step_run_ref.run_id == step_context.dagster_run.run_id

rehydrated_step_context = step_run_ref_to_step_context(step_run_ref, instance)
rehydrated_step = rehydrated_step_context.step
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ def test_partitions_key():
@op
def my_op(context):
assert (
context._step_execution_context.plan_data.pipeline_run.tags[ # pylint: disable=protected-access
context._step_execution_context.plan_data.dagster_run.tags[ # pylint: disable=protected-access
"dagster/partition"
]
== "2020-01-01"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ def launch_run(self, context: LaunchRunContext) -> None:
"""
Launch a run in an ECS task.
"""
run = context.pipeline_run
run = context.dagster_run
container_context = EcsContainerContext.create_for_run(run, self)

pipeline_origin = check.not_none(context.pipeline_code_origin)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ def _upload_file_to_s3(local_path, s3_filename):
def launch_step(self, step_context):
step_run_ref = step_context_to_step_run_ref(step_context, self.local_job_package_path)

run_id = step_context.pipeline_run.run_id
run_id = step_context.dagster_run.run_id
log = step_context.log

step_key = step_run_ref.step_key
Expand Down

0 comments on commit a090efa

Please sign in to comment.