Skip to content

Commit

Permalink
rename PipelineRun -> DagsterRun (#10847)
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Dec 6, 2022
1 parent 397efda commit d074ac1
Show file tree
Hide file tree
Showing 72 changed files with 272 additions and 281 deletions.
6 changes: 3 additions & 3 deletions docs/content/guides/dagster/run-attribution.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ In this use case, we'd like to add a hook to customize submitted runs while stil

```python file=/guides/dagster/run_attribution/custom_run_coordinator_skeleton.py startafter=start_custom_run_coordinator_marker endbefore=end_custom_run_coordinator_marker
from dagster._core.run_coordinator import QueuedRunCoordinator, SubmitRunContext
from dagster._core.storage.pipeline_run import PipelineRun
from dagster._core.storage.pipeline_run import DagsterRun


class CustomRunCoordinator(QueuedRunCoordinator):
def submit_run(self, context: SubmitRunContext) -> PipelineRun:
def submit_run(self, context: SubmitRunContext) -> DagsterRun:
desired_header = context.get_request_header(CUSTOM_HEADER_NAME)
```

Expand Down Expand Up @@ -53,7 +53,7 @@ The above is just an example - you can write any hook which would be useful to y
Putting this all together, we can use these hooks to dynamically attach tags to submitted pipeline runs. In the following example, we'd read the user's email from the `X-Amzn-Oidc-Data` header by using the `get_email` hook defined above, and then attach the email as a tag to the pipeline run.

```python file=/guides/dagster/run_attribution/custom_run_coordinator.py startafter=start_submit_marker endbefore=end_submit_marker dedent=4
def submit_run(self, context: SubmitRunContext) -> PipelineRun:
def submit_run(self, context: SubmitRunContext) -> DagsterRun:
pipeline_run = context.pipeline_run
jwt_claims_header = context.get_request_header("X-Amzn-Oidc-Data")
email = self.get_email(jwt_claims_header)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import Optional

from dagster._core.run_coordinator import QueuedRunCoordinator, SubmitRunContext
from dagster._core.storage.pipeline_run import PipelineRun
from dagster._core.storage.pipeline_run import DagsterRun


class CustomRunCoordinator(QueuedRunCoordinator):
Expand All @@ -28,7 +28,7 @@ def get_email(self, jwt_claims_header: Optional[str]) -> Optional[str]:
# end_email_marker

# start_submit_marker
def submit_run(self, context: SubmitRunContext) -> PipelineRun:
def submit_run(self, context: SubmitRunContext) -> DagsterRun:
pipeline_run = context.pipeline_run
jwt_claims_header = context.get_request_header("X-Amzn-Oidc-Data")
email = self.get_email(jwt_claims_header)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
# start_custom_run_coordinator_marker

from dagster._core.run_coordinator import QueuedRunCoordinator, SubmitRunContext
from dagster._core.storage.pipeline_run import PipelineRun
from dagster._core.storage.pipeline_run import DagsterRun


class CustomRunCoordinator(QueuedRunCoordinator):
def submit_run(self, context: SubmitRunContext) -> PipelineRun:
def submit_run(self, context: SubmitRunContext) -> DagsterRun:
desired_header = context.get_request_header(CUSTOM_HEADER_NAME)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
)
from marks import mark_daemon

from dagster._core.storage.pipeline_run import PipelineRun, RunsFilter
from dagster._core.storage.pipeline_run import DagsterRun, RunsFilter
from dagster._core.test_utils import poll_for_finished_run


Expand All @@ -24,7 +24,7 @@ def test_execute_schedule_on_celery_k8s( # pylint: disable=redefined-outer-name
dagster_instance_for_daemon.start_schedule(reoriginated_schedule)

scheduler_runs = dagster_instance_for_daemon.get_runs(
RunsFilter(tags=PipelineRun.tags_for_schedule(reoriginated_schedule))
RunsFilter(tags=DagsterRun.tags_for_schedule(reoriginated_schedule))
)

assert len(scheduler_runs) == 0
Expand All @@ -35,7 +35,7 @@ def test_execute_schedule_on_celery_k8s( # pylint: disable=redefined-outer-name

while True:
schedule_runs = dagster_instance_for_daemon.get_runs(
RunsFilter(tags=PipelineRun.tags_for_schedule(reoriginated_schedule))
RunsFilter(tags=DagsterRun.tags_for_schedule(reoriginated_schedule))
)

if len(schedule_runs) > 0:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from utils import start_daemon

from dagster._core.storage.pipeline_run import PipelineRun
from dagster._core.storage.pipeline_run import DagsterRun
from dagster._core.test_utils import create_run_for_test, poll_for_finished_run
from dagster._utils import file_relative_path, merge_dicts

Expand Down Expand Up @@ -38,10 +38,10 @@ def test_queue_from_schedule_and_sensor(instance, foo_example_workspace, foo_exa

runs = [
poll_for_finished_run(instance, run.run_id),
poll_for_finished_run(instance, run_tags=PipelineRun.tags_for_sensor(external_sensor)),
poll_for_finished_run(instance, run_tags=DagsterRun.tags_for_sensor(external_sensor)),
poll_for_finished_run(
instance,
run_tags=PipelineRun.tags_for_schedule(external_schedule),
run_tags=DagsterRun.tags_for_schedule(external_schedule),
timeout=90,
),
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from dagster._core.execution.plan.resume_retry import ReexecutionStrategy
from dagster._core.host_representation.selector import PipelineSelector
from dagster._core.instance import DagsterInstance
from dagster._core.storage.pipeline_run import DagsterRun, PipelineRun, RunsFilter
from dagster._core.storage.pipeline_run import DagsterRun, RunsFilter

from ..external import get_external_pipeline_or_raise
from ..utils import ExecutionMetadata, ExecutionParams, capture_error
Expand All @@ -31,7 +31,7 @@ def launch_pipeline_execution(

def do_launch(
graphene_info: HasContext, execution_params: ExecutionParams, is_reexecuted: bool = False
) -> PipelineRun:
) -> DagsterRun:
check.inst_param(graphene_info, "graphene_info", ResolveInfo)
check.inst_param(execution_params, "execution_params", ExecutionParams)
check.bool_param(is_reexecuted, "is_reexecuted")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from graphene import ResolveInfo

import dagster._check as check
from dagster._core.storage.pipeline_run import PipelineRun
from dagster._core.storage.pipeline_run import DagsterRun

from .external import get_external_pipeline_or_raise, get_full_external_pipeline_or_raise
from .utils import PipelineSelector, UserFacingGraphQLError, capture_error
Expand Down Expand Up @@ -58,7 +58,7 @@ def get_pipeline_reference_or_raise(graphene_info, pipeline_run):
InvalidSubsetError."""
from ..schema.pipelines.pipeline_ref import GrapheneUnknownPipeline

check.inst_param(pipeline_run, "pipeline_run", PipelineRun)
check.inst_param(pipeline_run, "pipeline_run", DagsterRun)
solid_selection = (
list(pipeline_run.solids_to_execute) if pipeline_run.solids_to_execute else None
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from dagster._core.execution.backfill import BulkActionStatus
from dagster._core.storage.tags import PARTITION_NAME_TAG
from dagster._core.test_utils import create_run_for_test
from dagster._legacy import DagsterRunStatus, PipelineRun
from dagster._legacy import DagsterRun, DagsterRunStatus
from dagster._seven import get_system_temp_directory

from .graphql_context_test_suite import ExecutingGraphQLContextTestMatrix
Expand Down Expand Up @@ -97,7 +97,7 @@ def _seed_runs(graphql_context, partition_runs: List[Tuple[str, DagsterRunStatus
instance=graphql_context.instance,
status=status,
tags={
**PipelineRun.tags_for_backfill_id(backfill_id),
**DagsterRun.tags_for_backfill_id(backfill_id),
PARTITION_NAME_TAG: partition,
},
)
Expand Down
8 changes: 4 additions & 4 deletions python_modules/dagster/dagster/_cli/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
PipelinePythonOrigin,
get_python_environment_entry_point,
)
from dagster._core.storage.pipeline_run import PipelineRun
from dagster._core.storage.pipeline_run import DagsterRun
from dagster._core.types.loadable_target_origin import LoadableTargetOrigin
from dagster._core.utils import coerce_valid_log_level
from dagster._grpc import DagsterGrpcClient, DagsterGrpcServer
Expand Down Expand Up @@ -95,7 +95,7 @@ def _execute_run_command_body(
instance, pipeline_run_id
)

pipeline_run: PipelineRun = check.not_none(
pipeline_run: DagsterRun = check.not_none(
instance.get_run_by_id(pipeline_run_id),
"Pipeline run with id '{}' not found for run execution.".format(pipeline_run_id),
)
Expand Down Expand Up @@ -359,7 +359,7 @@ def execute_step_command(input_json, compressed_input_json):


def _execute_step_command_body(
args: ExecuteStepArgs, instance: DagsterInstance, pipeline_run: PipelineRun
args: ExecuteStepArgs, instance: DagsterInstance, pipeline_run: DagsterRun
):
single_step_key = (
args.step_keys_to_execute[0]
Expand All @@ -369,7 +369,7 @@ def _execute_step_command_body(
try:
check.inst(
pipeline_run,
PipelineRun,
DagsterRun,
"Pipeline run with id '{}' not found for step execution".format(args.pipeline_run_id),
)
check.inst(
Expand Down
8 changes: 4 additions & 4 deletions python_modules/dagster/dagster/_core/debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import dagster._check as check
from dagster._core.events.log import EventLogEntry
from dagster._core.snap import ExecutionPlanSnapshot, PipelineSnapshot
from dagster._core.storage.pipeline_run import PipelineRun
from dagster._core.storage.pipeline_run import DagsterRun
from dagster._serdes import serialize_dagster_namedtuple, whitelist_for_serdes


Expand All @@ -13,7 +13,7 @@ class DebugRunPayload(
"_DebugRunPayload",
[
("version", str),
("pipeline_run", PipelineRun),
("pipeline_run", DagsterRun),
("event_list", Sequence[EventLogEntry]),
("pipeline_snapshot", PipelineSnapshot),
("execution_plan_snapshot", ExecutionPlanSnapshot),
Expand All @@ -23,15 +23,15 @@ class DebugRunPayload(
def __new__(
cls,
version: str,
pipeline_run: PipelineRun,
pipeline_run: DagsterRun,
event_list: Sequence[EventLogEntry],
pipeline_snapshot: PipelineSnapshot,
execution_plan_snapshot: ExecutionPlanSnapshot,
):
return super(DebugRunPayload, cls).__new__(
cls,
version=check.str_param(version, "version"),
pipeline_run=check.inst_param(pipeline_run, "pipeline_run", PipelineRun),
pipeline_run=check.inst_param(pipeline_run, "pipeline_run", DagsterRun),
event_list=check.sequence_param(event_list, "event_list", EventLogEntry),
pipeline_snapshot=check.inst_param(
pipeline_snapshot, "pipeline_snapshot", PipelineSnapshot
Expand Down
4 changes: 2 additions & 2 deletions python_modules/dagster/dagster/_core/definitions/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
ScheduleExecutionError,
user_code_error_boundary,
)
from ..storage.pipeline_run import PipelineRun
from ..storage.pipeline_run import DagsterRun
from .config import ConfigMapping
from .mode import DEFAULT_MODE_NAME
from .run_request import RunRequest, SkipReason
Expand Down Expand Up @@ -592,7 +592,7 @@ def tags_for_partition(self, partition: Partition[T]) -> Mapping[str, str]:
user_tags = validate_tags(
self._user_defined_tags_fn_for_partition(partition), allow_reserved_tags=False # type: ignore
)
tags = merge_dicts(user_tags, PipelineRun.tags_for_partition_set(self, partition))
tags = merge_dicts(user_tags, DagsterRun.tags_for_partition_set(self, partition))

return tags

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import dagster._check as check
from dagster._annotations import PublicAttr
from dagster._core.definitions.events import AssetKey
from dagster._core.storage.pipeline_run import DagsterRunStatus, PipelineRun
from dagster._core.storage.pipeline_run import DagsterRun, DagsterRunStatus
from dagster._core.storage.tags import PARTITION_NAME_TAG
from dagster._serdes.serdes import register_serdes_enum_fallbacks, whitelist_for_serdes
from dagster._utils.error import SerializableErrorInfo
Expand Down Expand Up @@ -114,7 +114,7 @@ class PipelineRunReaction(
NamedTuple(
"_PipelineRunReaction",
[
("pipeline_run", Optional[PipelineRun]),
("pipeline_run", Optional[DagsterRun]),
("error", Optional[SerializableErrorInfo]),
("run_status", Optional[DagsterRunStatus]),
],
Expand All @@ -132,13 +132,13 @@ class PipelineRunReaction(

def __new__(
cls,
pipeline_run: Optional[PipelineRun],
pipeline_run: Optional[DagsterRun],
error: Optional[SerializableErrorInfo] = None,
run_status: Optional[DagsterRunStatus] = None,
):
return super(PipelineRunReaction, cls).__new__(
cls,
pipeline_run=check.opt_inst_param(pipeline_run, "pipeline_run", PipelineRun),
pipeline_run=check.opt_inst_param(pipeline_run, "pipeline_run", DagsterRun),
error=check.opt_inst_param(error, "error", SerializableErrorInfo),
run_status=check.opt_inst_param(run_status, "run_status", DagsterRunStatus),
)
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
)
from dagster._core.events import PIPELINE_RUN_STATUS_TO_EVENT_TYPE, DagsterEvent
from dagster._core.instance import DagsterInstance
from dagster._core.storage.pipeline_run import DagsterRun, DagsterRunStatus, PipelineRun, RunsFilter
from dagster._core.storage.pipeline_run import DagsterRun, DagsterRunStatus, RunsFilter
from dagster._serdes import (
deserialize_json_to_dagster_namedtuple,
serialize_dagster_namedtuple,
Expand Down Expand Up @@ -122,7 +122,7 @@ def for_run_failure(self):
)

@property
def pipeline_run(self) -> PipelineRun:
def pipeline_run(self) -> DagsterRun:
warnings.warn(
"`RunStatusSensorContext.pipeline_run` is deprecated as of 0.13.0; use "
"`RunStatusSensorContext.dagster_run` instead."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
)
from ..instance import DagsterInstance
from ..instance.ref import InstanceRef
from ..storage.pipeline_run import PipelineRun
from ..storage.pipeline_run import DagsterRun
from .graph_definition import GraphDefinition
from .mode import DEFAULT_MODE_NAME
from .pipeline_definition import PipelineDefinition
Expand Down Expand Up @@ -619,7 +619,7 @@ def evaluate_tick(self, context: "ScheduleEvaluationContext") -> ScheduleExecuti
RunRequest(
run_key=request.run_key,
run_config=request.run_config,
tags=merge_dicts(request.tags, PipelineRun.tags_for_schedule(self)),
tags=merge_dicts(request.tags, DagsterRun.tags_for_schedule(self)),
asset_selection=request.asset_selection,
)
for request in run_requests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import dagster._check as check
from dagster._core.definitions.reconstruct import ReconstructablePipeline
from dagster._core.execution.retries import RetryMode
from dagster._core.storage.pipeline_run import PipelineRun
from dagster._core.storage.pipeline_run import DagsterRun

if TYPE_CHECKING:
from dagster._core.execution.plan.state import KnownExecutionState
Expand All @@ -15,7 +15,7 @@ class StepRunRef(
"_StepRunRef",
[
("run_config", Mapping[str, object]),
("pipeline_run", PipelineRun),
("pipeline_run", DagsterRun),
("run_id", str),
("retry_mode", RetryMode),
("step_key", str),
Expand All @@ -34,7 +34,7 @@ class StepRunRef(
def __new__(
cls,
run_config: Mapping[str, object],
pipeline_run: PipelineRun,
pipeline_run: DagsterRun,
run_id: str,
retry_mode: RetryMode,
step_key: str,
Expand All @@ -46,7 +46,7 @@ def __new__(
return super(StepRunRef, cls).__new__(
cls,
check.mapping_param(run_config, "run_config", key_type=str),
check.inst_param(pipeline_run, "pipeline_run", PipelineRun),
check.inst_param(pipeline_run, "pipeline_run", DagsterRun),
check.str_param(run_id, "run_id"),
check.inst_param(retry_mode, "retry_mode", RetryMode),
check.str_param(step_key, "step_key"),
Expand Down

1 comment on commit d074ac1

@vercel
Copy link

@vercel vercel bot commented on d074ac1 Dec 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.