Skip to content

Commit

Permalink
enable saving run key for ticks that skip runs because of run key ide…
Browse files Browse the repository at this point in the history
…mpotence (#7130)

* enable saving run key info for ticks that skip runs due to run key idempotence

* comments
  • Loading branch information
prha committed Mar 22, 2022
1 parent 3ee5790 commit b17183e
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 18 deletions.
11 changes: 6 additions & 5 deletions python_modules/dagster/dagster/core/scheduler/instigation.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,8 @@ def with_reason(self, skip_reason):
check.opt_str_param(skip_reason, "skip_reason")
return self._replace(tick_data=self.tick_data.with_reason(skip_reason))

def with_run(self, run_id, run_key=None):
return self._replace(tick_data=self.tick_data.with_run(run_id, run_key))
def with_run_info(self, run_id=None, run_key=None):
return self._replace(tick_data=self.tick_data.with_run_info(run_id, run_key))

def with_cursor(self, cursor):
return self._replace(tick_data=self.tick_data.with_cursor(cursor))
Expand Down Expand Up @@ -497,13 +497,14 @@ def with_status(self, status, error=None, timestamp=None, failure_count=None):
)
)

def with_run(self, run_id, run_key=None):
check.str_param(run_id, "run_id")
def with_run_info(self, run_id=None, run_key=None):
check.opt_str_param(run_id, "run_id")
check.opt_str_param(run_key, "run_key")
return TickData(
**merge_dicts(
self._asdict(),
{
"run_ids": [*self.run_ids, run_id],
"run_ids": [*self.run_ids, run_id] if run_id else self.run_ids,
"run_keys": [*self.run_keys, run_key] if run_key else self.run_keys,
},
)
Expand Down
13 changes: 6 additions & 7 deletions python_modules/dagster/dagster/daemon/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ def update_state(self, status, **kwargs):
if origin_run_id:
self._tick = self._tick.with_origin_run(origin_run_id)

def add_run(self, run_id, run_key=None):
self._tick = self._tick.with_run(run_id, run_key)
def add_run_info(self, run_id=None, run_key=None):
self._tick = self._tick.with_run_info(run_id, run_key)

def set_should_update_cursor_on_failure(self, should_update_cursor_on_failure: bool):
self._should_update_cursor_on_failure = should_update_cursor_on_failure
Expand Down Expand Up @@ -442,6 +442,7 @@ def _evaluate_sensor(

if isinstance(run, SkippedSensorRun):
skipped_runs.append(run)
context.add_run_info(run_id=None, run_key=run_request.run_key)
yield
continue

Expand Down Expand Up @@ -469,7 +470,7 @@ def _evaluate_sensor(

_check_for_debug_crash(sensor_debug_crash_flags, "RUN_LAUNCHED")

context.add_run(run_id=run.run_id, run_key=run_request.run_key)
context.add_run_info(run_id=run.run_id, run_key=run_request.run_key)

if skipped_runs:
run_keys = [skipped.run_key for skipped in skipped_runs]
Expand Down Expand Up @@ -562,10 +563,8 @@ def _get_or_create_sensor_run(

if run:
if run.status != PipelineRunStatus.NOT_STARTED:
# A run already exists and was launched for this time period,
# but the daemon must have crashed before the tick could be put
# into a SUCCESS state
context.logger.info(f"Skipping run for {run_request.run_key}, found {run.run_id}.")
# A run already exists and was launched for this run key, but the daemon must have
# crashed before the tick could be updated
return SkippedSensorRun(run_key=run_request.run_key, existing_run=run)
else:
context.logger.info(
Expand Down
8 changes: 4 additions & 4 deletions python_modules/dagster/dagster/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ def update_state(self, status, error=None, **kwargs):
if skip_reason:
self._tick = self._tick.with_reason(skip_reason=skip_reason)

def add_run(self, run_id, run_key=None):
self._tick = self._tick.with_run(run_id, run_key)
def add_run_info(self, run_id=None, run_key=None):
self._tick = self._tick.with_run_info(run_id, run_key)

def _write(self):
self._instance.update_tick(self._tick)
Expand Down Expand Up @@ -459,7 +459,7 @@ def _schedule_runs_at_time(
logger.info(
f"Run {run.run_id} already completed for this execution of {external_schedule.name}"
)
tick_context.add_run(run_id=run.run_id, run_key=run_request.run_key)
tick_context.add_run_info(run_id=run.run_id, run_key=run_request.run_key)
yield
continue
else:
Expand Down Expand Up @@ -490,7 +490,7 @@ def _schedule_runs_at_time(
yield error_info

_check_for_debug_crash(debug_crash_flags, "RUN_LAUNCHED")
tick_context.add_run(run_id=run.run_id, run_key=run_request.run_key)
tick_context.add_run_info(run_id=run.run_id, run_key=run_request.run_key)
_check_for_debug_crash(debug_crash_flags, "RUN_ADDED")
yield

Expand Down
4 changes: 2 additions & 2 deletions python_modules/dagster/dagster/utils/test/schedule_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ def test_update_tick_to_success(self, storage):
current_time = time.time()
tick = storage.create_tick(self.build_schedule_tick(current_time))

updated_tick = tick.with_status(TickStatus.SUCCESS).with_run(run_id="1234")
updated_tick = tick.with_status(TickStatus.SUCCESS).with_run_info(run_id="1234")
assert updated_tick.status == TickStatus.SUCCESS

storage.update_tick(updated_tick)
Expand Down Expand Up @@ -507,7 +507,7 @@ def test_update_sensor_tick_to_success(self, storage):
current_time = time.time()
tick = storage.create_tick(self.build_sensor_tick(current_time))

updated_tick = tick.with_status(TickStatus.SUCCESS).with_run(run_id="1234")
updated_tick = tick.with_status(TickStatus.SUCCESS).with_run_info(run_id="1234")
assert updated_tick.status == TickStatus.SUCCESS

storage.update_tick(updated_tick)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,10 @@ def test_launch_once(capfd):
freeze_datetime,
TickStatus.SKIPPED,
)
assert ticks[0].run_keys
assert len(ticks[0].run_keys) == 1
assert not ticks[0].run_ids

captured = capfd.readouterr()
assert (
'Skipping 1 run for sensor run_key_sensor already completed with run keys: ["only_once"]'
Expand Down

0 comments on commit b17183e

Please sign in to comment.