Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 35 additions & 52 deletions backend/app/core/orchestrator.py
Original file line number Diff line number Diff line change
@@ -1,56 +1,39 @@
import asyncio
import logging
from typing import Dict, Callable, Awaitable
from backend.app.services.report_service import save_report_data

logger = logging.getLogger(__name__)
class AIOrchestrator:
"""
Base class for coordinating multiple AI agents.
Designed to handle parallel asynchronous agent calls.
"""

class Orchestrator:
def __init__(self):
self.registered_agents: Dict[str, Callable[[str, str], Awaitable[Dict]]] = {}

def register_agent(self, name: str, agent_func: Callable[[str, str], Awaitable[Dict]]):
self.registered_agents[name] = agent_func

async def execute_agents_concurrently(self, report_id: str, token_id: str):
agent_tasks = []
agent_names = []

for name, agent_func in self.registered_agents.items():
agent_names.append(name)
agent_tasks.append(self._run_agent_safely(name, agent_func, report_id, token_id))

results = await asyncio.gather(*agent_tasks, return_exceptions=True)

aggregated_results = {}
for i, result in enumerate(results):
agent_name = agent_names[i]
if isinstance(result, Exception):
logger.error("Agent '%s' failed with error: %s", agent_name, result, exc_info=isinstance(result, BaseException))
aggregated_results[agent_name] = {'status': 'failed', 'error': str(result)}
else:
aggregated_results[agent_name] = {'status': 'completed', 'data': result}

failed_count = sum(1 for r in aggregated_results.values() if r['status'] == 'failed')
total = len(aggregated_results)
if failed_count == total:
overall_status = 'failed'
elif failed_count > 0:
overall_status = 'partial_success'
else:
overall_status = 'completed'

await save_report_data(report_id, {
'agent_results': aggregated_results,
'status': overall_status,
'summary': {'total': total, 'success': total - failed_count, 'failed': failed_count}
})

async def _run_agent_safely(self, name: str, agent_func: Callable[[str, str], Awaitable[Dict]], report_id: str, token_id: str) -> Dict:
try:
return await agent_func(report_id, token_id)
except Exception as e:
logger.error("Error running agent '%s': %s", name, e, exc_info=True)
raise # Re-raise to be caught by asyncio.gather

orchestrator = Orchestrator()
self.agents = []

def register_agent(self, agent):
"""
Registers an AI agent with the orchestrator.
Args:
agent: An instance of an AI agent.
"""
raise NotImplementedError

async def execute_agents(self, *args, **kwargs):
"""
Executes all registered AI agents in parallel asynchronously.
Args:
*args: Variable length argument list for agent execution.
**kwargs: Arbitrary keyword arguments for agent execution.
Returns:
A list of results from each agent.
"""
raise NotImplementedError
Comment on lines +20 to +29
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Abstract method + suggested default concurrent pattern.

Make the API abstract; if you prefer a default, use gather with a snapshot.

-    async def execute_agents(self, *args, **kwargs):
+    @abstractmethod
+    async def execute_agents(self, *args, **kwargs):
         """
         Executes all registered AI agents in parallel asynchronously.
@@
-        raise NotImplementedError
+        raise NotImplementedError

Optional default implementation:

agents = tuple(self.agents)
if not agents:
    return []
coros = [a.run(*args, **kwargs) for a in agents]
return await asyncio.gather(*coros)

Verify call-site breakages from prior API:


🏁 Script executed:

#!/bin/bash
# Find old symbols to update across the repo
rg -n -C2 -S '\bOrchestrator\b|execute_agents_concurrently\b|_run_agent_safely\b|registered_agents\b'

Length of output: 2500


🏁 Script executed:

#!/bin/bash
# Get the full Orchestrator class definition to see all methods
cat backend/app/core/orchestrator.py

Length of output: 1123


🏁 Script executed:

#!/bin/bash
# Check for all uses of execute_agents (both variants) in the entire codebase
rg -n 'def execute_agents|\.execute_agents' --type=py

Length of output: 442


🏁 Script executed:

#!/bin/bash
# Check if there are subclasses of Orchestrator
rg -n 'class.*Orchestrator\)' --type=py

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Check for imports/aliases in orchestrator.py
head -20 backend/app/core/orchestrator.py
tail -5 backend/app/core/orchestrator.py

Length of output: 647


🏁 Script executed:

#!/bin/bash
# Search for where execute_agents_concurrently is defined
rg -n 'def execute_agents_concurrently' --type=py

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Check if Orchestrator is aliased to AIOrchestrator in orchestrator.py
rg -n 'Orchestrator' backend/app/core/orchestrator.py

Length of output: 23


🏁 Script executed:

#!/bin/bash
# Check __init__.py for exports
cat backend/app/core/__init__.py

Length of output: 60


🏁 Script executed:

#!/bin/bash
# Get full test file to see if it would even execute
cat backend/tests/test_orchestrator.py

Length of output: 2876


Critical API inconsistencies prevent code execution; incomplete refactoring.

The review suggestion to add @abstractmethod is valid but incomplete. Multiple breaking issues exist:

  1. Import failure: Tests import Orchestrator but orchestrator.py defines AIOrchestrator—class name mismatch.

  2. Missing method: All call sites invoke execute_agents_concurrently() (test lines 27, 52; routes line 36), but only execute_agents() exists in the base class. The called method doesn't exist in the codebase.

  3. Signature mismatch: Base class defines register_agent(self, agent) with one parameter, but tests call orchestrator.register_agent("AgentOne", mock_agent_one) with two parameters (name, agent).

  4. Missing concrete implementation: No Orchestrator class or implementation of execute_agents_concurrently() found anywhere.

Tests will fail immediately on import.


def aggregate_results(self, results):
"""
Aggregates the results from the executed AI agents.
Args:
results (list): A list of results from the executed agents.
Returns:
The aggregated result.
"""
raise NotImplementedError