From 179b6989ad54521b06d1593823c2d25f2da2fff2 Mon Sep 17 00:00:00 2001 From: Mark Story Date: Tue, 7 Apr 2026 17:19:06 -0400 Subject: [PATCH] feat(scheduler) Make schedule changes take effect immediately We had feedback a few months back that changes in schedules should take effect immediately, and these changes implement that with backwards compatibility for existing run state. The scenario that came up was a task was moved from every 3 hours to every 10 minutes, and the developer was surprised when their task had to wait for 2+hrs to see their task run again. Refs STREAM-676 --- .../src/taskbroker_client/scheduler/runner.py | 60 ++++++----- .../taskbroker_client/scheduler/schedules.py | 13 +++ clients/python/tests/scheduler/test_runner.py | 99 ++++++++++++++++--- .../python/tests/scheduler/test_schedules.py | 13 +++ 4 files changed, 149 insertions(+), 36 deletions(-) diff --git a/clients/python/src/taskbroker_client/scheduler/runner.py b/clients/python/src/taskbroker_client/scheduler/runner.py index eadd9d9b..bfbcdfa8 100644 --- a/clients/python/src/taskbroker_client/scheduler/runner.py +++ b/clients/python/src/taskbroker_client/scheduler/runner.py @@ -36,10 +36,10 @@ def __init__( self._redis = redis self._metrics = metrics - def _make_key(self, taskname: str) -> str: - return f"tw:scheduler:{taskname}" + def _make_key(self, key: str) -> str: + return f"tw:scheduler:{key}" - def set(self, taskname: str, next_runtime: datetime) -> bool: + def set(self, key: str, next_runtime: datetime) -> bool: """ Record a spawn time for a task. The next_runtime parameter indicates when the record should expire, @@ -51,37 +51,49 @@ def set(self, taskname: str, next_runtime: datetime) -> bool: # next_runtime & now could be the same second, and redis gets sad if ex=0 duration = max(int((next_runtime - now).total_seconds()), 1) - result = self._redis.set(self._make_key(taskname), now.isoformat(), ex=duration, nx=True) + result = self._redis.set(self._make_key(key), now.isoformat(), ex=duration, nx=True) return bool(result) - def read(self, taskname: str) -> datetime | None: + def read(self, key: str) -> datetime | None: """ Retrieve the last run time of a task Returns None if last run time has expired or is unknown. """ - result = self._redis.get(self._make_key(taskname)) + result = self._redis.get(self._make_key(key)) if result: return datetime.fromisoformat(result) - self._metrics.incr( - "taskworker.scheduler.run_storage.read.miss", tags={"taskname": taskname} - ) + self._metrics.incr("taskworker.scheduler.run_storage.read.miss", tags={"taskname": key}) return None - def read_many(self, tasknames: list[str]) -> Mapping[str, datetime | None]: + def read_many( + self, + storage_keys: list[str], + ) -> Mapping[str, datetime | None]: """ - Retreive last run times in bulk + Retrieve last run times in bulk. + + storage_keys are the new-format keys including the schedule_id suffix + (e.g. "test:valid:300"). Falls back to the legacy key (derived by + stripping the suffix) when the new key has no data, allowing a seamless + first-deploy transition. + + Returns a mapping keyed by storage_key. """ - values = self._redis.mget([self._make_key(taskname) for taskname in tasknames]) - run_times = { - taskname: datetime.fromisoformat(value) if value else None - for taskname, value in zip(tasknames, values) - } + legacy_keys = [sk.rsplit(":", 1)[0] for sk in storage_keys] + + new_values = self._redis.mget([self._make_key(sk) for sk in storage_keys]) + legacy_values = self._redis.mget([self._make_key(lk) for lk in legacy_keys]) + + run_times: dict[str, datetime | None] = {} + for storage_key, new_val, legacy_val in zip(storage_keys, new_values, legacy_values): + raw = new_val if new_val is not None else legacy_val + run_times[storage_key] = datetime.fromisoformat(raw) if raw else None return run_times - def delete(self, taskname: str) -> None: + def delete(self, key: str) -> None: """remove a task key - mostly for testing.""" - self._redis.delete(self._make_key(taskname)) + self._redis.delete(self._make_key(key)) class ScheduleEntry: @@ -112,6 +124,10 @@ def __repr__(self) -> str: def fullname(self) -> str: return self._task.fullname + @property + def storage_key(self) -> str: + return f"{self.fullname}:{self._schedule.schedule_id()}" + @property def namespace(self) -> str: return self._task.namespace.name @@ -237,7 +253,7 @@ def tick(self) -> float: def _try_spawn(self, entry: ScheduleEntry) -> None: now = datetime.now(tz=UTC) next_runtime = entry.runtime_after(now) - if self._run_storage.set(entry.fullname, next_runtime): + if self._run_storage.set(entry.storage_key, next_runtime): entry.delay_task() entry.set_last_run(now) @@ -252,7 +268,7 @@ def _try_spawn(self, entry: ScheduleEntry) -> None: ) else: # We were not able to set a key, load last run from storage. - run_state = self._run_storage.read(entry.fullname) + run_state = self._run_storage.read(entry.storage_key) entry.set_last_run(run_state) logger.info( @@ -284,9 +300,9 @@ def _load_last_run(self) -> None: We synchronize each time the schedule set is modified and then incrementally as tasks spawn attempts are made. """ - last_run_times = self._run_storage.read_many([item.fullname for item in self._entries]) + last_run_times = self._run_storage.read_many([item.storage_key for item in self._entries]) for item in self._entries: - last_run = last_run_times.get(item.fullname, None) + last_run = last_run_times.get(item.storage_key, None) item.set_last_run(last_run) logger.info( "taskworker.scheduler.load_last_run", diff --git a/clients/python/src/taskbroker_client/scheduler/schedules.py b/clients/python/src/taskbroker_client/scheduler/schedules.py index ecf6edd1..7bb3df30 100644 --- a/clients/python/src/taskbroker_client/scheduler/schedules.py +++ b/clients/python/src/taskbroker_client/scheduler/schedules.py @@ -36,6 +36,13 @@ def runtime_after(self, start: datetime) -> datetime: Get the next scheduled time after `start` """ + @abc.abstractmethod + def schedule_id(self) -> str: + """ + Return a stable identifier for this schedule's interval. + Used as a key suffix in Redis so that changing the schedule rotates the key. + """ + class TimedeltaSchedule(Schedule): """ @@ -91,6 +98,9 @@ def runtime_after(self, start: datetime) -> datetime: """Get the next time a task should run after start""" return start + self._delta + def schedule_id(self) -> str: + return str(int(self._delta.total_seconds())) + class CrontabSchedule(Schedule): """ @@ -193,3 +203,6 @@ def runtime_after(self, start: datetime) -> datetime: """Get the next time a task should be spawned after `start`""" start = start.replace(second=0, microsecond=0) + timedelta(minutes=1) return self._advance(start) + + def schedule_id(self) -> str: + return str(self._crontab).replace(" ", "_") diff --git a/clients/python/tests/scheduler/test_runner.py b/clients/python/tests/scheduler/test_runner.py index a1d45c8f..90511496 100644 --- a/clients/python/tests/scheduler/test_runner.py +++ b/clients/python/tests/scheduler/test_runner.py @@ -110,7 +110,7 @@ def test_schedulerunner_tick_one_task_time_remaining( ) # Last run was two minutes ago. with freeze_time("2025-01-24 14:23:00 UTC"): - run_storage.set("test:valid", datetime(2025, 1, 24, 14, 28, 0, tzinfo=UTC)) + run_storage.set("test:valid:300", datetime(2025, 1, 24, 14, 28, 0, tzinfo=UTC)) namespace = task_app.taskregistry.get("test") with freeze_time("2025-01-24 14:25:00 UTC"), patch.object(namespace, "send_task") as mock_send: @@ -118,7 +118,7 @@ def test_schedulerunner_tick_one_task_time_remaining( assert sleep_time == 180 assert mock_send.call_count == 0 - last_run = run_storage.read("test:valid") + last_run = run_storage.read("test:valid:300") assert last_run == datetime(2025, 1, 24, 14, 23, 0, tzinfo=UTC) @@ -137,7 +137,7 @@ def test_schedulerunner_tick_one_task_spawned( # Last run was 5 minutes from the freeze_time below run_storage.read_many.return_value = { - "test:valid": datetime(2025, 1, 24, 14, 19, 55, tzinfo=UTC), + "test:valid:300": datetime(2025, 1, 24, 14, 19, 55, tzinfo=UTC), } run_storage.set.return_value = True @@ -154,7 +154,9 @@ def test_schedulerunner_tick_one_task_spawned( assert run_storage.set.call_count == 1 # set() is called with the correct next_run time - run_storage.set.assert_called_with("test:valid", datetime(2025, 1, 24, 14, 30, 0, tzinfo=UTC)) + run_storage.set.assert_called_with( + "test:valid:300", datetime(2025, 1, 24, 14, 30, 0, tzinfo=UTC) + ) @patch("taskbroker_client.scheduler.runner.capture_checkin") @@ -173,7 +175,7 @@ def test_schedulerunner_tick_create_checkin( # Last run was 5 minutes from the freeze_time below run_storage.read_many.return_value = { - "test:valid": datetime(2025, 1, 24, 14, 19, 55, tzinfo=UTC), + "test:valid:300": datetime(2025, 1, 24, 14, 19, 55, tzinfo=UTC), } run_storage.set.return_value = True mock_capture_checkin.return_value = "checkin-id" @@ -232,8 +234,8 @@ def test_schedulerunner_tick_key_exists_no_spawn( with freeze_time("2025-01-24 14:30:00 UTC"): # Set a key into run_storage to simulate another scheduler running - run_storage.delete("test:valid") - assert run_storage.set("test:valid", datetime.now(tz=UTC) + timedelta(minutes=2)) + run_storage.delete("test:valid:300") + assert run_storage.set("test:valid:300", datetime.now(tz=UTC) + timedelta(minutes=2)) # Our scheduler would wakeup and tick again. # The key exists in run_storage so we should not spawn a task. @@ -293,7 +295,7 @@ def test_schedulerunner_tick_one_task_multiple_ticks_crontab( assert sleep_time == 60 # Remove key to simulate expiration - run_storage.delete("test:valid") + run_storage.delete("test:valid:*/2_*_*_*_*") with freeze_time("2025-01-24 14:26:00 UTC"): sleep_time = schedule_set.tick() assert sleep_time == 120 @@ -334,7 +336,7 @@ def test_schedulerunner_tick_multiple_tasks( assert mock_send.call_count == 2 # Remove the redis key, as the ttl in redis doesn't respect freeze_time() - run_storage.delete("test:second") + run_storage.delete("test:second:120") with freeze_time("2025-01-24 14:27:01 UTC"): sleep_time = schedule_set.tick() # two minutes left on the 5 min task @@ -371,7 +373,7 @@ def test_schedulerunner_tick_fast_and_slow( called = extract_sent_tasks(mock_send) assert called == ["valid"] - run_storage.delete("test:valid") + run_storage.delete("test:valid:30") with freeze_time("2025-01-24 14:25:30 UTC"): sleep_time = schedule_set.tick() assert sleep_time == 30 @@ -379,7 +381,7 @@ def test_schedulerunner_tick_fast_and_slow( called = extract_sent_tasks(mock_send) assert called == ["valid", "valid"] - run_storage.delete("test:valid") + run_storage.delete("test:valid:30") with freeze_time("2025-01-24 14:26:00 UTC"): sleep_time = schedule_set.tick() assert sleep_time == 30 @@ -387,7 +389,7 @@ def test_schedulerunner_tick_fast_and_slow( called = extract_sent_tasks(mock_send) assert called == ["valid", "valid", "second", "valid"] - run_storage.delete("test:valid") + run_storage.delete("test:valid:30") with freeze_time("2025-01-24 14:26:30 UTC"): sleep_time = schedule_set.tick() assert sleep_time == 30 @@ -395,12 +397,12 @@ def test_schedulerunner_tick_fast_and_slow( called = extract_sent_tasks(mock_send) assert called == ["valid", "valid", "second", "valid", "valid"] - run_storage.delete("test:valid") + run_storage.delete("test:valid:30") with freeze_time("2025-01-24 14:27:00 UTC"): sleep_time = schedule_set.tick() assert sleep_time == 30 - assert run_storage.read("test:valid") + assert run_storage.read("test:valid:30") called = extract_sent_tasks(mock_send) assert called == [ "valid", @@ -414,3 +416,72 @@ def test_schedulerunner_tick_fast_and_slow( def extract_sent_tasks(mock: Mock) -> list[str]: return [call[0][0].taskname for call in mock.call_args_list] + + +def test_scheduleentry_storage_key(task_app: TaskbrokerApp) -> None: + run_storage = Mock(spec=RunStorage) + runner = ScheduleRunner(app=task_app, run_storage=run_storage) + runner.add("valid", {"task": "test:valid", "schedule": timedelta(minutes=5)}) + entry = runner._entries[0] + assert entry.storage_key == "test:valid:300" + assert entry.fullname == "test:valid" + + runner2 = ScheduleRunner(app=task_app, run_storage=run_storage) + runner2.add("valid", {"task": "test:valid", "schedule": crontab(minute="*/2")}) + entry2 = runner2._entries[0] + assert entry2.storage_key == "test:valid:*/2_*_*_*_*" + + +def test_schedulerunner_schedule_change_spawns_immediately( + task_app: TaskbrokerApp, run_storage: RunStorage +) -> None: + """ + When a schedule is changed, the old key (with a different schedule_id suffix) + should not block spawning. The task should run immediately on the new schedule. + """ + # Simulate the old scheduler having stored state with a 3-hour schedule + old_storage_key = "test:valid:10800" # 3 hours = 10800 seconds + with freeze_time("2025-01-24 12:00:00 UTC"): + # Old key with a 3h TTL would normally block for up to 3 more hours + run_storage.set(old_storage_key, datetime(2025, 1, 24, 15, 0, 0, tzinfo=UTC)) + + # New scheduler with a changed schedule (10 minutes = 600s) + schedule_set = ScheduleRunner(app=task_app, run_storage=run_storage) + schedule_set.add("valid", {"task": "test:valid", "schedule": timedelta(minutes=10)}) + + namespace = task_app.taskregistry.get("test") + with freeze_time("2025-01-24 14:25:00 UTC"), patch.object(namespace, "send_task") as mock_send: + sleep_time = schedule_set.tick() + # Task should spawn immediately — no last_run found under the new key + assert mock_send.call_count == 1 + assert sleep_time == 600 # 10 minutes + + +def test_runstorage_read_many_backwards_compat(run_storage: RunStorage) -> None: + """ + read_many() should fall back to the legacy key (old format without schedule_id suffix) + when the new-format key has no data. When the new key exists it should take precedence. + """ + with freeze_time("2025-01-24 14:25:00 UTC"): + now = datetime.now(tz=UTC) + # Write state under the old legacy key format (no schedule_id suffix) + run_storage._redis.set( + run_storage._make_key("test:valid"), + now.isoformat(), + ex=300, + ) + + # New-format key doesn't exist yet — should fall back to legacy value + result = run_storage.read_many(["test:valid:300"]) + assert result["test:valid:300"] == now + + # Once a new-format key is written, it wins over the legacy key + with freeze_time("2025-01-24 14:26:00 UTC"): + new_time = datetime.now(tz=UTC) + run_storage._redis.set( + run_storage._make_key("test:valid:300"), + new_time.isoformat(), + ex=300, + ) + result2 = run_storage.read_many(["test:valid:300"]) + assert result2["test:valid:300"] == new_time diff --git a/clients/python/tests/scheduler/test_schedules.py b/clients/python/tests/scheduler/test_schedules.py index 67f6f231..e958e978 100644 --- a/clients/python/tests/scheduler/test_schedules.py +++ b/clients/python/tests/scheduler/test_schedules.py @@ -66,6 +66,13 @@ def test_timedeltaschedule_remaining_seconds() -> None: assert schedule.remaining_seconds(ten_min_ago) == 0 +def test_timedeltaschedule_schedule_id() -> None: + assert TimedeltaSchedule(timedelta(seconds=30)).schedule_id() == "30" + assert TimedeltaSchedule(timedelta(minutes=5)).schedule_id() == "300" + assert TimedeltaSchedule(timedelta(hours=3)).schedule_id() == "10800" + assert TimedeltaSchedule(timedelta(hours=1, minutes=30)).schedule_id() == "5400" + + def test_crontabschedule_invalid() -> None: with pytest.raises(ValueError): CrontabSchedule("test", crontab(hour="99")) @@ -199,3 +206,9 @@ def test_crontabschedule_monitor_value() -> None: schedule = CrontabSchedule("test", crontab(minute="*/10", day_of_week="1")) assert schedule.monitor_value() == "*/10 * * * 1" + + +def test_crontabschedule_schedule_id() -> None: + assert CrontabSchedule("test", crontab(minute="*/2")).schedule_id() == "*/2_*_*_*_*" + assert CrontabSchedule("test", crontab(minute="1", hour="*/6")).schedule_id() == "1_*/6_*_*_*" + assert CrontabSchedule("test", crontab()).schedule_id() == "*_*_*_*_*"