Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 72 additions & 6 deletions src/executor_runtime/runners/async_http_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,22 @@
states meaning success (default:
``completed``).
- ``http.poll_interval_seconds``: seconds between polls (default ``2.0``).
- ``http.poll_pending_codes`` : comma-separated list of HTTP codes that
mean "still pending, keep polling"
(e.g. ``404`` for backends that 404
until the run is registered). Default
empty (only 200 is accepted; everything
else fails the poll loop).

Kickoff status codes:
- ``202``: standard async accept; proceed to poll loop.
- ``200``: if the response body carries a status at ``poll_status_path``
whose value is in ``poll_terminal_states``, the kickoff is
treated as a synchronous terminal result. Otherwise the 200 is
treated as kickoff acknowledgement and the poll loop runs.
(Some backends, e.g. Archon, return 200 with a non-terminal
status like ``"started"`` to acknowledge dispatch.)
- everything else: failure.

Sync from the caller's POV: ``run()`` blocks until a terminal status is
observed or the invocation timeout elapses. For genuinely concurrent
Expand Down Expand Up @@ -119,6 +135,17 @@ def run(self, invocation: RuntimeInvocation) -> RuntimeResult:
success_raw = meta.get("http.poll_success_states") or ",".join(_DEFAULT_SUCCESS_STATES)
success_states = tuple(s.strip() for s in success_raw.split(",") if s.strip())

pending_raw = meta.get("http.poll_pending_codes") or ""
try:
pending_codes = tuple(
int(s.strip()) for s in pending_raw.split(",") if s.strip()
)
except ValueError:
return _rejected(
invocation, started,
"http.poll_pending_codes must be comma-separated integers",
)

poll_interval = _parse_float(meta.get("http.poll_interval_seconds"), _DEFAULT_POLL_INTERVAL)
if poll_interval < 0:
return _rejected(invocation, started, "poll_interval_seconds must be non-negative")
Expand Down Expand Up @@ -154,12 +181,20 @@ def run(self, invocation: RuntimeInvocation) -> RuntimeResult:
return _failed(invocation, started, f"kickoff http error: {exc}")

if kickoff_resp.status_code == 200:
# Server returned a synchronous result — treat as terminal.
return _terminal_from_kickoff(
invocation, started, kickoff_resp,
success_states, poll_status_path,
)
if kickoff_resp.status_code != 202:
# 200 has two modes:
# - server returned a synchronous terminal result (status
# field present and in terminal_states)
# - server acknowledged async dispatch (status field absent
# or non-terminal, e.g. Archon's {accepted, status:"started"})
if _is_synchronous_terminal(
kickoff_resp, poll_status_path, terminal_states,
):
return _terminal_from_kickoff(
invocation, started, kickoff_resp,
success_states, poll_status_path,
)
# Fall through to poll loop — kickoff was an ack, not a result.
elif kickoff_resp.status_code != 202:
preview = kickoff_resp.text[:200] if kickoff_resp.text else ""
msg = (
f"kickoff expected 202 (or 200), got HTTP "
Expand Down Expand Up @@ -206,6 +241,12 @@ def run(self, invocation: RuntimeInvocation) -> RuntimeResult:
return _failed(invocation, started, f"poll http error: {exc}")

if poll_resp.status_code != 200:
if poll_resp.status_code in pending_codes:
# Backend reports "still pending" with this code (e.g.
# Archon's 404 before the run is registered to a
# worker). Sleep and poll again.
self._sleep(poll_interval)
continue
preview = poll_resp.text[:200] if poll_resp.text else ""
return _failed(
invocation, started,
Expand Down Expand Up @@ -314,6 +355,31 @@ def _utc_now_iso() -> str:
return datetime.now(UTC).isoformat()


def _is_synchronous_terminal(
response: Any,
poll_status_path: str,
terminal_states: tuple[str, ...],
) -> bool:
"""Inspect a 200 kickoff response to decide if it's a sync terminal result.

The kickoff path treats 200 as "synchronous result" only when the response
body carries a status at ``poll_status_path`` whose value appears in
``terminal_states``. Otherwise the 200 is interpreted as kickoff
acknowledgement and the caller proceeds to the poll loop.

Returns False on any parse error (treat as kickoff ack — let the poll
loop deal with it).
"""
try:
payload = response.json()
except json.JSONDecodeError:
return False
status = _extract_path(payload, poll_status_path)
if status is None:
return False
return str(status) in terminal_states


def _terminal_from_kickoff(
invocation: RuntimeInvocation,
started: str,
Expand Down
4 changes: 4 additions & 0 deletions src/executor_runtime/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ def register(self, runtime_kind: str, runner: RuntimeRunner) -> None:
"""Register a runner for a runtime_kind."""
self._runners[runtime_kind] = runner

def is_registered(self, runtime_kind: str) -> bool:
"""Return True if a runner is registered for runtime_kind."""
return runtime_kind in self._runners

def run(self, invocation: RuntimeInvocation) -> RuntimeResult:
runner = self._runners.get(invocation.runtime_kind)
if runner is None:
Expand Down
134 changes: 134 additions & 0 deletions tests/runners/test_async_http_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,3 +289,137 @@ def test_zero_interval_skips_sleep_arg(self):
result = runner.run(_invocation()) # poll_interval=0.0 in default metadata
assert result.status == "succeeded"
assert sleeps == [0.0]


# ---------------------------------------------------------------------------
# 200 kickoff: ack vs synchronous terminal
# ---------------------------------------------------------------------------


class TestKickoff200Ack:
"""Backends that ack async dispatch with 200 (e.g. Archon's
``{accepted, status: "started"}``) must fall through to the poll loop,
not be misinterpreted as a synchronous terminal result.
"""

def test_200_with_non_terminal_status_falls_through_to_poll(self):
# Kickoff response carries a non-terminal status — runner should poll.
client = _scripted_client([
httpx.Response(200, json={"accepted": True, "status": "started"}),
httpx.Response(200, json={"status": "completed"}),
])
# Use a poll URL without {run_id} template so we don't need run_id
# extraction from the kickoff body.
inv = _invocation(metadata_extra={
"http.poll_url_template": "http://example.test/api/runs/by-conv/abc",
})
# Drop the run-id-path metadata since template has no {run_id}.
inv = RuntimeInvocation(**{
**inv.model_dump(),
"metadata": {
k: v for k, v in inv.metadata.items()
if k != "http.poll_run_id_path"
},
})
result = _make_runner(client).run(inv)
assert result.status == "succeeded"

def test_200_with_missing_status_field_falls_through_to_poll(self):
# Status field absent at poll_status_path — treat as kickoff ack.
client = _scripted_client([
httpx.Response(200, json={"accepted": True}),
httpx.Response(200, json={"status": "completed"}),
])
inv = _invocation(metadata_extra={
"http.poll_url_template": "http://example.test/api/runs/by-conv/abc",
})
inv = RuntimeInvocation(**{
**inv.model_dump(),
"metadata": {
k: v for k, v in inv.metadata.items()
if k != "http.poll_run_id_path"
},
})
result = _make_runner(client).run(inv)
assert result.status == "succeeded"

def test_200_with_terminal_status_still_treated_as_sync_result(self):
# Backward compat: 200 + terminal status keeps the existing sync path.
client = _scripted_client([
httpx.Response(200, json={"status": "completed"}),
])
runner = _make_runner(client)
result = runner.run(_invocation())
assert result.status == "succeeded"

def test_200_kickoff_then_404_pending_then_terminal(self):
# Realistic Archon flow: 200 ack, then by-worker 404 (run not yet
# registered) tolerated via http.poll_pending_codes, then 200 with
# terminal status.
client = _scripted_client([
httpx.Response(200, json={"accepted": True, "status": "started"}),
httpx.Response(404, text="not registered yet"),
httpx.Response(404, text="not registered yet"),
httpx.Response(200, json={"run": {"status": "completed"}}),
])
inv = _invocation(metadata_extra={
"http.poll_url_template": "http://example.test/api/runs/by-conv/abc",
"http.poll_status_path": "run.status",
"http.poll_pending_codes": "404",
})
inv = RuntimeInvocation(**{
**inv.model_dump(),
"metadata": {
k: v for k, v in inv.metadata.items()
if k != "http.poll_run_id_path"
},
})
result = _make_runner(client).run(inv)
assert result.status == "succeeded"


class TestPollPendingCodes:
def test_pending_code_keeps_polling(self):
client = _scripted_client([
httpx.Response(202, json={"run_id": "abc"}),
httpx.Response(404, text="pending"),
httpx.Response(200, json={"status": "completed"}),
])
inv = _invocation(metadata_extra={"http.poll_pending_codes": "404"})
result = _make_runner(client).run(inv)
assert result.status == "succeeded"

def test_non_pending_non_200_still_fails(self):
client = _scripted_client([
httpx.Response(202, json={"run_id": "abc"}),
httpx.Response(503, text="upstream"),
])
inv = _invocation(metadata_extra={"http.poll_pending_codes": "404"})
result = _make_runner(client).run(inv)
assert result.status == "failed"
assert "503" in (result.error_summary or "")

def test_pending_codes_must_be_integers(self):
client = _scripted_client([])
inv = _invocation(metadata_extra={"http.poll_pending_codes": "404,not-a-code"})
result = _make_runner(client).run(inv)
assert result.status == "rejected"
assert "poll_pending_codes" in (result.error_summary or "")

def test_pending_code_sleeps_between_retries(self):
sleeps: list[float] = []
client = _scripted_client([
httpx.Response(202, json={"run_id": "abc"}),
httpx.Response(404, text="pending"),
httpx.Response(404, text="pending"),
httpx.Response(200, json={"status": "completed"}),
])
runner = AsyncHttpRunner(client=client, sleep=sleeps.append)
inv = _invocation(metadata_extra={
"http.poll_pending_codes": "404",
"http.poll_interval_seconds": "0.5",
})
result = runner.run(inv)
assert result.status == "succeeded"
# 2 pending 404s + 0 non-terminal 200s before terminal → 2 sleeps.
assert sleeps == [0.5, 0.5]
6 changes: 6 additions & 0 deletions tests/test_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ def test_default_facade_uses_subprocess_runner() -> None:
assert isinstance(runtime.runner, SubprocessRunner)


def test_is_registered_reports_registered_kinds() -> None:
runtime = ExecutorRuntime()
assert runtime.is_registered("subprocess") is True
assert runtime.is_registered("manual") is False


def test_facade_returns_runtime_result(tmp_path: Path) -> None:
runtime = ExecutorRuntime()
invocation = RuntimeInvocation(
Expand Down
Loading