-
Notifications
You must be signed in to change notification settings - Fork 0
Feat: Add endpoint for report processing status #12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughChanges add a concrete Orchestrator implementation with named agent registration, concurrent agent execution and per-agent result aggregation; introduce a public Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant API as "API Routes (/reports/{report_id}/status)"
participant Service as "Report Service"
participant Memory as "in_memory_reports"
participant Orch as "Orchestrator"
Client->>API: GET /reports/{report_id}/status
API->>Service: get_report_status_from_memory(report_id)
Service->>Memory: lookup report_id
Memory-->>Service: report dict | None
Service-->>API: report dict | None
alt found
API-->>Client: { "report_id": id, "status": report.status }
else not found
API-->>Client: 404 Report not found
end
Note over Orch,Memory: Background: concurrent agent execution
Orch->>Orch: execute_agents(report_id, token_id) (async)
par Agents (concurrent)
Orch->>AgentA: invoke agent_func()
AgentA-->>Orch: result | error (with timeout handling)
end
Orch->>Orch: aggregate_results(per_agent_results)
Orch->>Memory: update in_memory_reports[report_id] with overall status & agent_results
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/app/api/v1/routes.py (1)
37-43: On background orchestration failure, flip report status to failed and fix logging.Surface failures to clients by setting status, and avoid including the exception object in
logger.exceptionmessage (TRY401).def _on_done(t: asyncio.Task): try: t.result() - except Exception as e: - logger.exception('Background orchestration failed for %s: %s', report_id, e) - # Optionally update report status to failed here as well + except Exception as e: + logger.exception('Background orchestration failed for %s', report_id) + # Update report status so status endpoint reflects failure + if report_id in in_memory_reports: + in_memory_reports[report_id].update({"status": "failed", "error": str(e)})Optional: dispatch an async save via
asyncio.create_task(save_report_data(report_id, {"status": "failed", "error": str(e)}))if you prefer to keep writes encapsulated in the service layer. (As per static analysis hints)
🧹 Nitpick comments (4)
backend/app/core/orchestrator.py (2)
59-63: Classify overall status as failed when all agents fail.Current logic never returns "failed". Compute fail count to distinguish completed, partial_success, and failed.
- overall_status = "completed" - if any(result["status"] == "failed" for result in agent_results.values()): - overall_status = "partial_success" + fail_count = sum(1 for r in agent_results.values() if r["status"] == "failed") + if fail_count == 0: + overall_status = "completed" + elif fail_count == len(agent_results): + overall_status = "failed" + else: + overall_status = "partial_success"
3-3: Tighten agent typing to ensure async call signature.Annotate agents as
Callable[[str, str], Awaitable[Dict[str, Any]]]for clarity and static safety.-from typing import Callable, Dict, Any, List +from typing import Callable, Dict, Any, Awaitable, List @@ - self.agents: Dict[str, Callable] = {} + self.agents: Dict[str, Callable[[str, str], Awaitable[Dict[str, Any]]]] = {} @@ - def register_agent(self, name: str, agent_func: Callable): + def register_agent(self, name: str, agent_func: Callable[[str, str], Awaitable[Dict[str, Any]]]): @@ - agent_func (Callable): The asynchronous function representing the agent. + agent_func (Callable[[str, str], Awaitable[Dict[str, Any]]]): The asynchronous function representing the agent.Also applies to: 15-25
backend/app/services/report_service.py (1)
29-30: Accessor returns full record; consider a minimal, typed status view.Expose only status to callers to avoid leaking internal fields and to decouple storage shape. Option: add
get_report_status(report_id) -> str | Nonereturning just the status, or a small Pydantic model.Also, confirm runtime is Python 3.10+ for
Dict | Noneunion syntax.backend/app/api/v1/routes.py (1)
14-21: Use logger instead of print in async agents.Prefer structured logging over stdout for observability.
- print(f"Dummy Agent One running for report {report_id} and token {token_id}") + logger.info("Dummy Agent One running for report %s and token %s", report_id, token_id) @@ - print(f"Dummy Agent Two running for report {report_id} and token {token_id}") + logger.info("Dummy Agent Two running for report %s and token %s", report_id, token_id)Additionally, consider moving agent registration to an application startup event to avoid duplicate registration on reloads.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (6)
backend/__pycache__/__init__.cpython-313.pycis excluded by!**/*.pycbackend/app/core/__pycache__/orchestrator.cpython-313.pycis excluded by!**/*.pycbackend/app/services/__pycache__/report_processor.cpython-313.pycis excluded by!**/*.pycbackend/app/services/__pycache__/report_service.cpython-313.pycis excluded by!**/*.pycbackend/tests/__pycache__/test_orchestrator.cpython-313-pytest-8.4.2.pycis excluded by!**/*.pycbackend/tests/__pycache__/test_report_processor.cpython-313-pytest-8.4.2.pycis excluded by!**/*.pyc
📒 Files selected for processing (3)
backend/app/api/v1/routes.py(2 hunks)backend/app/core/orchestrator.py(2 hunks)backend/app/services/report_service.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
backend/app/api/v1/routes.py (1)
backend/app/services/report_service.py (2)
generate_report(11-20)get_report_status_from_memory(29-30)
🪛 Ruff (0.14.1)
backend/app/core/orchestrator.py
36-36: Redundant exception object included in logging.exception call
(TRY401)
38-38: Local variable has_failed_agent is assigned to but never used
Remove assignment to unused variable has_failed_agent
(F841)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
backend/app/core/orchestrator.py (3)
26-40: Bind timeouts at scheduling; iterate with as_completed; don’t swallow cancellations.Loop-based
wait_for(..., timeout=10)grants each agent extra time relative to when it’s awaited, not when scheduled. Bind the timeout at task creation and process tasks as they finish. Also re-raiseCancelledErrorand downgrade timeouts towarning.Apply:
- tasks = {name: asyncio.create_task(agent_func(report_id, token_id)) for name, agent_func in self.agents.items()} - results = {} - - for name, task in tasks.items(): - try: - result = await asyncio.wait_for(task, timeout=10) # Added timeout - results[name] = {"status": "completed", "data": result} - except asyncio.TimeoutError: # Handle timeout specifically - logger.exception("Agent %s timed out for report %s", name, report_id) - results[name] = {"status": "failed", "error": "Agent timed out"} - except Exception as e: - logger.exception("Agent %s failed for report %s", name, report_id) - results[name] = {"status": "failed", "error": str(e)} + task_to_name = { + asyncio.create_task(asyncio.wait_for(agent_func(report_id, token_id), timeout=self.agent_timeout)): name + for name, agent_func in self.agents.items() + } + results = {} + for task in asyncio.as_completed(task_to_name): + name = task_to_name[task] + try: + result = await task + results[name] = {"status": "completed", "data": result} + except asyncio.TimeoutError: + logger.warning("Agent %s timed out for report %s", name, report_id) + results[name] = {"status": "failed", "error": "Agent timed out"} + except asyncio.CancelledError: + # Preserve cooperative cancellation (shutdown, client disconnect, etc.) + raise + except Exception as e: + logger.exception("Agent %s failed for report %s", name, report_id) + results[name] = {"status": "failed", "error": str(e)} return results
14-16: Make per‑agent timeout configurable.Hardcoding 10s reduces flexibility. Add a parameter and store it.
Apply:
- def __init__(self): - self.agents: Dict[str, Callable] = {} + def __init__(self, agent_timeout: float = 10.0): + self.agents: Dict[str, Callable] = {} + self.agent_timeout = agent_timeout
17-24: Strengthen typing and validate agent registration.Constrain
agent_functo an async callable and fail fast on mis-registration.Apply:
- def register_agent(self, name: str, agent_func: Callable): + def register_agent(self, name: str, agent_func: Callable[[str, str], Any]): """ Registers an AI agent with the orchestrator. Args: name (str): The name of the agent. - agent_func (Callable): The asynchronous function representing the agent. + agent_func (Callable): The asynchronous function representing the agent. """ - self.agents[name] = agent_func + # Runtime guard: must be async + if not inspect.iscoroutinefunction(agent_func): + raise TypeError(f"agent_func for '{name}' must be an async function") + self.agents[name] = agent_funcAdd imports (outside this hunk):
import inspect from typing import Awaitable # if you later refine to Callable[[str, str], Awaitable[Any]]
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
backend/app/api/v1/routes.py(2 hunks)backend/app/core/orchestrator.py(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- backend/app/api/v1/routes.py
🔇 Additional comments (2)
backend/app/core/orchestrator.py (2)
66-74: The review comment's primary concern is already addressed by the code.The status endpoint at lines 47–51 correctly filters the response to
{"report_id": report_id, "status": report["status"]}, explicitly excludingagent_resultsand any internal data. The route does not expose per-agent internals to clients. No further action is required for the data exposure concern.The persistence concern (in-memory storage across multi-process workers) is valid architectural guidance but is not a bug in the code being reviewed—it relates to deployment resilience rather than this specific change.
Likely an incorrect or invalid review comment.
58-64: The review comment is incorrect; "partial_success" is the intended API contract.The test at
backend/tests/test_orchestrator.py:57explicitly assertsassert in_memory_reports[report_id]["status"] == "partial_success"for a scenario where one agent completes successfully and one fails. This status value accurately reflects a mixed outcome (partial work completed, not total failure). Changing it to "failed" would misrepresent the result and break the test.The endpoint directly returns
report["status"](routes.py:50), so the status vocabulary inin_memory_reportsis the public API contract. The existing design correctly uses:
"completed"when all agents succeed"partial_success"when some agents succeed and some fail"failed"for individual agent failuresIgnore the proposed status change to
"failed". The only reasonable suggestion from the original comment—addingstatustoaggregated_datafor convenience—is independent of the vocabulary concern.Likely an incorrect or invalid review comment.
|
the new |
Overview: This PR introduces a new API endpoint to allow users to check the current processing status of a report.
Changes
/reports/{report_id}/statuswithinapp/api/v1/routes.py.report_service.pyto fetch the report's state from the in-memory store.report_idand itsstatus(e.g., 'processing', 'completed', 'failed').HTTPExceptionif the providedreport_iddoes not exist.Summary by CodeRabbit
New Features
Bug Fixes & Improvements