Skip to content

Commit

Permalink
[resources on repos 1/n] Reorganize job/pipeline/graph duplication er…
Browse files Browse the repository at this point in the history
…ror logic in repository (#7817)

* Error when duplicating job defs between schedules, sensors, and bare

* Reorganize

* Fix attr

* Fix error

* Fix typing

* Fix errors that should have been caught previously in our codebase

* Fix additional naming conflicts

* Convert errors to warnings, fix msgs, remove unnecessary changes

* Fix test capture
  • Loading branch information
dpeng817 committed May 13, 2022
1 parent 6732a87 commit 993a3aa
Show file tree
Hide file tree
Showing 5 changed files with 280 additions and 32 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import warnings
from abc import ABC, abstractmethod
from inspect import isfunction
from types import FunctionType
Expand Down Expand Up @@ -624,6 +625,7 @@ def from_list(
from dagster.core.asset_defs import AssetGroup

pipelines_or_jobs: Dict[str, Union[PipelineDefinition, JobDefinition]] = {}
coerced_graphs: Dict[str, JobDefinition] = {}
partition_sets: Dict[str, PartitionSetDefinition] = {}
schedules: Dict[str, ScheduleDefinition] = {}
sensors: Dict[str, SensorDefinition] = {}
Expand Down Expand Up @@ -659,19 +661,12 @@ def from_list(
f"Duplicate definition found for {definition.name}"
)
sensors[definition.name] = definition
if definition.has_loadable_targets():
targets = definition.load_targets()
for target in targets:
pipelines_or_jobs[target.name] = target
elif isinstance(definition, ScheduleDefinition):
if definition.name in sensors or definition.name in schedules:
raise DagsterInvalidDefinitionError(
f"Duplicate definition found for {definition.name}"
)
schedules[definition.name] = definition
if definition.has_loadable_target():
target = definition.load_target()
pipelines_or_jobs[target.name] = target
if isinstance(definition, PartitionScheduleDefinition):
partition_set_def = definition.get_partition_set()
if (
Expand All @@ -692,6 +687,7 @@ def from_list(
)
)
pipelines_or_jobs[coerced.name] = coerced
coerced_graphs[coerced.name] = coerced

elif isinstance(definition, AssetGroup):
if combined_asset_group:
Expand All @@ -710,6 +706,21 @@ def from_list(
for source_asset in combined_asset_group.source_assets
}

for name, sensor_def in sensors.items():
if sensor_def.has_loadable_targets():
targets = sensor_def.load_targets()
for target in targets:
_process_and_validate_target(
sensor_def, coerced_graphs, pipelines_or_jobs, target
)

for name, schedule_def in schedules.items():
if schedule_def.has_loadable_target():
target = schedule_def.load_target()
_process_and_validate_target(
schedule_def, coerced_graphs, pipelines_or_jobs, target
)

pipelines: Dict[str, PipelineDefinition] = {}
jobs: Dict[str, JobDefinition] = {}
for name, pipeline_or_job in pipelines_or_jobs.items():
Expand Down Expand Up @@ -1161,3 +1172,52 @@ def source_assets_by_key(self) -> Dict[AssetKey, SourceAsset]:
# overwritten. Therefore, we want to maintain the call-ability of repository definitions.
def __call__(self, *args, **kwargs):
return self


def _process_and_validate_target(
schedule_or_sensor_def: Union[SensorDefinition, ScheduleDefinition],
coerced_graphs: Dict[str, JobDefinition],
pipelines_or_jobs: Dict[str, PipelineDefinition],
target: Union[GraphDefinition, PipelineDefinition],
):
# This function modifies the state of coerced_graphs.
targeter = (
f"schedule '{schedule_or_sensor_def.name}'"
if isinstance(schedule_or_sensor_def, ScheduleDefinition)
else f"sensor '{schedule_or_sensor_def.name}'"
)
if isinstance(target, GraphDefinition):
if target.name not in coerced_graphs:
# Since this is a graph we have to coerce, is not possible to be
# the same definition by reference equality
if target.name in pipelines_or_jobs:
dupe_target_type = pipelines_or_jobs[target.name].target_type
warnings.warn(
_get_error_msg_for_target_conflict(
targeter, "graph", target.name, dupe_target_type
)
)
elif coerced_graphs[target.name].graph != target:
warnings.warn(
_get_error_msg_for_target_conflict(targeter, "graph", target.name, "graph")
)
coerced_job = target.coerce_to_job()
coerced_graphs[target.name] = coerced_job
pipelines_or_jobs[target.name] = coerced_job
else:
if target.name in pipelines_or_jobs and pipelines_or_jobs[target.name] != target:
dupe_target_type = (
pipelines_or_jobs[target.name].target_type
if target.name not in coerced_graphs
else "graph"
)
warnings.warn(
_get_error_msg_for_target_conflict(
targeter, target.target_type, target.name, dupe_target_type
)
)
pipelines_or_jobs[target.name] = target


def _get_error_msg_for_target_conflict(targeter, target_type, target_name, dupe_target_type):
return f"{targeter} targets {target_type} '{target_name}', but a different {dupe_target_type} with the same name was provided. The {target_type} provided to {targeter} will override the existing {dupe_target_type}, but in Dagster 0.15.0, this will result in an error. Disambiguate between these by providing a separate name to one of them."
Original file line number Diff line number Diff line change
Expand Up @@ -449,9 +449,9 @@ def execution_timezone(self) -> Optional[str]:
return self._execution_timezone

@property
def job(self) -> PipelineDefinition:
def job(self) -> Union[GraphDefinition, PipelineDefinition]:
if isinstance(self._target, DirectTarget):
return self._target.pipeline
return self._target.target
raise DagsterInvalidDefinitionError("No job was provided to ScheduleDefinition.")

def evaluate_tick(self, context: "ScheduleEvaluationContext") -> ScheduleExecutionData:
Expand Down Expand Up @@ -518,7 +518,7 @@ def evaluate_tick(self, context: "ScheduleEvaluationContext") -> ScheduleExecuti
def has_loadable_target(self):
return isinstance(self._target, DirectTarget)

def load_target(self):
def load_target(self) -> Union[GraphDefinition, PipelineDefinition]:
if isinstance(self._target, DirectTarget):
return self._target.load()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,10 +319,10 @@ def targets(self) -> List[Union[DirectTarget, RepoRelativeTarget]]:
return self._targets

@property
def job(self) -> PipelineDefinition:
def job(self) -> Union[PipelineDefinition, GraphDefinition]:
if self._targets:
if len(self._targets) == 1 and isinstance(self._targets[0], DirectTarget):
return self._targets[0].pipeline
return self._targets[0].target
elif len(self._targets) > 1:
raise DagsterInvalidDefinitionError(
"Job property not available when SensorDefinition has multiple jobs."
Expand Down Expand Up @@ -402,7 +402,7 @@ def has_loadable_targets(self) -> bool:
return True
return False

def load_targets(self) -> List[PipelineDefinition]:
def load_targets(self) -> List[Union[PipelineDefinition, GraphDefinition]]:
targets = []
for target in self._targets:
if isinstance(target, DirectTarget):
Expand Down
41 changes: 22 additions & 19 deletions python_modules/dagster/dagster/core/definitions/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import dagster._check as check

from .graph_definition import GraphDefinition
from .mode import DEFAULT_MODE_NAME
from .pipeline_definition import PipelineDefinition


Expand All @@ -16,44 +17,46 @@ class RepoRelativeTarget(NamedTuple):
solid_selection: Optional[List[str]]


class DirectTarget(NamedTuple("_DirectTarget", [("pipeline", PipelineDefinition)])):
class DirectTarget(
NamedTuple("_DirectTarget", [("target", Union[GraphDefinition, PipelineDefinition])])
):
"""
The thing to be executed by a schedule or sensor, referenced directly and loaded
in to any repository the container is included in.
"""

def __new__(cls, graph: Union[GraphDefinition, PipelineDefinition]):
check.inst_param(graph, "graph", (GraphDefinition, PipelineDefinition))
def __new__(cls, target: Union[GraphDefinition, PipelineDefinition]):
check.inst_param(target, "target", (GraphDefinition, PipelineDefinition))

# pipeline will become job / execution target
if isinstance(graph, PipelineDefinition):
pipeline = graph
else:
pipeline = graph.to_job(resource_defs={})

check.invariant(
len(pipeline.mode_definitions) == 1,
f"Pipeline {pipeline.name} has more than one mode which makes it an invalid "
"execution target.",
)
if isinstance(target, PipelineDefinition) and not len(target.mode_definitions) == 1:
check.failed(
"Only graphs, jobs, and single-mode pipelines are valid "
"execution targets from a schedule or sensor. Please see the "
f"following guide to migrate your pipeline '{target.name}': "
"https://docs.dagster.io/guides/dagster/graph_job_op#migrating-to-ops-jobs-and-graphs"
)

return super().__new__(
cls,
pipeline,
target,
)

@property
def pipeline_name(self) -> str:
return self.pipeline.name
return self.target.name

@property
def mode(self) -> str:
return self.pipeline.mode_definitions[0].name
return (
self.target.mode_definitions[0].name
if isinstance(self.target, PipelineDefinition)
else DEFAULT_MODE_NAME
)

@property
def solid_selection(self):
# open question on how to direct target subset pipeline
return None

def load(self) -> PipelineDefinition:
return self.pipeline
def load(self) -> Union[PipelineDefinition, GraphDefinition]:
return self.target

0 comments on commit 993a3aa

Please sign in to comment.