Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,26 @@ func (r *AgenticSessionReconciler) SetupWithManager(mgr ctrl.Manager) error {
if !strings.HasSuffix(e.ObjectNew.Name, "-runner") {
return false
}
// Trigger if phase changed
return e.ObjectOld.Status.Phase != e.ObjectNew.Status.Phase
// Trigger if pod phase changed
if e.ObjectOld.Status.Phase != e.ObjectNew.Status.Phase {
return true
}
// Trigger if a container newly terminated (e.g. runner OOM).
// Pod phase stays Running when one container dies but
// the sidecar is still alive, so we must also check
// individual container statuses.
oldTerminated := make(map[string]bool, len(e.ObjectOld.Status.ContainerStatuses))
for _, cs := range e.ObjectOld.Status.ContainerStatuses {
if cs.State.Terminated != nil {
oldTerminated[cs.Name] = true
}
}
for _, cs := range e.ObjectNew.Status.ContainerStatuses {
if cs.State.Terminated != nil && !oldTerminated[cs.Name] {
return true
}
}
return false
Comment on lines +264 to +283
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify whether reconcileRunning handles container termination

echo "=== Checking reconcileRunning for ContainerStatuses handling ==="
rg -n -A5 -B2 'ContainerStatus' components/operator/internal/controller/reconcile_phases.go

echo ""
echo "=== Checking if any reconcile function handles OOM/Terminated containers ==="
rg -n 'Terminated|OOM' components/operator/internal/controller/

echo ""
echo "=== Full reconcileRunning function to confirm no container status checks ==="
ast-grep --pattern $'func ($_ *AgenticSessionReconciler) reconcileRunning($_, $_) ($_, $_) {
  $$$
}'

Repository: ambient-code/platform

Length of output: 9137


Predicate fires on container OOM, but reconcileRunning ignores container status.

The predicate correctly detects when a container newly terminates. However, reconcileRunning (lines 281-356) never inspects Status.ContainerStatuses. When the runner container OOMs:

  1. Pod phase stays Running (sidecar still alive)
  2. Predicate fires and enqueues reconciliation
  3. reconcileRunning checks pod existence (found), annotations (unchanged), generation (unchanged) → returns requeue after 30 seconds
  4. AgenticSession remains in Running state indefinitely despite the dead runner

Add logic to reconcileRunning to detect a terminated runner container and transition the session appropriately (e.g., to Failed or initiate recovery).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/operator/internal/controller/agenticsession_controller.go` around
lines 264 - 283, The reconcileRunning function currently ignores
pod.Status.ContainerStatuses, so when the runner container terminates (e.g.,
OOM) but the sidecar keeps the pod phase at Running the session never
transitions; update reconcileRunning to iterate pod.Status.ContainerStatuses,
detect when the runner container (identify by name used for the runner
container) has State.Terminated or lastTerminationState.Terminated with a
non-zero exit code or OOM reason, and then set the AgenticSession status to the
appropriate terminal state (e.g., Failed) or trigger your recovery path (update
session.Status, emit events, and stop requeue), ensuring you reference
reconcileRunning, the pod object passed in, and Status.ContainerStatuses when
locating where to add this logic.

},
DeleteFunc: func(e event.TypedDeleteEvent[*corev1.Pod]) bool {
return strings.HasSuffix(e.Object.Name, "-runner")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,21 @@ async def run(
async for event in wrapped_stream:
yield event

# Persist session ID after turn completes (for --resume on pod restart)
if worker.session_id:
self._session_manager._session_ids[thread_id] = worker.session_id
self._session_manager._persist_session_ids()
# Detect resume failure (session ID already persisted
# eagerly by the _on_session_id callback at init time).
if (
saved_session_id
and worker.session_id
and worker.session_id != saved_session_id
):
logger.warning(
"Session resume failed: requested --resume %s "
"but CLI created new session %s. "
"Previous conversation history was lost "
"(likely caused by ungraceful runner shutdown).",
saved_session_id,
worker.session_id,
)

# Capture halt state for this thread to avoid race conditions
# with concurrent runs modifying the shared adapter's halted flag
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import os
from contextlib import suppress
from pathlib import Path
from typing import Any, AsyncIterator, Optional
from typing import Any, AsyncIterator, Callable, Optional

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -67,10 +67,12 @@ def __init__(
thread_id: str,
options: Any,
api_key: str,
on_session_id: Optional[Callable[[str, str], None]] = None,
):
self.thread_id = thread_id
self._options = options
self._api_key = api_key
self._on_session_id = on_session_id

# Inbound: (prompt, session_id, output_queue) | _SHUTDOWN
self._input_queue: asyncio.Queue = asyncio.Queue()
Expand Down Expand Up @@ -140,6 +142,13 @@ async def _run(self) -> None:
sid = data.get("session_id")
if sid:
self.session_id = sid
# Persist immediately so the session ID
# survives even if this turn never completes
# (e.g. runner OOM during tool execution).
if self._on_session_id:
self._on_session_id(
self.thread_id, sid
)

await output_queue.put(msg)

Expand Down Expand Up @@ -289,7 +298,9 @@ async def get_or_create(
)
await self.destroy(thread_id)

worker = SessionWorker(thread_id, options, api_key)
worker = SessionWorker(
thread_id, options, api_key, on_session_id=self._on_session_id
)
await worker.start()
self._workers[thread_id] = worker
self._locks[thread_id] = asyncio.Lock()
Expand Down Expand Up @@ -345,6 +356,15 @@ async def shutdown(self) -> None:

# ── session ID persistence ──

def _on_session_id(self, thread_id: str, session_id: str) -> None:
"""Called by workers as soon as the CLI returns a session ID.

Persists immediately so the mapping survives even if the current
turn never completes (e.g. runner OOM during tool execution).
"""
self._session_ids[thread_id] = session_id
self._persist_session_ids()

def _session_ids_path(self) -> Path | None:
if not self._state_dir:
return None
Expand Down
9 changes: 9 additions & 0 deletions components/runners/state-sync/sync.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ NAMESPACE="${NAMESPACE:-default}"
SESSION_NAME="${SESSION_NAME:-unknown}"
SYNC_INTERVAL="${SYNC_INTERVAL:-60}"
MAX_SYNC_SIZE="${MAX_SYNC_SIZE:-1073741824}" # 1GB default
REPO_BACKUP_INTERVAL="${REPO_BACKUP_INTERVAL:-5}" # Backup repos every Nth sync cycle

# Sanitize inputs to prevent path traversal
NAMESPACE="${NAMESPACE//[^a-zA-Z0-9-]/}"
Expand Down Expand Up @@ -261,6 +262,7 @@ echo "Session: ${NAMESPACE}/${SESSION_NAME}"
echo "S3 Endpoint: ${S3_ENDPOINT}"
echo "S3 Bucket: ${S3_BUCKET}"
echo "Sync interval: ${SYNC_INTERVAL}s"
echo "Repo backup every: ${REPO_BACKUP_INTERVAL} sync cycles"
echo "Max sync size: ${MAX_SYNC_SIZE} bytes"
echo "========================================="

Expand All @@ -283,8 +285,15 @@ echo "Waiting 30s for workspace to populate..."
sleep 30

# Main sync loop
sync_count=0
while true; do
check_size || echo "Size check warning (continuing anyway)"
# Periodically backup git repos (every Nth cycle) so repo state
# is preserved even if the runner container OOMs without SIGTERM
sync_count=$((sync_count + 1))
if [ $((sync_count % REPO_BACKUP_INTERVAL)) -eq 0 ]; then
backup_git_repos || echo "Repo backup had errors (continuing)"
fi
sync_to_s3 || echo "Sync failed, will retry in ${SYNC_INTERVAL}s..."
sleep ${SYNC_INTERVAL}
Comment on lines +288 to 298
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Guard against REPO_BACKUP_INTERVAL=0 to prevent division by zero.

If a user sets REPO_BACKUP_INTERVAL=0 (perhaps expecting "disabled"), the modulo operation on line 294 will produce a division-by-zero error in bash. Depending on shell behavior with set -e, this could crash the sync loop.

Consider adding a guard near the configuration section to default invalid values:

Suggested fix
 REPO_BACKUP_INTERVAL="${REPO_BACKUP_INTERVAL:-5}"  # Backup repos every Nth sync cycle
+# Ensure REPO_BACKUP_INTERVAL is at least 1 to avoid division by zero
+if [ "${REPO_BACKUP_INTERVAL}" -lt 1 ] 2>/dev/null; then
+    REPO_BACKUP_INTERVAL=5
+fi

Otherwise, the periodic backup logic correctly addresses the OOM scenario by ensuring repo state is captured even without SIGTERM.

🧰 Tools
🪛 Shellcheck (0.11.0)

[info] 298-298: Double quote to prevent globbing and word splitting.

(SC2086)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/runners/state-sync/sync.sh` around lines 288 - 298, The loop can
divide by zero when REPO_BACKUP_INTERVAL is 0; add a startup/config guard that
validates REPO_BACKUP_INTERVAL and treats non-positive or unset values as
"disabled" (or set a safe default >0). Update the periodic backup condition in
the sync loop (which references sync_count and calls backup_git_repos) to first
check that REPO_BACKUP_INTERVAL is a positive integer before performing the
modulo, and skip backup_git_repos when disabled; ensure any parsing/validation
happens once at init so the sync_to_s3 and check_size logic remain unchanged.

done
Loading