From 9757c0026c9dfe0c36947d4bccabc86cd6aee53a Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Fri, 8 May 2026 16:10:20 +0530 Subject: [PATCH 1/2] feat(core): bridge Rust logs to Python and release GIL in PyQueue::new pyo3-log forwards Rust log::* records into Python's logging module. Activated lazily via _init_rust_logging() so cold imports don't deadlock against r2d2 pool builders, and PyQueue::new releases the GIL during storage init so connection-pool worker threads can flush log records. --- crates/taskito-python/Cargo.toml | 1 + crates/taskito-python/src/lib.rs | 11 +++ crates/taskito-python/src/py_queue/mod.rs | 87 +++++++++++++---------- py_src/taskito/_taskito.pyi | 4 ++ 4 files changed, 64 insertions(+), 39 deletions(-) diff --git a/crates/taskito-python/Cargo.toml b/crates/taskito-python/Cargo.toml index 17b455b..ca9c4cb 100644 --- a/crates/taskito-python/Cargo.toml +++ b/crates/taskito-python/Cargo.toml @@ -28,4 +28,5 @@ serde_json = { workspace = true } serde = { workspace = true } base64 = "0.22" log = { workspace = true } +pyo3-log = "0.11" gethostname = "1.1.0" diff --git a/crates/taskito-python/src/lib.rs b/crates/taskito-python/src/lib.rs index c3b0b34..6fcea61 100644 --- a/crates/taskito-python/src/lib.rs +++ b/crates/taskito-python/src/lib.rs @@ -14,8 +14,19 @@ use py_config::PyTaskConfig; use py_job::PyJob; use py_queue::PyQueue; +/// Activate the Rust → Python logging bridge. +/// +/// Called explicitly from `taskito.log_config.configure()` rather than from +/// module init so that cold imports (which can run while a connection pool +/// is blocking the GIL on retries) don't trip pyo3-log's flush path. +#[pyfunction] +fn _init_rust_logging() { + let _ = pyo3_log::try_init(); +} + #[pymodule] fn _taskito(m: &Bound<'_, PyModule>) -> PyResult<()> { + m.add_function(wrap_pyfunction!(_init_rust_logging, m)?)?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/crates/taskito-python/src/py_queue/mod.rs b/crates/taskito-python/src/py_queue/mod.rs index aac4eff..124b085 100644 --- a/crates/taskito-python/src/py_queue/mod.rs +++ b/crates/taskito-python/src/py_queue/mod.rs @@ -63,6 +63,7 @@ impl PyQueue { #[pyo3(signature = (db_path=".taskito/taskito.db", workers=0, default_retry=3, default_timeout=300, default_priority=0, result_ttl=None, backend="sqlite", db_url=None, schema="taskito", pool_size=None, scheduler_poll_interval_ms=50, scheduler_reap_interval=100, scheduler_cleanup_interval=1200, namespace=None))] #[allow(clippy::too_many_arguments)] pub fn new( + py: Python<'_>, db_path: &str, workers: usize, default_retry: i32, @@ -78,49 +79,57 @@ impl PyQueue { scheduler_cleanup_interval: u32, namespace: Option, ) -> PyResult { - let storage = match backend { - "sqlite" => { - let s = SqliteStorage::new(db_path) - .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))?; - StorageBackend::Sqlite(s) - } - #[cfg(feature = "postgres")] - "postgres" | "postgresql" => { - let url = db_url.ok_or_else(|| { - pyo3::exceptions::PyValueError::new_err( - "db_url is required for postgres backend", + // Storage init blocks on connection-pool builders that may emit + // `log::*` records from worker threads. With the pyo3-log bridge + // active, those records need the GIL to deliver to Python — so we + // must release it here to avoid a deadlock. + let storage = py.allow_threads(|| -> PyResult { + match backend { + "sqlite" => { + let s = SqliteStorage::new(db_path) + .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))?; + Ok(StorageBackend::Sqlite(s)) + } + #[cfg(feature = "postgres")] + "postgres" | "postgresql" => { + let url = db_url.ok_or_else(|| { + pyo3::exceptions::PyValueError::new_err( + "db_url is required for postgres backend", + ) + })?; + let s = PostgresStorage::with_schema_and_pool_size( + url, + schema, + pool_size.unwrap_or(10), ) - })?; - let s = PostgresStorage::with_schema_and_pool_size( - url, - schema, - pool_size.unwrap_or(10), - ) - .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))?; - StorageBackend::Postgres(s) - } - #[cfg(feature = "redis")] - "redis" => { - let url = db_url.ok_or_else(|| { - pyo3::exceptions::PyValueError::new_err("db_url is required for redis backend") - })?; - let s = RedisStorage::new(url) .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))?; - StorageBackend::Redis(s) - } - _ => { - #[allow(unused_mut, clippy::useless_vec)] - let mut available = vec!["sqlite"]; - #[cfg(feature = "postgres")] - available.push("postgres"); + Ok(StorageBackend::Postgres(s)) + } #[cfg(feature = "redis")] - available.push("redis"); - return Err(pyo3::exceptions::PyValueError::new_err(format!( - "Unknown backend: '{backend}'. Available backends: {}.", - available.join(", ") - ))); + "redis" => { + let url = db_url.ok_or_else(|| { + pyo3::exceptions::PyValueError::new_err( + "db_url is required for redis backend", + ) + })?; + let s = RedisStorage::new(url) + .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))?; + Ok(StorageBackend::Redis(s)) + } + _ => { + #[allow(unused_mut, clippy::useless_vec)] + let mut available = vec!["sqlite"]; + #[cfg(feature = "postgres")] + available.push("postgres"); + #[cfg(feature = "redis")] + available.push("redis"); + Err(pyo3::exceptions::PyValueError::new_err(format!( + "Unknown backend: '{backend}'. Available backends: {}.", + available.join(", ") + ))) + } } - }; + })?; let num_workers = if workers == 0 { std::thread::available_parallelism() diff --git a/py_src/taskito/_taskito.pyi b/py_src/taskito/_taskito.pyi index cf445da..71c636d 100644 --- a/py_src/taskito/_taskito.pyi +++ b/py_src/taskito/_taskito.pyi @@ -385,3 +385,7 @@ class PyResultSender: task_name: str, wall_time_ns: int, ) -> None: ... + +def _init_rust_logging() -> None: + """Activate the Rust → Python `logging` bridge (idempotent).""" + ... From ae93265261565dd23e10bf579c3c8320d37f6a21 Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Fri, 8 May 2026 16:10:38 +0530 Subject: [PATCH 2/2] feat(logging): central taskito logger with AM/PM timestamps One configure() call attaches a single managed handler to the taskito Python logger and the four taskito_* loggers populated by pyo3-log. The default format keeps today's [ts] LEVEL message shape; the timestamp gains AM/PM via %I:%M:%S %p. Idempotent and thread-safe. Wired into the CLI entry and run_worker so every 'app runs' path uses the same sink. configure_logging is exported from the package for embedded users (Django/FastAPI) to call themselves. --- py_src/taskito/__init__.py | 2 + py_src/taskito/cli.py | 5 + py_src/taskito/log_config.py | 130 +++++++++++++++++++++++++ py_src/taskito/mixins/lifecycle.py | 8 +- tests/observability/test_log_config.py | 126 ++++++++++++++++++++++++ 5 files changed, 265 insertions(+), 6 deletions(-) create mode 100644 py_src/taskito/log_config.py create mode 100644 tests/observability/test_log_config.py diff --git a/py_src/taskito/__init__.py b/py_src/taskito/__init__.py index baa36d1..cbdf5fd 100644 --- a/py_src/taskito/__init__.py +++ b/py_src/taskito/__init__.py @@ -26,6 +26,7 @@ ) from taskito.inject import Inject from taskito.interception import InterceptionError, InterceptionReport +from taskito.log_config import configure as configure_logging from taskito.middleware import TaskMiddleware from taskito.proxies.no_proxy import NoProxy from taskito.result import JobResult @@ -80,6 +81,7 @@ "chain", "chord", "chunks", + "configure_logging", "current_job", "group", "starmap", diff --git a/py_src/taskito/cli.py b/py_src/taskito/cli.py index da90943..d0fd0fb 100644 --- a/py_src/taskito/cli.py +++ b/py_src/taskito/cli.py @@ -11,11 +11,16 @@ from taskito.app import Queue from taskito.dashboard import serve_dashboard +from taskito.log_config import configure as configure_logging from taskito.scaler import serve_scaler def main() -> None: """Parse CLI arguments and dispatch to the appropriate subcommand.""" + # Configure the central taskito logger before any subcommand runs so + # ``info``, ``dashboard``, ``scaler``, etc. all share the same sink. + configure_logging() + parser = argparse.ArgumentParser( prog="taskito", description="taskito — Rust-powered task queue for Python", diff --git a/py_src/taskito/log_config.py b/py_src/taskito/log_config.py new file mode 100644 index 0000000..60ec509 --- /dev/null +++ b/py_src/taskito/log_config.py @@ -0,0 +1,130 @@ +"""Central log configuration for taskito. + +A single entry point — :func:`configure` — owns the ``taskito`` Python logger +and the parallel logger names that PyO3-log uses for Rust crates +(``taskito_core``, ``taskito_python``, ``taskito_async``, ``taskito_workflows``). +Calling it more than once is a no-op when the resolved settings haven't +changed; passing different arguments swaps the managed handler in place +rather than stacking handlers. + +The Rust side is bridged into Python's :mod:`logging` by ``pyo3_log::try_init`` +inside the ``_taskito`` extension module init, so a single configure call +captures Rust ``log::*`` output too. +""" + +from __future__ import annotations + +import logging +import os +import sys +import threading +from typing import TextIO + +from taskito._taskito import _init_rust_logging + +__all__ = ["DEFAULT_DATEFMT", "DEFAULT_FORMAT", "configure"] + +DEFAULT_FORMAT = "[%(asctime)s] %(levelname)s %(message)s" +DEFAULT_DATEFMT = "%Y-%m-%d %I:%M:%S %p" + +# Logger names that taskito emits under. The Python logger is well-known; +# the Rust ones are derived from cargo crate names (``_`` form) by pyo3-log. +_PY_LOGGER = "taskito" +_RUST_LOGGERS = ( + "taskito_core", + "taskito_python", + "taskito_async", + "taskito_workflows", +) +_ALL_LOGGERS = (_PY_LOGGER, *_RUST_LOGGERS) + +# Sentinel attribute set on a handler we manage, so we never double-attach +# and we leave caller-installed handlers untouched. +_OWN_HANDLER_ATTR = "_taskito_managed" + +# Guards concurrent calls to ``configure`` from multiple threads (CLI startup +# vs. ``Queue.run_worker`` background thread). +_CONFIG_LOCK = threading.Lock() + +# Cached signature of the last successful configure() call, so identical +# repeat calls short-circuit instead of churning handlers. +_LAST_CONFIG: tuple[int, int] | None = None + + +def _resolve_level(level: int | str | None) -> int: + """Resolve a level argument or ``TASKITO_LOG_LEVEL`` env var to a logging int.""" + if level is None: + level = os.environ.get("TASKITO_LOG_LEVEL", "INFO") + if isinstance(level, str): + try: + return int(level) + except ValueError: + pass + resolved = logging.getLevelName(level.upper()) + if isinstance(resolved, int): + return resolved + return logging.INFO + return level + + +def _build_handler(stream: TextIO) -> logging.Handler: + handler = logging.StreamHandler(stream) + handler.setFormatter(logging.Formatter(DEFAULT_FORMAT, datefmt=DEFAULT_DATEFMT)) + setattr(handler, _OWN_HANDLER_ATTR, True) + return handler + + +def _detach_managed(logger: logging.Logger) -> None: + for existing in list(logger.handlers): + if getattr(existing, _OWN_HANDLER_ATTR, False): + logger.removeHandler(existing) + + +def configure( + level: int | str | None = None, + *, + stream: TextIO | None = None, +) -> logging.Logger: + """Configure the central ``taskito`` logger and its Rust counterparts. + + Idempotent: identical repeat calls are no-ops; argument changes swap the + managed handler in place rather than stacking handlers. Caller-installed + handlers on the same loggers are left intact. + + Args: + level: Logging level as an int (e.g. ``logging.DEBUG``) or string + (``"INFO"``). Falls back to ``TASKITO_LOG_LEVEL``, then ``INFO``. + stream: Where to write log lines. Defaults to ``sys.stderr``. + + Returns: + The configured ``taskito`` Python logger. + """ + global _LAST_CONFIG + + resolved_level = _resolve_level(level) + target_stream = stream if stream is not None else sys.stderr + signature = (resolved_level, id(target_stream)) + + with _CONFIG_LOCK: + if signature == _LAST_CONFIG: + return logging.getLogger(_PY_LOGGER) + + # Activate the Rust → Python log bridge once. Doing this here (rather + # than from `_taskito` module init) avoids a deadlock during cold + # imports where the GIL is held by a blocking connection-pool retry. + _init_rust_logging() + + handler = _build_handler(target_stream) + for name in _ALL_LOGGERS: + logger = logging.getLogger(name) + _detach_managed(logger) + logger.addHandler(handler) + logger.setLevel(resolved_level) + # Leave `propagate` alone. Embedded hosts (Django, FastAPI) that + # already have a root handler can set `propagate = False` on the + # `taskito` logger themselves to avoid duplicate lines; flipping + # it here would silently break pytest `caplog` and any other + # consumer that listens at the root. + + _LAST_CONFIG = signature + return logging.getLogger(_PY_LOGGER) diff --git a/py_src/taskito/mixins/lifecycle.py b/py_src/taskito/mixins/lifecycle.py index 298953f..9eb16fd 100644 --- a/py_src/taskito/mixins/lifecycle.py +++ b/py_src/taskito/mixins/lifecycle.py @@ -17,6 +17,7 @@ from taskito._taskito import PyQueue, PyTaskConfig from taskito.context import _set_queue_ref from taskito.events import EventType +from taskito.log_config import configure as configure_logging from taskito.resources.health import HealthChecker from taskito.resources.runtime import ResourceRuntime from taskito.testing import TestMode @@ -142,12 +143,7 @@ def run_worker( timezone=pc.get("timezone"), ) - if not logging.root.handlers: - logging.basicConfig( - level=logging.INFO, - format="[%(asctime)s] %(levelname)s %(message)s", - datefmt="%Y-%m-%d %H:%M:%S", - ) + configure_logging() worker_queues = queue_list or ["default"] self._print_banner(worker_queues) diff --git a/tests/observability/test_log_config.py b/tests/observability/test_log_config.py new file mode 100644 index 0000000..0140540 --- /dev/null +++ b/tests/observability/test_log_config.py @@ -0,0 +1,126 @@ +"""Tests for the central log configuration module.""" + +from __future__ import annotations + +import io +import logging +from collections.abc import Iterator + +import pytest + +from taskito import log_config +from taskito.log_config import ( + DEFAULT_DATEFMT, + DEFAULT_FORMAT, + configure, +) + +_MANAGED_LOGGERS = ( + "taskito", + "taskito_core", + "taskito_python", + "taskito_async", + "taskito_workflows", +) + + +@pytest.fixture(autouse=True) +def _reset_taskito_loggers() -> Iterator[None]: + """Strip any handlers/state on the taskito loggers between tests. + + The module keeps a process-wide cache so tests must isolate themselves + from each other (and from anything an earlier import may have set up). + """ + log_config._LAST_CONFIG = None + for name in _MANAGED_LOGGERS: + logger = logging.getLogger(name) + logger.handlers.clear() + logger.setLevel(logging.NOTSET) + logger.propagate = True + yield + log_config._LAST_CONFIG = None + for name in _MANAGED_LOGGERS: + logger = logging.getLogger(name) + logger.handlers.clear() + logger.setLevel(logging.NOTSET) + logger.propagate = True + + +def test_configure_attaches_handler_to_all_known_loggers() -> None: + stream = io.StringIO() + configure(level="DEBUG", stream=stream) + + for name in _MANAGED_LOGGERS: + logger = logging.getLogger(name) + managed = [h for h in logger.handlers if getattr(h, "_taskito_managed", False)] + assert len(managed) == 1, f"{name} should have exactly one managed handler" + assert logger.level == logging.DEBUG + + +def test_configure_is_idempotent_for_identical_calls() -> None: + stream = io.StringIO() + configure(level="INFO", stream=stream) + handler_before = logging.getLogger("taskito").handlers[0] + + configure(level="INFO", stream=stream) + handler_after = logging.getLogger("taskito").handlers[0] + + assert handler_before is handler_after, "no-op call must not rebuild the handler" + assert len(logging.getLogger("taskito").handlers) == 1 + + +def test_configure_swaps_handler_when_level_changes() -> None: + stream = io.StringIO() + configure(level="INFO", stream=stream) + first = logging.getLogger("taskito").handlers[0] + + configure(level="DEBUG", stream=stream) + second = logging.getLogger("taskito").handlers[0] + + assert first is not second + assert logging.getLogger("taskito").level == logging.DEBUG + # Still exactly one managed handler — the old one was detached. + assert len(logging.getLogger("taskito").handlers) == 1 + + +def test_configure_leaves_caller_handlers_alone() -> None: + stream = io.StringIO() + foreign = logging.StreamHandler(io.StringIO()) + logging.getLogger("taskito").addHandler(foreign) + + configure(level="INFO", stream=stream) + handlers = logging.getLogger("taskito").handlers + assert foreign in handlers, "caller-installed handlers must be preserved" + managed = [h for h in handlers if getattr(h, "_taskito_managed", False)] + assert len(managed) == 1 + + +def test_level_resolution_from_env(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("TASKITO_LOG_LEVEL", "WARNING") + configure() + assert logging.getLogger("taskito").level == logging.WARNING + + +def test_level_resolution_falls_back_to_info(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("TASKITO_LOG_LEVEL", raising=False) + configure() + assert logging.getLogger("taskito").level == logging.INFO + + +def test_default_format_emits_am_pm_timestamp() -> None: + stream = io.StringIO() + configure(level="INFO", stream=stream) + + logging.getLogger("taskito").info("hello") + output = stream.getvalue() + + assert "INFO" in output + assert "hello" in output + assert ("AM" in output) or ("PM" in output), "default datefmt must include AM/PM marker" + + +def test_format_constants_are_stable() -> None: + # Catch accidental drift in the default format strings — these are part of + # the public surface (downstream tools that grep logs may rely on them). + assert DEFAULT_FORMAT == "[%(asctime)s] %(levelname)s %(message)s" + assert "%p" in DEFAULT_DATEFMT