From 7b8ca8510adc9ca26c93f07e54bd5ec33c7117b6 Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Sat, 21 Mar 2026 23:04:42 +0530 Subject: [PATCH 1/6] fix: bugs and code quality improvements across Rust and Python MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace all eprintln! with log crate macros (15 occurrences across taskito-python, taskito-async, and taskito-core) - Fix queue_name="unknown" bug in middleware context for retry/DLQ/cancel hooks — now passes actual queue name from ResultOutcome - Replace Redis KEYS * with SCAN cursor in reap_expired_locks (O(N) block) - Add 24h TTL to Redis execution claims (SET NX PX) so orphaned claims from dead workers auto-expire - Use typed exceptions in JobResult._poll_once(): TaskCancelledError, MaxRetriesExceededError, TaskFailedError, SerializationError instead of generic RuntimeError - Add TaskFailedError to exception hierarchy - Declare _taskito_is_async and _taskito_async_fn as explicit fields on TaskWrapper instead of dynamic monkey-patch - Fix ruff target-version py39 -> py310 to match requires-python - Fix UP035 (Callable import from collections.abc) and B905 (zip strict) lint warnings surfaced by the target-version change --- crates/taskito-async/Cargo.toml | 1 + crates/taskito-async/src/pool.rs | 5 ++- crates/taskito-async/src/task_executor.rs | 6 +-- crates/taskito-core/src/scheduler/mod.rs | 8 +++- .../src/scheduler/result_handler.rs | 23 ++++++++++- .../src/storage/redis_backend/locks.rs | 39 +++++++++++++------ .../src/storage/sqlite/dead_letter.rs | 4 +- crates/taskito-python/Cargo.toml | 1 + crates/taskito-python/src/async_worker.rs | 6 +-- crates/taskito-python/src/py_queue/worker.rs | 29 ++++++++------ crates/taskito-python/src/py_worker.rs | 6 +-- py_src/taskito/__init__.py | 2 + py_src/taskito/app.py | 17 +++++--- py_src/taskito/async_support/mixins.py | 3 +- py_src/taskito/canvas.py | 2 +- py_src/taskito/contrib/fastapi.py | 4 +- py_src/taskito/contrib/otel.py | 3 +- py_src/taskito/contrib/prometheus.py | 3 +- py_src/taskito/contrib/sentry.py | 3 +- py_src/taskito/events.py | 3 +- py_src/taskito/exceptions.py | 4 ++ py_src/taskito/interception/registry.py | 3 +- py_src/taskito/resources/definition.py | 3 +- py_src/taskito/resources/pool.py | 3 +- py_src/taskito/resources/runtime.py | 3 +- py_src/taskito/resources/thread_local.py | 3 +- py_src/taskito/resources/toml_config.py | 3 +- py_src/taskito/result.py | 23 +++++++++-- py_src/taskito/task.py | 5 ++- pyproject.toml | 2 +- tests/python/test_native_async.py | 2 +- 31 files changed, 157 insertions(+), 65 deletions(-) diff --git a/crates/taskito-async/Cargo.toml b/crates/taskito-async/Cargo.toml index e6782b0..27abd60 100644 --- a/crates/taskito-async/Cargo.toml +++ b/crates/taskito-async/Cargo.toml @@ -9,3 +9,4 @@ pyo3 = { workspace = true } tokio = { workspace = true } crossbeam-channel = { workspace = true } async-trait = { workspace = true } +log = { workspace = true } diff --git a/crates/taskito-async/src/pool.rs b/crates/taskito-async/src/pool.rs index 7eb3aa5..7aad3d8 100644 --- a/crates/taskito-async/src/pool.rs +++ b/crates/taskito-async/src/pool.rs @@ -84,9 +84,10 @@ impl WorkerDispatcher for NativeAsyncPool { &job.queue, ), ) { - eprintln!( + log::error!( "[taskito] Failed to submit async task {}[{}]: {e}", - job.task_name, job.id + job.task_name, + job.id ); let _ = result_tx.send(JobResult::Failure { job_id: job.id.clone(), diff --git a/crates/taskito-async/src/task_executor.rs b/crates/taskito-async/src/task_executor.rs index b07fac6..427be3b 100644 --- a/crates/taskito-async/src/task_executor.rs +++ b/crates/taskito-async/src/task_executor.rs @@ -22,7 +22,7 @@ pub fn execute_sync_task( let max_retries = job.max_retries; let start = std::time::Instant::now(); - eprintln!("[taskito] Task {task_name}[{job_id}] received"); + log::info!("[taskito] Task {task_name}[{job_id}] received"); let result = Python::with_gil(|py| -> PyResult>> { run_task(py, task_registry, job) }); @@ -32,7 +32,7 @@ pub fn execute_sync_task( let job_result = match result { Ok(result_bytes) => { let secs = start.elapsed().as_secs_f64(); - eprintln!("[taskito] Task {task_name}[{job_id}] succeeded in {secs:.3}s"); + log::info!("[taskito] Task {task_name}[{job_id}] succeeded in {secs:.3}s"); JobResult::Success { job_id, result: result_bytes, @@ -59,7 +59,7 @@ pub fn execute_sync_task( check_should_retry(py, retry_filters, &task_name, &exc_class_name, &e) }); - eprintln!("[taskito] Task {task_name}[{job_id}] failed: {error_msg}"); + log::error!("[taskito] Task {task_name}[{job_id}] failed: {error_msg}"); JobResult::Failure { job_id, error: error_msg, diff --git a/crates/taskito-core/src/scheduler/mod.rs b/crates/taskito-core/src/scheduler/mod.rs index 9a5dfed..535fbe5 100644 --- a/crates/taskito-core/src/scheduler/mod.rs +++ b/crates/taskito-core/src/scheduler/mod.rs @@ -79,6 +79,7 @@ pub enum ResultOutcome { Retry { job_id: String, task_name: String, + queue: String, error: String, retry_count: i32, timed_out: bool, @@ -87,11 +88,16 @@ pub enum ResultOutcome { DeadLettered { job_id: String, task_name: String, + queue: String, error: String, timed_out: bool, }, /// Task was cancelled during execution. - Cancelled { job_id: String, task_name: String }, + Cancelled { + job_id: String, + task_name: String, + queue: String, + }, } /// Per-task configuration for retry, rate limiting, and circuit breaker. diff --git a/crates/taskito-core/src/scheduler/result_handler.rs b/crates/taskito-core/src/scheduler/result_handler.rs index daad119..8f9ebd5 100644 --- a/crates/taskito-core/src/scheduler/result_handler.rs +++ b/crates/taskito-core/src/scheduler/result_handler.rs @@ -71,6 +71,14 @@ impl Scheduler { log::error!("circuit breaker error for {task_name}: {e}"); } + // Look up the job to get the queue name for middleware context + let queue = self + .storage + .get_job(&job_id)? + .as_ref() + .map(|j| j.queue.clone()) + .unwrap_or_default(); + // If should_retry is false (exception filtering), skip straight to DLQ if !should_retry { match self.storage.get_job(&job_id)? { @@ -80,6 +88,7 @@ impl Scheduler { return Ok(ResultOutcome::DeadLettered { job_id, task_name, + queue, error, timed_out, }); @@ -103,6 +112,7 @@ impl Scheduler { Ok(ResultOutcome::Retry { job_id, task_name, + queue, error, retry_count, timed_out, @@ -116,6 +126,7 @@ impl Scheduler { Ok(ResultOutcome::DeadLettered { job_id, task_name, + queue, error, timed_out, }) @@ -140,7 +151,17 @@ impl Scheduler { { error!("failed to record metric for cancelled job {job_id}: {e}"); } - Ok(ResultOutcome::Cancelled { job_id, task_name }) + let queue = self + .storage + .get_job(&job_id)? + .as_ref() + .map(|j| j.queue.clone()) + .unwrap_or_default(); + Ok(ResultOutcome::Cancelled { + job_id, + task_name, + queue, + }) } } } diff --git a/crates/taskito-core/src/storage/redis_backend/locks.rs b/crates/taskito-core/src/storage/redis_backend/locks.rs index 735094b..577b43f 100644 --- a/crates/taskito-core/src/storage/redis_backend/locks.rs +++ b/crates/taskito-core/src/storage/redis_backend/locks.rs @@ -127,20 +127,32 @@ impl RedisStorage { let mut conn = self.conn()?; let pattern = self.key(&["lock", "*"]); - let keys: Vec = redis::cmd("KEYS") - .arg(&pattern) - .query(&mut conn) - .map_err(map_err)?; - let mut count = 0u64; - for key in keys { - let expires_at: Option = conn.hget(&key, "expires_at").map_err(map_err)?; - if let Some(exp) = expires_at { - if exp <= now { - conn.del::<_, ()>(&key).map_err(map_err)?; - count += 1; + let mut cursor: u64 = 0; + loop { + let (next_cursor, keys): (u64, Vec) = redis::cmd("SCAN") + .arg(cursor) + .arg("MATCH") + .arg(&pattern) + .arg("COUNT") + .arg(100) + .query(&mut conn) + .map_err(map_err)?; + + for key in keys { + let expires_at: Option = conn.hget(&key, "expires_at").map_err(map_err)?; + if let Some(exp) = expires_at { + if exp <= now { + conn.del::<_, ()>(&key).map_err(map_err)?; + count += 1; + } } } + + cursor = next_cursor; + if cursor == 0 { + break; + } } Ok(count) @@ -151,11 +163,14 @@ impl RedisStorage { let now = now_millis(); let ckey = self.key(&["exec_claim", job_id]); - // NX: set only if not exists + // NX: set only if not exists. PX: auto-expire after 24 hours so + // orphaned claims from dead workers don't block re-execution forever. let result: bool = redis::cmd("SET") .arg(&ckey) .arg(format!("{worker_id}:{now}")) .arg("NX") + .arg("PX") + .arg(86_400_000i64) // 24 hours in milliseconds .query(&mut conn) .map_err(map_err)?; diff --git a/crates/taskito-core/src/storage/sqlite/dead_letter.rs b/crates/taskito-core/src/storage/sqlite/dead_letter.rs index 41efd94..243e17e 100644 --- a/crates/taskito-core/src/storage/sqlite/dead_letter.rs +++ b/crates/taskito-core/src/storage/sqlite/dead_letter.rs @@ -54,8 +54,8 @@ impl SqliteStorage { // Cascade cancel dependents — log warning on failure since the DLQ // transaction already committed and we can't roll it back. if let Err(e) = self.cascade_cancel(&job_id, "dependency failed") { - eprintln!( - "[taskito] WARNING: cascade_cancel failed for job {}: {}. Dependent jobs may be left pending.", + log::warn!( + "[taskito] cascade_cancel failed for job {}: {}. Dependent jobs may be left pending.", job_id, e ); } diff --git a/crates/taskito-python/Cargo.toml b/crates/taskito-python/Cargo.toml index bfeaf46..189bd86 100644 --- a/crates/taskito-python/Cargo.toml +++ b/crates/taskito-python/Cargo.toml @@ -23,3 +23,4 @@ uuid = { workspace = true } async-trait = { workspace = true } taskito-async = { path = "../taskito-async", optional = true } serde_json = { workspace = true } +log = { workspace = true } diff --git a/crates/taskito-python/src/async_worker.rs b/crates/taskito-python/src/async_worker.rs index 19bce48..a7bfeff 100644 --- a/crates/taskito-python/src/async_worker.rs +++ b/crates/taskito-python/src/async_worker.rs @@ -71,7 +71,7 @@ impl WorkerDispatcher for AsyncWorkerPool { let max_retries = job.max_retries; let start = std::time::Instant::now(); - eprintln!("[taskito] Task {task_name}[{job_id}] received"); + log::info!("[taskito] Task {task_name}[{job_id}] received"); let result = Python::with_gil(|py| -> PyResult>> { execute_task(py, ®istry, &job) @@ -82,7 +82,7 @@ impl WorkerDispatcher for AsyncWorkerPool { let job_result = match result { Ok(result_bytes) => { let secs = start.elapsed().as_secs_f64(); - eprintln!("[taskito] Task {task_name}[{job_id}] succeeded in {secs:.3}s"); + log::info!("[taskito] Task {task_name}[{job_id}] succeeded in {secs:.3}s"); JobResult::Success { job_id, result: result_bytes, @@ -109,7 +109,7 @@ impl WorkerDispatcher for AsyncWorkerPool { check_should_retry(py, &filters, &task_name, &exc_class_name, &e) }); - eprintln!("[taskito] Task {task_name}[{job_id}] failed: {error_msg}"); + log::error!("[taskito] Task {task_name}[{job_id}] failed: {error_msg}"); JobResult::Failure { job_id, error: error_msg, diff --git a/crates/taskito-python/src/py_queue/worker.rs b/crates/taskito-python/src/py_queue/worker.rs index 2e8c066..e665e9b 100644 --- a/crates/taskito-python/src/py_queue/worker.rs +++ b/crates/taskito-python/src/py_queue/worker.rs @@ -30,6 +30,7 @@ fn dispatch_outcome(py: Python<'_>, outcome: &ResultOutcome) { ResultOutcome::Retry { job_id, task_name, + queue, error, retry_count, timed_out, @@ -46,12 +47,12 @@ fn dispatch_outcome(py: Python<'_>, outcome: &ResultOutcome) { // Call on_timeout middleware if this was a timeout if *timed_out { - let ctx = build_lightweight_ctx(py, job_id, task_name)?; + let ctx = build_lightweight_ctx(py, job_id, task_name, queue)?; call_middleware_hook(py, &queue_ref, task_name, "on_timeout", (ctx,))?; } // Call on_retry middleware - let ctx = build_lightweight_ctx(py, job_id, task_name)?; + let ctx = build_lightweight_ctx(py, job_id, task_name, queue)?; let error_obj = pyo3::exceptions::PyRuntimeError::new_err(error.clone()).into_py(py); call_middleware_hook( @@ -65,6 +66,7 @@ fn dispatch_outcome(py: Python<'_>, outcome: &ResultOutcome) { ResultOutcome::DeadLettered { job_id, task_name, + queue, error, timed_out, } => { @@ -79,12 +81,12 @@ fn dispatch_outcome(py: Python<'_>, outcome: &ResultOutcome) { // Call on_timeout middleware if this was a timeout if *timed_out { - let ctx = build_lightweight_ctx(py, job_id, task_name)?; + let ctx = build_lightweight_ctx(py, job_id, task_name, queue)?; call_middleware_hook(py, &queue_ref, task_name, "on_timeout", (ctx,))?; } // Call on_dead_letter middleware - let ctx = build_lightweight_ctx(py, job_id, task_name)?; + let ctx = build_lightweight_ctx(py, job_id, task_name, queue)?; let error_obj = pyo3::exceptions::PyRuntimeError::new_err(error.clone()).into_py(py); call_middleware_hook( @@ -95,7 +97,11 @@ fn dispatch_outcome(py: Python<'_>, outcome: &ResultOutcome) { (ctx, error_obj), )?; } - ResultOutcome::Cancelled { job_id, task_name } => { + ResultOutcome::Cancelled { + job_id, + task_name, + queue, + } => { // Emit JOB_CANCELLED event let events_mod = py.import_bound("taskito.events")?; let event_type = events_mod.getattr("EventType")?.getattr("JOB_CANCELLED")?; @@ -105,7 +111,7 @@ fn dispatch_outcome(py: Python<'_>, outcome: &ResultOutcome) { queue_ref.call_method1("_emit_event", (event_type, payload))?; // Call on_cancel middleware - let ctx = build_lightweight_ctx(py, job_id, task_name)?; + let ctx = build_lightweight_ctx(py, job_id, task_name, queue)?; call_middleware_hook(py, &queue_ref, task_name, "on_cancel", (ctx,))?; } ResultOutcome::Success { .. } => { @@ -116,7 +122,7 @@ fn dispatch_outcome(py: Python<'_>, outcome: &ResultOutcome) { })(); if let Err(e) = result { - eprintln!("[taskito] middleware dispatch error: {e}"); + log::error!("[taskito] middleware dispatch error: {e}"); } } @@ -126,12 +132,13 @@ fn build_lightweight_ctx<'py>( py: Python<'py>, job_id: &str, task_name: &str, + queue_name: &str, ) -> PyResult> { let types_mod = py.import_bound("types")?; let ns = types_mod.call_method1("SimpleNamespace", ())?; ns.setattr("id", job_id)?; ns.setattr("task_name", task_name)?; - ns.setattr("queue_name", "unknown")?; + ns.setattr("queue_name", queue_name)?; ns.setattr("retry_count", 0)?; Ok(ns) } @@ -355,7 +362,7 @@ impl PyQueue { { Ok(rt) => rt, Err(e) => { - eprintln!("taskito: failed to build tokio runtime: {e}"); + log::error!("taskito: failed to build tokio runtime: {e}"); return; } }; @@ -450,7 +457,7 @@ impl PyQueue { match outcome { Ok(ref o) => dispatch_outcome(py, o), Err(e) => { - eprintln!("[taskito] result handling error: {e}") + log::error!("[taskito] result handling error: {e}") } } } @@ -465,7 +472,7 @@ impl PyQueue { let outcome = py.allow_threads(|| scheduler_for_results.handle_result(result)); match outcome { Ok(ref o) => dispatch_outcome(py, o), - Err(e) => eprintln!("[taskito] result handling error: {e}"), + Err(e) => log::error!("[taskito] result handling error: {e}"), } } PollAction::Continue => continue, diff --git a/crates/taskito-python/src/py_worker.rs b/crates/taskito-python/src/py_worker.rs index 465e031..864c406 100644 --- a/crates/taskito-python/src/py_worker.rs +++ b/crates/taskito-python/src/py_worker.rs @@ -65,7 +65,7 @@ fn worker_loop( let max_retries = job.max_retries; let start = std::time::Instant::now(); - eprintln!("[taskito] Task {task_name}[{job_id}] received"); + log::info!("[taskito] Task {task_name}[{job_id}] received"); let result = Python::with_gil(|py| -> PyResult>> { execute_task(py, &task_registry, &job) @@ -76,7 +76,7 @@ fn worker_loop( let job_result = match result { Ok(result_bytes) => { let secs = start.elapsed().as_secs_f64(); - eprintln!("[taskito] Task {task_name}[{job_id}] succeeded in {secs:.3}s"); + log::info!("[taskito] Task {task_name}[{job_id}] succeeded in {secs:.3}s"); JobResult::Success { job_id, result: result_bytes, @@ -104,7 +104,7 @@ fn worker_loop( check_should_retry(py, &retry_filters, &task_name, &exc_class_name, &e) }); - eprintln!("[taskito] Task {task_name}[{job_id}] failed: {error_msg}"); + log::error!("[taskito] Task {task_name}[{job_id}] failed: {error_msg}"); JobResult::Failure { job_id, error: error_msg, diff --git a/py_src/taskito/__init__.py b/py_src/taskito/__init__.py index 28ca42b..d06e548 100644 --- a/py_src/taskito/__init__.py +++ b/py_src/taskito/__init__.py @@ -20,6 +20,7 @@ SerializationError, SoftTimeoutError, TaskCancelledError, + TaskFailedError, TaskitoError, TaskTimeoutError, ) @@ -68,6 +69,7 @@ "Signature", "SoftTimeoutError", "TaskCancelledError", + "TaskFailedError", "TaskMiddleware", "TaskTimeoutError", "TaskWrapper", diff --git a/py_src/taskito/app.py b/py_src/taskito/app.py index 6b4a2ee..9e2c42c 100644 --- a/py_src/taskito/app.py +++ b/py_src/taskito/app.py @@ -12,9 +12,9 @@ import threading import urllib.parse import uuid -from collections.abc import Sequence +from collections.abc import Callable, Sequence from concurrent.futures import ThreadPoolExecutor -from typing import TYPE_CHECKING, Any, Callable +from typing import TYPE_CHECKING, Any if TYPE_CHECKING: from taskito.testing import TestMode @@ -382,9 +382,9 @@ def decorator(fn: Callable) -> TaskWrapper: # Mark async status for native async dispatch is_async = asyncio.iscoroutinefunction(fn) - wrapper._taskito_is_async = is_async # type: ignore[attr-defined] + wrapper._taskito_is_async = is_async if is_async: - wrapper._taskito_async_fn = fn # type: ignore[attr-defined] + wrapper._taskito_async_fn = fn return wrapper @@ -918,10 +918,15 @@ def enqueue_many( kw_list = kwargs_list or [{}] * count task_serializer = self._get_serializer(task_name) if self._interceptor is not None: - pairs = [self._interceptor.intercept(a, kw) for a, kw in zip(args_list, kw_list)] + pairs = [ + self._interceptor.intercept(a, kw) + for a, kw in zip(args_list, kw_list, strict=True) + ] payloads = [task_serializer.dumps((a, kw)) for a, kw in pairs] else: - payloads = [task_serializer.dumps((a, kw)) for a, kw in zip(args_list, kw_list)] + payloads = [ + task_serializer.dumps((a, kw)) for a, kw in zip(args_list, kw_list, strict=True) + ] task_names = [task_name] * count queues_list = [queue or "default"] * count if queue else None diff --git a/py_src/taskito/async_support/mixins.py b/py_src/taskito/async_support/mixins.py index 4a07860..72126ca 100644 --- a/py_src/taskito/async_support/mixins.py +++ b/py_src/taskito/async_support/mixins.py @@ -6,7 +6,8 @@ import contextlib import logging import signal -from typing import TYPE_CHECKING, Any, Callable, TypeVar +from collections.abc import Callable +from typing import TYPE_CHECKING, Any, TypeVar if TYPE_CHECKING: from collections.abc import Sequence diff --git a/py_src/taskito/canvas.py b/py_src/taskito/canvas.py index 8bc705a..c1b1eb4 100644 --- a/py_src/taskito/canvas.py +++ b/py_src/taskito/canvas.py @@ -119,7 +119,7 @@ def apply(self, queue: Queue | None = None) -> list[JobResult]: ) wave_jobs.append(job) # Wait for this wave to complete before starting next - for wj, sig in zip(wave_jobs, wave): + for wj, sig in zip(wave_jobs, wave, strict=True): wj.result(timeout=sig.options.get("timeout", 300)) all_jobs.extend(wave_jobs) diff --git a/py_src/taskito/contrib/fastapi.py b/py_src/taskito/contrib/fastapi.py index 5d8a147..49f20dc 100644 --- a/py_src/taskito/contrib/fastapi.py +++ b/py_src/taskito/contrib/fastapi.py @@ -23,8 +23,8 @@ import asyncio import json import logging -from collections.abc import AsyncGenerator, Sequence -from typing import TYPE_CHECKING, Any, Callable +from collections.abc import AsyncGenerator, Callable, Sequence +from typing import TYPE_CHECKING, Any logger = logging.getLogger(__name__) diff --git a/py_src/taskito/contrib/otel.py b/py_src/taskito/contrib/otel.py index 210cc21..7d6f414 100644 --- a/py_src/taskito/contrib/otel.py +++ b/py_src/taskito/contrib/otel.py @@ -13,7 +13,8 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any, Callable +from collections.abc import Callable +from typing import TYPE_CHECKING, Any from taskito.middleware import TaskMiddleware diff --git a/py_src/taskito/contrib/prometheus.py b/py_src/taskito/contrib/prometheus.py index 870bc47..a095b3b 100644 --- a/py_src/taskito/contrib/prometheus.py +++ b/py_src/taskito/contrib/prometheus.py @@ -17,7 +17,8 @@ import logging import threading import time -from typing import TYPE_CHECKING, Any, Callable +from collections.abc import Callable +from typing import TYPE_CHECKING, Any from taskito.middleware import TaskMiddleware diff --git a/py_src/taskito/contrib/sentry.py b/py_src/taskito/contrib/sentry.py index 50468bc..b550777 100644 --- a/py_src/taskito/contrib/sentry.py +++ b/py_src/taskito/contrib/sentry.py @@ -13,7 +13,8 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any, Callable +from collections.abc import Callable +from typing import TYPE_CHECKING, Any from taskito.middleware import TaskMiddleware diff --git a/py_src/taskito/events.py b/py_src/taskito/events.py index a6cf4d8..d8dff03 100644 --- a/py_src/taskito/events.py +++ b/py_src/taskito/events.py @@ -5,8 +5,9 @@ import enum import logging from collections import defaultdict +from collections.abc import Callable from concurrent.futures import ThreadPoolExecutor -from typing import Any, Callable +from typing import Any logger = logging.getLogger("taskito.events") diff --git a/py_src/taskito/exceptions.py b/py_src/taskito/exceptions.py index acc5e00..c48dc1c 100644 --- a/py_src/taskito/exceptions.py +++ b/py_src/taskito/exceptions.py @@ -17,6 +17,10 @@ class TaskCancelledError(TaskitoError): """Raised when a running task detects it has been cancelled.""" +class TaskFailedError(TaskitoError): + """Raised when a task has failed.""" + + class MaxRetriesExceededError(TaskitoError): """Raised when a task has exhausted all retry attempts.""" diff --git a/py_src/taskito/interception/registry.py b/py_src/taskito/interception/registry.py index cb5915a..cffde9d 100644 --- a/py_src/taskito/interception/registry.py +++ b/py_src/taskito/interception/registry.py @@ -2,8 +2,9 @@ from __future__ import annotations +from collections.abc import Callable from dataclasses import dataclass, field -from typing import Any, Callable +from typing import Any from taskito.interception.strategy import Strategy diff --git a/py_src/taskito/resources/definition.py b/py_src/taskito/resources/definition.py index 3872d9a..c378360 100644 --- a/py_src/taskito/resources/definition.py +++ b/py_src/taskito/resources/definition.py @@ -2,9 +2,10 @@ from __future__ import annotations +from collections.abc import Callable from dataclasses import dataclass, field from enum import Enum -from typing import Any, Callable +from typing import Any class ResourceScope(Enum): diff --git a/py_src/taskito/resources/pool.py b/py_src/taskito/resources/pool.py index e5f0ca3..e138c4f 100644 --- a/py_src/taskito/resources/pool.py +++ b/py_src/taskito/resources/pool.py @@ -6,8 +6,9 @@ import threading import time from collections import deque +from collections.abc import Callable from dataclasses import dataclass -from typing import Any, Callable +from typing import Any from taskito.async_support.helpers import run_maybe_async from taskito.exceptions import ResourceUnavailableError diff --git a/py_src/taskito/resources/runtime.py b/py_src/taskito/resources/runtime.py index 0398399..42473f8 100644 --- a/py_src/taskito/resources/runtime.py +++ b/py_src/taskito/resources/runtime.py @@ -3,7 +3,8 @@ from __future__ import annotations import logging -from typing import Any, Callable +from collections.abc import Callable +from typing import Any from taskito.async_support.helpers import run_maybe_async from taskito.exceptions import ( diff --git a/py_src/taskito/resources/thread_local.py b/py_src/taskito/resources/thread_local.py index 6543c8a..b098545 100644 --- a/py_src/taskito/resources/thread_local.py +++ b/py_src/taskito/resources/thread_local.py @@ -4,7 +4,8 @@ import logging import threading -from typing import Any, Callable +from collections.abc import Callable +from typing import Any from taskito.async_support.helpers import run_maybe_async diff --git a/py_src/taskito/resources/toml_config.py b/py_src/taskito/resources/toml_config.py index 9ed0ec3..e7eff27 100644 --- a/py_src/taskito/resources/toml_config.py +++ b/py_src/taskito/resources/toml_config.py @@ -3,7 +3,8 @@ from __future__ import annotations import importlib -from typing import Any, Callable +from collections.abc import Callable +from typing import Any from taskito.resources.definition import ResourceDefinition, ResourceScope diff --git a/py_src/taskito/result.py b/py_src/taskito/result.py index 237fe41..8ecfacf 100644 --- a/py_src/taskito/result.py +++ b/py_src/taskito/result.py @@ -6,6 +6,12 @@ from typing import TYPE_CHECKING, Any from taskito.async_support.result import AsyncJobResultMixin +from taskito.exceptions import ( + MaxRetriesExceededError, + SerializationError, + TaskCancelledError, + TaskFailedError, +) if TYPE_CHECKING: from taskito._taskito import PyJob @@ -89,13 +95,19 @@ def _poll_once(self) -> tuple[str, Any]: try: return status, self._queue._serializer.loads(result_bytes) except Exception as exc: - raise RuntimeError( + raise SerializationError( f"Failed to deserialize result for job {self.id}: {exc}" ) from exc - if status in ("failed", "dead", "cancelled"): + if status == "cancelled": error_msg = self._py_job.error or "job was cancelled" - raise RuntimeError(f"Job {self.id} {status}: {error_msg}") + raise TaskCancelledError(f"Job {self.id} cancelled: {error_msg}") + if status == "dead": + error_msg = self._py_job.error or "max retries exceeded" + raise MaxRetriesExceededError(f"Job {self.id} dead-lettered: {error_msg}") + if status == "failed": + error_msg = self._py_job.error or "task failed" + raise TaskFailedError(f"Job {self.id} failed: {error_msg}") return status, None @@ -121,7 +133,10 @@ def result( Raises: TimeoutError: If the job doesn't complete within the timeout. - RuntimeError: If the job failed or was moved to DLQ. + TaskFailedError: If the job failed. + MaxRetriesExceededError: If the job was moved to DLQ. + TaskCancelledError: If the job was cancelled. + SerializationError: If result deserialization fails. """ if timeout <= 0: raise ValueError("timeout must be positive") diff --git a/py_src/taskito/task.py b/py_src/taskito/task.py index dc8afab..4f9f496 100644 --- a/py_src/taskito/task.py +++ b/py_src/taskito/task.py @@ -2,7 +2,8 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any, Callable +from collections.abc import Callable +from typing import TYPE_CHECKING, Any if TYPE_CHECKING: from taskito.app import Queue @@ -37,6 +38,8 @@ def __init__( self._default_max_retries = default_max_retries self._default_timeout = default_timeout self._inject = inject or [] + self._taskito_is_async: bool = False + self._taskito_async_fn: Callable | None = None @property def name(self) -> str: diff --git a/pyproject.toml b/pyproject.toml index e15dcff..3fb2c78 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -62,7 +62,7 @@ testpaths = ["tests/python"] # -- Ruff -- [tool.ruff] -target-version = "py39" +target-version = "py310" line-length = 99 exclude = [".venv", "target", "dist", "build", "*.egg-info"] diff --git a/tests/python/test_native_async.py b/tests/python/test_native_async.py index a622ecf..f42d0b3 100644 --- a/tests/python/test_native_async.py +++ b/tests/python/test_native_async.py @@ -39,7 +39,7 @@ def my_sync_task(): pass assert my_sync_task._taskito_is_async is False - assert not hasattr(my_sync_task, "_taskito_async_fn") + assert my_sync_task._taskito_async_fn is None # ── Async context (contextvars) ────────────────────────────────── From 6132576d22a62a3126aa6cb9c212f1b700e42b26 Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Sat, 21 Mar 2026 23:11:18 +0530 Subject: [PATCH 2/6] feat: async canvas primitives and PyResultSender runtime guard - Add apply_async() to Signature, chain, group, and chord canvas classes - chain.apply_async() uses aresult() for truly async step-by-step execution - group.apply_async() uses asyncio.gather for concurrent wave execution - chord.apply_async() awaits all group results then enqueues callback - Expose PyResultSender in __init__.py with clean fallback when native-async feature is not compiled in - Update aresult() docstring to reflect typed exceptions --- py_src/taskito/__init__.py | 9 +++ py_src/taskito/async_support/result.py | 5 +- py_src/taskito/canvas.py | 96 ++++++++++++++++++++++++++ 3 files changed, 109 insertions(+), 1 deletion(-) diff --git a/py_src/taskito/__init__.py b/py_src/taskito/__init__.py index d06e548..d9d4704 100644 --- a/py_src/taskito/__init__.py +++ b/py_src/taskito/__init__.py @@ -84,6 +84,15 @@ "group", "starmap", ] +# PyResultSender is only available when built with --features native-async. +# Expose it with a clean error instead of a confusing AttributeError. +try: + from taskito._taskito import PyResultSender # noqa: F401 + + __all__.append("PyResultSender") +except (ImportError, AttributeError): + pass + try: from importlib.metadata import PackageNotFoundError from importlib.metadata import version as _get_version diff --git a/py_src/taskito/async_support/result.py b/py_src/taskito/async_support/result.py index 4f47819..9e55f00 100644 --- a/py_src/taskito/async_support/result.py +++ b/py_src/taskito/async_support/result.py @@ -37,7 +37,10 @@ async def aresult( Raises: TimeoutError: If the job doesn't complete within *timeout*. - RuntimeError: If the job failed or was moved to DLQ. + TaskFailedError: If the job failed. + MaxRetriesExceededError: If the job was moved to DLQ. + TaskCancelledError: If the job was cancelled. + SerializationError: If result deserialization fails. Example:: diff --git a/py_src/taskito/canvas.py b/py_src/taskito/canvas.py index c1b1eb4..a0a9c7a 100644 --- a/py_src/taskito/canvas.py +++ b/py_src/taskito/canvas.py @@ -2,6 +2,7 @@ from __future__ import annotations +import asyncio import math from dataclasses import dataclass, field from typing import TYPE_CHECKING, Any @@ -38,6 +39,14 @@ def apply(self, queue: Queue | None = None) -> JobResult: **self.options, ) + async def apply_async(self, queue: Queue | None = None) -> JobResult: + """Enqueue this signature for execution (async-safe). + + The enqueue operation is a fast DB write so it runs synchronously. + Use this in async contexts for API consistency with other ``apply_async`` methods. + """ + return self.apply(queue) + class chain: """Execute signatures sequentially, piping each result to the next.""" @@ -69,6 +78,28 @@ def apply(self, queue: Queue | None = None) -> JobResult: return last_job # type: ignore[return-value] + async def apply_async(self, queue: Queue | None = None) -> JobResult: + """Execute the chain asynchronously, awaiting each step's result.""" + q = queue or self.signatures[0].task._queue + + prev_result: Any = None + last_job: JobResult | None = None + + for sig in self.signatures: + args = sig.args + if prev_result is not None and not sig.immutable: + args = (prev_result, *sig.args) + + last_job = q.enqueue( + task_name=sig.task.name, + args=args, + kwargs=sig.kwargs if sig.kwargs else None, + **sig.options, + ) + prev_result = await last_job.aresult(timeout=sig.options.get("timeout", 300)) + + return last_job # type: ignore[return-value] + class group: """Execute signatures in parallel and collect all results. @@ -125,6 +156,49 @@ def apply(self, queue: Queue | None = None) -> list[JobResult]: return all_jobs + async def apply_async(self, queue: Queue | None = None) -> list[JobResult]: + """Enqueue all signatures for parallel execution (async-safe). + + With ``max_concurrency`` set, dispatches in waves, awaiting each + wave before starting the next. + """ + q = queue or self.signatures[0].task._queue + + if self.max_concurrency is None: + jobs: list[JobResult] = [] + for sig in self.signatures: + job = q.enqueue( + task_name=sig.task.name, + args=sig.args, + kwargs=sig.kwargs if sig.kwargs else None, + **sig.options, + ) + jobs.append(job) + return jobs + + all_jobs: list[JobResult] = [] + mc = self.max_concurrency + for i in range(0, len(self.signatures), mc): + wave = self.signatures[i : i + mc] + wave_jobs: list[JobResult] = [] + for sig in wave: + job = q.enqueue( + task_name=sig.task.name, + args=sig.args, + kwargs=sig.kwargs if sig.kwargs else None, + **sig.options, + ) + wave_jobs.append(job) + await asyncio.gather( + *( + wj.aresult(timeout=sig.options.get("timeout", 300)) + for wj, sig in zip(wave_jobs, wave, strict=True) + ) + ) + all_jobs.extend(wave_jobs) + + return all_jobs + class chord: """Run a group in parallel, then a callback with all results.""" @@ -156,6 +230,28 @@ def apply(self, queue: Queue | None = None) -> JobResult: **self.callback.options, ) + async def apply_async(self, queue: Queue | None = None) -> JobResult: + """Execute group, await all results, then run the callback (async-safe).""" + q = queue or self.callback.task._queue + + jobs = self.group.apply(queue=q) + max_timeout = max( + (sig.options.get("timeout", 300) for sig in self.group.signatures), + default=300, + ) + results = await asyncio.gather(*(job.aresult(timeout=max_timeout) for job in jobs)) + + args = self.callback.args + if not self.callback.immutable: + args = (list(results), *self.callback.args) + + return q.enqueue( + task_name=self.callback.task.name, + args=args, + kwargs=self.callback.kwargs if self.callback.kwargs else None, + **self.callback.options, + ) + def chunks(task: TaskWrapper, items: list[Any], chunk_size: int) -> group: """Split items into chunks and create a group of tasks processing each chunk. From 9fbe5e22ca16088404ea20c676e89d98305a769b Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Sat, 21 Mar 2026 23:24:48 +0530 Subject: [PATCH 3/6] feat: sample-based circuit breaker half-open recovery Replace the naive single-probe half-open strategy with a sample-based approach: allow N probes (default 5), track success/failure counts, and close only when the success rate meets the threshold (default 80%). New config options on circuit_breaker dict: - half_open_probes: number of probe requests in HalfOpen (default 5) - half_open_success_rate: required success rate to close (default 0.8) Behavior: - allow() permits up to max_probes concurrent probes in HalfOpen - record_success() closes circuit when sample meets success threshold - record_failure() immediately re-opens if threshold becomes impossible - Timeout: if probes don't complete within cooldown period, re-opens Schema changes: 5 new columns on circuit_breakers table with backward- compatible defaults. Redis uses serde(default) for JSON compatibility. --- .../src/resilience/circuit_breaker.rs | 163 +++++++++++++++--- crates/taskito-core/src/scheduler/mod.rs | 2 + crates/taskito-core/src/storage/models.rs | 18 ++ .../taskito-core/src/storage/postgres/mod.rs | 29 +++- crates/taskito-core/src/storage/schema.rs | 5 + crates/taskito-core/src/storage/sqlite/mod.rs | 29 +++- .../taskito-core/tests/rust/storage_tests.rs | 5 + crates/taskito-python/src/py_config.rs | 10 +- crates/taskito-python/src/py_queue/worker.rs | 4 + py_src/taskito/_taskito.pyi | 4 + py_src/taskito/app.py | 6 + 11 files changed, 247 insertions(+), 28 deletions(-) diff --git a/crates/taskito-core/src/resilience/circuit_breaker.rs b/crates/taskito-core/src/resilience/circuit_breaker.rs index 0f6a45f..74342cc 100644 --- a/crates/taskito-core/src/resilience/circuit_breaker.rs +++ b/crates/taskito-core/src/resilience/circuit_breaker.rs @@ -36,6 +36,10 @@ pub struct CircuitBreakerConfig { pub threshold: i32, pub window_ms: i64, pub cooldown_ms: i64, + /// Number of probe requests allowed in HalfOpen state (default: 5). + pub half_open_max_probes: i32, + /// Required success rate (0.0–1.0) to close from HalfOpen (default: 0.8 = 80%). + pub half_open_success_rate: f64, } /// Circuit breaker manager backed by SQLite. @@ -65,10 +69,13 @@ impl CircuitBreaker { // Check if cooldown has elapsed let opened = row.opened_at.unwrap_or(0); if now.saturating_sub(opened) >= row.cooldown_ms { - // Transition to half-open: allow one probe + // Transition to half-open: reset probe counters let updated = CircuitBreakerRow { state: CircuitState::HalfOpen as i32, half_open_at: Some(now), + half_open_probe_count: 0, + half_open_success_count: 0, + half_open_failure_count: 0, ..row }; self.storage.upsert_circuit_breaker(&updated)?; @@ -78,13 +85,37 @@ impl CircuitBreaker { } } CircuitState::HalfOpen => { - // Only one probe at a time — block others - Ok(false) + // Allow up to max_probes concurrent probes + if row.half_open_probe_count < row.half_open_max_probes { + let updated = CircuitBreakerRow { + half_open_probe_count: row.half_open_probe_count + 1, + ..row + }; + self.storage.upsert_circuit_breaker(&updated)?; + Ok(true) + } else { + // Check for timeout: if probes haven't completed within cooldown, re-open + let half_open_since = row.half_open_at.unwrap_or(0); + if now.saturating_sub(half_open_since) >= row.cooldown_ms { + let updated = CircuitBreakerRow { + state: CircuitState::Open as i32, + opened_at: Some(now), + half_open_at: None, + half_open_probe_count: 0, + half_open_success_count: 0, + half_open_failure_count: 0, + ..row + }; + self.storage.upsert_circuit_breaker(&updated)?; + } + Ok(false) + } } } } - /// Record a task success. Resets the circuit breaker to closed. + /// Record a task success. In HalfOpen, tracks probes and closes when + /// the success rate threshold is met. pub fn record_success(&self, task_name: &str) -> Result<()> { let row = match self.storage.get_circuit_breaker(task_name)? { Some(r) => r, @@ -92,19 +123,68 @@ impl CircuitBreaker { }; let state = CircuitState::from_i32(row.state); - if state == CircuitState::Closed && row.failure_count == 0 { - return Ok(()); // Nothing to do - } - let updated = CircuitBreakerRow { - state: CircuitState::Closed as i32, - failure_count: 0, - opened_at: None, - half_open_at: None, - ..row - }; - self.storage.upsert_circuit_breaker(&updated)?; - Ok(()) + match state { + CircuitState::Closed if row.failure_count == 0 => Ok(()), + CircuitState::HalfOpen => { + let successes = row.half_open_success_count + 1; + let total = successes + row.half_open_failure_count; + + if total >= row.half_open_max_probes { + let rate = successes as f64 / total as f64; + if rate >= row.half_open_success_rate { + // Threshold met — close the circuit + let updated = CircuitBreakerRow { + state: CircuitState::Closed as i32, + failure_count: 0, + opened_at: None, + half_open_at: None, + half_open_probe_count: 0, + half_open_success_count: 0, + half_open_failure_count: 0, + ..row + }; + self.storage.upsert_circuit_breaker(&updated)?; + } else { + // Threshold not met — re-open + let now = now_millis(); + let updated = CircuitBreakerRow { + state: CircuitState::Open as i32, + opened_at: Some(now), + half_open_at: None, + half_open_probe_count: 0, + half_open_success_count: 0, + half_open_failure_count: 0, + ..row + }; + self.storage.upsert_circuit_breaker(&updated)?; + } + } else { + // Still collecting samples + let updated = CircuitBreakerRow { + half_open_success_count: successes, + ..row + }; + self.storage.upsert_circuit_breaker(&updated)?; + } + Ok(()) + } + _ => { + // Closed with failures or Open — reset to clean Closed + let updated = CircuitBreakerRow { + state: CircuitState::Closed as i32, + failure_count: 0, + opened_at: None, + half_open_at: None, + half_open_probe_count: 0, + half_open_success_count: 0, + half_open_failure_count: 0, + ..row + }; + self.storage.upsert_circuit_breaker(&updated)?; + Ok(()) + } + } } /// Record a task failure. May trip the breaker open. @@ -120,16 +200,44 @@ impl CircuitBreaker { match state { CircuitState::HalfOpen => { - // Probe failed — go back to open - let updated = CircuitBreakerRow { - state: CircuitState::Open as i32, - failure_count: row.failure_count.saturating_add(1), - last_failure_at: Some(now), - opened_at: Some(now), - half_open_at: None, - ..row + let failures = row.half_open_failure_count + 1; + let total = row.half_open_success_count + failures; + + // Check if the success rate can still be met + let remaining = row.half_open_max_probes - total; + let best_possible_rate = if row.half_open_max_probes > 0 { + (row.half_open_success_count + remaining) as f64 + / row.half_open_max_probes as f64 + } else { + 0.0 }; - self.storage.upsert_circuit_breaker(&updated)?; + + if total >= row.half_open_max_probes + || best_possible_rate < row.half_open_success_rate + { + // Either all samples collected and failed, or impossible to meet threshold + let updated = CircuitBreakerRow { + state: CircuitState::Open as i32, + failure_count: row.failure_count.saturating_add(1), + last_failure_at: Some(now), + opened_at: Some(now), + half_open_at: None, + half_open_probe_count: 0, + half_open_success_count: 0, + half_open_failure_count: 0, + ..row + }; + self.storage.upsert_circuit_breaker(&updated)?; + } else { + // Still collecting samples + let updated = CircuitBreakerRow { + failure_count: row.failure_count.saturating_add(1), + last_failure_at: Some(now), + half_open_failure_count: failures, + ..row + }; + self.storage.upsert_circuit_breaker(&updated)?; + } } CircuitState::Closed => { // Reset count if outside the window @@ -193,6 +301,11 @@ impl CircuitBreaker { threshold: config.threshold, window_ms: config.window_ms, cooldown_ms: config.cooldown_ms, + half_open_max_probes: config.half_open_max_probes, + half_open_success_rate: config.half_open_success_rate, + half_open_probe_count: 0, + half_open_success_count: 0, + half_open_failure_count: 0, }; self.storage.upsert_circuit_breaker(&row)?; Ok(()) diff --git a/crates/taskito-core/src/scheduler/mod.rs b/crates/taskito-core/src/scheduler/mod.rs index 535fbe5..db4bfcd 100644 --- a/crates/taskito-core/src/scheduler/mod.rs +++ b/crates/taskito-core/src/scheduler/mod.rs @@ -458,6 +458,8 @@ mod tests { threshold: 1, window_ms: 60_000, cooldown_ms: 300_000, + half_open_max_probes: 5, + half_open_success_rate: 0.8, }; scheduler.register_task( "cb_task".to_string(), diff --git a/crates/taskito-core/src/storage/models.rs b/crates/taskito-core/src/storage/models.rs index 3f290c1..ac5ca12 100644 --- a/crates/taskito-core/src/storage/models.rs +++ b/crates/taskito-core/src/storage/models.rs @@ -272,6 +272,24 @@ pub struct CircuitBreakerRow { pub threshold: i32, pub window_ms: i64, pub cooldown_ms: i64, + #[serde(default = "default_max_probes")] + pub half_open_max_probes: i32, + #[serde(default = "default_success_rate")] + pub half_open_success_rate: f64, + #[serde(default)] + pub half_open_probe_count: i32, + #[serde(default)] + pub half_open_success_count: i32, + #[serde(default)] + pub half_open_failure_count: i32, +} + +fn default_max_probes() -> i32 { + 5 +} + +fn default_success_rate() -> f64 { + 0.8 } // ── Workers ────────────────────────────────────────────────────── diff --git a/crates/taskito-core/src/storage/postgres/mod.rs b/crates/taskito-core/src/storage/postgres/mod.rs index 91cec8c..5d50341 100644 --- a/crates/taskito-core/src/storage/postgres/mod.rs +++ b/crates/taskito-core/src/storage/postgres/mod.rs @@ -319,7 +319,12 @@ impl PostgresStorage { half_open_at BIGINT, threshold INTEGER NOT NULL DEFAULT 5, window_ms BIGINT NOT NULL DEFAULT 60000, - cooldown_ms BIGINT NOT NULL DEFAULT 300000 + cooldown_ms BIGINT NOT NULL DEFAULT 300000, + half_open_max_probes INTEGER NOT NULL DEFAULT 5, + half_open_success_rate DOUBLE PRECISION NOT NULL DEFAULT 0.8, + half_open_probe_count INTEGER NOT NULL DEFAULT 0, + half_open_success_count INTEGER NOT NULL DEFAULT 0, + half_open_failure_count INTEGER NOT NULL DEFAULT 0 )", ) .execute(&mut conn)?; @@ -407,6 +412,28 @@ impl PostgresStorage { "ALTER TABLE jobs ADD COLUMN IF NOT EXISTS namespace TEXT", ); + // Migration: add sample-based half-open probes to circuit breakers + migration_alter( + &mut conn, + "ALTER TABLE circuit_breakers ADD COLUMN IF NOT EXISTS half_open_max_probes INTEGER NOT NULL DEFAULT 5", + ); + migration_alter( + &mut conn, + "ALTER TABLE circuit_breakers ADD COLUMN IF NOT EXISTS half_open_success_rate DOUBLE PRECISION NOT NULL DEFAULT 0.8", + ); + migration_alter( + &mut conn, + "ALTER TABLE circuit_breakers ADD COLUMN IF NOT EXISTS half_open_probe_count INTEGER NOT NULL DEFAULT 0", + ); + migration_alter( + &mut conn, + "ALTER TABLE circuit_breakers ADD COLUMN IF NOT EXISTS half_open_success_count INTEGER NOT NULL DEFAULT 0", + ); + migration_alter( + &mut conn, + "ALTER TABLE circuit_breakers ADD COLUMN IF NOT EXISTS half_open_failure_count INTEGER NOT NULL DEFAULT 0", + ); + // ── Distributed Locks ───────────────────────────── diesel::sql_query( "CREATE TABLE IF NOT EXISTS distributed_locks ( diff --git a/crates/taskito-core/src/storage/schema.rs b/crates/taskito-core/src/storage/schema.rs index 9cb3155..9d1d802 100644 --- a/crates/taskito-core/src/storage/schema.rs +++ b/crates/taskito-core/src/storage/schema.rs @@ -134,6 +134,11 @@ diesel::table! { threshold -> Integer, window_ms -> BigInt, cooldown_ms -> BigInt, + half_open_max_probes -> Integer, + half_open_success_rate -> Double, + half_open_probe_count -> Integer, + half_open_success_count -> Integer, + half_open_failure_count -> Integer, } } diff --git a/crates/taskito-core/src/storage/sqlite/mod.rs b/crates/taskito-core/src/storage/sqlite/mod.rs index 2dc6e6d..a2ef348 100644 --- a/crates/taskito-core/src/storage/sqlite/mod.rs +++ b/crates/taskito-core/src/storage/sqlite/mod.rs @@ -321,7 +321,12 @@ impl SqliteStorage { half_open_at INTEGER, threshold INTEGER NOT NULL DEFAULT 5, window_ms INTEGER NOT NULL DEFAULT 60000, - cooldown_ms INTEGER NOT NULL DEFAULT 300000 + cooldown_ms INTEGER NOT NULL DEFAULT 300000, + half_open_max_probes INTEGER NOT NULL DEFAULT 5, + half_open_success_rate REAL NOT NULL DEFAULT 0.8, + half_open_probe_count INTEGER NOT NULL DEFAULT 0, + half_open_success_count INTEGER NOT NULL DEFAULT 0, + half_open_failure_count INTEGER NOT NULL DEFAULT 0 )", ) .execute(&mut conn)?; @@ -403,6 +408,28 @@ impl SqliteStorage { // Migration: add namespace column to jobs migration_alter(&mut conn, "ALTER TABLE jobs ADD COLUMN namespace TEXT"); + // Migration: add sample-based half-open probes to circuit breakers + migration_alter( + &mut conn, + "ALTER TABLE circuit_breakers ADD COLUMN half_open_max_probes INTEGER NOT NULL DEFAULT 5", + ); + migration_alter( + &mut conn, + "ALTER TABLE circuit_breakers ADD COLUMN half_open_success_rate REAL NOT NULL DEFAULT 0.8", + ); + migration_alter( + &mut conn, + "ALTER TABLE circuit_breakers ADD COLUMN half_open_probe_count INTEGER NOT NULL DEFAULT 0", + ); + migration_alter( + &mut conn, + "ALTER TABLE circuit_breakers ADD COLUMN half_open_success_count INTEGER NOT NULL DEFAULT 0", + ); + migration_alter( + &mut conn, + "ALTER TABLE circuit_breakers ADD COLUMN half_open_failure_count INTEGER NOT NULL DEFAULT 0", + ); + // ── Distributed Locks ───────────────────────────── diesel::sql_query( "CREATE TABLE IF NOT EXISTS distributed_locks ( diff --git a/crates/taskito-core/tests/rust/storage_tests.rs b/crates/taskito-core/tests/rust/storage_tests.rs index 8b68e41..b57936d 100644 --- a/crates/taskito-core/tests/rust/storage_tests.rs +++ b/crates/taskito-core/tests/rust/storage_tests.rs @@ -206,6 +206,11 @@ fn test_circuit_breakers(s: &impl Storage) { threshold: 5, window_ms: 60_000, cooldown_ms: 30_000, + half_open_max_probes: 5, + half_open_success_rate: 0.8, + half_open_probe_count: 0, + half_open_success_count: 0, + half_open_failure_count: 0, }; s.upsert_circuit_breaker(&row).unwrap(); diff --git a/crates/taskito-python/src/py_config.rs b/crates/taskito-python/src/py_config.rs index ec911f3..7d84f01 100644 --- a/crates/taskito-python/src/py_config.rs +++ b/crates/taskito-python/src/py_config.rs @@ -30,13 +30,17 @@ pub struct PyTaskConfig { pub max_retry_delay: Option, #[pyo3(get, set)] pub max_concurrent: Option, + #[pyo3(get, set)] + pub circuit_breaker_half_open_probes: Option, + #[pyo3(get, set)] + pub circuit_breaker_half_open_success_rate: Option, } #[pymethods] #[allow(clippy::too_many_arguments)] impl PyTaskConfig { #[new] - #[pyo3(signature = (name, max_retries=3, retry_backoff=1.0, timeout=300, priority=0, rate_limit=None, queue="default".to_string(), circuit_breaker_threshold=None, circuit_breaker_window=None, circuit_breaker_cooldown=None, retry_delays=None, max_retry_delay=None, max_concurrent=None))] + #[pyo3(signature = (name, max_retries=3, retry_backoff=1.0, timeout=300, priority=0, rate_limit=None, queue="default".to_string(), circuit_breaker_threshold=None, circuit_breaker_window=None, circuit_breaker_cooldown=None, retry_delays=None, max_retry_delay=None, max_concurrent=None, circuit_breaker_half_open_probes=None, circuit_breaker_half_open_success_rate=None))] pub fn new( name: String, max_retries: i32, @@ -51,6 +55,8 @@ impl PyTaskConfig { retry_delays: Option>, max_retry_delay: Option, max_concurrent: Option, + circuit_breaker_half_open_probes: Option, + circuit_breaker_half_open_success_rate: Option, ) -> Self { Self { name, @@ -66,6 +72,8 @@ impl PyTaskConfig { retry_delays, max_retry_delay, max_concurrent, + circuit_breaker_half_open_probes, + circuit_breaker_half_open_success_rate, } } } diff --git a/crates/taskito-python/src/py_queue/worker.rs b/crates/taskito-python/src/py_queue/worker.rs index e665e9b..01f4407 100644 --- a/crates/taskito-python/src/py_queue/worker.rs +++ b/crates/taskito-python/src/py_queue/worker.rs @@ -269,6 +269,10 @@ impl PyQueue { threshold, window_ms: tc.circuit_breaker_window.unwrap_or(60) * 1000, cooldown_ms: tc.circuit_breaker_cooldown.unwrap_or(300) * 1000, + half_open_max_probes: tc.circuit_breaker_half_open_probes.unwrap_or(5), + half_open_success_rate: tc + .circuit_breaker_half_open_success_rate + .unwrap_or(0.8), }); scheduler.register_task( tc.name.clone(), diff --git a/py_src/taskito/_taskito.pyi b/py_src/taskito/_taskito.pyi index 3b9bd1f..d410cf9 100644 --- a/py_src/taskito/_taskito.pyi +++ b/py_src/taskito/_taskito.pyi @@ -22,6 +22,8 @@ class PyTaskConfig: retry_delays: list[float] | None max_retry_delay: int | None max_concurrent: int | None + circuit_breaker_half_open_probes: int | None + circuit_breaker_half_open_success_rate: float | None def __init__( self, @@ -38,6 +40,8 @@ class PyTaskConfig: retry_delays: list[float] | None = None, max_retry_delay: int | None = None, max_concurrent: int | None = None, + circuit_breaker_half_open_probes: int | None = None, + circuit_breaker_half_open_success_rate: float | None = None, ) -> None: ... class PyJob: diff --git a/py_src/taskito/app.py b/py_src/taskito/app.py index 9e2c42c..3bc2c47 100644 --- a/py_src/taskito/app.py +++ b/py_src/taskito/app.py @@ -342,10 +342,14 @@ def decorator(fn: Callable) -> TaskWrapper: cb_threshold = None cb_window = None cb_cooldown = None + cb_half_open_probes = None + cb_half_open_success_rate = None if circuit_breaker: cb_threshold = circuit_breaker.get("threshold", 5) cb_window = circuit_breaker.get("window", 60) cb_cooldown = circuit_breaker.get("cooldown", 300) + cb_half_open_probes = circuit_breaker.get("half_open_probes") + cb_half_open_success_rate = circuit_breaker.get("half_open_success_rate") # Store config for worker startup config = PyTaskConfig( @@ -362,6 +366,8 @@ def decorator(fn: Callable) -> TaskWrapper: retry_delays=retry_delays, max_retry_delay=max_retry_delay, max_concurrent=max_concurrent, + circuit_breaker_half_open_probes=cb_half_open_probes, + circuit_breaker_half_open_success_rate=cb_half_open_success_rate, ) self._task_configs.append(config) From da93d50fd3244583b7b548753e4737031202f37a Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Sat, 21 Mar 2026 23:30:30 +0530 Subject: [PATCH 4/6] feat: enqueue_batch parity with single enqueue Add per-job optional parameters to enqueue_batch/enqueue_many that were previously only available on single enqueue: - delay_seconds_list / delay: per-job or uniform delay - unique_keys: per-job deduplication keys - metadata_list / metadata: per-job or uniform metadata JSON - expires_list / expires: per-job or uniform expiry - result_ttl_list / result_ttl: per-job or uniform result TTL Also adds JOB_ENQUEUED event emission and on_enqueue middleware hook dispatch to enqueue_many, matching the single enqueue behavior. --- crates/taskito-python/src/py_queue/mod.rs | 50 ++++++++++++++++++++--- py_src/taskito/_taskito.pyi | 5 +++ py_src/taskito/app.py | 46 ++++++++++++++++++++- 3 files changed, 94 insertions(+), 7 deletions(-) diff --git a/crates/taskito-python/src/py_queue/mod.rs b/crates/taskito-python/src/py_queue/mod.rs index b84bfda..58661d3 100644 --- a/crates/taskito-python/src/py_queue/mod.rs +++ b/crates/taskito-python/src/py_queue/mod.rs @@ -233,7 +233,8 @@ impl PyQueue { } /// Enqueue multiple jobs in a single transaction. - #[pyo3(signature = (task_names, payloads, queues=None, priorities=None, max_retries_list=None, timeouts=None))] + #[pyo3(signature = (task_names, payloads, queues=None, priorities=None, max_retries_list=None, timeouts=None, delay_seconds_list=None, unique_keys=None, metadata_list=None, expires_list=None, result_ttl_list=None))] + #[allow(clippy::too_many_arguments)] pub fn enqueue_batch( &self, task_names: Vec, @@ -242,6 +243,11 @@ impl PyQueue { priorities: Option>, max_retries_list: Option>, timeouts: Option>, + delay_seconds_list: Option>>, + unique_keys: Option>>, + metadata_list: Option>>, + expires_list: Option>>, + result_ttl_list: Option>>, ) -> PyResult> { let count = task_names.len(); if payloads.len() != count { @@ -254,6 +260,34 @@ impl PyQueue { let mut new_jobs = Vec::with_capacity(count); for i in 0..count { + let delay = delay_seconds_list + .as_ref() + .and_then(|d| d.get(i).copied().flatten()); + let scheduled_at = match delay { + Some(d) if d.is_finite() && d >= 0.0 => { + let delay_ms = (d * 1000.0) as i64; + now.saturating_add(delay_ms) + } + _ => now, + }; + + let expires_at = expires_list + .as_ref() + .and_then(|e| e.get(i).copied().flatten()) + .and_then(|e| { + if e.is_finite() && e >= 0.0 { + let ms = (e * 1000.0) as i64; + Some(now.saturating_add(ms)) + } else { + None + } + }); + + let result_ttl_ms = result_ttl_list + .as_ref() + .and_then(|r| r.get(i).copied().flatten()) + .map(|s| s.saturating_mul(1000)); + new_jobs.push(NewJob { queue: queues.as_ref().map_or("default".to_string(), |q| { q.get(i).cloned().unwrap_or_else(|| "default".to_string()) @@ -263,7 +297,7 @@ impl PyQueue { priority: priorities.as_ref().map_or(self.default_priority, |p| { p.get(i).copied().unwrap_or(self.default_priority) }), - scheduled_at: now, + scheduled_at, max_retries: max_retries_list.as_ref().map_or(self.default_retry, |r| { r.get(i).copied().unwrap_or(self.default_retry) }), @@ -275,11 +309,15 @@ impl PyQueue { pyo3::exceptions::PyValueError::new_err("timeout too large, would overflow") })? }, - unique_key: None, - metadata: None, + unique_key: unique_keys + .as_ref() + .and_then(|u| u.get(i).cloned().flatten()), + metadata: metadata_list + .as_ref() + .and_then(|m| m.get(i).cloned().flatten()), depends_on: vec![], - expires_at: None, - result_ttl_ms: None, + expires_at, + result_ttl_ms, namespace: None, }); } diff --git a/py_src/taskito/_taskito.pyi b/py_src/taskito/_taskito.pyi index d410cf9..1b55c56 100644 --- a/py_src/taskito/_taskito.pyi +++ b/py_src/taskito/_taskito.pyi @@ -112,6 +112,11 @@ class PyQueue: priorities: list[int] | None = None, max_retries_list: list[int] | None = None, timeouts: list[int] | None = None, + delay_seconds_list: list[float | None] | None = None, + unique_keys: list[str | None] | None = None, + metadata_list: list[str | None] | None = None, + expires_list: list[float | None] | None = None, + result_ttl_list: list[int | None] | None = None, ) -> list[PyJob]: ... def list_jobs( self, diff --git a/py_src/taskito/app.py b/py_src/taskito/app.py index 3bc2c47..87d02c6 100644 --- a/py_src/taskito/app.py +++ b/py_src/taskito/app.py @@ -900,6 +900,15 @@ def enqueue_many( queue: str | None = None, max_retries: int | None = None, timeout: int | None = None, + delay: float | None = None, + delay_list: list[float | None] | None = None, + unique_keys: list[str | None] | None = None, + metadata: str | None = None, + metadata_list: list[str | None] | None = None, + expires: float | None = None, + expires_list: list[float | None] | None = None, + result_ttl: int | None = None, + result_ttl_list: list[int | None] | None = None, ) -> list[JobResult]: """Enqueue multiple jobs for the same task in a single transaction. @@ -911,6 +920,15 @@ def enqueue_many( queue: Queue name for all jobs (uses "default" if None). max_retries: Max retries for all jobs (uses default if None). timeout: Timeout in seconds for all jobs (uses default if None). + delay: Uniform delay in seconds for all jobs. + delay_list: Per-job delays in seconds. + unique_keys: Per-job deduplication keys. + metadata: Uniform metadata JSON string for all jobs. + metadata_list: Per-job metadata JSON strings. + expires: Uniform expiry in seconds for all jobs. + expires_list: Per-job expiry in seconds. + result_ttl: Uniform result TTL in seconds for all jobs. + result_ttl_list: Per-job result TTL in seconds. Returns: List of JobResult handles, one per enqueued job. @@ -940,6 +958,12 @@ def enqueue_many( retries_list = [max_retries] * count if max_retries is not None else None timeouts_list = [timeout] * count if timeout is not None else None + # Build per-job optional lists + delays = delay_list or ([delay] * count if delay is not None else None) + metas = metadata_list or ([metadata] * count if metadata is not None else None) + exp_list = expires_list or ([expires] * count if expires is not None else None) + ttl_list = result_ttl_list or ([result_ttl] * count if result_ttl is not None else None) + py_jobs = self._inner.enqueue_batch( task_names=task_names, payloads=payloads, @@ -947,9 +971,29 @@ def enqueue_many( priorities=priorities_list, max_retries_list=retries_list, timeouts=timeouts_list, + delay_seconds_list=delays, + unique_keys=unique_keys, + metadata_list=metas, + expires_list=exp_list, + result_ttl_list=ttl_list, ) - return [JobResult(py_job=pj, queue=self) for pj in py_jobs] + results = [JobResult(py_job=pj, queue=self) for pj in py_jobs] + + # Emit events and dispatch on_enqueue middleware + for job_result in results: + self._emit_event( + EventType.JOB_ENQUEUED, + {"job_id": job_result.id, "task_name": task_name}, + ) + for mw in self._get_middleware_chain(task_name): + try: + options: dict[str, Any] = {} + mw.on_enqueue(task_name, args_list[0], {}, options) + except Exception: + pass + + return results # -- Events & Webhooks -- From ac0a7a061080d662deedf9981f773d02b50d0bfe Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Sat, 21 Mar 2026 23:43:59 +0530 Subject: [PATCH 5/6] docs: update changelog, API docs, and circuit breaker guide for v0.7.0 - Add 0.7.0 changelog entry with all features, fixes, and internals - Update result.md: document typed exceptions (TaskFailedError, MaxRetriesExceededError, TaskCancelledError, SerializationError) - Update canvas.md: add apply_async() docs for Signature, chain, group, and chord - Update queue.md: add new enqueue_many() parameters (delay, unique_keys, metadata, expires, result_ttl with per-job variants) - Update circuit-breakers.md: document sample-based half-open recovery with half_open_probes and half_open_success_rate parameters --- docs/api/canvas.md | 37 ++++++++++++++++++++++++++++++++++ docs/api/queue.md | 25 ++++++++++++++++++++++- docs/api/result.md | 23 +++++++++++++++++---- docs/changelog.md | 27 +++++++++++++++++++++++++ docs/guide/circuit-breakers.md | 18 ++++++++++------- 5 files changed, 118 insertions(+), 12 deletions(-) diff --git a/docs/api/canvas.md b/docs/api/canvas.md index 2f905d5..9a06a63 100644 --- a/docs/api/canvas.md +++ b/docs/api/canvas.md @@ -48,6 +48,14 @@ job = sig.apply() print(job.result(timeout=10)) # 5 ``` +### `await sig.apply_async()` + +```python +await sig.apply_async(queue: Queue | None = None) -> JobResult +``` + +Async version of `apply()`. Safe to call from async contexts (FastAPI handlers, etc.). + ### Mutable vs Immutable In a [`chain`](#chain), the previous task's return value is **prepended** to a mutable signature's args: @@ -104,6 +112,19 @@ graph LR B -->|"16"| C["Result: 16"] ``` +### `await chain.apply_async()` + +```python +await chain.apply_async(queue: Queue | None = None) -> JobResult +``` + +Async version of `apply()`. Awaits each step's result using `aresult()` instead of blocking with `result()`. Safe to call from async contexts. + +```python +result = await chain(double.s(3), add_ten.s()).apply_async() +value = await result.aresult(timeout=10) # 16 +``` + --- ## group @@ -144,6 +165,14 @@ graph LR C["add(5,6)"] --> D ``` +### `await group.apply_async()` + +```python +await group.apply_async(queue: Queue | None = None) -> list[JobResult] +``` + +Async version of `apply()`. With `max_concurrency`, uses `asyncio.gather` to await each wave concurrently instead of blocking. + --- ## chord @@ -193,6 +222,14 @@ graph LR C --> D["Combined result"] ``` +### `await chord.apply_async()` + +```python +await chord.apply_async(queue: Queue | None = None) -> JobResult +``` + +Async version of `apply()`. Awaits all group results using `asyncio.gather`, then enqueues the callback. + --- ## Complete Example diff --git a/docs/api/queue.md b/docs/api/queue.md index a4872d8..f10ed56 100644 --- a/docs/api/queue.md +++ b/docs/api/queue.md @@ -159,10 +159,33 @@ queue.enqueue_many( queue: str | None = None, max_retries: int | None = None, timeout: int | None = None, + delay: float | None = None, + delay_list: list[float | None] | None = None, + unique_keys: list[str | None] | None = None, + metadata: str | None = None, + metadata_list: list[str | None] | None = None, + expires: float | None = None, + expires_list: list[float | None] | None = None, + result_ttl: int | None = None, + result_ttl_list: list[int | None] | None = None, ) -> list[JobResult] ``` -Enqueue multiple jobs in a single transaction for high throughput. +Enqueue multiple jobs in a single transaction for high throughput. Supports both uniform parameters (applied to all jobs) and per-job lists. + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `delay` | `float \| None` | `None` | Uniform delay in seconds for all jobs | +| `delay_list` | `list[float \| None] \| None` | `None` | Per-job delays in seconds | +| `unique_keys` | `list[str \| None] \| None` | `None` | Per-job deduplication keys | +| `metadata` | `str \| None` | `None` | Uniform metadata JSON for all jobs | +| `metadata_list` | `list[str \| None] \| None` | `None` | Per-job metadata JSON | +| `expires` | `float \| None` | `None` | Uniform expiry in seconds for all jobs | +| `expires_list` | `list[float \| None] \| None` | `None` | Per-job expiry in seconds | +| `result_ttl` | `int \| None` | `None` | Uniform result TTL in seconds | +| `result_ttl_list` | `list[int \| None] \| None` | `None` | Per-job result TTL in seconds | + +Per-job lists (`*_list`) take precedence over uniform values when both are provided. ## Job Management diff --git a/docs/api/result.md b/docs/api/result.md index 3cb8103..5864239 100644 --- a/docs/api/result.md +++ b/docs/api/result.md @@ -119,14 +119,26 @@ job.result( **Raises:** - `TimeoutError` — if the job doesn't complete within `timeout` -- `RuntimeError` — if the job status is `"failed"` or `"dead"` +- `TaskFailedError` — if the job status is `"failed"` +- `MaxRetriesExceededError` — if the job status is `"dead"` (all retries exhausted) +- `TaskCancelledError` — if the job status is `"cancelled"` +- `SerializationError` — if result deserialization fails ```python +from taskito import TaskFailedError, MaxRetriesExceededError, TaskCancelledError + job = add.delay(2, 3) result = job.result(timeout=10) # blocks, returns 5 -# Custom polling for long-running tasks -result = job.result(timeout=600, poll_interval=1.0, max_poll_interval=5.0) +# Handle specific failure modes +try: + result = job.result(timeout=10) +except TaskCancelledError: + print("Job was cancelled") +except MaxRetriesExceededError: + print("Job exhausted all retries") +except TaskFailedError: + print("Job failed") ``` ### `await job.aresult()` @@ -150,7 +162,10 @@ Async version of `result()`. Uses `asyncio.sleep()` instead of `time.sleep()`, s **Raises:** - `TimeoutError` — if the job doesn't complete within `timeout` -- `RuntimeError` — if the job status is `"failed"` or `"dead"` +- `TaskFailedError` — if the job status is `"failed"` +- `MaxRetriesExceededError` — if the job status is `"dead"` +- `TaskCancelledError` — if the job status is `"cancelled"` +- `SerializationError` — if result deserialization fails ```python job = add.delay(2, 3) diff --git a/docs/changelog.md b/docs/changelog.md index 4d57905..82f431a 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -2,6 +2,33 @@ All notable changes to taskito are documented here. +## 0.7.0 + +### Features + +- **Async canvas primitives** -- `Signature.apply_async()`, `chain.apply_async()`, `group.apply_async()`, and `chord.apply_async()` for non-blocking workflow execution from async contexts; `chain` uses `aresult()` for truly async step-by-step execution; `group` uses `asyncio.gather` for concurrent wave awaiting; `chord` awaits all group results then enqueues the callback +- **Sample-based circuit breaker recovery** -- half-open state now allows N probe requests (default 5) instead of a single probe; closes only when the success rate meets a configurable threshold (default 80%); immediately re-opens when the threshold becomes mathematically impossible; timeout safety valve re-opens if probes don't complete within the cooldown period; configure via `circuit_breaker={"half_open_probes": 5, "half_open_success_rate": 0.8}` on `@queue.task()` +- **`enqueue_many()` parity with `enqueue()`** -- batch enqueue now supports per-job `delay`/`delay_list`, `unique_keys`, `metadata`/`metadata_list`, `expires`/`expires_list`, and `result_ttl`/`result_ttl_list` parameters; also emits `JOB_ENQUEUED` events and dispatches `on_enqueue` middleware hooks, matching single-enqueue behavior +- **`TaskFailedError` exception** -- new exception type in the hierarchy for tasks that failed (as opposed to cancelled or dead-lettered); `job.result()` now raises `TaskFailedError`, `TaskCancelledError`, `MaxRetriesExceededError`, or `SerializationError` instead of generic `RuntimeError` +- **`PyResultSender` conditional export** -- `from taskito import PyResultSender` works when built with `native-async` feature; silently unavailable otherwise (no confusing `AttributeError`) + +### Fixes + +- **Middleware context `queue_name` was `"unknown"`** -- `on_retry`, `on_dead_letter`, `on_cancel`, and `on_timeout` middleware hooks now receive the actual queue name from the job instead of a hardcoded `"unknown"` string +- **Redis `KEYS *` in lock reaping** -- `reap_expired_locks` replaced `KEYS` (O(N), blocks Redis server) with cursor-based `SCAN` using `COUNT 100` +- **Redis execution claims never expire** -- `claim_execution` now uses `SET NX PX 86400000` (24-hour TTL); orphaned claims from dead workers auto-expire instead of blocking re-execution forever +- **`_taskito_is_async` fragility** -- `_taskito_is_async` and `_taskito_async_fn` are now declared fields on `TaskWrapper.__init__` instead of dynamically monkey-patched attributes; prevents silent fallback to sync execution path if attributes are missing + +### Internal + +- All production Rust `eprintln!` calls replaced with `log` crate macros (`log::info!`, `log::warn!`, `log::error!`); `log` dependency added to `taskito-python` and `taskito-async` crates +- `ResultOutcome::Retry`, `::DeadLettered`, `::Cancelled` now carry `queue: String` for middleware context +- Ruff `target-version` updated from `py39` to `py310` to match `requires-python = ">=3.10"` +- Fixed UP035 (`Callable` import from `collections.abc`) and B905 (`zip()` without `strict=`) lint warnings +- Circuit breakers schema: 5 new columns on `circuit_breakers` table (`half_open_max_probes`, `half_open_success_rate`, `half_open_probe_count`, `half_open_success_count`, `half_open_failure_count`) with backward-compatible defaults + +--- + ## 0.6.0 ### Features diff --git a/docs/guide/circuit-breakers.md b/docs/guide/circuit-breakers.md index e8eeee7..fd8bf1b 100644 --- a/docs/guide/circuit-breakers.md +++ b/docs/guide/circuit-breakers.md @@ -11,13 +11,13 @@ stateDiagram-v2 [*] --> Closed Closed --> Open: failures >= threshold within window Open --> HalfOpen: cooldown elapsed - HalfOpen --> Closed: task succeeds - HalfOpen --> Open: task fails again + HalfOpen --> Closed: success rate >= threshold + HalfOpen --> Open: success rate impossible OR timeout ``` - **Closed** — Normal operation. Tasks execute as usual. Failures are counted. - **Open** — Too many failures. Tasks are immediately rejected without execution. -- **Half-Open** — After the cooldown period, one task is allowed through as a test. If it succeeds, the breaker closes. If it fails, the breaker reopens. +- **Half-Open** — After the cooldown period, up to N probe requests are allowed through (default 5). Success and failure counts are tracked. The circuit closes when the success rate meets the threshold (default 80%). If too many probes fail and the threshold becomes mathematically impossible, the circuit immediately re-opens. If probes don't complete within the cooldown period, the circuit re-opens as a safety valve. ## Configuration @@ -26,9 +26,11 @@ Enable circuit breakers per task using the `circuit_breaker` parameter: ```python @queue.task( circuit_breaker={ - "threshold": 5, # Open after 5 failures - "window": 60, # Within a 60-second window - "cooldown": 300, # Stay open for 5 minutes before half-open + "threshold": 5, # Open after 5 failures + "window": 60, # Within a 60-second window + "cooldown": 300, # Stay open for 5 minutes before half-open + "half_open_probes": 5, # Allow 5 probe requests in half-open + "half_open_success_rate": 0.8, # Close when 80% of probes succeed } ) def call_external_api(endpoint: str) -> dict: @@ -40,6 +42,8 @@ def call_external_api(endpoint: str) -> dict: | `threshold` | `int` | `5` | Number of failures to trigger the breaker | | `window` | `int` | `60` | Time window in seconds for counting failures | | `cooldown` | `int` | `300` | Seconds to wait before allowing a test request | +| `half_open_probes` | `int` | `5` | Number of probe requests allowed in half-open | +| `half_open_success_rate` | `float` | `0.8` | Required success rate (0.0–1.0) to close from half-open | ## Inspecting Circuit Breaker State @@ -116,7 +120,7 @@ def charge_customer(customer_id: str, amount: float): return response.json() ``` -If the payment API goes down, the circuit breaker opens after 3 failures within 60 seconds, preventing a flood of requests to the failing service. After 2 minutes, a single test request is allowed through. +If the payment API goes down, the circuit breaker opens after 3 failures within 60 seconds, preventing a flood of requests to the failing service. After 2 minutes, up to 5 probe requests are allowed through. If at least 80% succeed (4 out of 5), the circuit closes. ### Health Check with Monitoring From 414beddfeafc9803ae8bba8af41e991d7887e263 Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Sat, 21 Mar 2026 23:49:47 +0530 Subject: [PATCH 6/6] chore: bump version to 0.7.0 --- crates/taskito-async/Cargo.toml | 2 +- crates/taskito-core/Cargo.toml | 2 +- crates/taskito-python/Cargo.toml | 2 +- py_src/taskito/__init__.py | 2 +- pyproject.toml | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/taskito-async/Cargo.toml b/crates/taskito-async/Cargo.toml index 27abd60..a62f965 100644 --- a/crates/taskito-async/Cargo.toml +++ b/crates/taskito-async/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "taskito-async" -version = "0.6.0" +version = "0.7.0" edition = "2021" [dependencies] diff --git a/crates/taskito-core/Cargo.toml b/crates/taskito-core/Cargo.toml index 6ec526f..0a19fd8 100644 --- a/crates/taskito-core/Cargo.toml +++ b/crates/taskito-core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "taskito-core" -version = "0.6.0" +version = "0.7.0" edition = "2021" [features] diff --git a/crates/taskito-python/Cargo.toml b/crates/taskito-python/Cargo.toml index 189bd86..2656cca 100644 --- a/crates/taskito-python/Cargo.toml +++ b/crates/taskito-python/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "taskito-python" -version = "0.6.0" +version = "0.7.0" edition = "2021" [features] diff --git a/py_src/taskito/__init__.py b/py_src/taskito/__init__.py index d9d4704..8155081 100644 --- a/py_src/taskito/__init__.py +++ b/py_src/taskito/__init__.py @@ -99,4 +99,4 @@ __version__ = _get_version("taskito") except PackageNotFoundError: - __version__ = "0.6.0" + __version__ = "0.7.0" diff --git a/pyproject.toml b/pyproject.toml index 3fb2c78..2447886 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "maturin" [project] name = "taskito" -version = "0.6.0" +version = "0.7.0" description = "Rust-powered task queue for Python. No broker required." requires-python = ">=3.10" license = { file = "LICENSE" }