Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions docs-next/content/docs/guides/core/execution-model.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
---
title: Execution Models
description: "Thread pool, prefork, native async — pick the right one for your workload."
---

import { Callout } from "fumadocs-ui/components/callout";

Choose how tasks execute: OS threads (default), child processes (prefork), or
native async.

## Decision tree

<Mermaid
chart={`graph TD
A[What kind of task?] -->|CPU-bound| B[Prefork Pool]
A -->|I/O-bound sync| C[Thread Pool]
A -->|I/O-bound async| D[Native Async]
A -->|Mixed| B`}
/>

## Comparison

| Mode | Concurrency | GIL | Memory per worker | Startup cost | Best for |
|------|------------|-----|-------------------|--------------|----------|
| **Thread Pool** | `workers` OS threads | Shared | ~1 MB | None | I/O-bound sync tasks |
| **Prefork** | `workers` child processes | Independent | ~30 MB | One app import per child | CPU-bound tasks, mixed workloads |
| **Native Async** | `async_concurrency` coroutines | Shared (event loop) | Negligible per coroutine | None | I/O-bound async tasks |

## Thread pool (default)

The default. Runs sync task functions on Rust `std::thread` threads. Each
worker acquires the Python GIL only during task execution — the scheduler and
dispatch logic never touch it.

```python
# Default — thread pool with auto-detected worker count
queue.run_worker()

# Explicit worker count
queue.run_worker(workers=8)
```

```bash
taskito worker --app myapp:queue --workers 8
```

Because threads share a single GIL, CPU-bound tasks block each other. For
Python code that spends most of its time in C extensions (numpy, pandas) that
release the GIL, threads still work well.

## Prefork pool

Spawns separate child processes. Each process has its own Python interpreter
and GIL, so CPU-bound tasks run in true parallel.

```python
queue.run_worker(pool="prefork", app="myapp:queue")
```

```bash
taskito worker --app myapp:queue --pool prefork
```

The `app` parameter tells each child process where to import your `Queue`
instance. It must be a module-level name (`"module:attribute"` format) —
tasks defined inside functions or closures cannot be imported by child
processes.

For more details, see the [Prefork Pool guide](/docs/guides/advanced-execution).

## Native async

`async def` task functions run on a dedicated Python event loop thread. No
`asyncio.run()` wrapping, no thread-per-task overhead.

```python
@queue.task()
async def fetch_prices(symbol: str) -> dict:
async with httpx.AsyncClient() as client:
r = await client.get(f"https://api.example.com/prices/{symbol}")
return r.json()
```

Control how many coroutines run at once:

```python
queue = Queue(
db_path="myapp.db",
async_concurrency=200, # default: 100
)
```

For more details, see the [Native Async Tasks guide](/docs/guides/advanced-execution).

## Mixing sync and async

A single queue handles both sync and async tasks. No configuration needed —
the worker inspects each task at registration time and routes it to the
correct pool.

```python
@queue.task()
def resize_image(path: str) -> str:
# Sync — runs on thread pool
...

@queue.task()
async def send_notification(user_id: str) -> None:
# Async — runs on event loop
...
```

Both are enqueued, retried, rate-limited, and monitored identically.

## workers vs async_concurrency

These two parameters are independent:

```python
queue = Queue(
workers=4, # OS threads (or child processes) for sync tasks
async_concurrency=200, # concurrent coroutines for async tasks
)
```

`workers=4` means 4 sync tasks can execute at the same time.
`async_concurrency=200` means 200 async tasks can be in-flight concurrently
on the event loop. A queue with both set runs up to `4 + 200` tasks
simultaneously.

<Callout type="info">
For mostly-async workloads, keep `workers` small (2–4) and raise
`async_concurrency`. For mostly-sync I/O workloads, raise `workers`. For
CPU-bound workloads, switch to prefork.
</Callout>
15 changes: 10 additions & 5 deletions docs-next/content/docs/guides/core/index.mdx
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
---
title: Core
description: "Core taskito features: tasks, queues, results."
description: "The building blocks of every taskito application."
---

import { Callout } from 'fumadocs-ui/components/callout';
The building blocks of every taskito application.

<Callout title="Phase 1 stub" type="info">
Content port pending. See the [Zensical source](https://github.com/ByteVeda/taskito/tree/master/docs) for current text.
</Callout>
| Guide | Description |
|---|---|
| [Tasks](/docs/guides/core/tasks) | Define tasks with `@queue.task()`, configure retries, timeouts, and options |
| [Workers](/docs/guides/core/workers) | Start workers, control concurrency, graceful shutdown |
| [Execution Models](/docs/guides/core/execution-model) | How tasks move from enqueue to completion |
| [Queues & Priority](/docs/guides/core/queues) | Named queues, priority levels, and routing |
| [Scheduling](/docs/guides/core/scheduling) | Periodic tasks with cron expressions |
| [Workflows](/docs/guides/core/workflows) | Chains, groups, and chords for multi-step pipelines |
10 changes: 9 additions & 1 deletion docs-next/content/docs/guides/core/meta.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
{
"title": "Core",
"pages": ["index"]
"pages": [
"index",
"tasks",
"queues",
"workers",
"execution-model",
"scheduling",
"workflows"
]
}
145 changes: 145 additions & 0 deletions docs-next/content/docs/guides/core/queues.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
---
title: Queues & Priority
description: "Named queues, integer priority, queue-level rate limits and concurrency caps."
---

import { Callout } from "fumadocs-ui/components/callout";

## Named queues

Route tasks to different queues for isolation and dedicated processing:

```python
@queue.task(queue="emails")
def send_email(to, subject, body):
...

@queue.task(queue="reports")
def generate_report(report_id):
...

@queue.task() # Goes to "default" queue
def process_data(data):
...
```

### Worker queue subscription

Workers can listen to specific queues:

```bash
# Process only email tasks
taskito worker --app myapp:queue --queues emails

# Process multiple queues
taskito worker --app myapp:queue --queues emails,reports

# Process all registered queues (default)
taskito worker --app myapp:queue
```

Or programmatically:

```python
queue.run_worker(queues=["emails", "reports"])
```

<Callout type="info" title="Use queues to isolate workloads">
Separate I/O-bound tasks (API calls, emails) from CPU-bound tasks (data
processing, report generation) into different queues. Run them on different
worker processes for optimal resource usage.
</Callout>

## Priority

Higher priority jobs are dequeued first within the same queue. Priority is an
integer — higher values mean more urgent.

### Default priority

Set at task registration:

```python
@queue.task(priority=10)
def urgent_task(data):
...

@queue.task(priority=0) # Default
def normal_task(data):
...
```

### Override at enqueue time

```python
# This specific job is extra urgent
urgent_task.apply_async(args=(data,), priority=100)
```

### How it works

Jobs are dequeued using a compound index: `(queue, status, priority DESC, scheduled_at ASC)`. This means:

1. Higher priority jobs go first
2. Among equal priority, older jobs (earlier `scheduled_at`) go first
3. Each queue is processed independently

```python
# These three jobs are in the same queue
low = task.apply_async(args=(1,), priority=1)
mid = task.apply_async(args=(2,), priority=5)
high = task.apply_async(args=(3,), priority=10)

# Processing order: high (10), mid (5), low (1)
```

## Queue-level limits

Apply a rate limit or concurrency cap to an entire queue, independently of
per-task settings. These limits are checked in the scheduler before any
per-task limits.

### Rate limiting a queue

```python
queue.set_queue_rate_limit("default", "100/m") # Max 100 jobs per minute
queue.set_queue_rate_limit("emails", "20/s") # Max 20 emails per second
```

The format is the same as `rate_limit` on `@queue.task()`: `"N/s"`, `"N/m"`,
or `"N/h"`.

### Capping concurrency per queue

```python
queue.set_queue_concurrency("default", 10) # Max 10 jobs running at once
queue.set_queue_concurrency("reports", 2) # Heavy tasks: max 2 at a time
```

`set_queue_concurrency` limits how many jobs from that queue run
simultaneously across all workers.

<Callout type="info" title="Queue limits vs task limits">
Queue-level limits apply to all tasks in the queue regardless of their
individual settings. Per-task `rate_limit` and `max_concurrent` are checked
afterwards and may impose stricter caps. Set queue limits to protect shared
downstream resources (APIs, databases) and per-task limits to manage
individual task capacity.
</Callout>

Both methods can be called at any point before or after `run_worker()` starts.

## Default queue settings

Configure defaults at the Queue level:

```python
queue = Queue(
db_path="myapp.db",
default_priority=0, # Default priority for all tasks
default_retry=3, # Default max retries
default_timeout=300, # Default timeout in seconds
)
```

Individual `@queue.task()` decorators override these defaults.
Loading
Loading