Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 38 additions & 22 deletions clients/python/src/taskbroker_client/scheduler/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ def __init__(
self._redis = redis
self._metrics = metrics

def _make_key(self, taskname: str) -> str:
return f"tw:scheduler:{taskname}"
def _make_key(self, key: str) -> str:
return f"tw:scheduler:{key}"

def set(self, taskname: str, next_runtime: datetime) -> bool:
def set(self, key: str, next_runtime: datetime) -> bool:
"""
Record a spawn time for a task.
The next_runtime parameter indicates when the record should expire,
Expand All @@ -51,37 +51,49 @@ def set(self, taskname: str, next_runtime: datetime) -> bool:
# next_runtime & now could be the same second, and redis gets sad if ex=0
duration = max(int((next_runtime - now).total_seconds()), 1)

result = self._redis.set(self._make_key(taskname), now.isoformat(), ex=duration, nx=True)
result = self._redis.set(self._make_key(key), now.isoformat(), ex=duration, nx=True)
return bool(result)

def read(self, taskname: str) -> datetime | None:
def read(self, key: str) -> datetime | None:
"""
Retrieve the last run time of a task
Returns None if last run time has expired or is unknown.
"""
result = self._redis.get(self._make_key(taskname))
result = self._redis.get(self._make_key(key))
if result:
return datetime.fromisoformat(result)

self._metrics.incr(
"taskworker.scheduler.run_storage.read.miss", tags={"taskname": taskname}
)
self._metrics.incr("taskworker.scheduler.run_storage.read.miss", tags={"taskname": key})
return None

def read_many(self, tasknames: list[str]) -> Mapping[str, datetime | None]:
def read_many(
self,
storage_keys: list[str],
) -> Mapping[str, datetime | None]:
"""
Retreive last run times in bulk
Retrieve last run times in bulk.

storage_keys are the new-format keys including the schedule_id suffix
(e.g. "test:valid:300"). Falls back to the legacy key (derived by
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a contrived example, but:

storage_keys = [`test:val:300`, `test:val:3000`]
legacy_keys = [`test:val`, `test:val`]

new_values = [123, None] # No value for 3000
legacy_values = [321, 321] # We have a legacy value

run_times[`test:val:300`] = 123
run_times[`test:val:3000`] = 321

Is this scenario possible? And if so, does this result make sense?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this scenario possible? And if so, does this result make sense?

If I understand correctly, a task has both a legacy + new key (test:val and test:val:300) and then the schedule is changed from 300 -> 3000? In that scenario we shouldn't have both test:val:300 and test:val:3000 in storage_keys because test:val is the scheduled task name, and task names would be unique.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But isn't it possible to create such a scenario anyways? For example, if the scheduler command has...

scheduler.add(
    "simple-task", {"task": "test:val", "schedule": timedelta(seconds=300)}
)

...

scheduler.add(
    "simple-task", {"task": "test:val", "schedule": timedelta(seconds=3000)}
)

... wouldn't this recreate the example Evan proposed?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that scenario would trigger the problem. Currently having two schedule entries with the same task and different schedules results in only one of the schedules being followed because the storage key is shared.

We currently prevent tasks from having two different schedules in sentry with a test

https://github.com/getsentry/sentry/blob/41423c6cbfd603d2c63a7589d76c4f0dca2e6014/tests/sentry/taskworker/test_config.py#L30-L40

So the problematic scenario has been prevented for now. As we on-board more applications though we should make this more resilient to operator error. Using the schedule name as the storage key would help with that. I can follow up these changes with that improvement.

stripping the suffix) when the new key has no data, allowing a seamless
first-deploy transition.

Returns a mapping keyed by storage_key.
"""
values = self._redis.mget([self._make_key(taskname) for taskname in tasknames])
run_times = {
taskname: datetime.fromisoformat(value) if value else None
for taskname, value in zip(tasknames, values)
}
legacy_keys = [sk.rsplit(":", 1)[0] for sk in storage_keys]

new_values = self._redis.mget([self._make_key(sk) for sk in storage_keys])
legacy_values = self._redis.mget([self._make_key(lk) for lk in legacy_keys])

run_times: dict[str, datetime | None] = {}
for storage_key, new_val, legacy_val in zip(storage_keys, new_values, legacy_values):
raw = new_val if new_val is not None else legacy_val
run_times[storage_key] = datetime.fromisoformat(raw) if raw else None
Comment on lines +83 to +91
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The backwards-compatibility fallback for legacy run state can prevent a task from running immediately after a schedule change, delaying its execution until the new interval has passed.
Severity: MEDIUM

Suggested Fix

The backwards-compatibility logic should be adjusted to ignore the legacy last_run value if the schedule has changed. One approach is to have read_many not perform the fallback at all, forcing an immediate run on the new schedule. Alternatively, the calling function _load_last_run could compare the schedule ID from the new key with the schedule that produced the legacy data (if possible to determine) and ignore the legacy data if they differ.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: clients/python/src/taskbroker_client/scheduler/runner.py#L83-L91

Potential issue: The backwards-compatibility fallback in `read_many()` can prevent a
task from running immediately after a schedule change. If a task ran recently under an
old schedule, its last run time is stored in a legacy-formatted Redis key. When the code
is updated and the schedule is changed simultaneously, the new logic will read the last
run time from the legacy key. This causes the `is_due()` calculation to incorrectly
delay the task's next execution, respecting the old run time instead of triggering
immediately, which contradicts the feature's goal.

Did we get this right? 👍 / 👎 to inform future reviews.

return run_times

def delete(self, taskname: str) -> None:
def delete(self, key: str) -> None:
"""remove a task key - mostly for testing."""
self._redis.delete(self._make_key(taskname))
self._redis.delete(self._make_key(key))


class ScheduleEntry:
Expand Down Expand Up @@ -112,6 +124,10 @@ def __repr__(self) -> str:
def fullname(self) -> str:
return self._task.fullname

@property
def storage_key(self) -> str:
return f"{self.fullname}:{self._schedule.schedule_id()}"

@property
def namespace(self) -> str:
return self._task.namespace.name
Expand Down Expand Up @@ -237,7 +253,7 @@ def tick(self) -> float:
def _try_spawn(self, entry: ScheduleEntry) -> None:
now = datetime.now(tz=UTC)
next_runtime = entry.runtime_after(now)
if self._run_storage.set(entry.fullname, next_runtime):
if self._run_storage.set(entry.storage_key, next_runtime):
entry.delay_task()
entry.set_last_run(now)

Expand All @@ -252,7 +268,7 @@ def _try_spawn(self, entry: ScheduleEntry) -> None:
)
else:
# We were not able to set a key, load last run from storage.
run_state = self._run_storage.read(entry.fullname)
run_state = self._run_storage.read(entry.storage_key)
entry.set_last_run(run_state)

logger.info(
Expand Down Expand Up @@ -284,9 +300,9 @@ def _load_last_run(self) -> None:
We synchronize each time the schedule set is modified and
then incrementally as tasks spawn attempts are made.
"""
last_run_times = self._run_storage.read_many([item.fullname for item in self._entries])
last_run_times = self._run_storage.read_many([item.storage_key for item in self._entries])
for item in self._entries:
last_run = last_run_times.get(item.fullname, None)
last_run = last_run_times.get(item.storage_key, None)
item.set_last_run(last_run)
logger.info(
"taskworker.scheduler.load_last_run",
Expand Down
13 changes: 13 additions & 0 deletions clients/python/src/taskbroker_client/scheduler/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ def runtime_after(self, start: datetime) -> datetime:
Get the next scheduled time after `start`
"""

@abc.abstractmethod
def schedule_id(self) -> str:
"""
Return a stable identifier for this schedule's interval.
Used as a key suffix in Redis so that changing the schedule rotates the key.
"""


class TimedeltaSchedule(Schedule):
"""
Expand Down Expand Up @@ -91,6 +98,9 @@ def runtime_after(self, start: datetime) -> datetime:
"""Get the next time a task should run after start"""
return start + self._delta

def schedule_id(self) -> str:
return str(int(self._delta.total_seconds()))


class CrontabSchedule(Schedule):
"""
Expand Down Expand Up @@ -193,3 +203,6 @@ def runtime_after(self, start: datetime) -> datetime:
"""Get the next time a task should be spawned after `start`"""
start = start.replace(second=0, microsecond=0) + timedelta(minutes=1)
return self._advance(start)

def schedule_id(self) -> str:
return str(self._crontab).replace(" ", "_")
99 changes: 85 additions & 14 deletions clients/python/tests/scheduler/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,15 @@ def test_schedulerunner_tick_one_task_time_remaining(
)
# Last run was two minutes ago.
with freeze_time("2025-01-24 14:23:00 UTC"):
run_storage.set("test:valid", datetime(2025, 1, 24, 14, 28, 0, tzinfo=UTC))
run_storage.set("test:valid:300", datetime(2025, 1, 24, 14, 28, 0, tzinfo=UTC))

namespace = task_app.taskregistry.get("test")
with freeze_time("2025-01-24 14:25:00 UTC"), patch.object(namespace, "send_task") as mock_send:
sleep_time = schedule_set.tick()
assert sleep_time == 180
assert mock_send.call_count == 0

last_run = run_storage.read("test:valid")
last_run = run_storage.read("test:valid:300")
assert last_run == datetime(2025, 1, 24, 14, 23, 0, tzinfo=UTC)


Expand All @@ -137,7 +137,7 @@ def test_schedulerunner_tick_one_task_spawned(

# Last run was 5 minutes from the freeze_time below
run_storage.read_many.return_value = {
"test:valid": datetime(2025, 1, 24, 14, 19, 55, tzinfo=UTC),
"test:valid:300": datetime(2025, 1, 24, 14, 19, 55, tzinfo=UTC),
}
run_storage.set.return_value = True

Expand All @@ -154,7 +154,9 @@ def test_schedulerunner_tick_one_task_spawned(

assert run_storage.set.call_count == 1
# set() is called with the correct next_run time
run_storage.set.assert_called_with("test:valid", datetime(2025, 1, 24, 14, 30, 0, tzinfo=UTC))
run_storage.set.assert_called_with(
"test:valid:300", datetime(2025, 1, 24, 14, 30, 0, tzinfo=UTC)
)


@patch("taskbroker_client.scheduler.runner.capture_checkin")
Expand All @@ -173,7 +175,7 @@ def test_schedulerunner_tick_create_checkin(

# Last run was 5 minutes from the freeze_time below
run_storage.read_many.return_value = {
"test:valid": datetime(2025, 1, 24, 14, 19, 55, tzinfo=UTC),
"test:valid:300": datetime(2025, 1, 24, 14, 19, 55, tzinfo=UTC),
}
run_storage.set.return_value = True
mock_capture_checkin.return_value = "checkin-id"
Expand Down Expand Up @@ -232,8 +234,8 @@ def test_schedulerunner_tick_key_exists_no_spawn(

with freeze_time("2025-01-24 14:30:00 UTC"):
# Set a key into run_storage to simulate another scheduler running
run_storage.delete("test:valid")
assert run_storage.set("test:valid", datetime.now(tz=UTC) + timedelta(minutes=2))
run_storage.delete("test:valid:300")
assert run_storage.set("test:valid:300", datetime.now(tz=UTC) + timedelta(minutes=2))

# Our scheduler would wakeup and tick again.
# The key exists in run_storage so we should not spawn a task.
Expand Down Expand Up @@ -293,7 +295,7 @@ def test_schedulerunner_tick_one_task_multiple_ticks_crontab(
assert sleep_time == 60

# Remove key to simulate expiration
run_storage.delete("test:valid")
run_storage.delete("test:valid:*/2_*_*_*_*")
with freeze_time("2025-01-24 14:26:00 UTC"):
sleep_time = schedule_set.tick()
assert sleep_time == 120
Expand Down Expand Up @@ -334,7 +336,7 @@ def test_schedulerunner_tick_multiple_tasks(
assert mock_send.call_count == 2

# Remove the redis key, as the ttl in redis doesn't respect freeze_time()
run_storage.delete("test:second")
run_storage.delete("test:second:120")
with freeze_time("2025-01-24 14:27:01 UTC"):
sleep_time = schedule_set.tick()
# two minutes left on the 5 min task
Expand Down Expand Up @@ -371,36 +373,36 @@ def test_schedulerunner_tick_fast_and_slow(
called = extract_sent_tasks(mock_send)
assert called == ["valid"]

run_storage.delete("test:valid")
run_storage.delete("test:valid:30")
with freeze_time("2025-01-24 14:25:30 UTC"):
sleep_time = schedule_set.tick()
assert sleep_time == 30

called = extract_sent_tasks(mock_send)
assert called == ["valid", "valid"]

run_storage.delete("test:valid")
run_storage.delete("test:valid:30")
with freeze_time("2025-01-24 14:26:00 UTC"):
sleep_time = schedule_set.tick()
assert sleep_time == 30

called = extract_sent_tasks(mock_send)
assert called == ["valid", "valid", "second", "valid"]

run_storage.delete("test:valid")
run_storage.delete("test:valid:30")
with freeze_time("2025-01-24 14:26:30 UTC"):
sleep_time = schedule_set.tick()
assert sleep_time == 30

called = extract_sent_tasks(mock_send)
assert called == ["valid", "valid", "second", "valid", "valid"]

run_storage.delete("test:valid")
run_storage.delete("test:valid:30")
with freeze_time("2025-01-24 14:27:00 UTC"):
sleep_time = schedule_set.tick()
assert sleep_time == 30

assert run_storage.read("test:valid")
assert run_storage.read("test:valid:30")
called = extract_sent_tasks(mock_send)
assert called == [
"valid",
Expand All @@ -414,3 +416,72 @@ def test_schedulerunner_tick_fast_and_slow(

def extract_sent_tasks(mock: Mock) -> list[str]:
return [call[0][0].taskname for call in mock.call_args_list]


def test_scheduleentry_storage_key(task_app: TaskbrokerApp) -> None:
run_storage = Mock(spec=RunStorage)
runner = ScheduleRunner(app=task_app, run_storage=run_storage)
runner.add("valid", {"task": "test:valid", "schedule": timedelta(minutes=5)})
entry = runner._entries[0]
assert entry.storage_key == "test:valid:300"
assert entry.fullname == "test:valid"

runner2 = ScheduleRunner(app=task_app, run_storage=run_storage)
runner2.add("valid", {"task": "test:valid", "schedule": crontab(minute="*/2")})
entry2 = runner2._entries[0]
assert entry2.storage_key == "test:valid:*/2_*_*_*_*"


def test_schedulerunner_schedule_change_spawns_immediately(
task_app: TaskbrokerApp, run_storage: RunStorage
) -> None:
"""
When a schedule is changed, the old key (with a different schedule_id suffix)
should not block spawning. The task should run immediately on the new schedule.
"""
# Simulate the old scheduler having stored state with a 3-hour schedule
old_storage_key = "test:valid:10800" # 3 hours = 10800 seconds
with freeze_time("2025-01-24 12:00:00 UTC"):
# Old key with a 3h TTL would normally block for up to 3 more hours
run_storage.set(old_storage_key, datetime(2025, 1, 24, 15, 0, 0, tzinfo=UTC))

# New scheduler with a changed schedule (10 minutes = 600s)
schedule_set = ScheduleRunner(app=task_app, run_storage=run_storage)
schedule_set.add("valid", {"task": "test:valid", "schedule": timedelta(minutes=10)})

namespace = task_app.taskregistry.get("test")
with freeze_time("2025-01-24 14:25:00 UTC"), patch.object(namespace, "send_task") as mock_send:
sleep_time = schedule_set.tick()
# Task should spawn immediately — no last_run found under the new key
assert mock_send.call_count == 1
assert sleep_time == 600 # 10 minutes


def test_runstorage_read_many_backwards_compat(run_storage: RunStorage) -> None:
"""
read_many() should fall back to the legacy key (old format without schedule_id suffix)
when the new-format key has no data. When the new key exists it should take precedence.
"""
with freeze_time("2025-01-24 14:25:00 UTC"):
now = datetime.now(tz=UTC)
# Write state under the old legacy key format (no schedule_id suffix)
run_storage._redis.set(
run_storage._make_key("test:valid"),
now.isoformat(),
ex=300,
)

# New-format key doesn't exist yet — should fall back to legacy value
result = run_storage.read_many(["test:valid:300"])
assert result["test:valid:300"] == now

# Once a new-format key is written, it wins over the legacy key
with freeze_time("2025-01-24 14:26:00 UTC"):
new_time = datetime.now(tz=UTC)
run_storage._redis.set(
run_storage._make_key("test:valid:300"),
new_time.isoformat(),
ex=300,
)
result2 = run_storage.read_many(["test:valid:300"])
assert result2["test:valid:300"] == new_time
13 changes: 13 additions & 0 deletions clients/python/tests/scheduler/test_schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ def test_timedeltaschedule_remaining_seconds() -> None:
assert schedule.remaining_seconds(ten_min_ago) == 0


def test_timedeltaschedule_schedule_id() -> None:
assert TimedeltaSchedule(timedelta(seconds=30)).schedule_id() == "30"
assert TimedeltaSchedule(timedelta(minutes=5)).schedule_id() == "300"
assert TimedeltaSchedule(timedelta(hours=3)).schedule_id() == "10800"
assert TimedeltaSchedule(timedelta(hours=1, minutes=30)).schedule_id() == "5400"


def test_crontabschedule_invalid() -> None:
with pytest.raises(ValueError):
CrontabSchedule("test", crontab(hour="99"))
Expand Down Expand Up @@ -199,3 +206,9 @@ def test_crontabschedule_monitor_value() -> None:

schedule = CrontabSchedule("test", crontab(minute="*/10", day_of_week="1"))
assert schedule.monitor_value() == "*/10 * * * 1"


def test_crontabschedule_schedule_id() -> None:
assert CrontabSchedule("test", crontab(minute="*/2")).schedule_id() == "*/2_*_*_*_*"
assert CrontabSchedule("test", crontab(minute="1", hour="*/6")).schedule_id() == "1_*/6_*_*_*"
assert CrontabSchedule("test", crontab()).schedule_id() == "*_*_*_*_*"
Loading