-
Couldn't load subscription status.
- Fork 0
Feature: Asynchronous Report Generation for /report/generate Endpoint #15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…ve error handling
WalkthroughReplaces inline background orchestration with a BackgroundTasks-driven run_report_pipeline, moves status management into orchestrator adapter functions (set/get), adds async status and data endpoints (/reports/{id}/status, /reports/{id}/data), adds get_report_data, updates process_report to use orchestrator APIs, and adds end-to-end tests and a /health endpoint. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Client
participant API as Routes
participant BG as BackgroundTasks
participant Pipeline as run_report_pipeline
participant Orch as Orchestrator
participant Proc as process_report
participant Service as report_service
Client->>API: POST /api/v1/report/generate
API->>BG: add_task(run_report_pipeline, report_id, token)
API-->>Client: 200 {report_id}
par Background execution
BG->>Pipeline: run_report_pipeline(report_id, token)
Pipeline->>Orch: set_report_status(report_id, processing)
Pipeline->>Orch: execute_agents_concurrently(report_id, token)
Orch->>Orch: aggregate agent_results
Orch->>Orch: set_report_status(report_id, aggregated_status)
Pipeline->>Proc: (calls/processes as needed)
end
Client->>API: GET /api/v1/reports/{id}/data
API->>Service: get_report_data(report_id)
alt completed
API-->>Client: 200 {report_id, data: agent_results}
else processing
API-->>Client: 202 Accepted
else failed/terminal
API-->>Client: 422 / 500
else not found
API-->>Client: 404
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes
Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (4)
backend/tests/test_routes.py (3)
11-11: Remove unused import.The
anyiomodule is imported but never used in this test file.Apply this diff:
-import anyio
38-40: Simplify the mock function signature.The mock function doesn't use
argsorkwargsparameters, so they can be removed.Apply this diff:
- async def mock_sleep_agent(*args, **kwargs): + async def mock_sleep_agent(report_id: str, token_id: str): await asyncio.sleep(100) # Simulate a very long running task
74-74: Replace hardcoded sleep with polling or timeout mechanism.The hardcoded 6-second sleep makes the test timing-dependent and brittle. If the agents complete faster, you waste test time; if they take longer, the test fails.
Consider implementing a polling mechanism with a timeout:
# Wait for the background task to complete with polling max_wait = 10 # seconds poll_interval = 0.5 elapsed = 0 while elapsed < max_wait: status_response = client.get(f"/api/v1/reports/{report_id}/status") if status_response.json()["status"] in ["completed", "partial_success", "failed"]: break await asyncio.sleep(poll_interval) elapsed += poll_intervalbackend/app/services/report_service.py (1)
32-36: Consider distinguishing between "not found" and "not completed" cases.The function returns
Nonefor both scenarios: when a report doesn't exist and when it exists but isn't completed. This forces callers (likeget_report_data_endpointin routes.py) to make an additional call toget_report_status_from_memoryto determine whyNonewas returned.Consider one of these approaches:
Option 1: Return a status tuple
def get_report_data(report_id: str) -> tuple[Dict | None, str]: """Returns (data, status) where status is 'completed', 'processing', 'not_found', etc.""" report = in_memory_reports.get(report_id) if not report: return None, "not_found" if report.get("status") == "completed": return {"report_id": report_id, "data": report.get("agent_results")}, "completed" return None, report.get("status", "unknown")Option 2: Raise specific exceptions
def get_report_data(report_id: str) -> Dict: """Returns data dict or raises ReportNotFoundError/ReportNotCompletedError.""" report = in_memory_reports.get(report_id) if not report: raise ReportNotFoundError(report_id) if report.get("status") != "completed": raise ReportNotCompletedError(report_id, report.get("status")) return {"report_id": report_id, "data": report.get("agent_results")}
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (9)
backend/__pycache__/main.cpython-313.pycis excluded by!**/*.pycbackend/app/api/v1/__pycache__/__init__.cpython-313.pycis excluded by!**/*.pycbackend/app/api/v1/__pycache__/routes.cpython-313.pycis excluded by!**/*.pycbackend/app/core/__pycache__/config.cpython-313.pycis excluded by!**/*.pycbackend/app/core/__pycache__/orchestrator.cpython-313.pycis excluded by!**/*.pycbackend/app/services/__pycache__/report_processor.cpython-313.pycis excluded by!**/*.pycbackend/app/services/__pycache__/report_service.cpython-313.pycis excluded by!**/*.pycbackend/tests/__pycache__/test_report_processor.cpython-313-pytest-8.4.2.pycis excluded by!**/*.pycbackend/tests/__pycache__/test_routes.cpython-313-pytest-8.4.2.pycis excluded by!**/*.pyc
📒 Files selected for processing (4)
backend/app/api/v1/routes.py(3 hunks)backend/app/services/report_service.py(1 hunks)backend/main.py(1 hunks)backend/tests/test_routes.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
backend/app/api/v1/routes.py (4)
backend/app/models/report_models.py (2)
ReportRequest(4-6)ReportResponse(8-10)backend/app/services/report_service.py (3)
generate_report(11-20)get_report_status_from_memory(29-30)get_report_data(32-36)backend/app/services/report_processor.py (1)
process_report(14-50)backend/app/core/orchestrator.py (1)
execute_agents_concurrently(56-74)
backend/tests/test_routes.py (2)
backend/app/api/v1/routes.py (1)
get_report_status(43-47)backend/app/core/orchestrator.py (1)
register_agent(17-24)
🪛 Ruff (0.14.1)
backend/tests/test_routes.py
38-38: Unused function argument: args
(ARG001)
38-38: Unused function argument: kwargs
(ARG001)
🔇 Additional comments (1)
backend/main.py (1)
6-7: LGTM!The import path updates correctly align with the project's module structure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/app/core/orchestrator.py (1)
26-40: Cancel timed‑out tasks to avoid leaks; log at warning level.Timed‑out agent tasks continue running. Cancel them to free resources and avoid side effects.
for name, task in tasks.items(): try: result = await asyncio.wait_for(task, timeout=10) # Added timeout results[name] = {"status": "completed", "data": result} except asyncio.TimeoutError: # Handle timeout specifically - logger.exception("Agent %s timed out for report %s", name, report_id) + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + logger.warning("Agent %s timed out for report %s", name, report_id) results[name] = {"status": "failed", "error": "Agent timed out"}
🧹 Nitpick comments (7)
backend/tests/test_routes.py (3)
24-35: Reduce flakiness: poll status instead of fixed sleeps.Replace fixed 15s sleep and single check with a short polling loop until status != processing or timeout. This makes the test deterministic.
Example:
deadline = asyncio.get_event_loop().time() + 20 while asyncio.get_event_loop().time() < deadline: r = client.get(f"/api/v1/reports/{report_id}/status") s = r.json()["status"] if s != "processing": break await asyncio.sleep(0.5) assert s == "partial_success"Also applies to: 41-47
19-23: Prefer httpx.AsyncClient for async tests (or make tests sync).Using synchronous TestClient inside async tests can block the loop. Either:
- Convert tests to sync functions and keep TestClient, or
- Use httpx.AsyncClient with ASGITransport for true async.
8-10: Remove unused import.anyio is not used. Safe to drop.
-import anyiobackend/app/api/v1/routes.py (2)
37-39: Use logging.exception without passing the exception object.logger.exception already records the stack trace; remove redundant %s, e.
- logger.exception('Orchestrator failed for %s: %s', report_id, e) + logger.exception('Orchestrator failed for %s', report_id) @@ - logger.exception('process_report failed for %s: %s', report_id, e) + logger.exception('process_report failed for %s', report_id)Also applies to: 50-52
53-59: Optional: return 202 Accepted for async start.To reflect BackgroundTasks semantics, consider:
-@router.post("/report/generate", response_model=ReportResponse) +@router.post("/report/generate", response_model=ReportResponse, status_code=202)Update tests to expect 202.
Would you like me to update the tests to assert 202 consistently?
backend/app/core/orchestrator.py (2)
60-64: Set overall status to 'failed' when all agents fail.Distinguish all‑failed from partial success.
- overall_status = "completed" - if any(result["status"] == "failed" for result in agent_results.values()): - overall_status = "partial_success" + overall_status = "completed" + statuses = [r["status"] for r in agent_results.values()] + if all(s == "failed" for s in statuses): + overall_status = "failed" + elif any(s == "failed" for s in statuses): + overall_status = "partial_success"
65-70: Check set_report_status result to catch missed updates.Log when status wasn’t updated due to missing ID or terminal state.
- await set_report_status(report_id, { + ok = await set_report_status(report_id, { "status": overall_status, "agent_results": aggregated_data["agent_results"] }) + if not ok: + logger.warning("Failed to update status for report %s (state protected or missing).", report_id)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (4)
backend/app/api/v1/__pycache__/routes.cpython-313.pycis excluded by!**/*.pycbackend/app/core/__pycache__/orchestrator.cpython-313.pycis excluded by!**/*.pycbackend/app/services/__pycache__/report_processor.cpython-313.pycis excluded by!**/*.pycbackend/tests/__pycache__/test_routes.cpython-313-pytest-8.4.2.pycis excluded by!**/*.pyc
📒 Files selected for processing (4)
backend/app/api/v1/routes.py(3 hunks)backend/app/core/orchestrator.py(1 hunks)backend/app/services/report_processor.py(2 hunks)backend/tests/test_routes.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
backend/tests/test_routes.py (1)
backend/app/core/orchestrator.py (1)
register_agent(17-24)
backend/app/core/orchestrator.py (1)
backend/app/api/v1/routes.py (1)
get_report_status(62-66)
backend/app/services/report_processor.py (2)
backend/app/core/orchestrator.py (2)
set_report_status(75-89)get_report_status(91-95)backend/app/api/v1/routes.py (1)
get_report_status(62-66)
backend/app/api/v1/routes.py (4)
backend/app/models/report_models.py (2)
ReportRequest(4-6)ReportResponse(8-10)backend/app/core/orchestrator.py (3)
set_report_status(75-89)get_report_status(91-95)execute_agents_concurrently(56-71)backend/app/services/report_processor.py (1)
process_report(6-36)backend/app/services/report_service.py (2)
get_report_data(32-36)generate_report(11-20)
🪛 Ruff (0.14.1)
backend/tests/test_routes.py
30-30: Unused function argument: args
(ARG001)
30-30: Unused function argument: kwargs
(ARG001)
38-38: Undefined name request_payload
(F821)
45-45: Undefined name report_id
(F821)
backend/app/services/report_processor.py
20-20: Avoid specifying long messages outside the exception class
(TRY003)
26-26: Undefined name asyncio
(F821)
29-29: Consider moving this statement to an else block
(TRY300)
30-30: Undefined name asyncio
(F821)
backend/app/api/v1/routes.py
37-37: Redundant exception object included in logging.exception call
(TRY401)
50-50: Redundant exception object included in logging.exception call
(TRY401)
55-55: Undefined name generate_report
(F821)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/tests/test_report_processor.py (1)
33-35: Adjust expected error messageprocess_report now raises “already in a terminal or processing state”. Update the regex.
- with pytest.raises(ValueError, match=f"Report {report_id} is already being processed"): + with pytest.raises(ValueError, match=f"Report {report_id} is already in a terminal or processing state"):
♻️ Duplicate comments (1)
backend/tests/test_routes.py (1)
24-51: Fix flakiness: 'partial_success' cannot occur after 0.1s with a 10s agent timeoutThe background task adds agents with a 10-second timeout. After only 0.1s, those agents are still executing and haven't timed out yet. The orchestrator only transitions to "partial_success" after processing failed/timed-out agent results, which requires the full ~10s timeout period. The status will still be "processing" at the 0.1s mark.
Apply the fast fix:
@@ - await asyncio.sleep(0.1) # Allow background tasks to start + await asyncio.sleep(0.1) # Allow background task to start @@ - # Now, check the status, expecting partial_success due to agent timeouts + # Now, check the status; with 10s timeout, this should still be "processing" status_response = client.get(f"/api/v1/reports/{report_id}/status") assert status_response.status_code == 200 - assert status_response.json()["status"] == "partial_success" + assert status_response.json()["status"] == "processing"
🧹 Nitpick comments (7)
backend/tests/test_routes.py (2)
29-31: Silence lint: remove unused args in mock agentUse explicit parameters to avoid ARG001 while keeping compatibility.
- async def mock_sleep_agent(*args, **kwargs): + async def mock_sleep_agent(report_id: str, token_id: str): await asyncio.sleep(100) # Simulate a very long running task
10-15: Fixture decorator nitThis fixture is synchronous; prefer
@pytest.fixture(autouse=True)for clarity, or make itasync def. Behavior is unchanged either way.backend/app/core/orchestrator.py (2)
26-40: Consider configurable timeouts and faster aggregationNot blocking, but for testability and perf:
- Accept a timeout parameter (default from settings) instead of hardcoded 10s.
- Use asyncio.as_completed to record results as they finish.
If desired, I can draft a follow-up patch.
60-64: Consider 'failed' if all agents failCurrently any failure sets 'partial_success', even if all failed. Optionally set 'failed' when none succeed.
backend/app/api/v1/routes.py (2)
37-39: Logging nit: remove redundant exception from logger.exceptionlogger.exception already includes the exception info.
- except Exception as e: - logger.exception('Orchestrator failed for %s: %s', report_id, e) - await set_report_status(report_id, {'status': 'failed', 'reason': str(e)}) + except Exception as e: + logger.exception('Orchestrator failed for %s', report_id) + await set_report_status(report_id, {'status': 'failed', 'reason': str(e)})
53-59: Return 202 Accepted for async report creation (optional but idiomatic)Consider signaling async processing with 202.
-@router.post("/report/generate", response_model=ReportResponse) +@router.post("/report/generate", response_model=ReportResponse, status_code=202) async def generate_report_endpoint(request: ReportRequest, background_tasks: BackgroundTasks): @@ - return report_response + return report_responsebackend/tests/test_report_processor.py (1)
17-23: Test depends on upsert behaviorThis test will only pass if set_report_status allows creating entries, since process_report doesn’t seed in_memory_reports. With the proposed upsert in set_report_status, it’ll pass. Alternatively, seed the report first via generate_report or by writing to in_memory_reports.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (6)
backend/__pycache__/main.cpython-313.pycis excluded by!**/*.pycbackend/app/api/v1/__pycache__/routes.cpython-313.pycis excluded by!**/*.pycbackend/app/core/__pycache__/orchestrator.cpython-313.pycis excluded by!**/*.pycbackend/app/services/__pycache__/report_processor.cpython-313.pycis excluded by!**/*.pycbackend/tests/__pycache__/test_report_processor.cpython-313-pytest-8.4.2.pycis excluded by!**/*.pycbackend/tests/__pycache__/test_routes.cpython-313-pytest-8.4.2.pycis excluded by!**/*.pyc
📒 Files selected for processing (6)
backend/app/api/v1/routes.py(2 hunks)backend/app/core/orchestrator.py(2 hunks)backend/app/services/report_processor.py(2 hunks)backend/main.py(1 hunks)backend/tests/test_report_processor.py(6 hunks)backend/tests/test_routes.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
backend/tests/test_report_processor.py (2)
backend/app/services/report_processor.py (1)
process_report(7-37)backend/app/core/orchestrator.py (2)
set_report_status(75-89)get_report_status(91-95)
backend/app/services/report_processor.py (1)
backend/app/core/orchestrator.py (2)
set_report_status(75-89)get_report_status(91-95)
backend/app/api/v1/routes.py (4)
backend/app/models/report_models.py (2)
ReportRequest(4-6)ReportResponse(8-10)backend/app/core/orchestrator.py (3)
set_report_status(75-89)get_report_status(91-95)execute_agents_concurrently(56-71)backend/app/services/report_processor.py (1)
process_report(7-37)backend/app/services/report_service.py (2)
generate_report(11-20)get_report_data(32-36)
backend/tests/test_routes.py (1)
backend/app/core/orchestrator.py (1)
register_agent(17-24)
🪛 Ruff (0.14.1)
backend/app/services/report_processor.py
21-21: Avoid specifying long messages outside the exception class
(TRY003)
30-30: Consider moving this statement to an else block
(TRY300)
backend/app/api/v1/routes.py
37-37: Redundant exception object included in logging.exception call
(TRY401)
50-50: Redundant exception object included in logging.exception call
(TRY401)
backend/tests/test_routes.py
29-29: Unused function argument: args
(ARG001)
29-29: Unused function argument: kwargs
(ARG001)
🔇 Additional comments (5)
backend/main.py (1)
2-14: LGTM: app wiring and health endpointRouter inclusion and health check look correct. No blockers.
backend/app/services/report_processor.py (1)
31-36: No change needed; logging is fineException branch records failure and re-raises. Good.
backend/tests/test_report_processor.py (3)
52-54: Cancellation status assertion OK (after upsert change)Will pass once set_report_status creates missing entries.
69-71: Failure path assertion OK (after upsert change)Same note: requires set_report_status to upsert.
79-83: Upserting makes this test meaningfulWith upsert, this round-trip verifies get/set behavior as intended.
| async def set_report_status(report_id: str, status_info: Dict[str, Any]) -> bool: | ||
| """ | ||
| Sets the status of a report in in_memory_reports, preventing overwrites of terminal statuses. | ||
| """ | ||
| if report_id not in in_memory_reports: | ||
| logger.warning("Report ID %s not found in in_memory_reports.", report_id) | ||
| return False | ||
|
|
||
| current_status = in_memory_reports[report_id].get("status") | ||
| if current_status in ("failed", "cancelled", "partial_success"): | ||
| logger.info("Not overwriting terminal status for %s: %s", report_id, current_status) | ||
| return False | ||
|
|
||
| in_memory_reports[report_id].update(status_info) | ||
| return True | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Make status updates robust: create missing entries and treat 'completed' as terminal
Current behavior rejects unknown IDs and allows overwriting 'completed'. This breaks tests and can regress finished reports. Create on first write and protect 'completed'.
async def set_report_status(report_id: str, status_info: Dict[str, Any]) -> bool:
- """
- Sets the status of a report in in_memory_reports, preventing overwrites of terminal statuses.
- """
- if report_id not in in_memory_reports:
- logger.warning("Report ID %s not found in in_memory_reports.", report_id)
- return False
+ """
+ Upserts the status of a report in in_memory_reports, preventing overwrites of terminal statuses.
+ Creates the entry if it does not exist.
+ """
+ if report_id not in in_memory_reports:
+ in_memory_reports[report_id] = {"report_id": report_id}
current_status = in_memory_reports[report_id].get("status")
- if current_status in ("failed", "cancelled", "partial_success"):
+ if current_status in ("failed", "cancelled", "partial_success", "completed"):
logger.info("Not overwriting terminal status for %s: %s", report_id, current_status)
return False
in_memory_reports[report_id].update(status_info)
return TrueI'll verify the review comment by examining the code, tests, and usage patterns.
🏁 Script executed:
#!/bin/bash
# Examine the current implementation of set_report_status
cat backend/app/core/orchestrator.py | sed -n '75,90p'Length of output: 660
🏁 Script executed:
#!/bin/bash
# Find test files related to orchestrator
fd -e py -e test | grep -i orchestrator | head -20Length of output: 67
🏁 Script executed:
#!/bin/bash
# Search for all calls to set_report_status in the codebase
rg -n "set_report_status" --type pyLength of output: 1469
🏁 Script executed:
#!/bin/bash
# Search for "completed" status references to understand if it's used
rg -n '"completed"' --type py -C 2Length of output: 5068
🏁 Script executed:
#!/bin/bash
# Search for in_memory_reports usage to understand the data structure
rg -n "in_memory_reports\[" --type py -C 2 | head -50Length of output: 3831
Let me examine the test file more closely to understand the reported failure:Now let me examine test_report_processor.py more closely:
🏁 Script executed:
#!/bin/bash
# Examine test_report_processor.py around line 79
cat backend/tests/test_report_processor.py | head -100Length of output: 3327
🏁 Script executed:
#!/bin/bash
# Check if there are any failing tests related to set_report_status
rg -n "pytest\|assert\|def test" backend/tests/test_report_processor.py -B2 -A5Length of output: 0
🏁 Script executed:
#!/bin/bash
# Look at how create_report is called
rg -n "create_report\|in_memory_reports" backend/tests/test_report_processor.py -C 3Length of output: 0
Add auto-creation and protect 'completed' status in set_report_status
The current implementation causes test failures and status regression:
- Tests call
set_report_statuson unknown IDs (e.g.,test_get_report_statusline 79,process_reportstart) but current code returnsFalseinstead of creating the entry. "completed"status is actively used (orchestrator.py:61, report_processor.py:28) and treated as terminal in business logic (report_processor.py:20:"completed"is in terminal check), but current code allows overwrites.
Apply the suggested changes to create missing entries and protect "completed" as terminal.
async def set_report_status(report_id: str, status_info: Dict[str, Any]) -> bool:
- """
- Sets the status of a report in in_memory_reports, preventing overwrites of terminal statuses.
- """
- if report_id not in in_memory_reports:
- logger.warning("Report ID %s not found in in_memory_reports.", report_id)
- return False
+ """
+ Upserts the status of a report in in_memory_reports, preventing overwrites of terminal statuses.
+ Creates the entry if it does not exist.
+ """
+ if report_id not in in_memory_reports:
+ in_memory_reports[report_id] = {"report_id": report_id}
current_status = in_memory_reports[report_id].get("status")
- if current_status in ("failed", "cancelled", "partial_success"):
+ if current_status in ("failed", "cancelled", "partial_success", "completed"):
logger.info("Not overwriting terminal status for %s: %s", report_id, current_status)
return False
in_memory_reports[report_id].update(status_info)
return True🤖 Prompt for AI Agents
In backend/app/core/orchestrator.py around lines 75 to 90, modify
set_report_status so that if report_id is missing it auto-creates an entry in
in_memory_reports before applying status_info (instead of returning False), and
treat "completed" as a terminal status by adding it to the terminal-check (i.e.,
consider ("failed", "cancelled", "partial_success", "completed")
non-overwritable); ensure you still log the creation case and return True after
updating the entry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/app/core/orchestrator.py (1)
30-39: Cancel timed-out tasks to avoid zombie coroutinesAfter asyncio.wait_for timeout, the task continues running unless cancelled. Cancel it and swallow CancelledError.
- except asyncio.TimeoutError: # Handle timeout specifically - logger.exception("Agent %s timed out for report %s", name, report_id) + except asyncio.TimeoutError: # Handle timeout specifically + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + logger.warning("Agent %s timed out for report %s", name, report_id) results[name] = {"status": "failed", "error": "Agent timed out"}
🧹 Nitpick comments (4)
backend/app/api/v1/routes.py (2)
32-36: Derive token_id from stored state instead of trusting request body in backgroundTo avoid mismatches and reduce sensitive data propagation, resolve token_id from in_memory state before orchestration; pass only report_id to the background task.
-async def run_report_pipeline(report_id: str, token_id: str): +async def run_report_pipeline(report_id: str) -> None: + current = await get_status(report_id) + if not current: + await set_report_status(report_id, {'status': 'failed', 'reason': 'report not found'}) + logger.error('Report %s not found in store', report_id) + return + token_id = current.get('token_id') - background_tasks.add_task(run_report_pipeline, report_id, request.token_id) + background_tasks.add_task(run_report_pipeline, report_id)Also applies to: 55-60
15-17: Use logger instead of print in async agentsPrefer logger for consistency and production observability.
- print(f"Dummy Agent One running for report {report_id} and token {token_id}") + logger.info("Dummy Agent One running for report %s and token %s", report_id, token_id) @@ - print(f"Dummy Agent Two running for report {report_id} and token {token_id}") + logger.info("Dummy Agent Two running for report %s and token %s", report_id, token_id)Also applies to: 20-22
backend/app/core/orchestrator.py (2)
61-64: Centralize status constants to prevent driftStatuses ('processing', 'completed', 'partial_success', 'failed', 'cancelled') appear in multiple modules. Use an Enum and helpers (e.g., is_terminal) to avoid typos and scattered logic.
Example:
from enum import StrEnum, auto class ReportStatus(StrEnum): processing = auto() completed = auto() partial_success = auto() failed = auto() cancelled = auto() TERMINAL = {ReportStatus.completed, ReportStatus.partial_success, ReportStatus.failed, ReportStatus.cancelled} def is_terminal(status: str) -> bool: return status in TERMINALThen use ReportStatus.completed instead of string literals and TERMINAL in terminal checks.
Also applies to: 84-86
75-96: Consider making set/get status synchronousThese functions don’t perform I/O; making them sync reduces unnecessary awaits and simplifies call sites. If you keep async for future-proofing, add a brief rationale in docstrings.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
backend/app/api/v1/routes.py(2 hunks)backend/app/core/orchestrator.py(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
backend/app/api/v1/routes.py (4)
backend/app/models/report_models.py (2)
ReportRequest(4-6)ReportResponse(8-10)backend/app/core/orchestrator.py (3)
set_report_status(75-90)get_report_status(92-96)execute_agents_concurrently(56-71)backend/app/services/report_processor.py (1)
process_report(7-37)backend/app/services/report_service.py (2)
generate_report(11-20)get_report_data(32-36)
🪛 Ruff (0.14.1)
backend/app/api/v1/routes.py
37-37: Redundant exception object included in logging.exception call
(TRY401)
🔇 Additional comments (2)
backend/app/api/v1/routes.py (1)
69-85: LGTM: clear, state-aware data endpoint responsesThe mapping to 202/404/422/500 is precise and avoids the earlier 404 ambiguity. Nice.
backend/app/core/orchestrator.py (1)
65-69: LGTM: guarded status update via adapterGood move replacing direct in_memory_reports mutation with set_report_status, keeping terminal-state protection centralized.
| except Exception as e: | ||
| logger.exception('Orchestrator failed for %s: %s', report_id, e) | ||
| await set_report_status(report_id, {'status': 'failed', 'reason': str(e)}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix logger.exception usage (TRY401)
Avoid passing the exception object to logger.exception; it already logs the stack trace.
- except Exception as e:
- logger.exception('Orchestrator failed for %s: %s', report_id, e)
+ except Exception as e:
+ logger.exception('Orchestrator failed for %s', report_id)
await set_report_status(report_id, {'status': 'failed', 'reason': str(e)})🧰 Tools
🪛 Ruff (0.14.1)
37-37: Redundant exception object included in logging.exception call
(TRY401)
🤖 Prompt for AI Agents
In backend/app/api/v1/routes.py around lines 36 to 38, the logger.exception call
currently includes the exception object as a formatting argument; remove the
exception object from the call so the stack trace is not passed as a formatted
string. Change the call to log only the contextual message (e.g.,
logger.exception('Orchestrator failed for %s', report_id)) and keep the await
set_report_status(...) unchanged so the error reason still records str(e).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
backend/app/api/v1/routes.py (1)
31-41: Consider improving documentation and observability.The function correctly orchestrates the background pipeline, but could benefit from:
- Removing the "1)" numbering on line 32 (leftover from prior refactoring)
- Adding a docstring explaining the function's purpose and parameters
- Logging successful completion for better observability
Apply this diff to improve clarity and observability:
async def run_report_pipeline(report_id: str, token_id: str): - # 1) Orchestrate agents and handle errors + """ + Execute the report generation pipeline in the background. + Orchestrates concurrent agent execution and updates status on failure. + """ try: await orchestrator.execute_agents_concurrently(report_id, token_id) + logger.info('Report pipeline completed successfully for %s', report_id) except Exception as e: logger.exception('Orchestrator failed for %s', report_id) await set_report_status(report_id, {'status': 'failed', 'reason': str(e)}) return - # The orchestrator sets the terminal status, so there's nothing to do after it. - return
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (4)
backend/__pycache__/main.cpython-313.pycis excluded by!**/*.pycbackend/app/api/v1/__pycache__/routes.cpython-313.pycis excluded by!**/*.pycbackend/app/core/__pycache__/orchestrator.cpython-313.pycis excluded by!**/*.pycbackend/tests/__pycache__/test_routes.cpython-313-pytest-8.4.2.pycis excluded by!**/*.pyc
📒 Files selected for processing (1)
backend/app/api/v1/routes.py(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
backend/app/api/v1/routes.py (3)
backend/app/models/report_models.py (2)
ReportRequest(4-6)ReportResponse(8-10)backend/app/core/orchestrator.py (3)
set_report_status(75-90)get_report_status(92-96)execute_agents_concurrently(56-71)backend/app/services/report_service.py (2)
generate_report(11-20)get_report_data(32-36)
🔇 Additional comments (3)
backend/app/api/v1/routes.py (3)
42-48: LGTM!The endpoint correctly implements asynchronous report generation using FastAPI's BackgroundTasks. The approach schedules the pipeline as a single background task and returns immediately, which aligns with the PR objectives and addresses the previous race condition concerns.
50-55: LGTM!The endpoint properly retrieves status using the async orchestrator function and avoids the name shadowing issue from previous reviews by using the
_endpointsuffix naming convention.
57-73: LGTM!The endpoint correctly handles different report states with appropriate HTTP status codes, addressing the previous feedback about misleading 404 responses. The distinction between "report not found" (404), "still processing" (202), "processing failed" (422), and "unexpected state" (500) provides clear semantics for API consumers.
|
the shift to asynchronous processing in |
Overview: The
/report/generateendpoint now processes reports asynchronously using FastAPI'sBackgroundTasks.Changes
/report/generateendpoint inbackend/app/api/v1/routes.pyto acceptBackgroundTasks.process_reportfrombackend/app/services/report_processor.pyas a background task.Summary by CodeRabbit
New Features
Bug Fixes / Reliability
Refactor
Tests