Skip to content

Commit

Permalink
Add job/op equivalents to execution context (#7734)
Browse files Browse the repository at this point in the history
Closes #6979
  • Loading branch information
johannkm committed May 5, 2022
1 parent 0260b0f commit da15d8d
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 2 deletions.
61 changes: 59 additions & 2 deletions python_modules/dagster/dagster/core/execution/context/compute.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import Any, Dict, Iterator, List, Mapping, Optional
from typing import Any, Dict, Iterator, List, Mapping, Optional, cast

from dagster import check
from dagster.core.definitions.dependency import Node, NodeHandle
Expand All @@ -10,7 +10,9 @@
Materialization,
UserEvent,
)
from dagster.core.definitions.job_definition import JobDefinition
from dagster.core.definitions.mode import ModeDefinition
from dagster.core.definitions.op_definition import OpDefinition
from dagster.core.definitions.pipeline_definition import PipelineDefinition
from dagster.core.definitions.solid_definition import SolidDefinition
from dagster.core.definitions.step_launcher import StepLauncher
Expand All @@ -19,7 +21,7 @@
from dagster.core.events import DagsterEvent
from dagster.core.instance import DagsterInstance
from dagster.core.log_manager import DagsterLogManager
from dagster.core.storage.pipeline_run import PipelineRun
from dagster.core.storage.pipeline_run import DagsterRun, PipelineRun
from dagster.utils.forked_pdb import ForkedPdb

from .system import StepExecutionContext
Expand Down Expand Up @@ -114,11 +116,20 @@ def __init__(self, step_execution_context: StepExecutionContext):
def solid_config(self) -> Any:
return self._step_execution_context.op_config

@property
def op_config(self) -> Any:
return self.solid_config

@property
def pipeline_run(self) -> PipelineRun:
"""PipelineRun: The current pipeline run"""
return self._step_execution_context.pipeline_run

@property
def run(self) -> DagsterRun:
"""DagsterRun: The current run"""
return cast(DagsterRun, self.pipeline_run)

@property
def instance(self) -> DagsterInstance:
"""DagsterInstance: The current Dagster instance"""
Expand Down Expand Up @@ -178,11 +189,28 @@ def pipeline_def(self) -> PipelineDefinition:
"""PipelineDefinition: The currently executing pipeline."""
return self._step_execution_context.pipeline_def

@property
def job_def(self) -> JobDefinition:
"""JobDefinition: The currently executing job."""
return cast(
JobDefinition,
check.inst(
self.pipeline_def,
JobDefinition,
"Accessing job_def inside a legacy pipeline. Use pipeline_def instead.",
),
)

@property
def pipeline_name(self) -> str:
"""str: The name of the currently executing pipeline."""
return self._step_execution_context.pipeline_name

@property
def job_name(self) -> str:
"""str: The name of the currently executing job."""
return self.pipeline_name

@property
def mode_def(self) -> ModeDefinition:
"""ModeDefinition: The mode of the current execution."""
Expand All @@ -201,6 +229,14 @@ def solid_handle(self) -> NodeHandle:
"""
return self._step_execution_context.solid_handle

@property
def op_handle(self) -> NodeHandle:
"""NodeHandle: The current op's handle.
:meta private:
"""
return self.solid_handle

@property
def solid(self) -> Node:
"""Solid: The current solid object.
Expand All @@ -210,11 +246,32 @@ def solid(self) -> Node:
"""
return self._step_execution_context.pipeline_def.get_solid(self.solid_handle)

@property
def op(self) -> Node:
"""Solid: The current op object.
:meta private:
"""
return self.solid

@property
def solid_def(self) -> SolidDefinition:
"""SolidDefinition: The current solid definition."""
return self._step_execution_context.pipeline_def.get_solid(self.solid_handle).definition

@property
def op_def(self) -> OpDefinition:
"""OpDefinition: The current op definition."""
return cast(
OpDefinition,
check.inst(
self.solid_def,
OpDefinition,
"Called op_def on a legacy solid. Use solid_def instead.",
),
)

@property
def has_partition_key(self) -> bool:
"""Whether the current run is a partitioned run"""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import pytest

from dagster import OpExecutionContext, check, execute_pipeline, job, op, pipeline, solid
from dagster.core.definitions.job_definition import JobDefinition
from dagster.core.definitions.op_definition import OpDefinition
from dagster.core.definitions.pipeline_definition import PipelineDefinition
from dagster.core.definitions.solid_definition import SolidDefinition
from dagster.core.execution.context.compute import SolidExecutionContext
from dagster.core.storage.pipeline_run import DagsterRun, PipelineRun


def test_op_execution_context():
@op
def ctx_op(context: OpExecutionContext):
check.inst(context.run, DagsterRun)
assert context.job_name == "foo"
assert context.job_def.name == "foo"
check.inst(context.job_def, JobDefinition)
assert context.op_config is None
check.inst(context.op_def, OpDefinition)

check.inst(context.pipeline_run, PipelineRun)
assert context.pipeline_name == "foo"
assert context.pipeline_def.name == "foo"
check.inst(context.pipeline_def, PipelineDefinition)
assert context.solid_config is None
check.inst(context.solid_def, SolidDefinition)

@job
def foo():
ctx_op()

assert foo.execute_in_process().success


def test_solid_execution_context():
@solid
def ctx_solid(context: SolidExecutionContext):
check.inst(context.run, DagsterRun)
assert context.job_name == "foo"

with pytest.raises(Exception):
context.job_def

assert context.op_config is None

with pytest.raises(Exception):
context.op_def

check.inst(context.pipeline_run, PipelineRun)
assert context.pipeline_name == "foo"
assert context.pipeline_def.name == "foo"
check.inst(context.pipeline_def, PipelineDefinition)
assert context.solid_config is None
check.inst(context.solid_def, SolidDefinition)

@pipeline
def foo():
ctx_solid()

assert execute_pipeline(foo).success

0 comments on commit da15d8d

Please sign in to comment.