Skip to content

Commit

Permalink
[0.15.0] move from warnings to errors for sensor/schedule target dupl…
Browse files Browse the repository at this point in the history
…ication (#7861)

* Error when duplicating job defs between schedules, sensors, and bare

* Reorganize

* Fix attr

* Fix typing

* Fix errors that should have been caught previously in our codebase

* Fix additional naming conflicts

* Convert errors to warnings, fix msgs, remove unnecessary changes

* Fix test capture

* Move from warnings to errors for duplication

* nasty rebase

* nasty rebase pt 2

* Fix toys

* Fix test_sensor_run
  • Loading branch information
dpeng817 committed Jun 8, 2022
1 parent 66ea7c9 commit fc5f07e
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ def longitudinal():


longitudinal_job = longitudinal.to_job(
name="longitudinal_no_schedule",
description=(
"Demo job that simulates updating tables of users and video views and training a "
"video recommendation model. The growth of execution-time and data-throughput follows"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,6 @@ def unreliable():


unreliable_job = unreliable.to_job(
description="Demo graph of chained ops that fail with a configurable probability."
name="unreliable_job_no_schedule",
description="Demo graph of chained ops that fail with a configurable probability.",
)
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import warnings
from abc import ABC, abstractmethod
from inspect import isfunction
from types import FunctionType
Expand Down Expand Up @@ -1251,13 +1250,13 @@ def _process_and_validate_target(
# the same definition by reference equality
if target.name in pipelines_or_jobs:
dupe_target_type = pipelines_or_jobs[target.name].target_type
warnings.warn(
raise DagsterInvalidDefinitionError(
_get_error_msg_for_target_conflict(
targeter, "graph", target.name, dupe_target_type
)
)
elif coerced_graphs[target.name].graph != target:
warnings.warn(
raise DagsterInvalidDefinitionError(
_get_error_msg_for_target_conflict(targeter, "graph", target.name, "graph")
)
coerced_job = target.coerce_to_job()
Expand All @@ -1269,13 +1268,13 @@ def _process_and_validate_target(
# be the same definition by reference equality
if target.name in pipelines_or_jobs:
dupe_target_type = pipelines_or_jobs[target.name].target_type
warnings.warn(
raise DagsterInvalidDefinitionError(
_get_error_msg_for_target_conflict(
targeter, "unresolved asset job", target.name, dupe_target_type
)
)
elif unresolved_jobs[target.name].selection != target.selection:
warnings.warn(
raise DagsterInvalidDefinitionError(
_get_error_msg_for_target_conflict(
targeter, "unresolved asset job", target.name, "unresolved asset job"
)
Expand All @@ -1290,7 +1289,7 @@ def _process_and_validate_target(
if target.name in unresolved_jobs
else pipelines_or_jobs[target.name].target_type
)
warnings.warn(
raise DagsterInvalidDefinitionError(
_get_error_msg_for_target_conflict(
targeter, target.target_type, target.name, dupe_target_type
)
Expand All @@ -1299,4 +1298,4 @@ def _process_and_validate_target(


def _get_error_msg_for_target_conflict(targeter, target_type, target_name, dupe_target_type):
return f"{targeter} targets {target_type} '{target_name}', but a different {dupe_target_type} with the same name was provided. The {target_type} provided to {targeter} will override the existing {dupe_target_type}, but in Dagster 0.15.0, this will result in an error. Disambiguate between these by providing a separate name to one of them."
return f"{targeter} targets {target_type} '{target_name}', but a different {dupe_target_type} with the same name was provided. Disambiguate between these by providing a separate name to one of them."
Original file line number Diff line number Diff line change
Expand Up @@ -1062,17 +1062,17 @@ def test_duplicate_graph_target_invalid():
the_graph = _create_graph_with_name("foo")
other_graph = _create_graph_with_name("foo")
# Different reference-equal graph provided to repo with same name, ensure error is thrown.
with pytest.warns(
UserWarning,
with pytest.raises(
DagsterInvalidDefinitionError,
match="sensor '_the_sensor' targets graph 'foo', but a different graph with the same name was provided.",
):

@repository
def the_repo_dupe_graph_invalid_sensor():
return [the_graph, _create_sensor_from_target(other_graph)]

with pytest.warns(
UserWarning,
with pytest.raises(
DagsterInvalidDefinitionError,
match="schedule '_the_schedule' targets graph 'foo', but a different graph with the same name was provided.",
):

Expand Down Expand Up @@ -1108,17 +1108,17 @@ def foo():
foo_group = AssetGroup([foo])

# Different reference-equal jobs provided to repo with same name, ensure error is thrown.
with pytest.warns(
UserWarning,
with pytest.raises(
DagsterInvalidDefinitionError,
match="sensor '_the_sensor' targets unresolved asset job 'foo', but a different unresolved asset job with the same name was provided.",
):

@repository
def the_repo_dupe_graph_invalid_sensor():
return [foo_group, the_job, _create_sensor_from_target(other_job)]

with pytest.warns(
UserWarning,
with pytest.raises(
DagsterInvalidDefinitionError,
match="schedule '_the_schedule' targets unresolved asset job 'foo', but a different unresolved asset job with the same name was provided.",
):

Expand All @@ -1139,17 +1139,17 @@ def test_duplicate_job_target_invalid():
the_job = _create_job_with_name("foo")
other_job = _create_job_with_name("foo")

with pytest.warns(
UserWarning,
with pytest.raises(
DagsterInvalidDefinitionError,
match="sensor '_the_sensor' targets job 'foo', but a different job with the same name was provided.",
):

@repository
def the_repo_dupe_job_invalid_sensor():
return [the_job, _create_sensor_from_target(other_job)]

with pytest.warns(
UserWarning,
with pytest.raises(
DagsterInvalidDefinitionError,
match="schedule '_the_schedule' targets job 'foo', but a different job with the same name was provided.",
):

Expand All @@ -1174,17 +1174,17 @@ def test_dupe_pipelines_invalid():
the_pipeline = _create_pipeline_with_name("foo")
other_pipeline = _create_pipeline_with_name("foo")

with pytest.warns(
UserWarning,
with pytest.raises(
DagsterInvalidDefinitionError,
match="schedule '_the_schedule' targets pipeline 'foo', but a different pipeline with the same name was provided.",
):

@repository
def the_repo_dupe_pipelines_invalid_schedule():
return [the_pipeline, _create_schedule_from_target(other_pipeline)]

with pytest.warns(
UserWarning,
with pytest.raises(
DagsterInvalidDefinitionError,
match="sensor '_the_sensor' targets pipeline 'foo', but a different pipeline with the same name was provided.",
):

Expand All @@ -1199,17 +1199,17 @@ def test_dupe_jobs_pipelines_invalid():

the_schedule = _create_schedule_from_target(the_pipeline)
the_sensor = _create_sensor_from_target(the_pipeline)
with pytest.warns(
UserWarning,
with pytest.raises(
DagsterInvalidDefinitionError,
match="schedule '_the_schedule' targets pipeline 'foo', but a different job with the same name was provided.",
):

@repository
def the_repo_dupe_job_pipeline_invalid_schedule_job():
return [the_job, the_schedule]

with pytest.warns(
UserWarning,
with pytest.raises(
DagsterInvalidDefinitionError,
match="sensor '_the_sensor' targets pipeline 'foo', but a different job with the same name was provided.",
):

Expand All @@ -1219,17 +1219,17 @@ def the_repo_dupe_job_pipeline_invalid_sensor_job():

the_graph = _create_graph_with_name("foo")

with pytest.warns(
UserWarning,
with pytest.raises(
DagsterInvalidDefinitionError,
match="sensor '_the_sensor' targets pipeline 'foo', but a different graph with the same name was provided.",
):

@repository
def the_repo_dupe_graph_pipeline_invalid_sensor_graph():
return [the_graph, the_sensor]

with pytest.warns(
UserWarning,
with pytest.raises(
DagsterInvalidDefinitionError,
match="schedule '_the_schedule' targets pipeline 'foo', but a different graph with the same name was provided.",
):

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ def the_repo():
the_pipeline,
the_job,
config_pipeline,
config_graph,
config_job,
foo_pipeline,
large_sensor,
simple_sensor,
Expand Down

0 comments on commit fc5f07e

Please sign in to comment.