Skip to content

Commit

Permalink
[refactor] execution pipeline_run -> dagster_run (#12388)
Browse files Browse the repository at this point in the history
### Summary & Motivation

Many renames of `pipeline_run` -> `dagster_run` in execution APIs on
arguments and in local vars.

### How I Tested These Changes

BK
  • Loading branch information
smackesey committed Feb 17, 2023
1 parent 2e2eca1 commit 79f9ecf
Show file tree
Hide file tree
Showing 26 changed files with 194 additions and 201 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class StepRunRef(
"_StepRunRef",
[
("run_config", Mapping[str, object]),
("pipeline_run", DagsterRun),
("dagster_run", DagsterRun),
("run_id", str),
("retry_mode", RetryMode),
("step_key", str),
Expand All @@ -34,7 +34,7 @@ class StepRunRef(
def __new__(
cls,
run_config: Mapping[str, object],
pipeline_run: DagsterRun,
dagster_run: DagsterRun,
run_id: str,
retry_mode: RetryMode,
step_key: str,
Expand All @@ -46,7 +46,7 @@ def __new__(
return super(StepRunRef, cls).__new__(
cls,
check.mapping_param(run_config, "run_config", key_type=str),
check.inst_param(pipeline_run, "pipeline_run", DagsterRun),
check.inst_param(dagster_run, "dagster_run", DagsterRun),
check.str_param(run_id, "run_id"),
check.inst_param(retry_mode, "retry_mode", RetryMode),
check.str_param(step_key, "step_key"),
Expand Down
162 changes: 79 additions & 83 deletions python_modules/dagster/dagster/_core/execution/api.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,14 @@ def build_resources(
resources: Mapping[str, Any],
instance: Optional[DagsterInstance] = None,
resource_config: Optional[Mapping[str, Any]] = None,
pipeline_run: Optional[DagsterRun] = None,
dagster_run: Optional[DagsterRun] = None,
log_manager: Optional[DagsterLogManager] = None,
pipeline_run: Optional[DagsterRun] = None,
) -> Generator[Resources, None, None]:
"""Context manager that yields resources using provided resource definitions and run config.
This API allows for using resources in an independent context. Resources will be initialized
with the provided run config, and optionally, pipeline_run. The resulting resources will be
with the provided run config, and optionally, dagster_run. The resulting resources will be
yielded on a dictionary keyed identically to that provided for `resource_defs`. Upon exiting the
context, resources will also be torn down safely.
Expand All @@ -61,8 +62,8 @@ def build_resources(
resources on.
resource_config (Optional[Mapping[str, Any]]): A dict representing the config to be
provided to each resource during initialization and teardown.
pipeline_run (Optional[PipelineRun]): The pipeline run to provide during resource
initialization and teardown. If the provided resources require either the `pipeline_run`
dagster_run (Optional[PipelineRun]): The pipeline run to provide during resource
initialization and teardown. If the provided resources require either the `dagster_run`
or `run_id` attributes of the provided context during resource initialization and/or
teardown, this must be provided, or initialization will fail.
log_manager (Optional[DagsterLogManager]): Log Manager to use during resource
Expand All @@ -82,6 +83,7 @@ def the_resource():
assert resources.from_val == "bar"
"""
dagster_run = dagster_run or pipeline_run
resources = check.mapping_param(resources, "resource_defs", key_type=str)
instance = check.opt_inst_param(instance, "instance", DagsterInstance)
resource_config = check.opt_mapping_param(resource_config, "resource_config", key_type=str)
Expand All @@ -93,9 +95,9 @@ def the_resource():
resources_manager = resource_initialization_manager(
resource_defs=resource_defs,
resource_configs=mapped_resource_config,
log_manager=log_manager if log_manager else initialize_console_manager(pipeline_run),
log_manager=log_manager if log_manager else initialize_console_manager(dagster_run),
execution_plan=None,
pipeline_run=pipeline_run,
dagster_run=dagster_run,
resource_keys_to_init=set(resource_defs.keys()),
instance=dagster_instance,
emit_persistent_events=False,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,10 @@ class InitResourceContext:
log_manager (DagsterLogManager): The log manager for this run of the job or pipeline
resources (ScopedResources): The resources that are available to the resource that we are
initalizing.
dagster_run (Optional[PipelineRun]): The dagster run to use. When initializing resources
dagster_run (Optional[DagsterRun]): The dagster run to use. When initializing resources
outside of execution context, this will be None.
run_id (Optional[str]): The id for this run of the job or pipeline. When initializing resources
outside of execution context, this will be None.
pipeline_run (Optional[PipelineRun]): (legacy) The dagster run to use. When initializing resources
outside of execution context, this will be None.
Example:
Expand Down Expand Up @@ -205,10 +203,6 @@ def resources(self) -> Resources:
def instance(self) -> Optional[DagsterInstance]:
return self._instance

@property
def pipeline_run(self) -> Optional[DagsterRun]:
return None

@property
def log(self) -> Optional[DagsterLogManager]:
return self._log_manager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,19 @@


def initialize_console_manager(
pipeline_run: Optional[DagsterRun], instance: Optional[DagsterInstance] = None
dagster_run: Optional[DagsterRun], instance: Optional[DagsterInstance] = None
) -> DagsterLogManager:
# initialize default colored console logger
loggers = []
for logger_def, logger_config in default_system_loggers(instance):
loggers.append(
logger_def.logger_fn(
InitLoggerContext(
logger_config, logger_def, run_id=pipeline_run.run_id if pipeline_run else None
logger_config, logger_def, run_id=dagster_run.run_id if dagster_run else None
)
)
)
return DagsterLogManager.create(loggers=loggers, pipeline_run=pipeline_run, instance=instance)
return DagsterLogManager.create(loggers=loggers, dagster_run=dagster_run, instance=instance)


def executor_def_from_config(
Expand Down Expand Up @@ -107,7 +107,7 @@ def executor_def_from_config(
class ContextCreationData(NamedTuple):
pipeline: IPipeline
resolved_run_config: ResolvedRunConfig
pipeline_run: DagsterRun
dagster_run: DagsterRun
mode_def: ModeDefinition
executor_def: ExecutorDefinition
instance: DagsterInstance
Expand All @@ -123,19 +123,19 @@ def create_context_creation_data(
pipeline: IPipeline,
execution_plan: ExecutionPlan,
run_config: Mapping[str, object],
pipeline_run: DagsterRun,
dagster_run: DagsterRun,
instance: DagsterInstance,
) -> "ContextCreationData":
pipeline_def = pipeline.get_definition()
resolved_run_config = ResolvedRunConfig.build(pipeline_def, run_config, mode=pipeline_run.mode)
resolved_run_config = ResolvedRunConfig.build(pipeline_def, run_config, mode=dagster_run.mode)

mode_def = pipeline_def.get_mode_definition(pipeline_run.mode)
mode_def = pipeline_def.get_mode_definition(dagster_run.mode)
executor_def = executor_def_from_config(mode_def, resolved_run_config)

return ContextCreationData(
pipeline=pipeline,
resolved_run_config=resolved_run_config,
pipeline_run=pipeline_run,
dagster_run=dagster_run,
mode_def=mode_def,
executor_def=executor_def,
instance=instance,
Expand All @@ -151,7 +151,7 @@ def create_plan_data(
) -> PlanData:
return PlanData(
pipeline=context_creation_data.pipeline,
dagster_run=context_creation_data.pipeline_run,
dagster_run=context_creation_data.dagster_run,
instance=context_creation_data.instance,
execution_plan=context_creation_data.execution_plan,
raise_on_error=raise_on_error,
Expand Down Expand Up @@ -208,7 +208,7 @@ def execution_context_event_generator(
pipeline: IPipeline,
execution_plan: ExecutionPlan,
run_config: Mapping[str, object],
pipeline_run: DagsterRun,
dagster_run: DagsterRun,
instance: DagsterInstance,
retry_mode: RetryMode,
scoped_resources_builder_cm: Optional[
Expand All @@ -230,7 +230,7 @@ def execution_context_event_generator(
pipeline_def = pipeline.get_definition()

run_config = check.mapping_param(run_config, "run_config", key_type=str)
pipeline_run = check.inst_param(pipeline_run, "pipeline_run", DagsterRun)
dagster_run = check.inst_param(dagster_run, "dagster_run", DagsterRun)
instance = check.inst_param(instance, "instance", DagsterInstance)

raise_on_error = check.bool_param(raise_on_error, "raise_on_error")
Expand All @@ -239,7 +239,7 @@ def execution_context_event_generator(
pipeline,
execution_plan,
run_config,
pipeline_run,
dagster_run,
instance,
)

Expand All @@ -253,7 +253,7 @@ def execution_context_event_generator(
resource_configs=context_creation_data.resolved_run_config.resources,
log_manager=log_manager,
execution_plan=execution_plan,
pipeline_run=context_creation_data.pipeline_run,
dagster_run=context_creation_data.dagster_run,
resource_keys_to_init=context_creation_data.resource_keys_to_init,
instance=instance,
emit_persistent_events=True,
Expand Down Expand Up @@ -284,7 +284,7 @@ def __init__(
pipeline: IPipeline,
execution_plan: ExecutionPlan,
run_config: Mapping[str, object],
pipeline_run: DagsterRun,
dagster_run: DagsterRun,
instance: DagsterInstance,
raise_on_error: Optional[bool] = False,
output_capture: Optional[Dict["StepOutputHandle", Any]] = None,
Expand All @@ -295,7 +295,7 @@ def __init__(
pipeline,
execution_plan,
run_config,
pipeline_run,
dagster_run,
instance,
raise_on_error,
executor_defs,
Expand All @@ -313,7 +313,7 @@ def orchestration_context_event_generator(
pipeline: IPipeline,
execution_plan: ExecutionPlan,
run_config: Mapping[str, object],
pipeline_run: DagsterRun,
dagster_run: DagsterRun,
instance: DagsterInstance,
raise_on_error: bool,
executor_defs: Optional[Sequence[ExecutorDefinition]],
Expand All @@ -325,7 +325,7 @@ def orchestration_context_event_generator(
pipeline,
execution_plan,
run_config,
pipeline_run,
dagster_run,
instance,
)

Expand Down Expand Up @@ -357,10 +357,10 @@ def orchestration_context_event_generator(
error_info = serializable_error_info_from_exc_info(user_facing_exc_info)

event = DagsterEvent.pipeline_failure(
pipeline_context_or_name=pipeline_run.pipeline_name,
pipeline_context_or_name=dagster_run.pipeline_name,
context_msg=(
"Pipeline failure during initialization for pipeline"
f' "{pipeline_run.pipeline_name}". This may be due to a failure in initializing the'
f' "{dagster_run.pipeline_name}". This may be due to a failure in initializing the'
" executor or one of the loggers."
),
error_info=error_info,
Expand All @@ -380,7 +380,7 @@ def __init__(
pipeline: IPipeline,
execution_plan: ExecutionPlan,
run_config: Mapping[str, object],
pipeline_run: DagsterRun,
dagster_run: DagsterRun,
instance: DagsterInstance,
retry_mode: RetryMode,
scoped_resources_builder_cm: Optional[
Expand All @@ -394,7 +394,7 @@ def __init__(
pipeline,
execution_plan,
run_config,
pipeline_run,
dagster_run,
instance,
retry_mode,
scoped_resources_builder_cm,
Expand Down Expand Up @@ -433,7 +433,7 @@ def scoped_pipeline_context(
execution_plan: ExecutionPlan,
pipeline: IPipeline,
run_config: Mapping[str, object],
pipeline_run: DagsterRun,
dagster_run: DagsterRun,
instance: DagsterInstance,
scoped_resources_builder_cm: Callable[
..., EventGenerationManager[ScopedResourcesBuilder]
Expand All @@ -450,15 +450,15 @@ def scoped_pipeline_context(
check.inst_param(execution_plan, "execution_plan", ExecutionPlan)
check.inst_param(pipeline, "pipeline", IPipeline)
check.mapping_param(run_config, "run_config", key_type=str)
check.inst_param(pipeline_run, "pipeline_run", DagsterRun)
check.inst_param(dagster_run, "dagster_run", DagsterRun)
check.inst_param(instance, "instance", DagsterInstance)
check.callable_param(scoped_resources_builder_cm, "scoped_resources_builder_cm")

initialization_manager = PlanExecutionContextManager(
pipeline,
execution_plan,
run_config,
pipeline_run,
dagster_run,
instance,
RetryMode.DISABLED,
scoped_resources_builder_cm=scoped_resources_builder_cm,
Expand All @@ -479,11 +479,11 @@ def create_log_manager(
) -> DagsterLogManager:
check.inst_param(context_creation_data, "context_creation_data", ContextCreationData)

pipeline_def, mode_def, resolved_run_config, pipeline_run = (
pipeline_def, mode_def, resolved_run_config, dagster_run = (
context_creation_data.pipeline_def,
context_creation_data.mode_def,
context_creation_data.resolved_run_config,
context_creation_data.pipeline_run,
context_creation_data.dagster_run,
)

# The following logic is tightly coupled to the processing of logger config in
Expand All @@ -500,7 +500,7 @@ def create_log_manager(
resolved_run_config.loggers.get(logger_key, {}).get("config"),
logger_def,
pipeline_def=pipeline_def,
run_id=pipeline_run.run_id,
run_id=dagster_run.run_id,
)
)
)
Expand All @@ -513,28 +513,28 @@ def create_log_manager(
logger_config,
logger_def,
pipeline_def=pipeline_def,
run_id=pipeline_run.run_id,
run_id=dagster_run.run_id,
)
)
)

return DagsterLogManager.create(
loggers=loggers, pipeline_run=pipeline_run, instance=context_creation_data.instance
loggers=loggers, dagster_run=dagster_run, instance=context_creation_data.instance
)


def create_context_free_log_manager(
instance: DagsterInstance, pipeline_run: DagsterRun
instance: DagsterInstance, dagster_run: DagsterRun
) -> DagsterLogManager:
"""In the event of pipeline initialization failure, we want to be able to log the failure
without a dependency on the PlanExecutionContext to initialize DagsterLogManager.
Args:
pipeline_run (PipelineRun)
dagster_run (PipelineRun)
pipeline_def (PipelineDefinition)
"""
check.inst_param(instance, "instance", DagsterInstance)
check.inst_param(pipeline_run, "pipeline_run", DagsterRun)
check.inst_param(dagster_run, "dagster_run", DagsterRun)

loggers = []
# Use the default logger
Expand All @@ -545,9 +545,9 @@ def create_context_free_log_manager(
logger_config,
logger_def,
pipeline_def=None,
run_id=pipeline_run.run_id,
run_id=dagster_run.run_id,
)
)
]

return DagsterLogManager.create(loggers=loggers, instance=instance, pipeline_run=pipeline_run)
return DagsterLogManager.create(loggers=loggers, instance=instance, dagster_run=dagster_run)
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def core_execute_in_process(
context_event_generator=orchestration_context_event_generator,
pipeline=pipeline,
execution_plan=execution_plan,
pipeline_run=run,
dagster_run=run,
instance=execute_instance,
run_config=run_config,
executor_defs=None,
Expand Down
4 changes: 2 additions & 2 deletions python_modules/dagster/dagster/_core/execution/host_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def host_mode_execution_context_event_generator(
)

log_manager = DagsterLogManager.create(
loggers=loggers, pipeline_run=pipeline_run, instance=instance
loggers=loggers, dagster_run=pipeline_run, instance=instance
)

try:
Expand Down Expand Up @@ -214,7 +214,7 @@ def execute_run_host_mode(
pipeline=pipeline,
execution_plan=execution_plan,
run_config=pipeline_run.run_config,
pipeline_run=pipeline_run,
dagster_run=pipeline_run,
instance=instance,
raise_on_error=raise_on_error,
executor_defs=executor_defs,
Expand Down

0 comments on commit 79f9ecf

Please sign in to comment.