Skip to content

Commit

Permalink
Still write cursors even when a sensor tick fails (#6807)
Browse files Browse the repository at this point in the history
Summary:
Run status sensors still update the cursor even if there's an error in the tick, but we throw that update on the floor in the sensor loop. This PR changes

Test Plan:
Manually, write a run status sensor that raises an Exception, run it
Before: Tick would spam in a loop, never updating the cursor
Now: Tick errors once and starts skipping again
  • Loading branch information
gibsondan committed Feb 25, 2022
1 parent ce7441c commit ee07c9c
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 18 deletions.
58 changes: 40 additions & 18 deletions python_modules/dagster/dagster/daemon/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@
from dagster.utils import merge_dicts
from dagster.utils.error import serializable_error_info_from_exc_info

RECORDED_TICK_STATES = [TickStatus.SUCCESS, TickStatus.FAILURE]
FULFILLED_TICK_STATES = [TickStatus.SKIPPED, TickStatus.SUCCESS]

MIN_INTERVAL_LOOP_TIME = 5

FINISHED_TICK_STATES = [TickStatus.SKIPPED, TickStatus.SUCCESS, TickStatus.FAILURE]


class DagsterSensorDaemonError(DagsterError):
"""Error when running the SensorDaemon"""
Expand All @@ -46,6 +45,8 @@ def __init__(self, external_sensor, state, tick, instance, logger):
self._state = state
self._tick = tick

self._should_update_cursor_on_failure = False

@property
def status(self):
return self._tick.status
Expand Down Expand Up @@ -88,31 +89,49 @@ def update_state(self, status, **kwargs):
def add_run(self, run_id, run_key=None):
self._tick = self._tick.with_run(run_id, run_key)

def set_should_update_cursor_on_failure(self, should_update_cursor_on_failure: bool):
self._should_update_cursor_on_failure = should_update_cursor_on_failure

def _write(self):
self._instance.update_tick(self._tick)
if self._tick.status in FULFILLED_TICK_STATES:
last_run_key = (
self._state.instigator_data.last_run_key if self._state.instigator_data else None
)
if self._tick.run_keys:
last_run_key = self._tick.run_keys[-1]
self._instance.update_instigator_state(
self._state.with_data(
SensorInstigatorData(
last_tick_timestamp=self._tick.timestamp,
last_run_key=last_run_key,
min_interval=self._external_sensor.min_interval_seconds,
cursor=self._tick.cursor,
)

if self._tick.status not in FINISHED_TICK_STATES:
return

should_update_cursor_and_last_run_key = (
self._tick.status != TickStatus.FAILURE
) or self._should_update_cursor_on_failure

last_run_key = (
self._state.instigator_data.last_run_key if self._state.instigator_data else None
)
if self._tick.run_keys and should_update_cursor_and_last_run_key:
last_run_key = self._tick.run_keys[-1]

cursor = self._state.instigator_data.cursor if self._state.instigator_data else None
if should_update_cursor_and_last_run_key:
cursor = self._tick.cursor

self._instance.update_instigator_state(
self._state.with_data(
SensorInstigatorData(
last_tick_timestamp=self._tick.timestamp,
last_run_key=last_run_key,
min_interval=self._external_sensor.min_interval_seconds,
cursor=cursor,
)
)
)

def __enter__(self):
return self

def __exit__(self, exception_type, exception_value, traceback):
if exception_type and isinstance(exception_value, KeyboardInterrupt):
return

# Log the error if the failure wasn't an interrupt or the daemon generator stopping
if exception_value and not isinstance(exception_value, (KeyboardInterrupt, GeneratorExit)):
if exception_value and not isinstance(exception_value, GeneratorExit):
error_data = serializable_error_info_from_exc_info(sys.exc_info())
self.update_state(TickStatus.FAILURE, error=error_data)

Expand Down Expand Up @@ -348,6 +367,9 @@ def _evaluate_sensor(
cursor=sensor_runtime_data.cursor,
error=pipeline_run_reaction.error,
)
# Since run status sensors have side effects that we don't want to repeat,
# we still want to update the cursor, even though the tick failed
context.set_should_update_cursor_on_failure(True)
else:
# log to the original pipeline run
message = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ def run_key_sensor(_context):

@sensor(pipeline_name="the_pipeline")
def error_sensor(context):
context.update_cursor("the exception below should keep this from being persisted")
raise Exception("womp womp")


Expand Down Expand Up @@ -226,6 +227,11 @@ def my_run_failure_sensor_filtered(context):
assert isinstance(context.instance, DagsterInstance)


@run_failure_sensor()
def my_run_failure_sensor_that_itself_fails(context):
raise Exception("How meta")


@run_status_sensor(pipeline_run_status=PipelineRunStatus.SUCCESS)
def my_pipeline_success_sensor(context):
assert isinstance(context.instance, DagsterInstance)
Expand Down Expand Up @@ -284,6 +290,7 @@ def the_repo():
asset_job_sensor,
my_pipeline_failure_sensor,
my_run_failure_sensor_filtered,
my_run_failure_sensor_that_itself_fails,
my_pipeline_success_sensor,
failure_pipeline,
failure_job,
Expand Down Expand Up @@ -610,6 +617,10 @@ def test_error_sensor(capfd):
InstigatorStatus.RUNNING,
)
)

state = instance.get_instigator_state(external_sensor.get_external_origin_id())
assert state.instigator_data is None

assert instance.get_runs_count() == 0
ticks = instance.get_ticks(external_sensor.get_external_origin_id())
assert len(ticks) == 0
Expand All @@ -633,6 +644,11 @@ def test_error_sensor(capfd):
"Error occurred during the execution of evaluation_fn for sensor error_sensor"
) in captured.out

# Tick updated the sensor's last tick time, but not its cursor (due to the failure)
state = instance.get_instigator_state(external_sensor.get_external_origin_id())
assert state.instigator_data.cursor is None
assert state.instigator_data.last_tick_timestamp == freeze_datetime.timestamp()


def test_wrong_config_sensor(capfd):
freeze_datetime = to_timezone(
Expand Down Expand Up @@ -681,6 +697,8 @@ def test_wrong_config_sensor(capfd):
captured = capfd.readouterr()
assert ("Error in config for pipeline") in captured.out

freeze_datetime = freeze_datetime.add(seconds=60)
with pendulum.test(freeze_datetime):
# Error repeats on subsequent ticks

evaluate_sensors(instance, workspace)
Expand Down Expand Up @@ -1298,6 +1316,77 @@ def test_pipeline_failure_sensor():
)


def test_run_failure_sensor_that_fails():
freeze_datetime = pendulum.now()
with instance_with_sensors() as (
instance,
workspace,
external_repo,
):
with pendulum.test(freeze_datetime):
failure_sensor = external_repo.get_external_sensor(
"my_run_failure_sensor_that_itself_fails"
)
instance.start_sensor(failure_sensor)

evaluate_sensors(instance, workspace)

ticks = instance.get_ticks(failure_sensor.get_external_origin_id())
assert len(ticks) == 1
validate_tick(
ticks[0],
failure_sensor,
freeze_datetime,
TickStatus.SKIPPED,
)

freeze_datetime = freeze_datetime.add(seconds=60)
time.sleep(1)

with pendulum.test(freeze_datetime):
external_pipeline = external_repo.get_full_external_pipeline("failure_pipeline")
run = instance.create_run_for_pipeline(
failure_pipeline,
external_pipeline_origin=external_pipeline.get_external_origin(),
pipeline_code_origin=external_pipeline.get_python_origin(),
)
instance.submit_run(run.run_id, workspace)
wait_for_all_runs_to_finish(instance)
run = instance.get_runs()[0]
assert run.status == PipelineRunStatus.FAILURE
freeze_datetime = freeze_datetime.add(seconds=60)

with pendulum.test(freeze_datetime):

# should fire the failure sensor and fail
evaluate_sensors(instance, workspace)

ticks = instance.get_ticks(failure_sensor.get_external_origin_id())
assert len(ticks) == 2
validate_tick(
ticks[0],
failure_sensor,
freeze_datetime,
TickStatus.FAILURE,
expected_error="How meta",
)

# Next tick skips again
freeze_datetime = freeze_datetime.add(seconds=60)
with pendulum.test(freeze_datetime):
# should fire the failure sensor and fail
evaluate_sensors(instance, workspace)

ticks = instance.get_ticks(failure_sensor.get_external_origin_id())
assert len(ticks) == 3
validate_tick(
ticks[0],
failure_sensor,
freeze_datetime,
TickStatus.SKIPPED,
)


def test_run_failure_sensor_filtered():
freeze_datetime = pendulum.now()
with instance_with_sensors() as (
Expand Down

0 comments on commit ee07c9c

Please sign in to comment.