feat(python): Move scheduler files and tests from the prototype#540
feat(python): Move scheduler files and tests from the prototype#540
Conversation
Add the scheduler and its tests to the client libraries. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
| return checkin_config | ||
|
|
||
|
|
||
| class ScheduleRunner: |
There was a problem hiding this comment.
Like the worker, the client libraries will provide most of the component, but the application still needs to build an entrypoint that loads config, and sets up the TaskbrokerApp that tasks are run from, and define all the schedules.
| try: | ||
| self._try_spawn(entry) | ||
| except Exception as e: | ||
| # Trap errors from spawning/update state so that the heap stays consistent. | ||
| capture_exception(e) | ||
| heapq.heappush(self._heap, (entry.remaining_seconds(), entry)) |
There was a problem hiding this comment.
Bug: An exception in delay_task() prevents _last_run from updating, causing an infinite loop in ScheduleRunner.tick() as the same task is processed repeatedly.
Severity: HIGH
Suggested Fix
Move the heapq.heappush call that re-schedules the entry into the try block in ScheduleRunner.tick(). This ensures that the task is only re-queued for its next run if the current attempt to spawn it succeeds without raising an exception. Failed tasks will be logged but not immediately retried in a tight loop.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.
Location: clients/python/src/taskbroker_client/scheduler/runner.py#L224-L229
Potential issue: In `ScheduleRunner.tick()`, if `entry.delay_task()` raises an exception
(e.g., from a Kafka producer error), the exception is caught, but the task's `_last_run`
timestamp is not updated because `entry.set_last_run(now)` is skipped. The task is then
immediately pushed back onto the heap with a remaining time near zero. This causes the
`while` loop to process the same failed task repeatedly, leading to a tight infinite
loop that consumes 100% CPU, logs excessive errors, and prevents any other scheduled
tasks from running.
Did we get this right? 👍 / 👎 to inform future reviews.
Add the scheduler and its tests to the client libraries.
Refs STREAM-605