-
Couldn't load subscription status.
- Fork 0
Feat: Implement concurrent agent execution with asyncio.gather #8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughAdds an Orchestrator to register and run agents concurrently, starts background agent execution from report generation endpoint, exposes a report status endpoint, and provides persistence for aggregated agent results; includes tests for success and failure agent runs. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant API as generate_report_endpoint
participant Orch as Orchestrator
participant Agents as Agents
participant RS as ReportService
participant Status as get_report_status
Client->>API: POST /report
API->>API: generate_report() -> report_id
API-->>Client: 200 OK (report_id)
Note over API,Orch: Background task: execute_agents_concurrently
API->>Orch: execute_agents_concurrently(report_id, token_id)
activate Orch
par Parallel agent runs
Orch->>Agents: _run_agent_safely(AgentOne)
Orch->>Agents: _run_agent_safely(AgentTwo)
end
Agents-->>Orch: results / exceptions
Orch->>RS: save_report_data(report_id, aggregated_results)
deactivate Orch
loop Polling
Client->>Status: GET /report_status/{report_id}
Status-->>Client: current report data or 404
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
🧹 Nitpick comments (2)
backend/tests/test_orchestrator.py (1)
13-63: Comprehensive test coverage for success and failure scenarios.Both tests properly validate:
- Agent function calls with correct arguments
- Result aggregation structure
- Status updates in in_memory_reports
- Error handling and propagation
Consider adding these additional test cases for more complete coverage:
@pytest.mark.asyncio async def test_execute_agents_concurrently_no_agents(): """Test behavior when no agents are registered.""" orchestrator = Orchestrator() report_id = "test_report_id_empty" token_id = "test_token_id" in_memory_reports[report_id] = {"token_id": token_id, "status": "processing"} await orchestrator.execute_agents_concurrently(report_id, token_id) assert in_memory_reports[report_id]["status"] == "completed" assert in_memory_reports[report_id]["agent_results"] == {} @pytest.mark.asyncio async def test_execute_agents_concurrently_all_fail(): """Test behavior when all agents fail.""" orchestrator = Orchestrator() mock_agent_one = AsyncMock(side_effect=Exception("Agent 1 failed")) mock_agent_two = AsyncMock(side_effect=Exception("Agent 2 failed")) orchestrator.register_agent("AgentOne", mock_agent_one) orchestrator.register_agent("AgentTwo", mock_agent_two) report_id = "test_report_id_all_fail" token_id = "test_token_id" in_memory_reports[report_id] = {"token_id": token_id, "status": "processing"} await orchestrator.execute_agents_concurrently(report_id, token_id) assert in_memory_reports[report_id]["status"] == "completed" assert in_memory_reports[report_id]["agent_results"]["AgentOne"]["status"] == "failed" assert in_memory_reports[report_id]["agent_results"]["AgentTwo"]["status"] == "failed"backend/app/core/orchestrator.py (1)
13-18: Consider using dict comprehension to simplify agent task creation.The parallel
agent_namesandagent_taskslists can be streamlined by iterating over items directly during result processing.async def execute_agents_concurrently(self, report_id: str, token_id: str): - agent_tasks = [] - agent_names = [] - - for name, agent_func in self.registered_agents.items(): - agent_names.append(name) - agent_tasks.append(self._run_agent_safely(name, agent_func, report_id, token_id)) + agent_tasks = { + name: self._run_agent_safely(name, agent_func, report_id, token_id) + for name, agent_func in self.registered_agents.items() + } - results = await asyncio.gather(*agent_tasks, return_exceptions=True) + results = await asyncio.gather(*agent_tasks.values(), return_exceptions=True) aggregated_results = {} - for i, result in enumerate(results): - agent_name = agent_names[i] + for agent_name, result in zip(agent_tasks.keys(), results): if isinstance(result, Exception):Alternatively, use
asyncio.TaskGroup(Python 3.11+) for more structured concurrency:async def execute_agents_concurrently(self, report_id: str, token_id: str): async with asyncio.TaskGroup() as tg: tasks = { name: tg.create_task(self._run_agent_safely(name, agent_func, report_id, token_id)) for name, agent_func in self.registered_agents.items() } aggregated_results = {} for name, task in tasks.items(): try: result = task.result() aggregated_results[name] = {"status": "completed", "data": result} except Exception as e: logger.error(f"Agent '{name}' failed with error: {e}") aggregated_results[name] = {"status": "failed", "error": str(e)}
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (7)
backend/__pycache__/__init__.cpython-313.pycis excluded by!**/*.pycbackend/app/__pycache__/__init__.cpython-313.pycis excluded by!**/*.pycbackend/app/core/__pycache__/orchestrator.cpython-313.pycis excluded by!**/*.pycbackend/app/models/__pycache__/report_models.cpython-313.pycis excluded by!**/*.pycbackend/app/services/__pycache__/report_service.cpython-313.pycis excluded by!**/*.pycbackend/app/utils/__pycache__/id_generator.cpython-313.pycis excluded by!**/*.pycbackend/tests/__pycache__/test_orchestrator.cpython-313-pytest-8.4.2.pycis excluded by!**/*.pyc
📒 Files selected for processing (6)
backend/app/api/v1/routes.py(1 hunks)backend/app/core/orchestrator.py(1 hunks)backend/app/services/report_service.py(2 hunks)backend/main.py(1 hunks)backend/tests/test_orchestrator.py(1 hunks)main.py(0 hunks)
💤 Files with no reviewable changes (1)
- main.py
🧰 Additional context used
🧬 Code graph analysis (4)
backend/app/core/orchestrator.py (1)
backend/app/services/report_service.py (1)
save_report_data(19-24)
backend/tests/test_orchestrator.py (1)
backend/app/core/orchestrator.py (3)
Orchestrator(5-38)register_agent(9-10)execute_agents_concurrently(12-31)
backend/app/services/report_service.py (2)
backend/app/models/report_models.py (2)
ReportRequest(4-6)ReportResponse(8-10)backend/app/utils/id_generator.py (1)
generate_report_id(3-8)
backend/app/api/v1/routes.py (3)
backend/app/models/report_models.py (2)
ReportRequest(4-6)ReportResponse(8-10)backend/app/services/report_service.py (1)
generate_report(8-17)backend/app/core/orchestrator.py (2)
register_agent(9-10)execute_agents_concurrently(12-31)
🪛 Ruff (0.14.1)
backend/tests/test_orchestrator.py
23-23: Possible hardcoded password assigned to: "token_id"
(S105)
48-48: Possible hardcoded password assigned to: "token_id"
(S105)
🔇 Additional comments (4)
backend/app/services/report_service.py (1)
1-2: LGTM! Import paths updated correctly.The import paths have been updated to use the
backend.app.*namespace, which aligns with the module reorganization.backend/app/api/v1/routes.py (1)
9-22: Dummy agents look good for demonstration purposes.The two dummy agents are properly structured as async functions with appropriate signatures. The simulated async work with
asyncio.sleep()effectively demonstrates concurrent execution.backend/tests/test_orchestrator.py (1)
7-11: LGTM! Fixture properly manages test state.The autouse fixture ensures
in_memory_reportsis cleared before and after each test, preventing test pollution.backend/app/core/orchestrator.py (1)
5-10: LGTM! Clean orchestrator initialization and registration.The class structure is clear, and the
register_agentmethod properly types the agent function signature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (3)
backend/app/api/v1/routes.py (2)
13-21: Replaceprint()with proper logging in dummy agents.For consistency with the rest of the codebase and better observability, use the logger instead of
print()statements.Apply this diff:
async def dummy_agent_one(report_id: str, token_id: str) -> dict: - print(f"Dummy Agent One running for report {report_id} and token {token_id}") + logger.info("Dummy Agent One running for report %s and token %s", report_id, token_id) await asyncio.sleep(1) # Simulate async work return {"agent_one_data": "data_from_agent_one"} async def dummy_agent_two(report_id: str, token_id: str) -> dict: - print(f"Dummy Agent Two running for report {report_id} and token {token_id}") + logger.info("Dummy Agent Two running for report %s and token %s", report_id, token_id) await asyncio.sleep(0.5) # Simulate async work return {"agent_two_data": "data_from_agent_two"}
37-43: Remove redundant exception parameter fromlogger.exception.The
logger.exception()method automatically logs the current exception, so passing the exception object as a parameter is redundant.Apply this diff:
def _on_done(t: asyncio.Task): try: t.result() except Exception as e: - logger.exception('Background orchestration failed for %s: %s', report_id, e) + logger.exception('Background orchestration failed for %s', report_id) # Optionally update report status to failed here as wellBased on static analysis hints.
backend/app/core/orchestrator.py (1)
26-32: Consider simplifying theexc_infoparameter.The current usage of
exc_info=isinstance(result, BaseException)is technically correct but could be simplified. Since the exception is already logged with full traceback in_run_agent_safely(line 53), this orchestration-level log serves as a summary and doesn't need theexc_infoparameter.Apply this diff:
for i, result in enumerate(results): agent_name = agent_names[i] if isinstance(result, Exception): - logger.error("Agent '%s' failed with error: %s", agent_name, result, exc_info=isinstance(result, BaseException)) + logger.error("Agent '%s' failed with error: %s", agent_name, result) aggregated_results[agent_name] = {'status': 'failed', 'error': str(result)} else: aggregated_results[agent_name] = {'status': 'completed', 'data': result}
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
backend/app/api/v1/routes.py(1 hunks)backend/app/core/orchestrator.py(1 hunks)backend/app/services/report_service.py(2 hunks)backend/tests/test_orchestrator.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
backend/app/services/report_service.py (2)
backend/app/models/report_models.py (2)
ReportRequest(4-6)ReportResponse(8-10)backend/app/utils/id_generator.py (1)
generate_report_id(3-8)
backend/app/core/orchestrator.py (1)
backend/app/services/report_service.py (1)
save_report_data(22-27)
backend/app/api/v1/routes.py (3)
backend/app/models/report_models.py (2)
ReportRequest(4-6)ReportResponse(8-10)backend/app/services/report_service.py (1)
generate_report(11-20)backend/app/core/orchestrator.py (2)
register_agent(12-13)execute_agents_concurrently(15-47)
backend/tests/test_orchestrator.py (1)
backend/app/core/orchestrator.py (3)
Orchestrator(8-54)register_agent(12-13)execute_agents_concurrently(15-47)
🪛 Ruff (0.14.1)
backend/app/api/v1/routes.py
41-41: Redundant exception object included in logging.exception call
(TRY401)
backend/tests/test_orchestrator.py
22-22: Possible hardcoded password assigned to: "token_id"
(S105)
47-47: Possible hardcoded password assigned to: "token_id"
(S105)
🔇 Additional comments (11)
backend/app/services/report_service.py (2)
1-6: LGTM!The logging setup follows best practices, and the import path updates are correct.
22-27: LGTM!The
save_report_datafunction correctly handles both success and missing report ID cases with proper logging.backend/tests/test_orchestrator.py (2)
6-10: LGTM!The autouse fixture ensures proper test isolation by cleaning up
in_memory_reportsbefore and after each test.
12-35: LGTM!The test correctly validates successful concurrent agent execution, including proper agent invocation and result aggregation.
backend/app/api/v1/routes.py (2)
1-8: LGTM!The imports and logger initialization are properly configured.
46-50: LGTM!The status endpoint correctly validates the report ID and returns appropriate responses.
backend/app/core/orchestrator.py (5)
1-13: LGTM!The class initialization and agent registration logic are clean and well-typed.
15-23: LGTM!The concurrent task execution using
asyncio.gatherwithreturn_exceptions=Truecorrectly enables parallel agent execution with graceful error handling.
34-47: LGTM!The overall status determination logic correctly handles all scenarios (complete success, partial failure, complete failure), and the summary provides useful metrics.
49-54: LGTM!The safe agent execution wrapper properly logs exceptions with full traceback before re-raising them for
asyncio.gatherto handle.
56-56: LGTM!The module-level orchestrator instance provides a clean singleton pattern for agent registration and execution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
backend/tests/test_orchestrator.py (2)
12-35: Test logic is correct and validates the success path.The test properly validates that both agents execute concurrently and their results are aggregated correctly.
Optional enhancement: Consider asserting the
summaryfield for more complete coverage:assert in_memory_reports[report_id]["agent_results"]["AgentTwo"] == {"status": "completed", "data": {"agent_two_result": "data2"}} + assert in_memory_reports[report_id]["summary"] == {"total": 2, "success": 2, "failed": 0}
37-62: Test correctly validates the partial success scenario.The test properly handles the case where one agent succeeds and another fails. Line 57 correctly asserts
"partial_success"status, which aligns with the orchestrator logic. The past review concern has been addressed.Optional enhancement: Consider asserting the
summaryfield for more complete coverage:assert "Agent failed" in in_memory_reports[report_id]["agent_results"]["AgentFailing"]["error"] + assert in_memory_reports[report_id]["summary"] == {"total": 2, "success": 1, "failed": 1}
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (4)
backend/__pycache__/__init__.cpython-313.pycis excluded by!**/*.pycbackend/app/core/__pycache__/orchestrator.cpython-313.pycis excluded by!**/*.pycbackend/app/services/__pycache__/report_service.cpython-313.pycis excluded by!**/*.pycbackend/tests/__pycache__/test_orchestrator.cpython-313-pytest-8.4.2.pycis excluded by!**/*.pyc
📒 Files selected for processing (1)
backend/tests/test_orchestrator.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
backend/tests/test_orchestrator.py (1)
backend/app/core/orchestrator.py (3)
Orchestrator(8-54)register_agent(12-13)execute_agents_concurrently(15-47)
🪛 Ruff (0.14.1)
backend/tests/test_orchestrator.py
22-22: Possible hardcoded password assigned to: "token_id"
(S105)
47-47: Possible hardcoded password assigned to: "token_id"
(S105)
🔇 Additional comments (1)
backend/tests/test_orchestrator.py (1)
6-10: LGTM! Good test isolation.The autouse fixture correctly clears the shared
in_memory_reportsstate before and after each test, ensuring proper test isolation.
|
Nice, the concurrent execution should speed things up a lot. Approved! |
Overview: This PR introduces concurrent execution for agents to significantly improve report generation performance.
Changes
execute_agents_concurrentlymethod toapp/core/orchestrator.py.asyncio.gatherto run multiple registered agents in parallel.save_report_data().Summary by CodeRabbit
New Features
Tests