Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/taskito-python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ serde_json = { workspace = true }
serde = { workspace = true }
base64 = "0.22"
log = { workspace = true }
pyo3-log = "0.11"
gethostname = "1.1.0"
11 changes: 11 additions & 0 deletions crates/taskito-python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,19 @@ use py_config::PyTaskConfig;
use py_job::PyJob;
use py_queue::PyQueue;

/// Activate the Rust → Python logging bridge.
///
/// Called explicitly from `taskito.log_config.configure()` rather than from
/// module init so that cold imports (which can run while a connection pool
/// is blocking the GIL on retries) don't trip pyo3-log's flush path.
#[pyfunction]
fn _init_rust_logging() {
let _ = pyo3_log::try_init();
}

#[pymodule]
fn _taskito(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_function(wrap_pyfunction!(_init_rust_logging, m)?)?;
m.add_class::<PyQueue>()?;
m.add_class::<PyJob>()?;
m.add_class::<PyTaskConfig>()?;
Expand Down
87 changes: 48 additions & 39 deletions crates/taskito-python/src/py_queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ impl PyQueue {
#[pyo3(signature = (db_path=".taskito/taskito.db", workers=0, default_retry=3, default_timeout=300, default_priority=0, result_ttl=None, backend="sqlite", db_url=None, schema="taskito", pool_size=None, scheduler_poll_interval_ms=50, scheduler_reap_interval=100, scheduler_cleanup_interval=1200, namespace=None))]
#[allow(clippy::too_many_arguments)]
pub fn new(
py: Python<'_>,
db_path: &str,
workers: usize,
default_retry: i32,
Expand All @@ -78,49 +79,57 @@ impl PyQueue {
scheduler_cleanup_interval: u32,
namespace: Option<String>,
) -> PyResult<Self> {
let storage = match backend {
"sqlite" => {
let s = SqliteStorage::new(db_path)
.map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))?;
StorageBackend::Sqlite(s)
}
#[cfg(feature = "postgres")]
"postgres" | "postgresql" => {
let url = db_url.ok_or_else(|| {
pyo3::exceptions::PyValueError::new_err(
"db_url is required for postgres backend",
// Storage init blocks on connection-pool builders that may emit
// `log::*` records from worker threads. With the pyo3-log bridge
// active, those records need the GIL to deliver to Python — so we
// must release it here to avoid a deadlock.
let storage = py.allow_threads(|| -> PyResult<StorageBackend> {
match backend {
"sqlite" => {
let s = SqliteStorage::new(db_path)
.map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))?;
Ok(StorageBackend::Sqlite(s))
}
#[cfg(feature = "postgres")]
"postgres" | "postgresql" => {
let url = db_url.ok_or_else(|| {
pyo3::exceptions::PyValueError::new_err(
"db_url is required for postgres backend",
)
})?;
let s = PostgresStorage::with_schema_and_pool_size(
url,
schema,
pool_size.unwrap_or(10),
)
})?;
let s = PostgresStorage::with_schema_and_pool_size(
url,
schema,
pool_size.unwrap_or(10),
)
.map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))?;
StorageBackend::Postgres(s)
}
#[cfg(feature = "redis")]
"redis" => {
let url = db_url.ok_or_else(|| {
pyo3::exceptions::PyValueError::new_err("db_url is required for redis backend")
})?;
let s = RedisStorage::new(url)
.map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))?;
StorageBackend::Redis(s)
}
_ => {
#[allow(unused_mut, clippy::useless_vec)]
let mut available = vec!["sqlite"];
#[cfg(feature = "postgres")]
available.push("postgres");
Ok(StorageBackend::Postgres(s))
}
#[cfg(feature = "redis")]
available.push("redis");
return Err(pyo3::exceptions::PyValueError::new_err(format!(
"Unknown backend: '{backend}'. Available backends: {}.",
available.join(", ")
)));
"redis" => {
let url = db_url.ok_or_else(|| {
pyo3::exceptions::PyValueError::new_err(
"db_url is required for redis backend",
)
})?;
let s = RedisStorage::new(url)
.map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))?;
Ok(StorageBackend::Redis(s))
}
_ => {
#[allow(unused_mut, clippy::useless_vec)]
let mut available = vec!["sqlite"];
#[cfg(feature = "postgres")]
available.push("postgres");
#[cfg(feature = "redis")]
available.push("redis");
Err(pyo3::exceptions::PyValueError::new_err(format!(
"Unknown backend: '{backend}'. Available backends: {}.",
available.join(", ")
)))
}
}
};
})?;

let num_workers = if workers == 0 {
std::thread::available_parallelism()
Expand Down
2 changes: 2 additions & 0 deletions py_src/taskito/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
)
from taskito.inject import Inject
from taskito.interception import InterceptionError, InterceptionReport
from taskito.log_config import configure as configure_logging
from taskito.middleware import TaskMiddleware
from taskito.proxies.no_proxy import NoProxy
from taskito.result import JobResult
Expand Down Expand Up @@ -80,6 +81,7 @@
"chain",
"chord",
"chunks",
"configure_logging",
"current_job",
"group",
"starmap",
Expand Down
4 changes: 4 additions & 0 deletions py_src/taskito/_taskito.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -385,3 +385,7 @@ class PyResultSender:
task_name: str,
wall_time_ns: int,
) -> None: ...

def _init_rust_logging() -> None:
"""Activate the Rust → Python `logging` bridge (idempotent)."""
...
5 changes: 5 additions & 0 deletions py_src/taskito/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,16 @@

from taskito.app import Queue
from taskito.dashboard import serve_dashboard
from taskito.log_config import configure as configure_logging
from taskito.scaler import serve_scaler


def main() -> None:
"""Parse CLI arguments and dispatch to the appropriate subcommand."""
# Configure the central taskito logger before any subcommand runs so
# ``info``, ``dashboard``, ``scaler``, etc. all share the same sink.
configure_logging()

parser = argparse.ArgumentParser(
prog="taskito",
description="taskito — Rust-powered task queue for Python",
Expand Down
130 changes: 130 additions & 0 deletions py_src/taskito/log_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
"""Central log configuration for taskito.

A single entry point — :func:`configure` — owns the ``taskito`` Python logger
and the parallel logger names that PyO3-log uses for Rust crates
(``taskito_core``, ``taskito_python``, ``taskito_async``, ``taskito_workflows``).
Calling it more than once is a no-op when the resolved settings haven't
changed; passing different arguments swaps the managed handler in place
rather than stacking handlers.

The Rust side is bridged into Python's :mod:`logging` by ``pyo3_log::try_init``
inside the ``_taskito`` extension module init, so a single configure call
captures Rust ``log::*`` output too.
"""

from __future__ import annotations

import logging
import os
import sys
import threading
from typing import TextIO

from taskito._taskito import _init_rust_logging

__all__ = ["DEFAULT_DATEFMT", "DEFAULT_FORMAT", "configure"]

DEFAULT_FORMAT = "[%(asctime)s] %(levelname)s %(message)s"
DEFAULT_DATEFMT = "%Y-%m-%d %I:%M:%S %p"

# Logger names that taskito emits under. The Python logger is well-known;
# the Rust ones are derived from cargo crate names (``_`` form) by pyo3-log.
_PY_LOGGER = "taskito"
_RUST_LOGGERS = (
"taskito_core",
"taskito_python",
"taskito_async",
"taskito_workflows",
)
_ALL_LOGGERS = (_PY_LOGGER, *_RUST_LOGGERS)

# Sentinel attribute set on a handler we manage, so we never double-attach
# and we leave caller-installed handlers untouched.
_OWN_HANDLER_ATTR = "_taskito_managed"

# Guards concurrent calls to ``configure`` from multiple threads (CLI startup
# vs. ``Queue.run_worker`` background thread).
_CONFIG_LOCK = threading.Lock()

# Cached signature of the last successful configure() call, so identical
# repeat calls short-circuit instead of churning handlers.
_LAST_CONFIG: tuple[int, int] | None = None


def _resolve_level(level: int | str | None) -> int:
"""Resolve a level argument or ``TASKITO_LOG_LEVEL`` env var to a logging int."""
if level is None:
level = os.environ.get("TASKITO_LOG_LEVEL", "INFO")
if isinstance(level, str):
try:
return int(level)
except ValueError:
pass
resolved = logging.getLevelName(level.upper())
if isinstance(resolved, int):
return resolved
return logging.INFO
return level


def _build_handler(stream: TextIO) -> logging.Handler:
handler = logging.StreamHandler(stream)
handler.setFormatter(logging.Formatter(DEFAULT_FORMAT, datefmt=DEFAULT_DATEFMT))
setattr(handler, _OWN_HANDLER_ATTR, True)
return handler


def _detach_managed(logger: logging.Logger) -> None:
for existing in list(logger.handlers):
if getattr(existing, _OWN_HANDLER_ATTR, False):
logger.removeHandler(existing)


def configure(
level: int | str | None = None,
*,
stream: TextIO | None = None,
) -> logging.Logger:
"""Configure the central ``taskito`` logger and its Rust counterparts.

Idempotent: identical repeat calls are no-ops; argument changes swap the
managed handler in place rather than stacking handlers. Caller-installed
handlers on the same loggers are left intact.

Args:
level: Logging level as an int (e.g. ``logging.DEBUG``) or string
(``"INFO"``). Falls back to ``TASKITO_LOG_LEVEL``, then ``INFO``.
stream: Where to write log lines. Defaults to ``sys.stderr``.

Returns:
The configured ``taskito`` Python logger.
"""
global _LAST_CONFIG

resolved_level = _resolve_level(level)
target_stream = stream if stream is not None else sys.stderr
signature = (resolved_level, id(target_stream))

with _CONFIG_LOCK:
if signature == _LAST_CONFIG:
return logging.getLogger(_PY_LOGGER)

# Activate the Rust → Python log bridge once. Doing this here (rather
# than from `_taskito` module init) avoids a deadlock during cold
# imports where the GIL is held by a blocking connection-pool retry.
_init_rust_logging()

handler = _build_handler(target_stream)
for name in _ALL_LOGGERS:
logger = logging.getLogger(name)
_detach_managed(logger)
logger.addHandler(handler)
logger.setLevel(resolved_level)
# Leave `propagate` alone. Embedded hosts (Django, FastAPI) that
# already have a root handler can set `propagate = False` on the
# `taskito` logger themselves to avoid duplicate lines; flipping
# it here would silently break pytest `caplog` and any other
# consumer that listens at the root.

_LAST_CONFIG = signature
return logging.getLogger(_PY_LOGGER)
8 changes: 2 additions & 6 deletions py_src/taskito/mixins/lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from taskito._taskito import PyQueue, PyTaskConfig
from taskito.context import _set_queue_ref
from taskito.events import EventType
from taskito.log_config import configure as configure_logging
from taskito.resources.health import HealthChecker
from taskito.resources.runtime import ResourceRuntime
from taskito.testing import TestMode
Expand Down Expand Up @@ -142,12 +143,7 @@ def run_worker(
timezone=pc.get("timezone"),
)

if not logging.root.handlers:
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s] %(levelname)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
configure_logging()

worker_queues = queue_list or ["default"]
self._print_banner(worker_queues)
Expand Down
Loading
Loading