In [1]:
from __future__ import annotations

import os
from pathlib import Path
from datetime import datetime

ROOT = Path.cwd().resolve()
if ROOT.name == "notebooks_v2":
    os.chdir(ROOT.parent)
ROOT = Path.cwd().resolve()
assert (ROOT / "src").exists(), f"Not at repo root. CWD={ROOT}"
print("âœ… CWD =", ROOT)

def backup_write(rel_path: str, content: str) -> None:
    p = ROOT / rel_path
    p.parent.mkdir(parents=True, exist_ok=True)
    if p.exists():
        ts = datetime.now().strftime("%Y%m%d_%H%M%S")
        bak = p.with_suffix(p.suffix + f".bak_{ts}")
        bak.write_text(p.read_text(encoding="utf-8"), encoding="utf-8")
        print("BACKUP:", bak)
    p.write_text(content, encoding="utf-8")
    print("WROTE:", p)

backup_write("src/careeragent/api/run_manager_service.py", r'''
from __future__ import annotations

import asyncio
import copy
import threading
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Optional
from uuid import uuid4

from careeragent.services.db_service import SqliteStateStore
from careeragent.langgraph.runtime_nodes import run_single_layer, approve_ranking_flow, approve_drafts_flow

try:
    from careeragent.config import artifacts_root  # type: ignore
except Exception:
    def artifacts_root() -> Path:
        return Path("src/careeragent/artifacts").resolve()


def utc_now() -> str:
    return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")


class RunManagerService:
    """
    Description: Background run manager that streams state updates per layer (thread-safe timeouts).
    Layer: L8
    """

    def __init__(self) -> None:
        self._store = SqliteStateStore()

    def _runs_dir(self, run_id: str) -> Path:
        d = artifacts_root() / "runs" / run_id
        d.mkdir(parents=True, exist_ok=True)
        return d

    def save_state(self, *, run_id: str, state: Dict[str, Any]) -> None:
        state.setdefault("meta", {})
        state["meta"]["heartbeat_utc"] = utc_now()
        self._store.upsert_state(
            run_id=run_id,
            status=str(state.get("status", "unknown")),
            state=state,
            updated_at_utc=utc_now(),
        )

    def get_state(self, run_id: str) -> Optional[Dict[str, Any]]:
        return self._store.get_state(run_id=run_id)

    def create_run(self, *, resume_filename: str, resume_text: str, resume_bytes: bytes, preferences: Dict[str, Any]) -> Dict[str, Any]:
        run_id = uuid4().hex
        run_dir = self._runs_dir(run_id)

        (run_dir / "resume_upload.bin").write_bytes(resume_bytes)
        (run_dir / "resume_raw.txt").write_text(resume_text, encoding="utf-8")

        thresholds = (preferences.get("thresholds") or {})
        if "default" in thresholds:
            d = float(thresholds["default"])
            thresholds.setdefault("parser", d)
            thresholds.setdefault("discovery", d)
            thresholds.setdefault("match", d)
            thresholds.setdefault("draft", d)

        state: Dict[str, Any] = {
            "run_id": run_id,
            "status": "running",
            "pending_action": None,
            "hitl_reason": None,
            "hitl_payload": {},
            "thresholds": thresholds,
            "max_retries": int(preferences.get("max_refinements", 3)),
            "layer_retry_count": {},
            "preferences": preferences,
            "resume_filename": resume_filename,
            "resume_text": resume_text,
            "profile": {},
            "jobs_raw": [],
            "jobs_scored": [],
            "ranking": [],
            "drafts": {},
            "bridge_docs": {},
            "meta": {
                "created_at_utc": utc_now(),
                "heartbeat_utc": utc_now(),
                "last_layer": None,
                "plan_layers": ["L0","L2","L3","L4","L5"],
            },
            "steps": [],
            "live_feed": [{"layer": "L1", "agent": "API", "message": "Run created. Starting background pipelineâ€¦"}],
            "attempts": [],
            "gates": [],
            "evaluations": [],
            "artifacts": {
                "resume_raw": {"path": str(run_dir / "resume_raw.txt"), "content_type": "text/plain"},
                "resume_upload": {"path": str(run_dir / "resume_upload.bin"), "content_type": "application/octet-stream"},
            },
        }

        self.save_state(run_id=run_id, state=state)
        return state

    def start_background(self, run_id: str) -> None:
        t = threading.Thread(target=self._bg_runner, args=(run_id,), daemon=True)
        t.start()

    def _bg_runner(self, run_id: str) -> None:
        asyncio.run(self._bg_async(run_id))

    async def _run_layer_threadsafe(self, state: Dict[str, Any], layer: str, timeout_sec: int) -> Dict[str, Any]:
        """
        Description: Run layer in a separate thread using a deep-copied state so blocking code can't freeze loop.
        Layer: L6
        """
        async def _runner() -> Dict[str, Any]:
            # Run the coroutine in a thread with its own event loop; isolate mutations via deepcopy.
            def _blocking_call() -> Dict[str, Any]:
                st_copy = copy.deepcopy(state)
                return asyncio.run(run_single_layer(st_copy, layer))  # fresh loop in worker thread
            return await asyncio.to_thread(_blocking_call)

        try:
            return await asyncio.wait_for(_runner(), timeout=timeout_sec)
        except asyncio.TimeoutError:
            state["status"] = "needs_human_approval"
            state["pending_action"] = f"timeout_{layer.lower()}"
            state.setdefault("live_feed", []).append({"layer": layer, "agent": "TimeoutGuard", "message": f"{layer} timed out after {timeout_sec}s (likely blocking sync I/O)."})
            return state
        except Exception as e:
            state["status"] = "failed"
            state["pending_action"] = f"error_{layer.lower()}"
            state.setdefault("live_feed", []).append({"layer": layer, "agent": "CrashGuard", "message": f"{layer} crashed: {e}"})
            return state

    async def _bg_async(self, run_id: str) -> None:
        state = self.get_state(run_id)
        if not state:
            return

        plan = [("L0", 30), ("L2", 60), ("L3", 120), ("L4", 180), ("L5", 60)]

        for layer, tmo in plan:
            if state.get("status") in ("blocked", "needs_human_approval", "failed", "completed"):
                self.save_state(run_id=run_id, state=state)
                return

            state.setdefault("meta", {})
            state["meta"]["last_layer"] = layer

            state.setdefault("steps", []).append({"layer_id": layer, "status": "running", "started_at_utc": utc_now()})
            state.setdefault("live_feed", []).append({"layer": layer, "agent": "Orchestrator", "message": f"Running {layer}â€¦"})
            self.save_state(run_id=run_id, state=state)

            # ðŸ”¥ run safely
            state = await self._run_layer_threadsafe(state, layer, tmo)

            state["steps"][-1]["status"] = state.get("status", "running")
            state["steps"][-1]["finished_at_utc"] = utc_now()
            self.save_state(run_id=run_id, state=state)

            if state.get("pending_action") == "review_ranking":
                state["status"] = "needs_human_approval"
                self.save_state(run_id=run_id, state=state)
                return

        if state.get("status") == "running":
            state["status"] = "needs_human_approval"
            state["pending_action"] = "review_ranking"
            state.setdefault("live_feed", []).append({"layer": "L5", "agent": "Orchestrator", "message": "Ranking ready for review."})
            self.save_state(run_id=run_id, state=state)

    async def handle_action(self, *, run_id: str, action_type: str, payload: Dict[str, Any]) -> Dict[str, Any]:
        state = self.get_state(run_id)
        if not state:
            raise ValueError("run_id not found")

        if action_type == "execute_layer":
            layer = str(payload.get("layer", "")).upper()
            st_copy = copy.deepcopy(state)
            state = await run_single_layer(st_copy, layer)
            self.save_state(run_id=run_id, state=state)
            return state

        if action_type == "approve_ranking":
            st_copy = copy.deepcopy(state)
            state = await approve_ranking_flow(st_copy)
            self.save_state(run_id=run_id, state=state)
            return state

        if action_type == "approve_drafts":
            st_copy = copy.deepcopy(state)
            state = await approve_drafts_flow(st_copy)
            self.save_state(run_id=run_id, state=state)
            return state

        state.setdefault("live_feed", []).append({"layer":"L5","agent":"HITL","message":f"Unhandled action_type={action_type}"})
        self.save_state(run_id=run_id, state=state)
        return state
''')

print("âœ… Patched RunManagerService with thread-safe layer execution + real timeouts.")
print("Restart backend now (NO --reload), then start a NEW run.")

âœ… CWD = /Users/ganeshprasadbhandari/Documents/D_drive/clark/careeragent-ai
BACKUP: /Users/ganeshprasadbhandari/Documents/D_drive/clark/careeragent-ai/src/careeragent/api/run_manager_service.py.bak_20260221_160406
WROTE: /Users/ganeshprasadbhandari/Documents/D_drive/clark/careeragent-ai/src/careeragent/api/run_manager_service.py
âœ… Patched RunManagerService with thread-safe layer execution + real timeouts.
Restart backend now (NO --reload), then start a NEW run.


In [None]:



RUN_ID=444fffc391aa44328d688c7134a8fbbc
curl -sS "http://127.0.0.1:8000/status/$RUN_ID" | python - <<'PY'
import sys, json
s=json.load(sys.stdin)
meta=s.get("meta") or {}
feed=s.get("live_feed") or []
steps=s.get("steps") or []
print("status=", s.get("status"), "pending=", s.get("pending_action"))
print("last_layer=", meta.get("last_layer"), "heartbeat=", meta.get("heartbeat_utc"))
print("steps=", len(steps), "feed=", len(feed))
print("last_feed=", feed[-1] if feed else None)
PY