Skip to content

Prefork: queue.cancel(job_id) does not propagate to running children #82

@pratyush618

Description

@pratyush618

Summary

Calling queue.cancel(job_id) on a job that is currently executing inside a prefork child has no effect on the running child process. The cancel flag is set in storage, but the child never observes it and runs the task to completion. This breaks the cooperative cancellation contract that already works in the default pool="thread" mode.

Reproduction

@queue.task()
def slow():
    import time
    for i in range(60):
        from taskito import current_job
        current_job.check_cancelled()  # cooperative checkpoint
        time.sleep(1)

job = slow.delay()
time.sleep(2)
queue.cancel(job.id)
# Expect: job stops within 1 second on the next checkpoint.
# Actual (prefork): job runs the full 60 seconds, ignoring cancel.

Root cause

  • crates/taskito-python/src/prefork/protocol.rs:15-27ParentMessage enum has only Job and Shutdown. There is no Cancel(job_id) variant for the parent to signal a running child.
  • py_src/taskito/prefork/child.py — the child main loop reads from stdin, executes _execute_job, and writes the result. It never polls storage for the cancel flag and never receives a cancel signal from the parent.
  • py_src/taskito/prefork/child.py:96TaskCancelledError is only caught if the task itself raises it, which only happens when current_job.check_cancelled() reads a True flag — and the child has no live access to that flag for the running job.

In the thread pool, _wrap_task runs in the same Python process as the storage handle, so check_cancelled() reads the up-to-date flag via the existing connection. In prefork, the child process must be told.

Proposed fix

Two reasonable directions; they can be combined:

  1. Side-channel signal: parent sends SIGUSR1 (or writes to a dedicated cancel pipe) when cancel(job_id) is called and the job is currently routed to that child. Child has a signal handler / pipe reader that flips a thread-local cancel flag, which current_job.check_cancelled() consumes.
  2. In-band protocol message: extend ParentMessage with Cancel { job_id: String }. Child's stdin reader (currently single-threaded with the executor) would need a small async/select indirection to consume cancel messages while a job is running — one option is a dedicated stdin reader thread that pushes cancels into a threading.Event keyed by job id.

The signal-based approach (1) is the smaller change and reuses POSIX semantics that are easy to test.

Acceptance criteria

  • queue.cancel(job_id) causes a running prefork child's task to raise TaskCancelledError on the next check_cancelled() call within ~100 ms.
  • Job status transitions to cancelled; on_cancel middleware fires and JOB_CANCELLED is emitted, matching thread-pool behavior.
  • The child process is not killed by cancel — only the task is interrupted; the child continues serving subsequent jobs.
  • Cancel of a job that has already completed or is queued (not yet running) behaves like the thread pool: storage flag is set; if not yet picked up, it never starts.
  • Regression test under tests/python/test_prefork_*.py.

Related

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions