From 36da88080552a990a28abb2ae68ba0e13ab3a72a Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Sun, 3 May 2026 02:37:57 +0530 Subject: [PATCH] docs(docs-next): port guides/resources (phase 4g) --- .../docs/guides/resources/configuration.mdx | 182 +++++++++++ .../guides/resources/dependency-injection.mdx | 282 ++++++++++++++++++ .../content/docs/guides/resources/index.mdx | 97 +++++- .../docs/guides/resources/interception.mdx | 196 ++++++++++++ .../content/docs/guides/resources/meta.json | 10 +- .../docs/guides/resources/observability.mdx | 218 ++++++++++++++ .../content/docs/guides/resources/proxies.mdx | 185 ++++++++++++ .../content/docs/guides/resources/testing.mdx | 140 +++++++++ 8 files changed, 1303 insertions(+), 7 deletions(-) create mode 100644 docs-next/content/docs/guides/resources/configuration.mdx create mode 100644 docs-next/content/docs/guides/resources/dependency-injection.mdx create mode 100644 docs-next/content/docs/guides/resources/interception.mdx create mode 100644 docs-next/content/docs/guides/resources/observability.mdx create mode 100644 docs-next/content/docs/guides/resources/proxies.mdx create mode 100644 docs-next/content/docs/guides/resources/testing.mdx diff --git a/docs-next/content/docs/guides/resources/configuration.mdx b/docs-next/content/docs/guides/resources/configuration.mdx new file mode 100644 index 0000000..971d6d0 --- /dev/null +++ b/docs-next/content/docs/guides/resources/configuration.mdx @@ -0,0 +1,182 @@ +--- +title: Configuration +description: "TOML resource files, pool tuning, frozen and reloadable resources, hot reload." +--- + +import { Callout } from "fumadocs-ui/components/callout"; + +Resources can be declared in code with `@queue.worker_resource()` or loaded +from a TOML file. Both approaches support the same options. + +## TOML configuration file + +Define resources in a TOML file and load them at startup: + +```toml +# resources.toml + +[resources.config] +factory = "myapp.resources:load_config" +scope = "worker" + +[resources.db] +factory = "myapp.resources:create_engine" +teardown = "myapp.resources:close_engine" +health_check = "myapp.resources:check_db" +health_check_interval = 30.0 +max_recreation_attempts = 3 +scope = "worker" +depends_on = ["config"] + +[resources.session] +factory = "myapp.resources:create_session" +scope = "task" +pool_size = 20 +pool_min = 5 +acquire_timeout = 5.0 +depends_on = ["db"] +``` + +Load before starting the worker: + +```python +queue.load_resources("resources.toml") +``` + +The `factory`, `teardown`, and `health_check` values are import paths. +Both formats are accepted: + +- `"myapp.resources:create_engine"` — colon separator (preferred) +- `"myapp.resources.create_engine"` — dot separator + +On Python 3.11+ the TOML parser is built-in. On earlier versions, install +`tomli`: + +```bash +pip install tomli +``` + +## TOML resource options + +| Key | Type | Default | Description | +|---|---|---|---| +| `factory` | string | required | Import path to the factory callable. | +| `teardown` | string | — | Import path to the teardown callable. | +| `health_check` | string | — | Import path to the health check callable. | +| `health_check_interval` | float | `0.0` | Seconds between health checks. `0` disables. | +| `max_recreation_attempts` | int | `3` | Max recreation attempts on health failure. | +| `scope` | string | `"worker"` | `"worker"`, `"task"`, `"thread"`, or `"request"`. | +| `depends_on` | list[string] | `[]` | Resource names this one depends on. | +| `pool_size` | int | `4` | Task scope: max concurrent instances. | +| `pool_min` | int | `0` | Task scope: pre-warmed instances at startup. | +| `acquire_timeout` | float | `10.0` | Task scope: seconds to wait for a pool instance. | +| `max_lifetime` | float | `3600.0` | Task scope: max seconds an instance lives. | +| `idle_timeout` | float | `300.0` | Task scope: max idle seconds before eviction. | +| `reloadable` | bool | `false` | Allow hot reload via SIGHUP or CLI. | +| `frozen` | bool | `false` | Wrap instance in a read-only proxy. | + +## Pool configuration + +Pool parameters apply only to task-scoped resources (`scope = "task"`). +They control the bounded pool that manages concurrent instances. + +| Parameter | Default | Description | +|---|---|---| +| `pool_size` | `4` | Max concurrent instances. Tasks block if the pool is exhausted. | +| `pool_min` | `0` | Instances pre-warmed at startup. `0` means lazy creation. | +| `acquire_timeout` | `10.0` | Seconds to wait for an available instance before raising `ResourceUnavailableError`. | +| `max_lifetime` | `3600.0` | Max seconds a pooled instance can live before it is replaced. | +| `idle_timeout` | `300.0` | Max seconds an instance can sit idle in the pool. | + +```python +@queue.worker_resource( + "session", + scope="task", + pool_size=20, + pool_min=5, + acquire_timeout=5.0, + max_lifetime=1800.0, +) +def create_session(db): + return db() +``` + +Setting `pool_min > 0` causes the pool to prewarm instances at worker +startup. This avoids the cold-start latency on the first burst of tasks. + +## Frozen resources + +Wrap a resource in a read-only proxy to prevent accidental mutation: + +```python +@queue.worker_resource("config", frozen=True) +def load_config(): + return AppConfig.from_env() +``` + +Attempts to set attributes on a frozen resource raise `AttributeError`. +This is useful for configuration objects that should be treated as +immutable after initialization. + +## Hot reload + +Mark resources as reloadable to update them without restarting the worker: + +```python +@queue.worker_resource("feature_flags", reloadable=True) +def load_flags(): + return FeatureFlags.from_remote() +``` + +Trigger a reload by sending `SIGHUP` to the worker process: + +```bash +kill -HUP +``` + +Or via the CLI: + +```bash +taskito reload --pid +taskito reload --pid --resource feature_flags # reload one resource +``` + +Or programmatically from application code: + +```python +results = queue._resource_runtime.reload() +# {"feature_flags": True} +``` + +Only resources declared with `reloadable=True` are affected. Non-reloadable +resources are left running — no teardown or reconnection. + +Resources are reloaded in the same topological order as initialization. If +a reloadable resource depends on another reloadable resource, both are +reloaded in dependency order. + + + SIGHUP is not available on Windows. Use the programmatic API instead. + + +## Programmatic resource registration + +`load_resources()` and `@worker_resource()` both call +`register_resource()` internally. You can call it directly for full control: + +```python +from taskito.resources.definition import ResourceDefinition, ResourceScope + +queue.register_resource(ResourceDefinition( + name="db", + factory=create_db, + teardown=close_db, + health_check=check_db, + health_check_interval=30.0, + scope=ResourceScope.WORKER, + depends_on=["config"], + reloadable=True, +)) +``` + +`register_resource()` must be called before `run_worker()`. diff --git a/docs-next/content/docs/guides/resources/dependency-injection.mdx b/docs-next/content/docs/guides/resources/dependency-injection.mdx new file mode 100644 index 0000000..dafeaf6 --- /dev/null +++ b/docs-next/content/docs/guides/resources/dependency-injection.mdx @@ -0,0 +1,282 @@ +--- +title: Dependency Injection +description: "worker_resource decorator, scopes (worker/task/thread/request), dependencies, teardown, health checks." +--- + +import { Tab, Tabs } from "fumadocs-ui/components/tabs"; + +Worker resources are long-lived objects initialized once at worker startup +and injected into tasks by name. No serialization is involved — they live +entirely in the worker process and are never put in the queue. + +## Declaring resources + +```python +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker + +@queue.worker_resource("db") +def create_db(): + engine = create_engine("postgresql://localhost/myapp") + return sessionmaker(engine) +``` + +The factory runs once when the worker starts. The return value is the +resource instance. + +The factory can be async: + +```python +@queue.worker_resource("redis") +async def create_redis(): + import redis.asyncio as aioredis + return await aioredis.from_url("redis://localhost") +``` + +Taskito runs the async factory on the worker's event loop before accepting +tasks. + +## `worker_resource()` parameters + +| Parameter | Default | Description | +|---|---|---| +| `name` | required | Resource name used in `inject=["name"]` and `Inject["name"]`. | +| `depends_on` | `[]` | Names of resources this factory receives as arguments. | +| `teardown` | `None` | Callable invoked with the instance on graceful shutdown. | +| `health_check` | `None` | Callable invoked periodically; returns truthy if healthy. | +| `health_check_interval` | `0.0` | Seconds between health checks. `0` disables checking. | +| `max_recreation_attempts` | `3` | Max times to recreate after consecutive health failures. | +| `scope` | `"worker"` | Lifetime scope — see Resource scopes below. | +| `pool_size` | `None` | Task scope: max concurrent instances (default: 4). | +| `pool_min` | `0` | Task scope: pre-warmed instances at startup. | +| `acquire_timeout` | `10.0` | Task scope: seconds to wait for an available instance. | +| `max_lifetime` | `3600.0` | Task scope: max seconds an instance can live. | +| `idle_timeout` | `300.0` | Task scope: max idle seconds before eviction. | +| `reloadable` | `False` | Allow hot reload via SIGHUP or CLI. | +| `frozen` | `False` | Wrap instance in a read-only proxy. | + +## Injecting resources into tasks + + + + +```python +@queue.task(inject=["db"]) +def process_order(order_id: int, db): + session = db() + order = session.get(Order, order_id) + ... +``` + + + + +```python +from taskito import Inject + +@queue.task() +def process_order(order_id: int, db: Inject["db"]): + session = db() + order = session.get(Order, order_id) + ... +``` + + + + +Both syntaxes are equivalent. `Inject["name"]` is a type annotation — it +works with any type checker and makes the dependency explicit in the +function signature. The worker reads the annotation at task registration +time and injects the resource automatically. + +If a caller explicitly passes a `db` kwarg to `.delay()`, that value wins +over injection. + +## Resource scopes + +| Scope | Lifetime | Use case | +|---|---|---| +| `"worker"` (default) | Entire worker process | Database connection pools, shared caches | +| `"task"` | Acquired per-task from a pool, returned after | Short-lived connections with limited concurrency | +| `"thread"` | One instance per worker thread, created lazily | Thread-unsafe objects that must not be shared | +| `"request"` | Fresh instance per task, torn down after | Stateful per-request objects | + +```python +# Task scope: each task gets its own session from a pool of up to 10 +@queue.worker_resource("db_session", scope="task", pool_size=10) +def create_session(db): + return db() # db must be a worker-scoped resource + +# Thread scope: one cache per worker thread +@queue.worker_resource("local_cache", scope="thread") +def create_cache(): + return {} +``` + +Pool configuration parameters (`pool_size`, `pool_min`, `acquire_timeout`, +`max_lifetime`, `idle_timeout`) only apply to task-scoped resources. See +[Configuration](/docs/guides/resources/configuration) for details. + +## Dependencies + +Resources can declare other resources they depend on. Taskito resolves the +dependency graph and initializes in topological order, injecting +dependencies as keyword arguments to the factory: + +```python +@queue.worker_resource("config") +def load_config(): + return Config.from_env() + + +@queue.worker_resource("db", depends_on=["config"]) +def create_db(config): + return create_engine(config.db_url, pool_size=10) + + +@queue.worker_resource("cache", depends_on=["config"]) +def create_cache(config): + return Redis.from_url(config.redis_url) +``` + +On shutdown, resources are torn down in reverse initialization order — +`cache` and `db` before `config`. + +Cycles are detected eagerly at registration time and raise +`CircularDependencyError`. + +## Teardown + +Supply a teardown callable to clean up the resource on graceful shutdown: + +```python +@queue.worker_resource( + "db", + teardown=lambda engine: engine.dispose(), +) +def create_db(): + return create_engine("postgresql://localhost/myapp") +``` + +Or use `register_resource()` for the programmatic API: + +```python +from taskito.resources.definition import ResourceDefinition + +queue.register_resource(ResourceDefinition( + name="db", + factory=create_db, + teardown=close_db, + depends_on=["config"], +)) +``` + +Teardown callables can be async — Taskito awaits them if they return a +coroutine. + +## Health checking + +Resources can declare a health check function that runs on a background +thread. If the check returns falsy, the worker attempts to recreate the +resource: + +```python +def check_db(engine): + with engine.connect() as conn: + conn.execute(text("SELECT 1")) + return True + + +@queue.worker_resource( + "db", + health_check=check_db, + health_check_interval=30.0, # check every 30 seconds + max_recreation_attempts=3, # mark permanently unhealthy after 3 failures +) +def create_db(): + return create_engine("postgresql://localhost/myapp") +``` + +The health checker runs in a single daemon thread. Each resource with a +non-zero `health_check_interval` is checked independently on its own +schedule. + +Run a health check manually from application code: + +```python +is_healthy = queue.health_check("db") +``` + +If a resource fails all recreation attempts, it is marked permanently +unhealthy. Subsequent tasks that depend on it raise +`ResourceUnavailableError`. + +## Resource status + +```python +status = queue.resource_status() +# [ +# { +# "name": "config", +# "scope": "worker", +# "health": "healthy", +# "init_duration_ms": 12.4, +# "recreations": 0, +# "depends_on": [], +# }, +# { +# "name": "db", +# "scope": "worker", +# "health": "healthy", +# "init_duration_ms": 45.2, +# "recreations": 0, +# "depends_on": ["config"], +# }, +# ] +``` + +Task-scoped resources include a `"pool"` key with pool statistics. See +[Observability](/docs/guides/resources/observability) for details. + +## Full example + +```python +from taskito import Queue, Inject +from sqlalchemy import create_engine, text +from sqlalchemy.orm import sessionmaker, Session + +queue = Queue(db_path="tasks.db", interception="strict") + + +@queue.worker_resource("config") +def load_config(): + return Config.from_env() + + +def check_db(engine): + with engine.connect() as conn: + conn.execute(text("SELECT 1")) + return True + + +@queue.worker_resource( + "db", + depends_on=["config"], + teardown=lambda engine: engine.dispose(), + health_check=check_db, + health_check_interval=60.0, +) +def create_db(config): + return create_engine(config.database_url, pool_size=10) + + +@queue.task() +def process_order(order_id: int, db: Inject["db"]): + session: Session = db() + try: + order = session.get(Order, order_id) + order.status = "processed" + session.commit() + finally: + session.close() +``` diff --git a/docs-next/content/docs/guides/resources/index.mdx b/docs-next/content/docs/guides/resources/index.mdx index a311cf8..a533a78 100644 --- a/docs-next/content/docs/guides/resources/index.mdx +++ b/docs-next/content/docs/guides/resources/index.mdx @@ -1,10 +1,95 @@ --- -title: Resources -description: "Worker DI: scopes, pools, hot reload, TOML config." +title: Resource System +description: "The three-layer pipeline that lets tasks consume non-serializable dependencies safely." --- -import { Callout } from 'fumadocs-ui/components/callout'; +The resource system gives tasks clean access to external dependencies — +database connections, HTTP clients, cloud clients — without passing live +objects through the queue. It operates in three layers that together solve +a fundamental distributed systems problem: task arguments must be +serializable, but most real-world dependencies are not. - - Content port pending. See the [Zensical source](https://github.com/ByteVeda/taskito/tree/master/docs) for current text. - + B["Layer 1: Argument Interception"] + B -->|"REDIRECT"| C["DI marker"] + B -->|"PROXY"| D["Serializable recipe"] + B -->|"PASS / CONVERT"| E["Serialized payload"] + C --> F["Queue"] + D --> F + E --> F + F --> G["Worker"] + G --> H["Layer 2: Resource Runtime"] + G --> I["Layer 3: Proxy Reconstruction"] + H --> J["Inject 'db' resource"] + I --> K["Rebuild file handle"] + J --> L["task(order_id, db, file)"] + K --> L`} +/> + +**Layer 1 — Argument Interception** classifies each value passed to +`.delay()` before serialization. Database sessions become DI markers, file +handles become recipes, safe primitives pass through unchanged, and +non-serializable types like locks are rejected with a helpful error. + +**Layer 2 — Worker Resource Runtime** manages long-lived objects +initialized once at worker startup. Resources are injected into tasks by +name — no serialization needed, no connection per task. + +**Layer 3 — Resource Proxies** handles objects that have capturable state: +file handles, HTTP sessions, cloud clients. The interceptor extracts a +recipe; the worker rebuilds the live object before the task runs. + +## Minimal end-to-end example + +```python +from taskito import Queue, Inject +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker + +queue = Queue(db_path="tasks.db", interception="strict") + + +@queue.worker_resource("db") +def create_db(): + engine = create_engine("postgresql://localhost/myapp") + return sessionmaker(engine) + + +@queue.task() +def process_order(order_id: int, db: Inject["db"]): + session = db() + try: + order = session.get(Order, order_id) + order.status = "processed" + session.commit() + finally: + session.close() +``` + +Enqueue from anywhere in your application: + +```python +process_order.delay(42) +# The integer 42 passes through serialization normally. +# 'db' is injected by the worker — no session is ever put in the queue. +``` + +Start the worker: + +```bash +taskito worker --app myapp.tasks:queue +# [taskito] Initialized 1 resource(s): db +# [taskito] Worker started with 8 threads +``` + +## Section overview + +| Page | What it covers | +|---|---| +| [Argument Interception](/docs/guides/resources/interception) | Modes, strategies, custom types, `analyze()`, metrics | +| [Dependency Injection](/docs/guides/resources/dependency-injection) | `worker_resource()`, scopes, dependencies, teardown, health checks | +| [Resource Proxies](/docs/guides/resources/proxies) | Built-in handlers, HMAC signing, security, `NoProxy`, cloud handlers | +| [Configuration](/docs/guides/resources/configuration) | TOML config, pool tuning, frozen and reloadable resources, hot reload | +| [Testing](/docs/guides/resources/testing) | `test_mode(resources=)`, `MockResource`, pytest fixtures | +| [Observability](/docs/guides/resources/observability) | Prometheus metrics, dashboard endpoints, CLI commands | diff --git a/docs-next/content/docs/guides/resources/interception.mdx b/docs-next/content/docs/guides/resources/interception.mdx new file mode 100644 index 0000000..70a795f --- /dev/null +++ b/docs-next/content/docs/guides/resources/interception.mdx @@ -0,0 +1,196 @@ +--- +title: Argument Interception +description: "Classification modes and strategies — PASS / CONVERT / REDIRECT / PROXY / REJECT." +--- + +Argument interception classifies every value passed to `.delay()` or +`.apply_async()` before serialization. Enable it on the `Queue` constructor: + +```python +queue = Queue(db_path="tasks.db", interception="strict") +``` + +Without interception, values are passed directly to the serializer. A +SQLAlchemy session or file handle would either raise a serialization error +at enqueue time or produce a broken payload that fails on the worker. + +## Modes + +| Mode | Behavior | +|---|---| +| `"off"` | Disabled (default). All arguments pass through to the serializer unchanged. | +| `"strict"` | Raises `InterceptionError` immediately when a rejected type is detected. | +| `"lenient"` | Logs a warning and drops the rejected argument instead of raising. | + +`"strict"` is recommended for production — it surfaces problems at call +time rather than causing silent task failures. + +## Classification strategies + +Every argument gets one of five strategies: + +| Strategy | What happens | Examples | +|---|---|---| +| `PASS` | Sent as-is to the serializer | `int`, `str`, `bool`, `bytes` | +| `CONVERT` | Transformed to a serializable form, reconstructed on the worker | `UUID`, `datetime`, `Decimal`, `Path`, `Enum`, Pydantic models, dataclasses | +| `REDIRECT` | Replaced with a DI marker; the worker injects the named resource | SQLAlchemy sessions, Redis clients, MongoDB clients | +| `PROXY` | Deconstructed to a recipe; reconstructed as a live object on the worker | File handles, loggers, `requests.Session`, `httpx.Client`, boto3 clients | +| `REJECT` | Raises `InterceptionError` in strict mode, dropped in lenient mode | Thread locks, generators, coroutines, sockets | + +## Built-in CONVERT types + +These are converted automatically when interception is enabled: + +| Type | Notes | +|---|---| +| `uuid.UUID` | Stored as `"uuid:"` | +| `datetime.datetime` / `date` / `time` / `timedelta` | ISO format | +| `decimal.Decimal` | Stored as string to preserve precision | +| `pathlib.Path` / `PurePath` | Stored as POSIX string | +| `re.Pattern` | Pattern string + flags | +| `collections.OrderedDict` | Preserves insertion order | +| `pydantic.BaseModel` | Via `.model_dump()` (if pydantic is installed) | +| `enum.Enum` subclasses | Class path + value | +| Dataclasses | Auto-detected via `dataclasses.is_dataclass()` | +| `NamedTuple` subclasses | Auto-detected | + +## Built-in REDIRECT types + +These connectors are automatically detected and replaced with a resource +injection marker. The worker injects the named resource instead of +attempting to deserialize a live connection object: + +| Type | Default resource name | +|---|---| +| `sqlalchemy.orm.Session` | `"db"` | +| `sqlalchemy.ext.asyncio.AsyncSession` | `"db"` | +| `sqlalchemy.engine.Engine` | `"db"` | +| `sqlalchemy.ext.asyncio.AsyncEngine` | `"db"` | +| `redis.Redis` | `"redis"` | +| `redis.asyncio.Redis` | `"redis"` | +| `pymongo.MongoClient` | `"mongo"` | +| `motor.motor_asyncio.AsyncIOMotorClient` | `"mongo"` | +| `psycopg2.extensions.connection` | `"db"` | +| `asyncpg.connection.Connection` | `"db"` | +| `django.db.backends.base.base.BaseDatabaseWrapper` | `"db"` | +| `aiohttp.ClientSession` | `"aiohttp_session"` | + +The resource name is the key you use in `@queue.worker_resource("name")`. +If your resource has a different name, register a custom redirect with +`register_type()`. + +## Built-in PROXY types + +These objects are deconstructed to a recipe dict and rebuilt by the worker: + +| Type | Handler name | +|---|---| +| `io.TextIOWrapper`, `io.BufferedReader`, `io.BufferedWriter`, `io.FileIO` | `"file"` | +| `logging.Logger` | `"logger"` | +| `requests.Session` | `"requests_session"` | +| `httpx.Client` / `httpx.AsyncClient` | `"httpx_client"` | +| boto3 clients (via `botocore.client.BaseClient`) | `"boto3_client"` | +| `google.cloud.storage.Client` / `Bucket` / `Blob` | `"gcs_client"` | + +See [Resource Proxies](/docs/guides/resources/proxies) for security options +and handler details. + +## Built-in REJECT types + +These are always rejected because they cannot cross process or +serialization boundaries: + +- Thread synchronization primitives (`Lock`, `RLock`, `Semaphore`, `Event`) +- `socket.socket` +- Generator objects +- Coroutine objects +- `subprocess.Popen` +- `asyncio.Task` / `asyncio.Future` +- `contextvars.Context` +- Multiprocessing `Lock` and `Queue` + +Each rejection includes a message explaining why and suggests alternatives. + +## Registering custom types + +Add custom rules for types not covered by the built-ins: + +```python +from myapp import MyDBClient, MoneyAmount, APIConnection + +# Treat a custom DB client as a worker resource (worker must have "my_db" registered) +queue.register_type(MyDBClient, "redirect", resource="my_db") + +# Convert a custom value type to something serializable +queue.register_type( + MoneyAmount, + "convert", + converter=lambda m: {"__type__": "money", "value": str(m.value), "currency": m.currency}, + type_key="money", +) + +# Reject with a helpful message +queue.register_type( + APIConnection, + "reject", + message="API connections are process-local. Register it as a worker resource instead.", +) +``` + +`register_type()` requires interception to be enabled (`"strict"` or +`"lenient"`). Calling it when interception is `"off"` raises `RuntimeError`. + +| Parameter | Description | +|---|---| +| `python_type` | The type to register. | +| `strategy` | `"pass"`, `"convert"`, `"redirect"`, `"reject"`, or `"proxy"`. | +| `resource` | Resource name for `"redirect"`. | +| `message` | Rejection reason for `"reject"`. | +| `converter` | Converter callable for `"convert"`. | +| `type_key` | Dispatch key for the converter reconstructor. | +| `proxy_handler` | Handler name for `"proxy"`. | + +## Constructor parameters + +| Parameter | Default | Description | +|---|---|---| +| `interception` | `"off"` | Interception mode: `"strict"`, `"lenient"`, or `"off"`. | +| `max_intercept_depth` | `10` | Maximum depth the walker recurses into nested containers. | + +## Analyzing arguments + +Inspect how interception would classify arguments without actually +transforming them: + +```python +from myapp.tasks import queue + +report = queue._interceptor.analyze( + args=(user_session, "Hello"), + kwargs={"attachment": open("file.pdf", "rb")}, +) +print(report) +# Argument Analysis: +# args[0] (Session) → REDIRECT (redirect to worker resource 'db') +# args[1] (str) → PASS +# kwargs.attachment (BufferedReader) → PROXY (handler=file) +``` + +`analyze()` is a development and debugging tool. It reads the registry but +makes no changes to arguments. + +## Interception metrics + +```python +stats = queue.interception_stats() +# { +# "total_intercepts": 1200, +# "total_duration_ms": 216.0, +# "avg_duration_ms": 0.18, +# "strategy_counts": {"pass": 2800, "convert": 450, "redirect": 200, "proxy": 30, "reject": 0}, +# "max_depth_reached": 3, +# } +``` + +See [Observability](/docs/guides/resources/observability) for Prometheus +metrics and dashboard endpoints. diff --git a/docs-next/content/docs/guides/resources/meta.json b/docs-next/content/docs/guides/resources/meta.json index 8ff5a66..dbc9e63 100644 --- a/docs-next/content/docs/guides/resources/meta.json +++ b/docs-next/content/docs/guides/resources/meta.json @@ -1,4 +1,12 @@ { "title": "Resources", - "pages": ["index"] + "pages": [ + "index", + "interception", + "dependency-injection", + "proxies", + "configuration", + "testing", + "observability" + ] } diff --git a/docs-next/content/docs/guides/resources/observability.mdx b/docs-next/content/docs/guides/resources/observability.mdx new file mode 100644 index 0000000..7dcc12b --- /dev/null +++ b/docs-next/content/docs/guides/resources/observability.mdx @@ -0,0 +1,218 @@ +--- +title: Observability +description: "resource_status / interception_stats / proxy_stats, dashboard endpoints, CLI, Prometheus metrics." +--- + +import { Callout } from "fumadocs-ui/components/callout"; + +The resource system exposes metrics through three channels: the `Queue` +API, the built-in dashboard, and Prometheus. + +## Queue API + +### `resource_status()` + +Returns a snapshot of every registered resource: + +```python +status = queue.resource_status() +# [ +# { +# "name": "config", +# "scope": "worker", +# "health": "healthy", +# "init_duration_ms": 12.4, +# "recreations": 0, +# "depends_on": [], +# }, +# { +# "name": "db", +# "scope": "worker", +# "health": "healthy", +# "init_duration_ms": 45.2, +# "recreations": 1, +# "depends_on": ["config"], +# }, +# { +# "name": "session", +# "scope": "task", +# "health": "healthy", +# "init_duration_ms": 0.0, +# "recreations": 0, +# "depends_on": ["db"], +# "pool": { +# "size": 20, +# "active": 3, +# "idle": 5, +# "total_acquisitions": 1542, +# "total_timeouts": 0, +# "avg_acquire_ms": 0.4, +# }, +# }, +# ] +``` + +Task-scoped resources include a `"pool"` key with pool statistics. The +`"health"` field is `"healthy"`, `"unhealthy"`, or `"unknown"` (resource +not yet initialized). + +### `interception_stats()` + +Returns aggregate metrics from the argument interceptor: + +```python +stats = queue.interception_stats() +# { +# "total_intercepts": 1200, +# "total_duration_ms": 216.0, +# "avg_duration_ms": 0.18, +# "strategy_counts": { +# "pass": 2800, +# "convert": 450, +# "redirect": 200, +# "proxy": 30, +# "reject": 0, +# }, +# "max_depth_reached": 3, +# } +``` + +Returns an empty dict if interception is disabled (`"off"`). + +### `proxy_stats()` + +Returns per-handler reconstruction metrics: + +```python +stats = queue.proxy_stats() +# [ +# { +# "handler": "file", +# "total_reconstructions": 42, +# "total_errors": 0, +# "total_cleanup_errors": 0, +# "total_checksum_failures": 0, +# "total_duration_ms": 50.4, +# "avg_duration_ms": 1.2, +# "max_duration_ms": 8.1, +# "p95_duration_ms": 3.4, +# }, +# { +# "handler": "boto3_client", +# "total_reconstructions": 310, +# "total_errors": 2, +# ... +# }, +# ] +``` + +## Dashboard endpoints + +The built-in dashboard exposes three JSON endpoints for the resource system: + +| Endpoint | Description | +|---|---| +| `GET /api/resources` | Same data as `resource_status()` | +| `GET /api/proxy-stats` | Same data as `proxy_stats()` | +| `GET /api/interception-stats` | Same data as `interception_stats()` | + +Start the dashboard: + +```bash +taskito dashboard --app myapp.tasks:queue +``` + +See the [Web Dashboard](/docs/guides/observability/dashboard) guide for +full dashboard documentation. + +## CLI commands + +### `taskito resources` + +Print a formatted table of all registered resources and their current +status: + +```bash +taskito resources --app myapp.tasks:queue +# RESOURCE SCOPE HEALTH INIT (ms) RECREATIONS DEPENDS ON +# ----------------------------------------------------------------------- +# config worker healthy 12.40 0 - +# db worker healthy 45.21 1 config +# session task healthy 0.00 0 db +# pool: active=3 idle=5 max=20 timeouts=0 +``` + +### `taskito reload` + +Send `SIGHUP` to a running worker to reload all reloadable resources: + +```bash +taskito reload --pid 12345 +# Sent SIGHUP to worker (PID 12345) + +# Reload a specific resource only: +taskito reload --pid 12345 --resource feature_flags +``` + + + `taskito reload` sends `SIGHUP` — it does not wait for the reload to + complete. Check `taskito resources` or logs to confirm the reload + succeeded. + + +## Prometheus metrics + +Install the Prometheus integration: + +```bash +pip install taskito[prometheus] +``` + +```python +from taskito.contrib.prometheus import PrometheusMiddleware, PrometheusStatsCollector + +queue = Queue(db_path="tasks.db", middleware=[PrometheusMiddleware()]) + +# Poll resource, proxy, and interception stats periodically +collector = PrometheusStatsCollector(queue, interval=10.0) +collector.start() +``` + +### Resource metrics + +| Metric | Type | Labels | Description | +|---|---|---|---| +| `taskito_resource_health_status` | Gauge | `resource` | `1` if healthy, `0` if unhealthy | +| `taskito_resource_recreation_total` | Gauge | `resource` | Total recreation count | +| `taskito_resource_init_duration_seconds` | Gauge | `resource` | Initialization duration | +| `taskito_resource_pool_size` | Gauge | `resource` | Pool max size (task scope) | +| `taskito_resource_pool_active` | Gauge | `resource` | Active pool instances | +| `taskito_resource_pool_idle` | Gauge | `resource` | Idle pool instances | +| `taskito_resource_pool_timeout_total` | Counter | `resource` | Pool acquisition timeouts | + +### Proxy metrics + +| Metric | Type | Labels | Description | +|---|---|---|---| +| `taskito_proxy_reconstruct_total` | Counter | `handler` | Total reconstructions | +| `taskito_proxy_reconstruct_errors_total` | Counter | `handler` | Reconstruction errors | +| `taskito_proxy_reconstruct_duration_seconds` | Histogram | `handler` | Reconstruction duration | + +### Interception metrics + +| Metric | Type | Labels | Description | +|---|---|---|---| +| `taskito_intercept_strategy_total` | Counter | `strategy` | Count per strategy (`pass`, `convert`, `redirect`, `proxy`, `reject`) | +| `taskito_intercept_duration_seconds` | Histogram | — | Interception pass duration | + +Metrics are exposed at `/metrics` on the dashboard server or via a +standalone metrics server: + +```python +from taskito.contrib.prometheus import start_metrics_server + +start_metrics_server(port=9090) +``` + +See the [Prometheus integration](/docs/guides/integrations) page for full +setup instructions. diff --git a/docs-next/content/docs/guides/resources/proxies.mdx b/docs-next/content/docs/guides/resources/proxies.mdx new file mode 100644 index 0000000..235cb7c --- /dev/null +++ b/docs-next/content/docs/guides/resources/proxies.mdx @@ -0,0 +1,185 @@ +--- +title: Resource Proxies +description: "File / logger / HTTP session / cloud client proxies — HMAC signing, allowlists, NoProxy." +--- + +import { Callout } from "fumadocs-ui/components/callout"; + +Proxies handle objects that are neither serializable primitives nor +DI-injectable — things like file handles, HTTP sessions, and cloud clients +that have capturable state. + +When interception detects a proxy-able argument, the handler's +`deconstruct()` method extracts a JSON-serializable recipe. The worker +calls `reconstruct()` to rebuild the live object before invoking the task. +After the task completes, `cleanup()` is called on the reconstructed +object. + +Proxies require interception to be enabled: + +```python +queue = Queue(db_path="tasks.db", interception="strict") +``` + +## Built-in handlers + +| Handler name | Handled types | Notes | +|---|---|---| +| `"file"` | `io.TextIOWrapper`, `io.BufferedReader`, `io.BufferedWriter`, `io.FileIO` | Stores path + mode; worker reopens the file | +| `"logger"` | `logging.Logger` | Stores logger name; worker resolves `logging.getLogger(name)` | +| `"requests_session"` | `requests.Session` | Stores headers, auth, timeout, verify; worker creates a new session | +| `"httpx_client"` | `httpx.Client`, `httpx.AsyncClient` | Stores base_url, headers, timeout, verify | +| `"boto3_client"` | boto3 clients (`botocore.client.BaseClient`) | Stores service name, region, endpoint_url; credentials are NOT included | +| `"gcs_client"` | `google.cloud.storage.Client`, `Bucket`, `Blob` | Stores project and resource identifiers; credentials are NOT included | + +`requests`, `httpx`, `boto3`, and `google-cloud-storage` are optional. +Their handlers register automatically when the library is installed. + +## HMAC signing + +Proxy recipes are signed with HMAC-SHA256 to prevent recipe tampering +between enqueue and execution: + +```python +queue = Queue( + db_path="tasks.db", + interception="strict", + recipe_signing_key="your-secret-key", +) +``` + +If `recipe_signing_key` is not set on the constructor, it falls back to +the `TASKITO_RECIPE_SECRET` environment variable. Signed recipes are +verified at reconstruction time — a modified or forged recipe raises +`ProxyReconstructionError`. + + + Omitting a signing key means recipes are not verified. Use a signing key + in production. + + +## Security options + +### Reconstruction timeout + +Limit how long reconstruction can take before raising +`ProxyReconstructionError`: + +```python +queue = Queue( + db_path="tasks.db", + max_reconstruction_timeout=5.0, # seconds, default 5.0 +) +``` + +### File path allowlist + +Restrict which file paths the file proxy handler is allowed to reconstruct: + +```python +queue = Queue( + db_path="tasks.db", + file_path_allowlist=["/data/uploads/", "/tmp/taskito/"], +) +``` + +Paths outside the allowlist raise `ProxyReconstructionError` during +reconstruction. Without an allowlist, any path is permitted. + +### Disabling specific handlers + +Disable individual handlers by name: + +```python +queue = Queue( + db_path="tasks.db", + disabled_proxies=["requests_session", "gcs_client"], +) +``` + +Disabled handlers are not registered. Arguments of those types fall +through to the serializer — or are rejected if they would otherwise be +PROXY-classified and interception is strict. + +## Cloud handlers + +### AWS (boto3) + +```bash +pip install taskito[aws] # adds boto3>=1.20 +``` + +The `boto3_client` handler stores the service name, region, and optional +endpoint URL. **Credentials are not stored in the recipe.** The worker +uses its own ambient credentials — IAM role, environment variables, or +`~/.aws/credentials`. + +```python +import boto3 + +s3 = boto3.client("s3", region_name="us-east-1") +process_upload.delay(s3, "my-bucket/key") +# Recipe: {"service_name": "s3", "region_name": "us-east-1", "endpoint_url": null} +# Worker recreates: boto3.client("s3", region_name="us-east-1") +``` + +### Google Cloud Storage + +```bash +pip install taskito[gcs] # adds google-cloud-storage>=2.0 +``` + +The `gcs_client` handler stores the project and resource identifiers for +`Client`, `Bucket`, and `Blob` objects. **Credentials are not stored.** The +worker uses Application Default Credentials. + +```python +from google.cloud import storage + +client = storage.Client(project="my-project") +blob = client.bucket("my-bucket").blob("file.parquet") +process_file.delay(blob) +# Recipe: {"type": "blob", "project": "my-project", "bucket_name": "my-bucket", "blob_name": "file.parquet"} +``` + +## `NoProxy` wrapper + +Opt out of proxy handling for a specific argument. The value is passed +through to the serializer as-is: + +```python +from taskito import NoProxy + +session = requests.Session() +session.headers["Authorization"] = "Bearer token" + +# Pass to cloudpickle instead of the proxy system +process.delay(NoProxy(session)) +``` + +Use `NoProxy` when the serializer can handle the value directly (e.g., +with cloudpickle) or when you want to suppress proxy handling for a +specific call without disabling the handler globally. + +## Proxy metrics + +```python +stats = queue.proxy_stats() +# [ +# { +# "handler": "file", +# "total_reconstructions": 42, +# "total_errors": 0, +# "total_cleanup_errors": 0, +# "total_checksum_failures": 0, +# "total_duration_ms": 50.4, +# "avg_duration_ms": 1.2, +# "max_duration_ms": 8.1, +# "p95_duration_ms": 3.4, +# }, +# ... +# ] +``` + +See [Observability](/docs/guides/resources/observability) for Prometheus +metrics and dashboard endpoints. diff --git a/docs-next/content/docs/guides/resources/testing.mdx b/docs-next/content/docs/guides/resources/testing.mdx new file mode 100644 index 0000000..0e4f62a --- /dev/null +++ b/docs-next/content/docs/guides/resources/testing.mdx @@ -0,0 +1,140 @@ +--- +title: Testing with Resources +description: "test_mode(resources=...), MockResource, pytest fixtures, propagating errors." +--- + +import { Callout } from "fumadocs-ui/components/callout"; + +The `test_mode()` context manager runs tasks synchronously in the calling +thread without starting a worker. Pass mock resources to override real +factories during tests. + +## Injecting mock resources + +```python +from unittest.mock import MagicMock +from myapp.tasks import queue, process_order + +def test_process_order(): + mock_db = MagicMock() + mock_db.return_value.get.return_value = Order(id=42, total=99.0) + + with queue.test_mode(resources={"db": mock_db}) as results: + process_order.delay(42) + + assert results[0].succeeded + mock_db.return_value.get.assert_called_once_with(Order, 42) +``` + +The `resources=` dict maps resource names to mock values. Any plain Python +object works — `MagicMock`, a real instance, a simple dict, whatever your +test needs. + +When `test_mode(resources=...)` is active: + +- Resources are taken directly from the dict — no factories are called. +- Proxy reconstruction is bypassed. Proxy markers in arguments are passed through unchanged, so tests don't fail because of missing files or network connections. +- The previous resource runtime is restored on context exit. + +## `MockResource` + +`MockResource` adds call tracking on top of a plain mock value: + +```python +from taskito import MockResource + +def test_with_spy(): + spy_db = MockResource("db", wraps=real_session_factory, track_calls=True) + + with queue.test_mode(resources={"db": spy_db}) as results: + process_order.delay(42) + + assert spy_db.call_count == 1 + assert results[0].succeeded +``` + +| Parameter | Description | +|---|---| +| `name` | Resource name (informational, used in repr). | +| `return_value` | Value returned when the resource is accessed. | +| `wraps` | Wrap a real object — it is returned as-is when the resource is accessed. | +| `track_calls` | If `True`, increment `call_count` each time the resource is accessed. | + +`MockResource` attributes: + +| Attribute | Description | +|---|---| +| `call_count` | Number of times the resource was accessed during the test. | +| `calls` | List of call tuples (currently `[]` — tracking is count-only). | + + + `MockResource` wraps a value — it is not callable by default. If your + task calls `db()` to obtain a session, your `return_value` or `wraps` + must be callable (e.g., a `MagicMock` or a real session factory). + + +## Explicit kwargs override injection + +If a test calls `.delay()` with an explicit kwarg that matches an injected +resource name, the explicit value wins: + +```python +@queue.task() +def my_task(db: Inject["db"]): + db.do_something() + +# Override injection for this one call: +mock_db = MagicMock() +my_task.delay(db=mock_db) +``` + +This also works inside `test_mode()` — the resource dict is the default, +but explicit call-site kwargs always take precedence. + +## pytest fixture pattern + +```python +# conftest.py +import pytest +from unittest.mock import MagicMock +from myapp.tasks import queue + + +@pytest.fixture +def mock_db(): + session = MagicMock() + session.return_value.get.return_value = None + return session + + +@pytest.fixture +def task_results(mock_db): + with queue.test_mode(resources={"db": mock_db}) as results: + yield results, mock_db +``` + +```python +# test_orders.py +def test_order_processed(task_results): + results, mock_db = task_results + mock_db.return_value.get.return_value = Order(id=1, status="pending") + + process_order.delay(1) + + assert results[0].succeeded + order = mock_db.return_value.get.return_value + assert order.status == "processed" +``` + +## Propagating errors + +By default, task exceptions are captured in `TestResult.error`. To re-raise +them immediately: + +```python +with queue.test_mode(propagate_errors=True) as results: + process_order.delay(999) # raises if the task raises +``` + +Use `propagate_errors=False` (the default) when you want to test error +handling — check `results[0].failed` and `results[0].error`.