Documentation |
Issues |
Changelog |
Funding 💚
A queue agnostic worker for Django's task framework.
- Durability – We recover from any failures, even poorly written tasks.
- Consistency – We never lose data, even if someone unplugs the power or network.
- Utilization – We keep the CPU saturated with tasks, not with idle time or waiting for locks.
Warning
Threadmill requires a development version of Django and is in a preview stage.
You need to have [Django's Task framework][django-tasks] setup properly.
uv add threadmillAdd threadmill to your INSTALLED_APPS in settings.py:
# settings.py
INSTALLED_APPS = [
"threadmill",
# ...
]Finally, you launch the worker pool:
uv run manage.py threadmillThe workers are inspired by Gunicorn, and the CLI is very similar.
Depending on your workload, you can tweak the number of processes and threads. Processes allow for parallel compute (no GIL) while threads are great for low-memory concurrent IO.
uv run manage.py threadmill --processes 4 --threads 2If your tasks leak memory, you can recycle (restart) the workers after a certain number of tasks have been processed:
uv run manage.py threadmill --max-tasks 1000 --max-tasks-jitter 100This will restart the workers after 1000 tasks have been processed, with a random jitter of up to 100 tasks to avoid all workers restarting at the same time.
Should a worker crash or be killed, the pool will automatically restart it.
A graceful shutdown is possible with the SIGTERM or a keyboard interrupt.
All workers will finish the tasks they acquired and publish them.
You can use --exit-empty to exit immediately after all tasks have been processed,
which might be useful for draining a one-off queue.
You can prefetch tasks from a queue to avoid IO latency bottlenecks. However, this will increase the memory usage of the worker pool.
uv run manage.py threadmill --prefetch 100Warning
Work in progress, this feature is not yet stable.
Task timeouts are important to ensure the long-term health of your pool. However, they need to be aligned with your queueing system's timeout settings. The message queue needs to requeue a task that hasn't been acknowledged within the timeout.
Note
This section is for people who want to integrate Threadmill into their queueing system.
Threadmill is designed to be durable and requires a queueing system to support late acknowledgement.
To use Threadmill, your backend will need to inherit from threadmill.backends.AcknowledgeableTaskBackend and implement the following methods:
class AcknowledgeableTaskBackend(BaseTaskBackend, ABC):
"""Provide an interface for tasks queues to be processed by the executor."""
def acquire(
self, *queue_names: str, timeout: datetime.timedelta | None = None
) -> TaskResult:
"""
Return and lock the next task to be processed without removing it from the queue.
Args:
queue_names: The names of the queues to acquire tasks from.
timeout: The maximum time to wait for a task. If None, wait indefinitely.
Raises:
TimeoutError: If no task is available within the specified timeout.
"""
raise NotImplementedError
def acknowledge(self, task_result: TaskResult) -> None:
"""Remove the task from the queue and publish the result."""
raise NotImplementedError