Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
572 changes: 0 additions & 572 deletions py_src/taskito/dashboard.py

This file was deleted.

30 changes: 30 additions & 0 deletions py_src/taskito/dashboard/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""Built-in web dashboard for taskito — zero extra dependencies.

Usage::

taskito dashboard --app myapp:queue
# → http://127.0.0.1:8080

Or programmatically::

from taskito.dashboard import serve_dashboard
serve_dashboard(queue, host="0.0.0.0", port=8080)

Serves the SPA at ``py_src/taskito/static/dashboard/`` (``index.html`` plus
hashed ``assets/`` produced by the Vite build) plus the JSON API under
``/api/*``. Requests for client-side routes fall back to ``index.html`` so
deep links work.
"""

from taskito.dashboard.handlers.scaler import build_scaler_response
from taskito.dashboard.server import _make_handler, serve_dashboard
from taskito.dashboard.static import StaticAssets, _content_type_for, _resolve_static_node

__all__ = [
"StaticAssets",
"_content_type_for",
"_make_handler",
"_resolve_static_node",
"build_scaler_response",
"serve_dashboard",
]
17 changes: 17 additions & 0 deletions py_src/taskito/dashboard/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
"""Internal exception sentinels raised by route handlers."""

from __future__ import annotations


class _BadRequest(Exception):
"""Raised by route handlers to signal a 400 response."""

def __init__(self, message: str) -> None:
self.message = message


class _NotFound(Exception):
"""Raised by route handlers to signal a 404 response."""

def __init__(self, message: str) -> None:
self.message = message
1 change: 1 addition & 0 deletions py_src/taskito/dashboard/handlers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Route handler functions, organized by feature area."""
16 changes: 16 additions & 0 deletions py_src/taskito/dashboard/handlers/_qs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""Query-string parsing helpers shared by dashboard handlers."""

from __future__ import annotations

from taskito.dashboard.errors import _BadRequest


def _parse_int_qs(qs: dict, key: str, default: int) -> int:
"""Parse a non-negative integer from query string, raising ``_BadRequest``."""
try:
val = int(qs.get(key, [str(default)])[0])
except (ValueError, IndexError):
raise _BadRequest(f"{key} must be an integer") from None
if val < 0:
raise _BadRequest(f"{key} must be non-negative")
return val
16 changes: 16 additions & 0 deletions py_src/taskito/dashboard/handlers/dead_letters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""Dead-letter route handlers."""

from __future__ import annotations

from typing import TYPE_CHECKING

from taskito.dashboard.handlers._qs import _parse_int_qs

if TYPE_CHECKING:
from taskito.app import Queue


def _handle_dead_letters(queue: Queue, qs: dict) -> list:
limit = _parse_int_qs(qs, "limit", 20)
offset = _parse_int_qs(qs, "offset", 0)
return queue.dead_letters(limit=limit, offset=offset)
53 changes: 53 additions & 0 deletions py_src/taskito/dashboard/handlers/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""Job-related route handlers."""

from __future__ import annotations

from typing import TYPE_CHECKING

from taskito.dashboard.errors import _NotFound
from taskito.dashboard.handlers._qs import _parse_int_qs

if TYPE_CHECKING:
from taskito.app import Queue


def _handle_list_jobs(queue: Queue, qs: dict) -> list[dict]:
status = qs.get("status", [None])[0]
q = qs.get("queue", [None])[0]
task = qs.get("task", [None])[0]
metadata_like = qs.get("metadata", [None])[0]
error_like = qs.get("error", [None])[0]
created_after = qs.get("created_after", [None])[0]
created_before = qs.get("created_before", [None])[0]
limit = _parse_int_qs(qs, "limit", 20)
offset = _parse_int_qs(qs, "offset", 0)

if any(x is not None for x in [metadata_like, error_like, created_after, created_before]):
ca = int(created_after) if created_after else None
cb = int(created_before) if created_before else None
jobs = queue.list_jobs_filtered(
status=status,
queue=q,
task_name=task,
metadata_like=metadata_like,
error_like=error_like,
created_after=ca,
created_before=cb,
limit=limit,
offset=offset,
)
else:
jobs = queue.list_jobs(status=status, queue=q, task_name=task, limit=limit, offset=offset)
return [j.to_dict() for j in jobs]


def _handle_get_job(queue: Queue, _qs: dict, job_id: str) -> dict:
job = queue.get_job(job_id)
if job is None:
raise _NotFound("Job not found")
return job.to_dict()


def _handle_replay_post(queue: Queue, job_id: str) -> dict:
result = queue.replay(job_id)
return {"replay_job_id": result.id}
18 changes: 18 additions & 0 deletions py_src/taskito/dashboard/handlers/logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
"""Log query route handlers."""

from __future__ import annotations

from typing import TYPE_CHECKING

from taskito.dashboard.handlers._qs import _parse_int_qs

if TYPE_CHECKING:
from taskito.app import Queue


def _handle_logs(queue: Queue, qs: dict) -> list:
task = qs.get("task", [None])[0]
level = qs.get("level", [None])[0]
since = _parse_int_qs(qs, "since", 3600)
limit = _parse_int_qs(qs, "limit", 100)
return queue.query_logs(task_name=task, level=level, since=since, limit=limit)
23 changes: 23 additions & 0 deletions py_src/taskito/dashboard/handlers/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""Metrics route handlers."""

from __future__ import annotations

from typing import TYPE_CHECKING

from taskito.dashboard.handlers._qs import _parse_int_qs

if TYPE_CHECKING:
from taskito.app import Queue


def _handle_metrics(queue: Queue, qs: dict) -> dict:
task = qs.get("task", [None])[0]
since = _parse_int_qs(qs, "since", 3600)
return queue.metrics(task_name=task, since=since)


def _handle_metrics_timeseries(queue: Queue, qs: dict) -> list:
task = qs.get("task", [None])[0]
since = _parse_int_qs(qs, "since", 3600)
bucket = _parse_int_qs(qs, "bucket", 60)
return queue.metrics_timeseries(task_name=task, since=since, bucket=bucket)
15 changes: 15 additions & 0 deletions py_src/taskito/dashboard/handlers/queues.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"""Per-queue stats route handlers."""

from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from taskito.app import Queue


def _handle_stats_queues(queue: Queue, qs: dict) -> dict:
q_name = qs.get("queue", [None])[0]
if q_name:
return queue.stats_by_queue(q_name)
return queue.stats_all_queues()
56 changes: 56 additions & 0 deletions py_src/taskito/dashboard/handlers/scaler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
"""KEDA scaler payload assembly."""

from __future__ import annotations

import logging
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
from taskito.app import Queue


logger = logging.getLogger("taskito.dashboard")


def build_scaler_response(
queue: Queue,
queue_name: str | None = None,
target_queue_depth: int = 10,
) -> dict[str, Any]:
"""Build KEDA-compatible scaler payload for a queue."""
stats = queue.stats()
depth = stats.get("pending", 0)
running = stats.get("running", 0)

worker_list = queue.workers()
live_workers = len(worker_list)
total_capacity = queue._workers

response: dict[str, Any] = {
"metricName": "taskito_queue_depth",
"metricValue": depth,
"isActive": depth > 0,
"liveWorkers": live_workers,
"totalCapacity": total_capacity,
"targetQueueDepth": target_queue_depth,
}

if total_capacity > 0:
response["workerUtilization"] = round(running / total_capacity, 3)

if queue_name:
q_stats = queue.stats_by_queue(queue_name)
response["metricValue"] = q_stats.get("pending", 0)
response["isActive"] = q_stats.get("pending", 0) > 0
response["metricName"] = f"taskito_queue_depth_{queue_name}"

try:
all_q = queue.stats_all_queues()
response["perQueue"] = {
name: {"pending": s.get("pending", 0), "running": s.get("running", 0)}
for name, s in all_q.items()
}
except Exception:
logger.warning("Failed to collect per-queue stats for scaler", exc_info=True)

return response
77 changes: 77 additions & 0 deletions py_src/taskito/dashboard/routes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
"""Routing tables for the dashboard HTTP server.

Each entry maps a path (or path pattern) to a callable that produces
JSON-serializable data. Handlers may raise
:class:`~taskito.dashboard.errors._BadRequest` (→ 400) or
:class:`~taskito.dashboard.errors._NotFound` (→ 404).
"""

from __future__ import annotations

import re
from typing import Any

from taskito.dashboard.handlers.dead_letters import _handle_dead_letters
from taskito.dashboard.handlers.jobs import (
_handle_get_job,
_handle_list_jobs,
_handle_replay_post,
)
from taskito.dashboard.handlers.logs import _handle_logs
from taskito.dashboard.handlers.metrics import _handle_metrics, _handle_metrics_timeseries
from taskito.dashboard.handlers.queues import _handle_stats_queues
from taskito.dashboard.handlers.scaler import build_scaler_response

# ── Exact-match GET routes: path → handler(queue, qs) → JSON data ──
GET_ROUTES: dict[str, Any] = {
"/api/stats": lambda q, qs: q.stats(),
"/api/jobs": _handle_list_jobs,
"/api/dead-letters": _handle_dead_letters,
"/api/metrics": _handle_metrics,
"/api/metrics/timeseries": _handle_metrics_timeseries,
"/api/logs": _handle_logs,
"/api/circuit-breakers": lambda q, qs: q.circuit_breakers(),
"/api/workers": lambda q, qs: q.workers(),
"/api/resources": lambda q, qs: q.resource_status(),
"/api/proxy-stats": lambda q, qs: q.proxy_stats(),
"/api/interception-stats": lambda q, qs: q.interception_stats(),
"/api/queues/paused": lambda q, qs: q.paused_queues(),
"/api/stats/queues": _handle_stats_queues,
"/api/scaler": lambda q, qs: build_scaler_response(q, queue_name=qs.get("queue", [None])[0]),
}

# ── Parameterized GET routes: regex → handler(queue, qs, captured_id) ──
# Order matters — more specific patterns first.
GET_PARAM_ROUTES: list[tuple[re.Pattern, Any]] = [
(re.compile(r"^/api/jobs/([^/]+)/errors$"), lambda q, qs, jid: q.job_errors(jid)),
(re.compile(r"^/api/jobs/([^/]+)/logs$"), lambda q, qs, jid: q.task_logs(jid)),
(
re.compile(r"^/api/jobs/([^/]+)/replay-history$"),
lambda q, qs, jid: q.replay_history(jid),
),
(re.compile(r"^/api/jobs/([^/]+)/dag$"), lambda q, qs, jid: q.job_dag(jid)),
(re.compile(r"^/api/jobs/([^/]+)$"), _handle_get_job),
]

# ── Exact-match POST routes: path → handler(queue) → JSON data ──
POST_ROUTES: dict[str, Any] = {
"/api/dead-letters/purge": lambda q: {"purged": q.purge_dead(0)},
}

# ── Parameterized POST routes: regex → handler(queue, captured_id) ──
POST_PARAM_ROUTES: list[tuple[re.Pattern, Any]] = [
(
re.compile(r"^/api/jobs/([^/]+)/cancel$"),
lambda q, jid: {"cancelled": q.cancel_job(jid)},
),
(re.compile(r"^/api/jobs/([^/]+)/replay$"), _handle_replay_post),
(
re.compile(r"^/api/dead-letters/([^/]+)/retry$"),
lambda q, did: {"new_job_id": q.retry_dead(did)},
),
(re.compile(r"^/api/queues/([^/]+)/pause$"), lambda q, n: (q.pause(n), {"paused": n})[1]),
(
re.compile(r"^/api/queues/([^/]+)/resume$"),
lambda q, n: (q.resume(n), {"resumed": n})[1],
),
]
Loading
Loading