Skip to content

Commit

Permalink
assorted sensor and schedule fixes (#6759)
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Mar 3, 2022
1 parent 919fb31 commit 498b261
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,9 @@ def inner(
]
) -> SensorDefinition:
check.callable_param(fn, "fn")
sensor_name = name or fn.__name__

sensor_def = SensorDefinition(
name=sensor_name,
name=name,
pipeline_name=pipeline_name,
evaluation_fn=fn,
solid_selection=solid_selection,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,12 @@ def environment_vars(self) -> Dict[str, str]:
def execution_timezone(self) -> Optional[str]:
return self._execution_timezone

@property
def job(self) -> PipelineDefinition:
if isinstance(self._target, DirectTarget):
return self._target.pipeline
raise DagsterInvalidDefinitionError("No job was provided to ScheduleDefinition.")

def evaluate_tick(self, context: "ScheduleEvaluationContext") -> ScheduleExecutionData:
"""Evaluate schedule using the provided context.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from .events import AssetKey
from .graph_definition import GraphDefinition
from .job_definition import JobDefinition
from .pipeline_definition import PipelineDefinition
from .mode import DEFAULT_MODE_NAME
from .run_request import PipelineRunReaction, RunRequest, SkipReason
from .target import DirectTarget, RepoRelativeTarget
Expand Down Expand Up @@ -147,13 +148,13 @@ class SensorDefinition:
"""Define a sensor that initiates a set of runs based on some external state
Args:
name (str): The name of the sensor to create.
evaluation_fn (Callable[[SensorEvaluationContext]]): The core evaluation function for the
sensor, which is run at an interval to determine whether a run should be launched or
not. Takes a :py:class:`~dagster.SensorEvaluationContext`.
This function must return a generator, which must yield either a single SkipReason
or one or more RunRequest objects.
name (Optional[str]): The name of the sensor to create. Defaults to name of evaluation_fn
pipeline_name (Optional[str]): (legacy) The name of the pipeline to execute when the sensor
fires. Cannot be used in conjunction with `job` or `jobs` parameters.
solid_selection (Optional[List[str]]): (legacy) A list of solid subselection (including single
Expand All @@ -173,11 +174,13 @@ class SensorDefinition:

def __init__(
self,
name: str,
evaluation_fn: Callable[
["SensorEvaluationContext"],
Union[Generator[Union[RunRequest, SkipReason], None, None], RunRequest, SkipReason],
],
name: Optional[str] = None,
evaluation_fn: Optional[
Callable[
["SensorEvaluationContext"],
Union[Generator[Union[RunRequest, SkipReason], None, None], RunRequest, SkipReason],
]
] = None,
pipeline_name: Optional[str] = None,
solid_selection: Optional[List[Any]] = None,
mode: Optional[str] = None,
Expand All @@ -187,6 +190,8 @@ def __init__(
jobs: Optional[Sequence[Union[GraphDefinition, JobDefinition]]] = None,
default_status: DefaultSensorStatus = DefaultSensorStatus.STOPPED,
):
if evaluation_fn is None:
raise DagsterInvalidDefinitionError("Must provide evaluation_fn to SensorDefinition.")

if job and jobs:
raise DagsterInvalidDefinitionError(
Expand Down Expand Up @@ -229,11 +234,15 @@ def __init__(
elif jobs:
targets = [DirectTarget(job) for job in jobs]

self._name = check_valid_name(name)
if name:
self._name = check_valid_name(name)
else:
self._name = evaluation_fn.__name__

self._raw_fn = check.callable_param(evaluation_fn, "evaluation_fn")
self._evaluation_fn: Callable[
[SensorEvaluationContext], Generator[Union[RunRequest, SkipReason], None, None]
] = wrap_sensor_evaluation(name, evaluation_fn)
] = wrap_sensor_evaluation(self._name, evaluation_fn)
self._min_interval = check.opt_int_param(
minimum_interval_seconds, "minimum_interval_seconds", DEFAULT_SENSOR_DAEMON_INTERVAL
)
Expand Down Expand Up @@ -300,6 +309,17 @@ def minimum_interval_seconds(self) -> Optional[int]:
def targets(self) -> Optional[List[Union[DirectTarget, RepoRelativeTarget]]]:
return self._targets

@property
def job(self) -> PipelineDefinition:
if self._targets:
if len(self._targets) == 1 and isinstance(self._targets[0], DirectTarget):
return self._targets[0].pipeline
elif len(self._targets) > 1:
raise DagsterInvalidDefinitionError(
"Job property not available when SensorDefinition has multiple jobs."
)
raise DagsterInvalidDefinitionError("No job was provided to SensorDefinition.")

def evaluate_tick(self, context: "SensorEvaluationContext") -> "SensorExecutionData":
"""Evaluate sensor using the provided context.
Expand Down Expand Up @@ -387,8 +407,9 @@ def check_valid_run_requests(self, run_requests: List[RunRequest]):
if run_requests and not self._targets:
raise Exception(
f"Error in sensor {self._name}: Sensor evaluation function returned a RunRequest "
"for a sensor without a specified target. Targets can be specified by providing "
"a job or pipeline_name."
"for a sensor lacking a specified target (pipeline_name, job, or jobs). Targets "
"can be specified by providing job, jobs, or pipeline_name to the @sensor "
"decorator."
)

for run_request in run_requests:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import pytest

from dagster import ScheduleDefinition, graph
from dagster.core.errors import DagsterInvalidDefinitionError


def test_default_name():
Expand All @@ -22,3 +25,18 @@ def my_graph():

schedule = ScheduleDefinition(job=my_graph.to_job(name="my_job"), cron_schedule="0 0 * * *")
assert schedule.name == "my_job_schedule"


def test_jobs_attr():
@graph
def my_graph():
pass

schedule = ScheduleDefinition(job=my_graph, cron_schedule="0 0 * * *")
assert schedule.job.name == my_graph.name

schedule = ScheduleDefinition(pipeline_name="my_pipeline", cron_schedule="0 0 * * *")
with pytest.raises(
DagsterInvalidDefinitionError, match="No job was provided to ScheduleDefinition."
):
schedule.job
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import pytest

from dagster import SensorDefinition, graph
from dagster.core.errors import DagsterInvalidDefinitionError


def test_jobs_attr():
def eval_fn():
pass

@graph
def my_graph():
pass

sensor = SensorDefinition(evaluation_fn=eval_fn, job=my_graph)
assert sensor.job.name == my_graph.name

sensor = SensorDefinition(evaluation_fn=eval_fn, pipeline_name="my_pipeline")
with pytest.raises(
DagsterInvalidDefinitionError, match="No job was provided to SensorDefinition."
):
sensor.job

@graph
def my_second_graph():
pass

sensor = SensorDefinition(evaluation_fn=eval_fn, jobs=[my_graph, my_second_graph])
with pytest.raises(
DagsterInvalidDefinitionError,
match="Job property not available when SensorDefinition has multiple jobs.",
):
sensor.job


def test_direct_sensor_definition_instantiation():
with pytest.raises(
DagsterInvalidDefinitionError, match="Must provide evaluation_fn to SensorDefinition."
):
SensorDefinition()
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
DagsterInvariantViolationError,
DagsterRunStatus,
RunRequest,
SensorDefinition,
SensorEvaluationContext,
SensorExecutionContext,
build_run_status_sensor_context,
Expand All @@ -17,7 +18,7 @@
run_status_sensor,
sensor,
)
from dagster.core.errors import DagsterInvalidInvocationError
from dagster.core.errors import DagsterInvalidDefinitionError, DagsterInvalidInvocationError
from dagster.core.test_utils import instance_for_test


Expand Down Expand Up @@ -101,6 +102,27 @@ def test_instance_access_with_mock():
assert build_sensor_context(instance=mock_instance).instance == mock_instance


def test_sensor_w_no_job():
@sensor()
def no_job_sensor():
pass

with pytest.raises(
Exception,
match=r".* Sensor evaluation function returned a RunRequest for a sensor lacking a "
r"specified target .*",
):
no_job_sensor.check_valid_run_requests(
[
RunRequest(
run_key=None,
run_config=None,
tags=None,
)
]
)


def test_run_status_sensor():
@run_status_sensor(pipeline_run_status=DagsterRunStatus.SUCCESS)
def status_sensor(context):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1824,7 +1824,8 @@ def test_bad_run_request_untargeted():
None,
(
"Error in sensor bad_request_untargeted: Sensor evaluation function returned a "
"RunRequest for a sensor without a specified target."
"RunRequest for a sensor lacking a specified target (pipeline_name, job, or "
"jobs)."
),
)

Expand Down

0 comments on commit 498b261

Please sign in to comment.