From 2061e71bafa0dadf5a10242bc2a96be73f06d5a5 Mon Sep 17 00:00:00 2001 From: Sandeep Singh Date: Thu, 23 Apr 2026 11:40:03 -0700 Subject: [PATCH 1/6] initial commit --- eval_protocol/adapters/fireworks_tracing.py | 32 ++++++++++ .../pytest/remote_rollout_processor.py | 59 ++++++++----------- 2 files changed, 58 insertions(+), 33 deletions(-) diff --git a/eval_protocol/adapters/fireworks_tracing.py b/eval_protocol/adapters/fireworks_tracing.py index 85aa6739..937d23a3 100644 --- a/eval_protocol/adapters/fireworks_tracing.py +++ b/eval_protocol/adapters/fireworks_tracing.py @@ -375,6 +375,38 @@ async def async_search_logs( ) return results + def get_status(self, rollout_id: str) -> Optional[Dict[str, Any]]: + """Fetch rollout status from the lightweight /status endpoint. + + Returns the parsed JSON response or None if the status is not yet available. + Response shape: {"rollout_id": "...", "status": {"code": ...} | null} + """ + from ..common_utils import get_user_agent + + headers = { + "Authorization": f"Bearer {self._get_api_key()}", + "User-Agent": get_user_agent(), + } + params: Dict[str, Any] = {"rollout_id": rollout_id} + + urls_to_try = [f"{self.base_url}/status", f"{self.base_url}/v1/status"] + last_error: Optional[str] = None + for url in urls_to_try: + try: + response = requests.get(url, params=params, timeout=self.timeout, headers=headers) + if response.status_code == 404: + last_error = f"404 for {url}" + continue + response.raise_for_status() + return response.json() + except requests.exceptions.RequestException as e: + last_error = str(e) + continue + + if last_error: + logger.error("Failed to fetch status from Fireworks (tried %s): %s", urls_to_try, last_error) + return None + def get_evaluation_rows( self, tags: List[str], diff --git a/eval_protocol/pytest/remote_rollout_processor.py b/eval_protocol/pytest/remote_rollout_processor.py index 6b622218..bfccf72e 100644 --- a/eval_protocol/pytest/remote_rollout_processor.py +++ b/eval_protocol/pytest/remote_rollout_processor.py @@ -121,46 +121,39 @@ async def _process_row(row: EvaluationRow) -> EvaluationRow: deadline = time.time() + timeout_seconds while time.time() < deadline: - session = self._get_or_create_session() - completed_logs = await self._tracing_adapter.async_search_logs( - session, tags=[f"rollout_id:{row.execution_metadata.rollout_id}"] + # Poll status (run in thread to avoid blocking event loop) + status_result = await asyncio.to_thread( + self._tracing_adapter.get_status, rollout_id=row.execution_metadata.rollout_id ) - # Filter for logs that actually have status information - status_logs = [] - for log in completed_logs: - status_dict = log.get("status") - if status_dict and isinstance(status_dict, dict) and "code" in status_dict: - status_logs.append(log) - - if status_logs: - if len(status_logs) > 1: - logger.warning( - "Found %s status logs for rollout %s; expected at most 1. Using the first one: %s", - len(status_logs), - row.execution_metadata.rollout_id, - status_logs[0], - ) - # Use the first log with status information - status_log = status_logs[0] - status_dict = status_log.get("status") - raw_extras = status_log.get("extras") or {} - status_extras = { - k: v for k, v in raw_extras.items() if k not in ("logger_name", "level", "timestamp") - } + if status_result and status_result.get("status"): + status_code = status_result["status"]["code"] logger.info( - f"Found status log for rollout {row.execution_metadata.rollout_id}: {status_log.get('message', '')}" + "Found status for rollout %s with code %s", + row.execution_metadata.rollout_id, + status_code, ) - status_code = status_dict.get("code") - status_message = status_dict.get("message", "") - status_details = status_dict.get("details", []) - - logger.info( - f"Found Fireworks log for rollout {row.execution_metadata.rollout_id} with status code {status_code}" + # Backfill message/details/extras from the full Logs table (one-shot) + completed_logs = await asyncio.to_thread( + self._tracing_adapter.search_logs, + tags=[f"rollout_id:{row.execution_metadata.rollout_id}"], ) + status_message = "" + status_details: list = [] + status_extras: dict = {} + for log in completed_logs: + sd = log.get("status") + if sd and isinstance(sd, dict) and "code" in sd: + status_message = sd.get("message", "") + status_details = sd.get("details", []) + raw_extras = log.get("extras") or {} + status_extras = { + k: v for k, v in raw_extras.items() + if k not in ("logger_name", "level", "timestamp") + } + break - # Create and raise exception if appropriate, preserving original message exception = exception_for_status_code(status_code, status_message) if exception is not None: raise exception From 5488bfeb62b183aa2b64dfd58c47fa57f5f50868 Mon Sep 17 00:00:00 2001 From: Sandeep Singh Date: Thu, 23 Apr 2026 18:43:07 -0700 Subject: [PATCH 2/6] quick fix --- eval_protocol/pytest/remote_rollout_processor.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/eval_protocol/pytest/remote_rollout_processor.py b/eval_protocol/pytest/remote_rollout_processor.py index bfccf72e..7b73d7a1 100644 --- a/eval_protocol/pytest/remote_rollout_processor.py +++ b/eval_protocol/pytest/remote_rollout_processor.py @@ -125,8 +125,13 @@ async def _process_row(row: EvaluationRow) -> EvaluationRow: status_result = await asyncio.to_thread( self._tracing_adapter.get_status, rollout_id=row.execution_metadata.rollout_id ) - if status_result and status_result.get("status"): - status_code = status_result["status"]["code"] + status = (status_result or {}).get("status") + if status and "code" in status: + status_code = status["code"] + + if status_code == Status.Code.RUNNING: + await asyncio.sleep(poll_interval) + continue logger.info( "Found status for rollout %s with code %s", From a17c9b8fb9d77c0155bdd38df0e9dcaedd411ae7 Mon Sep 17 00:00:00 2001 From: Sandeep Singh Date: Fri, 24 Apr 2026 11:51:57 -0700 Subject: [PATCH 3/6] Rebase --- eval_protocol/pytest/remote_rollout_processor.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/eval_protocol/pytest/remote_rollout_processor.py b/eval_protocol/pytest/remote_rollout_processor.py index 7b73d7a1..82d75e5e 100644 --- a/eval_protocol/pytest/remote_rollout_processor.py +++ b/eval_protocol/pytest/remote_rollout_processor.py @@ -140,9 +140,9 @@ async def _process_row(row: EvaluationRow) -> EvaluationRow: ) # Backfill message/details/extras from the full Logs table (one-shot) - completed_logs = await asyncio.to_thread( - self._tracing_adapter.search_logs, - tags=[f"rollout_id:{row.execution_metadata.rollout_id}"], + session = self._get_or_create_session() + completed_logs = await self._tracing_adapter.async_search_logs( + session, tags=[f"rollout_id:{row.execution_metadata.rollout_id}"], ) status_message = "" status_details: list = [] From 20b0f230869d6c3bd6ab2095cd96822edef56967 Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Tue, 28 Apr 2026 19:31:56 -0700 Subject: [PATCH 4/6] fix: poll rollout status asynchronously Use the lightweight status endpoint from RemoteRolloutProcessor via the shared aiohttp session and avoid the logs backfill after terminal status is observed. Made-with: Cursor --- eval_protocol/adapters/fireworks_tracing.py | 21 +++++---- .../pytest/remote_rollout_processor.py | 44 ++++++------------- 2 files changed, 23 insertions(+), 42 deletions(-) diff --git a/eval_protocol/adapters/fireworks_tracing.py b/eval_protocol/adapters/fireworks_tracing.py index 937d23a3..04157353 100644 --- a/eval_protocol/adapters/fireworks_tracing.py +++ b/eval_protocol/adapters/fireworks_tracing.py @@ -375,31 +375,30 @@ async def async_search_logs( ) return results - def get_status(self, rollout_id: str) -> Optional[Dict[str, Any]]: + async def async_get_status(self, session: aiohttp.ClientSession, rollout_id: str) -> Optional[Dict[str, Any]]: """Fetch rollout status from the lightweight /status endpoint. Returns the parsed JSON response or None if the status is not yet available. Response shape: {"rollout_id": "...", "status": {"code": ...} | null} """ - from ..common_utils import get_user_agent - headers = { "Authorization": f"Bearer {self._get_api_key()}", "User-Agent": get_user_agent(), } params: Dict[str, Any] = {"rollout_id": rollout_id} + timeout = aiohttp.ClientTimeout(total=self.timeout) - urls_to_try = [f"{self.base_url}/status", f"{self.base_url}/v1/status"] + urls_to_try = [f"{self.base_url}/v1/status", f"{self.base_url}/status"] last_error: Optional[str] = None for url in urls_to_try: try: - response = requests.get(url, params=params, timeout=self.timeout, headers=headers) - if response.status_code == 404: - last_error = f"404 for {url}" - continue - response.raise_for_status() - return response.json() - except requests.exceptions.RequestException as e: + async with session.get(url, params=params, headers=headers, timeout=timeout) as resp: + if resp.status == 404: + last_error = f"404 for {url}" + continue + resp.raise_for_status() + return (await resp.json(content_type=None)) or {} + except (aiohttp.ClientError, asyncio.TimeoutError, json.JSONDecodeError) as e: last_error = str(e) continue diff --git a/eval_protocol/pytest/remote_rollout_processor.py b/eval_protocol/pytest/remote_rollout_processor.py index 82d75e5e..f9f6ed31 100644 --- a/eval_protocol/pytest/remote_rollout_processor.py +++ b/eval_protocol/pytest/remote_rollout_processor.py @@ -121,43 +121,23 @@ async def _process_row(row: EvaluationRow) -> EvaluationRow: deadline = time.time() + timeout_seconds while time.time() < deadline: - # Poll status (run in thread to avoid blocking event loop) - status_result = await asyncio.to_thread( - self._tracing_adapter.get_status, rollout_id=row.execution_metadata.rollout_id + session = self._get_or_create_session() + status_result = await self._tracing_adapter.async_get_status( + session, + rollout_id=row.execution_metadata.rollout_id, ) status = (status_result or {}).get("status") - if status and "code" in status: + if isinstance(status, dict) and "code" in status: status_code = status["code"] - if status_code == Status.Code.RUNNING: - await asyncio.sleep(poll_interval) - continue - logger.info( "Found status for rollout %s with code %s", row.execution_metadata.rollout_id, status_code, ) - # Backfill message/details/extras from the full Logs table (one-shot) - session = self._get_or_create_session() - completed_logs = await self._tracing_adapter.async_search_logs( - session, tags=[f"rollout_id:{row.execution_metadata.rollout_id}"], - ) - status_message = "" - status_details: list = [] - status_extras: dict = {} - for log in completed_logs: - sd = log.get("status") - if sd and isinstance(sd, dict) and "code" in sd: - status_message = sd.get("message", "") - status_details = sd.get("details", []) - raw_extras = log.get("extras") or {} - status_extras = { - k: v for k, v in raw_extras.items() - if k not in ("logger_name", "level", "timestamp") - } - break + status_message = status.get("message", "") or "" + status_details = status.get("details", []) or [] exception = exception_for_status_code(status_code, status_message) if exception is not None: @@ -169,10 +149,12 @@ async def _process_row(row: EvaluationRow) -> EvaluationRow: details=status_details, ) - if row.execution_metadata.extra: - row.execution_metadata.extra.update(status_extras) - else: - row.execution_metadata.extra = status_extras + status_extras = status.get("extras") + if isinstance(status_extras, dict): + if row.execution_metadata.extra: + row.execution_metadata.extra.update(status_extras) + else: + row.execution_metadata.extra = status_extras logger.info("Stopping polling for rollout %s", row.execution_metadata.rollout_id) break From 8e0e0ef9fb60aca28216425318345ece129e17fb Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Tue, 28 Apr 2026 19:37:07 -0700 Subject: [PATCH 5/6] fix: keep polling running rollout statuses Continue polling when the lightweight status endpoint returns RUNNING so the remote rollout processor only exits the poll loop for terminal statuses. Made-with: Cursor --- eval_protocol/pytest/remote_rollout_processor.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/eval_protocol/pytest/remote_rollout_processor.py b/eval_protocol/pytest/remote_rollout_processor.py index f9f6ed31..5d3e86ee 100644 --- a/eval_protocol/pytest/remote_rollout_processor.py +++ b/eval_protocol/pytest/remote_rollout_processor.py @@ -129,6 +129,9 @@ async def _process_row(row: EvaluationRow) -> EvaluationRow: status = (status_result or {}).get("status") if isinstance(status, dict) and "code" in status: status_code = status["code"] + if status_code == Status.Code.RUNNING: + await asyncio.sleep(poll_interval) + continue logger.info( "Found status for rollout %s with code %s", From fd4358c26f9cbe57bb9c0091f28ae6642194941b Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Tue, 28 Apr 2026 20:08:36 -0700 Subject: [PATCH 6/6] fix: preserve rollout status extras Read rollout status extras from the top-level status response so RemoteRolloutProcessor preserves metadata that previously came from log entries. Made-with: Cursor --- eval_protocol/adapters/fireworks_tracing.py | 2 +- eval_protocol/pytest/remote_rollout_processor.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/eval_protocol/adapters/fireworks_tracing.py b/eval_protocol/adapters/fireworks_tracing.py index 04157353..2d8316d2 100644 --- a/eval_protocol/adapters/fireworks_tracing.py +++ b/eval_protocol/adapters/fireworks_tracing.py @@ -379,7 +379,7 @@ async def async_get_status(self, session: aiohttp.ClientSession, rollout_id: str """Fetch rollout status from the lightweight /status endpoint. Returns the parsed JSON response or None if the status is not yet available. - Response shape: {"rollout_id": "...", "status": {"code": ...} | null} + Response shape: {"rollout_id": "...", "status": {"code": ...} | null, "extras": {...} | null} """ headers = { "Authorization": f"Bearer {self._get_api_key()}", diff --git a/eval_protocol/pytest/remote_rollout_processor.py b/eval_protocol/pytest/remote_rollout_processor.py index 5d3e86ee..632d5e00 100644 --- a/eval_protocol/pytest/remote_rollout_processor.py +++ b/eval_protocol/pytest/remote_rollout_processor.py @@ -152,7 +152,7 @@ async def _process_row(row: EvaluationRow) -> EvaluationRow: details=status_details, ) - status_extras = status.get("extras") + status_extras = (status_result or {}).get("extras") if isinstance(status_extras, dict): if row.execution_metadata.extra: row.execution_metadata.extra.update(status_extras)