diff --git a/.env.example b/.env.example index f496d91..cfb3cb6 100644 --- a/.env.example +++ b/.env.example @@ -13,6 +13,14 @@ BAMBUDDY_PRINTERS_ENDPOINT=/api/v1/printers/ BAMBUDDY_PRINTER_STATUS_ENDPOINT_TEMPLATE=/api/v1/printers/{printer_id}/status PRINTERPRINTER_POLL_INTERVAL_SECONDS=5 +PRINTERPRINTER_LABEL_WAIT_SECONDS=60 +PRINTERPRINTER_LABEL_WAIT_POLL_SECONDS=5 +PRINTERPRINTER_PENDING_LABEL_MAX_AGE_SECONDS=900 +PRINTERPRINTER_ENV_FILE_PATH=.env +PRINTERPRINTER_SERVICE_NAME=printerprinter +PRINTERPRINTER_INSTALL_DIR=/opt/printerprinter +PRINTERPRINTER_UPDATE_BRANCH=main +PRINTERPRINTER_VENV_PATH=/opt/printerprinter/venv # Bambuddy numeric IDs (not IP/serial). Leave blank to include all. PRINTERPRINTER_MONITORED_PRINTER_IDS=11,13,16 # Optional selectors: Bambuddy ID, printer name, serial number, or IP address. diff --git a/README.md b/README.md index 032ffd4..8ed7e5b 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,13 @@ Latest label preview (DK1202 / 62x100): - `POST /admin/poll-once` - `GET /admin/printers` - `GET /admin/events` +- `GET /admin/ui` +- `GET /admin` +- `GET /admin/config` +- `POST /admin/config` +- `POST /admin/actions/restart` +- `POST /admin/actions/update` +- `GET /admin/label-preview/{event_id}.png` - `POST /admin/print-event/{event_id}` ## Configuration @@ -49,6 +56,14 @@ Copy `.env.example` to `.env` and update values. - `PRINTERPRINTER_LOG_LEVEL` (default `INFO`) - `PRINTERPRINTER_DB_PATH` (default `./data/printerprinter.sqlite3`) - `PRINTERPRINTER_POLL_INTERVAL_SECONDS` (default `5`) +- `PRINTERPRINTER_LABEL_WAIT_SECONDS` (default `60`; wait window for missing duration/filament before deferring print) +- `PRINTERPRINTER_LABEL_WAIT_POLL_SECONDS` (default `5`; refresh interval during wait window) +- `PRINTERPRINTER_PENDING_LABEL_MAX_AGE_SECONDS` (default `900`; only auto-print deferred events younger than this) +- `PRINTERPRINTER_ENV_FILE_PATH` (default `.env`; config file used by admin UI save flow) +- `PRINTERPRINTER_SERVICE_NAME` (default `printerprinter`; systemd service name for restart/update actions) +- `PRINTERPRINTER_INSTALL_DIR` (default `/opt/printerprinter`; repository path used by update action) +- `PRINTERPRINTER_UPDATE_BRANCH` (default `main`; branch used by update action) +- `PRINTERPRINTER_VENV_PATH` (default `/opt/printerprinter/venv`; virtualenv used for update install) - `PRINTERPRINTER_MONITORED_PRINTER_IDS` (comma-separated Bambuddy numeric IDs, empty means all) - `PRINTERPRINTER_MONITORED_PRINTER_IDENTIFIERS` (comma-separated IDs, names, serials, or IPs to auto-resolve) diff --git a/src/printerprinter/admin_ui.py b/src/printerprinter/admin_ui.py new file mode 100644 index 0000000..44ca8d3 --- /dev/null +++ b/src/printerprinter/admin_ui.py @@ -0,0 +1,472 @@ +from __future__ import annotations + + +def render_admin_ui_html() -> str: + return """ + + + + + PrinterPrinter Admin + + + +
+
+

PrinterPrinter Admin

+

Configure settings, reprint labels, restart service, and run updates.

+
+ +
+
+

Instance Controls

+
+ + + + +
+
+
+ +
+

Configuration

+
+
+ + +
+
+ + +
+
+
+
+ + +
+
+ + +
+
+
+
+ + +
+
+ + +
+
+
+
+ + +
+
+ + +
+
+
+ +
+
+
+ +
+

Recent Labels

+
+
+ + + + + + + + + + + + + +
IDPrinterFileStartDurationFilamentActions
+
+ \"Label +
+
+
+ + + + +""" diff --git a/src/printerprinter/config.py b/src/printerprinter/config.py index a0cc159..22bccee 100644 --- a/src/printerprinter/config.py +++ b/src/printerprinter/config.py @@ -33,6 +33,17 @@ class Settings(BaseSettings): alias="PRINTERPRINTER_MONITORED_PRINTER_IDENTIFIERS", ) poll_interval_seconds: float = Field(default=5.0, alias="PRINTERPRINTER_POLL_INTERVAL_SECONDS") + label_wait_seconds: float = Field(default=60.0, alias="PRINTERPRINTER_LABEL_WAIT_SECONDS") + label_wait_poll_seconds: float = Field(default=5.0, alias="PRINTERPRINTER_LABEL_WAIT_POLL_SECONDS") + pending_label_max_age_seconds: float = Field( + default=900.0, + alias="PRINTERPRINTER_PENDING_LABEL_MAX_AGE_SECONDS", + ) + env_file_path: str = Field(default=".env", alias="PRINTERPRINTER_ENV_FILE_PATH") + service_name: str = Field(default="printerprinter", alias="PRINTERPRINTER_SERVICE_NAME") + install_dir: str = Field(default="/opt/printerprinter", alias="PRINTERPRINTER_INSTALL_DIR") + update_branch: str = Field(default="main", alias="PRINTERPRINTER_UPDATE_BRANCH") + venv_path: str = Field(default="/opt/printerprinter/venv", alias="PRINTERPRINTER_VENV_PATH") brother_enabled: bool = Field(default=True, alias="BROTHER_ENABLED") brother_model: str = Field(default="QL-820NWB", alias="BROTHER_MODEL") diff --git a/src/printerprinter/labeling.py b/src/printerprinter/labeling.py index 912ec7c..6ed8470 100644 --- a/src/printerprinter/labeling.py +++ b/src/printerprinter/labeling.py @@ -1,6 +1,6 @@ from __future__ import annotations -from datetime import datetime +from datetime import UTC, datetime from brother_ql.devicedependent import label_type_specs from PIL import Image, ImageDraw, ImageFont @@ -55,6 +55,11 @@ def _format_datetime(value: object | None) -> str: except ValueError: return text + # Treat naive timestamps as UTC, then display in the system local timezone. + if dt.tzinfo is None: + dt = dt.replace(tzinfo=UTC) + dt = dt.astimezone() + # Use a compact friendly format that still includes month/day and time. # Example: Wed May 28, 10:05 AM return dt.strftime("%a %b %d, %I:%M %p") diff --git a/src/printerprinter/main.py b/src/printerprinter/main.py index 6df5dfe..dc6bf00 100644 --- a/src/printerprinter/main.py +++ b/src/printerprinter/main.py @@ -1,21 +1,29 @@ from __future__ import annotations import asyncio +import io import logging import re +import shlex from contextlib import asynccontextmanager from dataclasses import asdict from datetime import UTC, datetime, timedelta +from pathlib import Path from typing import AsyncIterator from fastapi import FastAPI, HTTPException +from fastapi.responses import HTMLResponse, Response +from pydantic import BaseModel, Field +from printerprinter.admin_ui import render_admin_ui_html from printerprinter.bambuddy_client import BambuddyClient from printerprinter.config import get_settings +from printerprinter.labeling import render_label_image from printerprinter.logging_config import configure_logging from printerprinter.printing import BrotherPrintService from printerprinter.storage import ( get_print_start_event, + has_label_job_attempt, init_db, list_recent_print_start_events, record_label_job_attempt, @@ -27,6 +35,115 @@ LOGGER = logging.getLogger(__name__) +EDITABLE_CONFIG_KEYS: tuple[str, ...] = ( + "BAMBUDDY_BASE_URL", + "BAMBUDDY_API_TOKEN", + "BAMBUDDY_TIMEOUT_SECONDS", + "BAMBUDDY_AUTH_MODE", + "BAMBUDDY_AUTH_HEADER_NAME", + "BAMBUDDY_JOBS_ENDPOINT", + "BAMBUDDY_PRINTERS_ENDPOINT", + "BAMBUDDY_PRINTER_STATUS_ENDPOINT_TEMPLATE", + "PRINTERPRINTER_MONITORED_PRINTER_IDS", + "PRINTERPRINTER_MONITORED_PRINTER_IDENTIFIERS", + "PRINTERPRINTER_POLL_INTERVAL_SECONDS", + "PRINTERPRINTER_LABEL_WAIT_SECONDS", + "PRINTERPRINTER_LABEL_WAIT_POLL_SECONDS", + "PRINTERPRINTER_PENDING_LABEL_MAX_AGE_SECONDS", + "BROTHER_ENABLED", + "BROTHER_MODEL", + "BROTHER_PRINTER_URI", + "BROTHER_LABEL_SIZE", + "BROTHER_CUT", + "SHOW_PRICE_ON_LABEL", + "FILAMENT_PRICE_PER_GRAM", +) + + +class ConfigUpdateRequest(BaseModel): + values: dict[str, str] = Field(default_factory=dict) + + +def _resolve_env_file_path() -> Path: + settings = get_settings() + env_path = Path(settings.env_file_path) + if env_path.is_absolute(): + return env_path + cwd_candidate = Path.cwd() / env_path + if cwd_candidate.exists(): + return cwd_candidate + return Path(settings.install_dir) / env_path + + +def _read_env_lines(env_path: Path) -> list[str]: + if not env_path.exists(): + return [] + return env_path.read_text(encoding="utf-8").splitlines() + + +def _parse_env_map(lines: list[str]) -> dict[str, str]: + values: dict[str, str] = {} + for line in lines: + stripped = line.strip() + if not stripped or stripped.startswith("#") or "=" not in stripped: + continue + key, raw_value = stripped.split("=", 1) + values[key.strip()] = raw_value.strip() + return values + + +def _write_env_updates(env_path: Path, updates: dict[str, str]) -> None: + env_path.parent.mkdir(parents=True, exist_ok=True) + lines = _read_env_lines(env_path) + touched_keys: set[str] = set() + output_lines: list[str] = [] + + for line in lines: + stripped = line.strip() + if not stripped or stripped.startswith("#") or "=" not in stripped: + output_lines.append(line) + continue + + key, _ = stripped.split("=", 1) + normalized_key = key.strip() + if normalized_key in updates: + output_lines.append(f"{normalized_key}={updates[normalized_key]}") + touched_keys.add(normalized_key) + else: + output_lines.append(line) + + missing_keys = [key for key in updates if key not in touched_keys] + if missing_keys and output_lines and output_lines[-1].strip(): + output_lines.append("") + for key in missing_keys: + output_lines.append(f"{key}={updates[key]}") + + content = "\n".join(output_lines).rstrip() + "\n" + env_path.write_text(content, encoding="utf-8") + + +async def _run_exec_command(command: list[str], cwd: str | None = None) -> tuple[int, str, str]: + process = await asyncio.create_subprocess_exec( + *command, + cwd=cwd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, stderr = await process.communicate() + return process.returncode, stdout.decode("utf-8", errors="replace"), stderr.decode("utf-8", errors="replace") + + +def _shell_join(command: list[str]) -> str: + return " ".join(shlex.quote(part) for part in command) + + +def _tail_text(value: str, max_lines: int = 20) -> str: + lines = [line for line in value.splitlines() if line.strip()] + if len(lines) <= max_lines: + return "\n".join(lines) + return "\n".join(lines[-max_lines:]) + + def _exc_message(exc: Exception) -> str: detail = str(exc).strip() if detail: @@ -249,6 +366,48 @@ def _normalize_name(value: object | None) -> str: return re.sub(r"\s+", " ", text).strip() +def _name_tokens(value: object | None) -> set[str]: + normalized = _normalize_name(value) + if not normalized: + return set() + return {token for token in normalized.split(" ") if token} + + +def _names_compatible(expected: object | None, candidate: object | None) -> bool: + expected_normalized = _normalize_name(expected) + candidate_normalized = _normalize_name(candidate) + + if not expected_normalized and not candidate_normalized: + return True + if not expected_normalized or not candidate_normalized: + return False + if expected_normalized == candidate_normalized: + return True + + expected_tokens = _name_tokens(expected) + candidate_tokens = _name_tokens(candidate) + + if not expected_tokens or not candidate_tokens: + return False + + if expected_tokens == candidate_tokens: + return True + + ignored_suffix_tokens = {"gcode", "3mf", "stl"} + expected_core = expected_tokens - ignored_suffix_tokens + candidate_core = candidate_tokens - ignored_suffix_tokens + if not expected_core or not candidate_core: + return False + + smaller, larger = ( + (expected_core, candidate_core) + if len(expected_core) <= len(candidate_core) + else (candidate_core, expected_core) + ) + # Accept subset matches when at least three meaningful words align. + return len(smaller) >= 3 and smaller.issubset(larger) + + def _job_enrichment_score(job: object) -> tuple[int, int]: duration = _coerce_int(getattr(job, "est_duration_sec", None)) filament = _coerce_float(getattr(job, "filament_estimated_g", None)) @@ -289,7 +448,7 @@ def _pick_running_job_fields( continue job_file_name = _normalize_name(getattr(job, "file_name", None)) - if normalized_file_name and job_file_name and job_file_name != normalized_file_name: + if normalized_file_name and job_file_name and not _names_compatible(normalized_file_name, job_file_name): continue candidates.append(job) @@ -367,6 +526,23 @@ def _event_needs_backfill(event: dict[str, object]) -> bool: ) +def _parse_created_at(value: object | None) -> datetime | None: + if value is None: + return None + text = str(value).strip() + if not text: + return None + + normalized = text.replace(" ", "T").replace("Z", "+00:00") + try: + dt = datetime.fromisoformat(normalized) + except ValueError: + return None + if dt.tzinfo is None: + dt = dt.replace(tzinfo=UTC) + return dt + + def _job_matches_event(job, event: dict[str, object]) -> bool: job_printer_id = _normalize_name(getattr(job, "printer_id", None)) event_printer_id = _normalize_name(event.get("printer_id")) @@ -376,7 +552,7 @@ def _job_matches_event(job, event: dict[str, object]) -> bool: event_file_name = _normalize_name(event.get("file_name")) job_file_name = _normalize_name(getattr(job, "file_name", None)) if event_file_name: - return event_file_name == job_file_name + return _names_compatible(event_file_name, job_file_name) if job_file_name: return False @@ -436,6 +612,86 @@ async def reconcile_recent_events(client: BambuddyClient, db_path: str) -> int: return updates +async def _attempt_print_for_event( + db_path: str, + print_service: BrotherPrintService, + event: dict[str, object], +) -> bool: + print_result = await asyncio.to_thread(print_service.print_event, event) + status = "printed" if print_result.ok else "failed" + record_label_job_attempt( + db_path, + event_id=int(event["id"]), + status=status, + attempts=1, + error_message=print_result.error, + ) + return print_result.ok + + +async def _wait_for_event_data( + client: BambuddyClient, + db_path: str, + *, + event_id: int, + timeout_seconds: float, + poll_interval_seconds: float, +) -> dict[str, object] | None: + event = get_print_start_event(db_path, event_id) + if event is None: + return None + if not _event_needs_backfill(event): + return event + + timeout = max(0.0, float(timeout_seconds)) + interval = max(1.0, float(poll_interval_seconds)) + if timeout == 0: + return event + + deadline = asyncio.get_running_loop().time() + timeout + while asyncio.get_running_loop().time() < deadline: + await asyncio.sleep(interval) + await reconcile_recent_events(client, db_path) + event = get_print_start_event(db_path, event_id) + if event is None: + return None + if not _event_needs_backfill(event): + return event + + return event + + +async def process_pending_labels( + db_path: str, + print_service: BrotherPrintService, + *, + max_age_seconds: float, + limit: int = 50, +) -> int: + recent_events = list_recent_print_start_events(db_path, limit=limit) + now = datetime.now(UTC) + attempts = 0 + + for event in sorted(recent_events, key=lambda item: int(item["id"])): + event_id = int(event["id"]) + if has_label_job_attempt(db_path, event_id): + continue + + if _event_needs_backfill(event): + continue + + created_at = _parse_created_at(event.get("created_at")) + if created_at is None: + continue + if (now - created_at).total_seconds() > max(0.0, float(max_age_seconds)): + continue + + await _attempt_print_for_event(db_path, print_service, event) + attempts += 1 + + return attempts + + def _resolve_allowed_printer_ids( printers: list, monitored_ids: set[str], @@ -467,11 +723,15 @@ async def run_poll_iteration( monitored_identifiers: set[str], last_states: dict[str, str], print_service: BrotherPrintService, + label_wait_seconds: float, + label_wait_poll_seconds: float, ) -> dict[str, int]: printers = await client.list_printers() allowed_ids = _resolve_allowed_printer_ids(printers, monitored_ids, monitored_identifiers) seen = 0 inserted = 0 + printed = 0 + deferred = 0 for printer in printers: if allowed_ids and printer.printer_id not in allowed_ids: @@ -513,17 +773,31 @@ async def run_poll_iteration( ) if result["inserted"]: inserted += 1 - print_result = await asyncio.to_thread(print_service.print_event, result["event"]) - status = "printed" if print_result.ok else "failed" - record_label_job_attempt( - db_path, - event_id=int(result["event"]["id"]), - status=status, - attempts=1, - error_message=print_result.error, - ) + event_row: dict[str, object] | None = result["event"] + if _event_needs_backfill(event_row): + deferred += 1 + event_row = await _wait_for_event_data( + client, + db_path, + event_id=int(result["event"]["id"]), + timeout_seconds=label_wait_seconds, + poll_interval_seconds=label_wait_poll_seconds, + ) + + if event_row is None: + continue + + if _event_needs_backfill(event_row): + LOGGER.info( + "label deferred: event_id=%s waiting for duration/filament", + event_row.get("id"), + ) + continue + + if await _attempt_print_for_event(db_path, print_service, event_row): + printed += 1 - return {"seen": seen, "inserted": inserted} + return {"seen": seen, "inserted": inserted, "printed": printed, "deferred": deferred} async def poller_loop( @@ -534,6 +808,9 @@ async def poller_loop( monitored_identifiers: set[str], last_states: dict[str, str], print_service: BrotherPrintService, + label_wait_seconds: float, + label_wait_poll_seconds: float, + pending_label_max_age_seconds: float, ) -> None: while True: try: @@ -544,11 +821,26 @@ async def poller_loop( monitored_identifiers, last_states, print_service, + label_wait_seconds, + label_wait_poll_seconds, ) reconciled = await reconcile_recent_events(client, db_path) - LOGGER.info("poll iteration complete: seen=%s inserted=%s", outcome["seen"], outcome["inserted"]) + pending_attempts = await process_pending_labels( + db_path, + print_service, + max_age_seconds=pending_label_max_age_seconds, + ) + LOGGER.info( + "poll iteration complete: seen=%s inserted=%s printed=%s deferred=%s", + outcome["seen"], + outcome["inserted"], + outcome.get("printed", 0), + outcome.get("deferred", 0), + ) if reconciled: LOGGER.info("event reconciliation complete: updated=%s", reconciled) + if pending_attempts: + LOGGER.info("pending label processing complete: attempted=%s", pending_attempts) except Exception as exc: # noqa: BLE001 LOGGER.warning("poll iteration failed: %s", _exc_message(exc)) await asyncio.sleep(interval_seconds) @@ -595,6 +887,9 @@ async def lifespan(_: FastAPI) -> AsyncIterator[None]: monitored_identifiers, last_states, print_service, + settings.label_wait_seconds, + settings.label_wait_poll_seconds, + settings.pending_label_max_age_seconds, ), name="bambuddy-poller", ) @@ -664,6 +959,8 @@ async def admin_poll_once() -> dict[str, int]: monitored_identifiers, state_cache, print_service, + settings.label_wait_seconds, + settings.label_wait_poll_seconds, ) except Exception as exc: # noqa: BLE001 msg = _exc_message(exc) @@ -703,6 +1000,129 @@ async def admin_events(limit: int = 50) -> dict[str, object]: return {"count": len(items), "items": items} +@app.get("/admin/ui", response_class=HTMLResponse) +async def admin_ui() -> HTMLResponse: + return HTMLResponse(content=render_admin_ui_html()) + + +@app.get("/admin", response_class=HTMLResponse) +async def admin_ui_shortcut() -> HTMLResponse: + return HTMLResponse(content=render_admin_ui_html()) + + +@app.get("/admin/config") +async def admin_get_config() -> dict[str, object]: + env_path = _resolve_env_file_path() + lines = _read_env_lines(env_path) + values = _parse_env_map(lines) + filtered_values = {key: values.get(key, "") for key in EDITABLE_CONFIG_KEYS} + missing = [key for key, value in filtered_values.items() if not value] + return { + "env_file_path": str(env_path), + "editable_keys": list(EDITABLE_CONFIG_KEYS), + "values": filtered_values, + "missing": missing, + } + + +@app.post("/admin/config") +async def admin_update_config(payload: ConfigUpdateRequest) -> dict[str, object]: + unknown_keys = [key for key in payload.values if key not in EDITABLE_CONFIG_KEYS] + if unknown_keys: + raise HTTPException(status_code=400, detail=f"Unsupported config keys: {', '.join(unknown_keys)}") + + updates = {key: str(value).strip() for key, value in payload.values.items()} + env_path = _resolve_env_file_path() + _write_env_updates(env_path, updates) + get_settings.cache_clear() + return { + "ok": True, + "updated": sorted(updates.keys()), + "env_file_path": str(env_path), + "detail": "Saved. Restart service to apply runtime polling/printing changes.", + } + + +@app.post("/admin/actions/restart") +async def admin_action_restart() -> dict[str, object]: + settings = get_settings() + command = ["systemctl", "restart", settings.service_name] + code, stdout, stderr = await _run_exec_command(command) + if code != 0: + raise HTTPException( + status_code=500, + detail=f"Command failed ({_shell_join(command)}): {_tail_text(stderr) or _tail_text(stdout)}", + ) + return { + "ok": True, + "command": _shell_join(command), + "detail": f"Service {settings.service_name} restarted.", + } + + +@app.post("/admin/actions/update") +async def admin_action_update() -> dict[str, object]: + settings = get_settings() + install_dir = settings.install_dir + branch = settings.update_branch + service_name = settings.service_name + pip_path = Path(settings.venv_path) / "bin" / "pip" + + commands: list[tuple[list[str], str | None, str]] = [ + (["git", "fetch", "origin", branch], install_dir, "fetch"), + (["git", "pull", "--ff-only", "origin", branch], install_dir, "pull"), + ] + if pip_path.exists(): + commands.append(([str(pip_path), "install", "-e", install_dir], install_dir, "pip_install")) + else: + commands.append((["python3", "-m", "pip", "install", "-e", install_dir], install_dir, "pip_install")) + commands.append((["systemctl", "restart", service_name], None, "restart")) + + completed_steps: list[str] = [] + logs: list[dict[str, str]] = [] + for command, cwd, step in commands: + code, stdout, stderr = await _run_exec_command(command, cwd=cwd) + logs.append( + { + "step": step, + "command": _shell_join(command), + "stdout": _tail_text(stdout), + "stderr": _tail_text(stderr), + } + ) + if code != 0: + raise HTTPException( + status_code=500, + detail=f"Update failed during {step}: {_tail_text(stderr) or _tail_text(stdout)}", + ) + completed_steps.append(step) + + return { + "ok": True, + "steps": completed_steps, + "logs": logs, + "detail": f"Updated from {branch} and restarted {service_name}.", + } + + +@app.get("/admin/label-preview/{event_id}.png") +async def admin_label_preview(event_id: int) -> Response: + settings = get_settings() + event = get_print_start_event(settings.db_path, event_id) + if event is None: + raise HTTPException(status_code=404, detail="Event not found") + + image = render_label_image( + event, + settings.brother_label_size, + show_price=settings.show_price_on_label, + price_per_gram=settings.filament_price_per_gram, + ) + buffer = io.BytesIO() + image.save(buffer, format="PNG") + return Response(content=buffer.getvalue(), media_type="image/png") + + @app.post("/admin/print-event/{event_id}") async def admin_print_event(event_id: int) -> dict[str, object]: settings = get_settings() diff --git a/src/printerprinter/storage.py b/src/printerprinter/storage.py index cc9e57f..a8276bd 100644 --- a/src/printerprinter/storage.py +++ b/src/printerprinter/storage.py @@ -192,3 +192,17 @@ def record_label_job_attempt( ) conn.commit() return int(cur.lastrowid) + + +def has_label_job_attempt(db_path: str, event_id: int) -> bool: + with sqlite3.connect(db_path) as conn: + row = conn.execute( + """ + SELECT 1 + FROM label_jobs + WHERE event_id = ? + LIMIT 1; + """, + (event_id,), + ).fetchone() + return row is not None