Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/memory/feature-flows.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
| Date | ID | Feature | Flow |
|------|-----|---------|------|
| 2026-04-19 | #211 | Auto-propagate global GitHub PAT to running agents on update — per-agent PAT holders and agents without `GITHUB_PAT` in `.env` are skipped; delete does NOT propagate | [github-sync.md](feature-flows/github-sync.md), [platform-settings.md](feature-flows/platform-settings.md) |
| 2026-04-19 | #378 | Cleanup service Phase 3 just-in-time re-verify + parallel per-agent fan-out — eliminates phantom stale-slot failures for still-running tasks; adds residual-race observability log | [cleanup-service.md](feature-flows/cleanup-service.md) |
| 2026-04-18 | DOCS-QA-001 | Trinity Docs Q&A — public Vertex AI Search endpoint + in-app floating help widget (#391) | [trinity-docs-qa.md](feature-flows/trinity-docs-qa.md) |
| 2026-04-17 | #376 | Proactive messaging UI toggle — SharingPanel shows allow_proactive switch per shared user | [proactive-messaging.md](feature-flows/proactive-messaging.md), [agent-sharing.md](feature-flows/agent-sharing.md) |
| 2026-04-16 | #321 | Proactive agent messaging — agents send messages to users by verified email via Telegram/Slack/web | [proactive-messaging.md](feature-flows/proactive-messaging.md) |
Expand Down
53 changes: 40 additions & 13 deletions docs/memory/feature-flows/cleanup-service.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,22 +96,19 @@ Seven sequential operations, each wrapped in individual try/except. Watchdog run
```
Calls `DatabaseManager.mark_stale_activities_failed()` which delegates to `ActivityOperations.mark_stale_activities_failed()`.

5. **Cleanup stale Redis slots and fail execution records** (lines 123-148, Issue #219, #226, #61)
5. **Cleanup stale Redis slots and fail execution records** (Issues #219, #226, #61, #378)
```python
slot_service = get_slot_service()
agent_timeouts = db.get_all_execution_timeouts() # #226: per-agent TTL
reclaimed = await slot_service.cleanup_stale_slots(agent_timeouts=agent_timeouts)
report.stale_slots = sum(len(ids) for ids in reclaimed.values())
# Fail execution records whose slots were reclaimed, skip confirmed running (#226)
for agent_name, execution_ids in reclaimed.items():
for execution_id in execution_ids:
if execution_id in confirmed_running_ids:
continue # Watchdog verified still running
# Issue #61: Attempt to terminate process before marking failed (best-effort)
await self._terminate_on_agent(client, agent_name, execution_id)
db.fail_stale_slot_execution(execution_id, error=...)
# #378: delegates to _process_stale_slot_reclaims which re-verifies
# each agent just-in-time before writing FAILED
await self._process_stale_slot_reclaims(
reclaimed, confirmed_running_ids, report
)
```
Calls `SlotService.cleanup_stale_slots()` with per-agent timeouts (#226). The service scans all `agent:slots:*` keys, computes each agent's TTL as `timeout_seconds + 5 min buffer` (or default 20 min if no timeout configured), removes entries older than that TTL, and returns a dict mapping agent names to reclaimed execution IDs. **Issue #61**: Before failing the execution, the cleanup service attempts to terminate any orphaned Claude process on the agent (best-effort, failures logged). The cleanup service then fails the corresponding `schedule_executions` DB records using a guarded update (`WHERE status = 'running'`), skipping any IDs the watchdog confirmed as still running.
Calls `SlotService.cleanup_stale_slots()` with per-agent timeouts (#226). The service scans all `agent:slots:*` keys, computes each agent's TTL as `timeout_seconds + 5 min buffer` (or default 20 min if no timeout configured), removes entries older than that TTL, and returns a dict mapping agent names to reclaimed execution IDs. Phase 3 is then implemented by `_process_stale_slot_reclaims()` — see [Phase 3 Slot Reclaim Re-verification](#phase-3-slot-reclaim-re-verification-issue-378) below.

### Watchdog Reconciliation (Issue #129)

Expand Down Expand Up @@ -159,6 +156,35 @@ Shared DRY helper for both orphan recovery and auto-terminate:
#### `_terminate_on_agent(client, agent_name, execution_id)` → `bool`
`POST http://agent-{name}:8000/api/executions/{id}/terminate`. Returns True if HTTP 2xx (agent confirmed termination), False otherwise. Callers only proceed with DB/resource cleanup on success — failed terminations are deferred to the 120-min stale cleanup safety net.

### Phase 3 Slot Reclaim Re-verification (Issue #378)

Before this fix, Phase 3 could mark an execution `FAILED` with "Stale execution — slot TTL expired" while the task was actually still running on the agent (agent had just dropped it from its registry before Phase 0's batch query, so `confirmed_running_ids` missed it). The agent's authoritative `SUCCESS` response then arrived seconds later and overwrote `FAILED` → `SUCCESS`, causing a phantom failure flash in the UI.

#### `_process_stale_slot_reclaims(reclaimed, confirmed_running_ids, report)` → `None`

Replaces the inline Phase 3 loop. Extracted as its own method for direct unit testing (mirrors `_reconcile_orphaned_executions` testability pattern). Key additions over the old inline loop:

1. **Parallel per-agent re-verify fan-out** — one `GET /api/executions/running` call per agent (not per-execution), dispatched concurrently via `asyncio.gather(..., return_exceptions=True)`. Mirrors Phase 0's pattern. Worst-case Phase 3 wall-time goes from O(N_agents × 5s) serial to O(5s) parallel when agents are slow.
2. **Just-in-time re-verify** — the agent is re-queried as close as possible to the `fail_stale_slot_execution` write, minimizing the race window that Phase 0's earlier batch query leaves open.
3. **Per-execution decision matrix**:

| Phase 0 said running? | Re-verify says? | Action |
|---|---|---|
| Yes (`confirmed_running_ids`) | — | **SKIP** (trust Phase 0, save an HTTP call) |
| No | Agent unreachable (None) | **SKIP this cycle** — Phase 1 (120-min stale cleanup) is the backstop |
| No | Agent says still running | **SKIP** — #378 race closed; agent's own SUCCESS write will land correctly |
| No | Agent says not running | **FAIL** — terminate (best-effort, #61) + `fail_stale_slot_execution` with phantom-stale error |

4. **No cross-cycle state** — `slot_service.cleanup_stale_slots` removes reclaimed IDs from Redis permanently (`zremrangebyscore`), so a deferred ID cannot reappear in a later cycle's `reclaimed` dict. Any "retry on next cycle" state machine would be dead code. Transiently-unreachable agents are caught by Phase 0's orphan recovery on subsequent cycles (when the agent becomes reachable again) and by Phase 1's 120-min stale cleanup as a final backstop.

#### Residual-race observability

`db.schedules.update_execution_status` emits a narrowly-scoped `logger.warning` whenever a `SUCCESS` write overwrites a row whose existing error matches the `_STALE_SLOT_ERROR_PATTERN = "Stale execution — slot TTL expired"` marker. Purely observational — update semantics are unchanged (the agent's SUCCESS still wins). The pattern match prevents misattribution of other legitimate FAILED→SUCCESS transitions (Phase 0 auto-terminate, Phase 1 stale cleanup, startup recovery) to #378. Grep with:

```bash
docker logs trinity-backend | grep "residual race condition (#378)"
```

### Startup Loop (`_cleanup_loop`)

```
Expand Down Expand Up @@ -475,8 +501,8 @@ This is a purely backend service. The only "UI" is the two admin API endpoints u

| File | Role |
|------|------|
| `src/backend/services/cleanup_service.py` | Service class, watchdog reconciliation, and global instance |
| `src/backend/db/schedules.py` | `get_running_executions_with_agent_info()` (Issue #129), `mark_execution_failed_by_watchdog()` (Issue #129), `mark_stale_executions_failed()`, `mark_execution_dispatched()`, `mark_no_session_executions_failed()` (Issue #106), `fail_stale_slot_execution()` (Issue #219), `finalize_orphaned_skipped_executions()` (Issue #106) |
| `src/backend/services/cleanup_service.py` | Service class, watchdog reconciliation, Phase 3 re-verification (Issue #378), and global instance |
| `src/backend/db/schedules.py` | `get_running_executions_with_agent_info()` (Issue #129), `mark_execution_failed_by_watchdog()` (Issue #129), `mark_stale_executions_failed()`, `mark_execution_dispatched()`, `mark_no_session_executions_failed()` (Issue #106), `fail_stale_slot_execution()` (Issue #219), `finalize_orphaned_skipped_executions()` (Issue #106), residual-race observability log in `update_execution_status()` (Issue #378) |
| `src/backend/db/activities.py` | `mark_stale_activities_failed()` |
| `src/backend/database.py` | Delegation methods on DatabaseManager |
| `src/backend/services/slot_service.py` | `cleanup_stale_slots()` Redis cleanup, returns reclaimed IDs (Issue #219), `release_slot()` used by watchdog |
Expand All @@ -487,4 +513,5 @@ This is a purely backend service. The only "UI" is the two admin API endpoints u
| `docker/base-image/agent_server/services/process_registry.py` | `get_last_error()` method scans log buffer for errors (Issue #286) |
| `tests/test_cleanup_service.py` | API integration tests for cleanup (Issue #106) |
| `tests/test_watchdog.py` | API integration tests for watchdog fields (Issue #129) |
| `tests/test_watchdog_unit.py` | Unit tests for watchdog reconciliation logic (Issue #129), error context tests (Issue #286) |
| `tests/test_watchdog_unit.py` | Unit tests for watchdog reconciliation logic (Issue #129), error context tests (Issue #286), Phase 3 re-verify tests (Issue #378) |
| `tests/unit/test_schedule_status_observability.py` | Residual-race observability log tests (Issue #378) |
33 changes: 31 additions & 2 deletions src/backend/db/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@

logger = logging.getLogger(__name__)

# #378: Error-message marker written by cleanup_service._process_stale_slot_reclaims
# when Phase 3 fails an execution. Used to scope the residual-race WARNING log
# below so it doesn't misfire on other legitimate FAILED→SUCCESS transitions
# (e.g. Phase 0 auto-terminate, Phase 1 stale cleanup, startup recovery).
_STALE_SLOT_ERROR_PATTERN = "Stale execution — slot TTL expired"


class ScheduleOperations:
"""Schedule and execution database operations."""
Expand Down Expand Up @@ -909,12 +915,35 @@ def update_execution_status(
with get_db_connection() as conn:
cursor = conn.cursor()

# Get started_at for duration calculation
cursor.execute("SELECT started_at FROM schedule_executions WHERE id = ?", (execution_id,))
# Get started_at for duration calculation and current status/error
# for #378 residual-race observability (see log below).
cursor.execute(
"SELECT started_at, status, error FROM schedule_executions WHERE id = ?",
(execution_id,),
)
row = cursor.fetchone()
if not row:
return False

# #378: warn when SUCCESS overwrites a Phase-3 phantom-stale FAILED.
# This lets us observe residual races in production without
# changing update semantics (agent's response still wins). Scoped
# to the stale-slot error pattern so other legitimate FAILED→SUCCESS
# transitions (Phase 0/1 recovery, startup recovery) don't misfire.
current_status = row["status"] if "status" in row.keys() else None
current_error = row["error"] if "error" in row.keys() else None
if (
status == TaskExecutionStatus.SUCCESS
and current_status == TaskExecutionStatus.FAILED
and current_error
and _STALE_SLOT_ERROR_PATTERN in current_error
):
logger.warning(
f"[DB] SUCCESS overwrote Phase-3 stale-slot FAILED for execution "
f"{execution_id} — residual race condition (#378). Prior error: "
f"{current_error[:200]}"
)

# Use parse_iso_timestamp to handle both 'Z' and non-'Z' timestamps
started_at = parse_iso_timestamp(row["started_at"])
completed_at = parse_iso_timestamp(utc_now_iso())
Expand Down
124 changes: 102 additions & 22 deletions src/backend/services/cleanup_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ async def _run_cleanup_inner(self) -> CleanupReport:
except Exception as e:
logger.error(f"[Cleanup] Error marking stale activities: {e}")

# 3. Cleanup stale Redis slots and fail corresponding execution records (#219, #226)
# 3. Cleanup stale Redis slots and fail corresponding execution records
# (#219, #226, #378 — see _process_stale_slot_reclaims docstring).
try:
slot_service = get_slot_service()

Expand All @@ -189,24 +190,112 @@ async def _run_cleanup_inner(self) -> CleanupReport:
)
report.stale_slots = sum(len(ids) for ids in reclaimed.values())

# Fail execution records whose slots were reclaimed,
# but skip IDs the watchdog confirmed as still running (#226).
await self._process_stale_slot_reclaims(
reclaimed, confirmed_running_ids, report
)
except Exception as e:
logger.error(f"[Cleanup] Error cleaning stale slots: {e}")

self.last_run_at = utc_now_iso()
self.last_report = report

if report.total > 0:
logger.info(f"[Cleanup] Cycle complete: {report.to_dict()}")

return report

async def _process_stale_slot_reclaims(
self,
reclaimed: Dict[str, List[str]],
confirmed_running_ids: set,
report: CleanupReport,
) -> None:
"""Fail execution records whose slots were reclaimed, with just-in-time
re-verify to prevent phantom failures (#378).

The bug: cleanup service's Phase 3 sometimes marked executions FAILED
with "Stale execution — slot TTL expired" even though the task was
still running (agent had just dropped it from its registry after
completion, so Phase 0's batch query missed it). The SUCCESS response
then arrived after Phase 3 already wrote FAILED — user saw the flip.

The fix:
- Do a just-in-time re-verify with each agent RIGHT BEFORE writing
FAILED, closing the window between Phase 0 and Phase 3.
- Parallel fan-out via asyncio.gather (mirrors Phase 0 pattern at
_reconcile_orphaned_executions).
- On agent unreachable: skip this cycle. Do NOT accumulate cross-cycle
state — slot_service.cleanup_stale_slots removes reclaimed IDs from
Redis permanently, so the same ID will not reappear in a later
cycle. The 120-min Phase 1 stale cleanup is the backstop for truly
stuck agents.
"""
if not reclaimed:
return

agent_names = list(reclaimed.keys())
async with httpx.AsyncClient(timeout=WATCHDOG_HTTP_TIMEOUT) as client:
results = await asyncio.gather(
*(self._get_agent_running_ids(client, name) for name in agent_names),
return_exceptions=True,
)
per_agent_running: Dict[str, Optional[set]] = {}
for name, result in zip(agent_names, results):
if isinstance(result, BaseException):
logger.warning(
f"[Cleanup] Phase 3 re-verify failed for '{name}': {result}"
)
per_agent_running[name] = None
else:
# result is Optional[set] here after the BaseException branch
per_agent_running[name] = result

for agent_name, execution_ids in reclaimed.items():
running_ids = per_agent_running.get(agent_name)

for execution_id in execution_ids:
# #226: Phase 0 already confirmed this exec as running —
# trust it to save an HTTP call.
if execution_id in confirmed_running_ids:
logger.info(
f"[Cleanup] Skipping execution {execution_id} for agent "
f"'{agent_name}' — watchdog confirmed still running"
f"[Cleanup] Skipping {execution_id} for '{agent_name}' "
f"— watchdog confirmed still running"
)
continue

# Just-in-time re-verify interpretation
if running_ids is None:
# Agent unreachable during re-verify. Skip this cycle;
# Phase 1 (120-min stale cleanup) is the backstop.
logger.info(
f"[Cleanup] Skipping {execution_id} for '{agent_name}' "
f"— agent unreachable during re-verify (#378); "
f"Phase 1 stale cleanup is fallback"
)
continue

if execution_id in running_ids:
# #378: agent says this exec is still running — the
# slot TTL fired prematurely relative to the task.
# Skip; the task's own SUCCESS/FAILED write will
# land correctly later.
logger.info(
f"[Cleanup] Skipping {execution_id} for '{agent_name}' "
f"— re-verification shows still running (#378)"
)
continue

# Re-verify confirmed inactive → safe to fail.
try:
# Issue #61: Attempt to terminate execution on agent before
# marking it failed. Best-effort — may fail if agent unreachable.
# Issue #61: best-effort terminate before marking failed.
try:
async with httpx.AsyncClient(timeout=WATCHDOG_HTTP_TIMEOUT) as term_client:
await self._terminate_on_agent(term_client, agent_name, execution_id)
await self._terminate_on_agent(
client, agent_name, execution_id
)
except Exception as term_err:
logger.debug(f"[Cleanup] Could not terminate {execution_id}: {term_err}")
logger.debug(
f"[Cleanup] Could not terminate {execution_id}: {term_err}"
)

updated = db.fail_stale_slot_execution(
execution_id=execution_id,
Expand All @@ -215,22 +304,13 @@ async def _run_cleanup_inner(self) -> CleanupReport:
if updated:
report.stale_slot_executions += 1
logger.info(
f"[Cleanup] Failed execution {execution_id} for agent '{agent_name}' (slot reclaimed)"
f"[Cleanup] Failed execution {execution_id} for agent "
f"'{agent_name}' (slot reclaimed)"
)
except Exception as e:
logger.error(
f"[Cleanup] Error failing execution {execution_id} after slot reclaim: {e}"
f"[Cleanup] Error failing {execution_id} after slot reclaim: {e}"
)
except Exception as e:
logger.error(f"[Cleanup] Error cleaning stale slots: {e}")

self.last_run_at = utc_now_iso()
self.last_report = report

if report.total > 0:
logger.info(f"[Cleanup] Cycle complete: {report.to_dict()}")

return report

async def _reconcile_orphaned_executions(self) -> tuple[int, int, set]:
"""Reconcile DB execution state against agent process registries.
Expand Down
Loading