diff --git a/backend/__pycache__/__init__.cpython-313.pyc b/backend/__pycache__/__init__.cpython-313.pyc index e11fd64..0a56e1a 100644 Binary files a/backend/__pycache__/__init__.cpython-313.pyc and b/backend/__pycache__/__init__.cpython-313.pyc differ diff --git a/backend/app/api/v1/routes.py b/backend/app/api/v1/routes.py index 15712c9..b7b204c 100644 --- a/backend/app/api/v1/routes.py +++ b/backend/app/api/v1/routes.py @@ -1,7 +1,7 @@ import logging from fastapi import APIRouter, HTTPException from backend.app.models.report_models import ReportRequest, ReportResponse -from backend.app.services.report_service import generate_report, in_memory_reports +from backend.app.services.report_service import generate_report, in_memory_reports, get_report_status_from_memory from backend.app.core.orchestrator import orchestrator import asyncio @@ -43,8 +43,9 @@ def _on_done(t: asyncio.Task): task.add_done_callback(_on_done) return report_response -@router.get('/report/{report_id}/status') +@router.get("/reports/{report_id}/status") async def get_report_status(report_id: str): - if report_id not in in_memory_reports: - raise HTTPException(status_code=404, detail='Report not found') - return in_memory_reports[report_id] + report = get_report_status_from_memory(report_id) + if not report: + raise HTTPException(status_code=404, detail="Report not found") + return {"report_id": report_id, "status": report["status"]} diff --git a/backend/app/core/__pycache__/orchestrator.cpython-313.pyc b/backend/app/core/__pycache__/orchestrator.cpython-313.pyc index c82d613..333178e 100644 Binary files a/backend/app/core/__pycache__/orchestrator.cpython-313.pyc and b/backend/app/core/__pycache__/orchestrator.cpython-313.pyc differ diff --git a/backend/app/core/orchestrator.py b/backend/app/core/orchestrator.py index 13365fe..e9f650f 100644 --- a/backend/app/core/orchestrator.py +++ b/backend/app/core/orchestrator.py @@ -1,4 +1,9 @@ import asyncio +import logging +from typing import Callable, Dict, Any, List +from backend.app.services.report_service import in_memory_reports + +logger = logging.getLogger(__name__) class AIOrchestrator: """ @@ -7,33 +12,65 @@ class AIOrchestrator: """ def __init__(self): - self.agents = [] + self.agents: Dict[str, Callable] = {} - def register_agent(self, agent): + def register_agent(self, name: str, agent_func: Callable): """ Registers an AI agent with the orchestrator. Args: - agent: An instance of an AI agent. + name (str): The name of the agent. + agent_func (Callable): The asynchronous function representing the agent. """ - raise NotImplementedError + self.agents[name] = agent_func - async def execute_agents(self, *args, **kwargs): - """ - Executes all registered AI agents in parallel asynchronously. - Args: - *args: Variable length argument list for agent execution. - **kwargs: Arbitrary keyword arguments for agent execution. - Returns: - A list of results from each agent. - """ - raise NotImplementedError + async def execute_agents(self, report_id: str, token_id: str) -> Dict[str, Any]: + tasks = {name: asyncio.create_task(agent_func(report_id, token_id)) for name, agent_func in self.agents.items()} + results = {} + + for name, task in tasks.items(): + try: + result = await asyncio.wait_for(task, timeout=10) # Added timeout + results[name] = {"status": "completed", "data": result} + except asyncio.TimeoutError: # Handle timeout specifically + logger.exception("Agent %s timed out for report %s", name, report_id) + results[name] = {"status": "failed", "error": "Agent timed out"} + except Exception as e: + logger.exception("Agent %s failed for report %s", name, report_id) + results[name] = {"status": "failed", "error": str(e)} + return results - def aggregate_results(self, results): + def aggregate_results(self, results: Dict[str, Any]) -> Dict[str, Any]: """ Aggregates the results from the executed AI agents. Args: - results (list): A list of results from the executed agents. + results (dict): A dictionary of results from the executed agents. Returns: The aggregated result. """ - raise NotImplementedError \ No newline at end of file + return {"agent_results": results} + +class Orchestrator(AIOrchestrator): + """ + Concrete implementation of AIOrchestrator. + """ + async def execute_agents_concurrently(self, report_id: str, token_id: str) -> Dict[str, Any]: + agent_results = await self.execute_agents(report_id, token_id) + aggregated_data = self.aggregate_results(agent_results) + + # Determine overall status + overall_status = "completed" + if any(result["status"] == "failed" for result in agent_results.values()): + overall_status = "partial_success" + + # Update in_memory_reports + if report_id in in_memory_reports: + in_memory_reports[report_id].update({ + "status": overall_status, + "agent_results": aggregated_data["agent_results"] + }) + else: + logger.warning("Report ID %s not found in in_memory_reports during orchestration.", report_id) + + return aggregated_data + +orchestrator = Orchestrator() \ No newline at end of file diff --git a/backend/app/services/__pycache__/report_processor.cpython-313.pyc b/backend/app/services/__pycache__/report_processor.cpython-313.pyc new file mode 100644 index 0000000..56b9ee3 Binary files /dev/null and b/backend/app/services/__pycache__/report_processor.cpython-313.pyc differ diff --git a/backend/app/services/__pycache__/report_service.cpython-313.pyc b/backend/app/services/__pycache__/report_service.cpython-313.pyc index 82433c1..2f175bc 100644 Binary files a/backend/app/services/__pycache__/report_service.cpython-313.pyc and b/backend/app/services/__pycache__/report_service.cpython-313.pyc differ diff --git a/backend/app/services/report_service.py b/backend/app/services/report_service.py index ba6617e..72feffc 100644 --- a/backend/app/services/report_service.py +++ b/backend/app/services/report_service.py @@ -25,3 +25,6 @@ async def save_report_data(report_id: str, data: Dict): else: # Handle case where report_id does not exist, or log a warning logger.warning("Report ID %s not found for saving data.", report_id) + +def get_report_status_from_memory(report_id: str) -> Dict | None: + return in_memory_reports.get(report_id) diff --git a/backend/tests/__pycache__/test_orchestrator.cpython-313-pytest-8.4.2.pyc b/backend/tests/__pycache__/test_orchestrator.cpython-313-pytest-8.4.2.pyc index d8e1144..0ecf065 100644 Binary files a/backend/tests/__pycache__/test_orchestrator.cpython-313-pytest-8.4.2.pyc and b/backend/tests/__pycache__/test_orchestrator.cpython-313-pytest-8.4.2.pyc differ diff --git a/backend/tests/__pycache__/test_report_processor.cpython-313-pytest-8.4.2.pyc b/backend/tests/__pycache__/test_report_processor.cpython-313-pytest-8.4.2.pyc new file mode 100644 index 0000000..6b64968 Binary files /dev/null and b/backend/tests/__pycache__/test_report_processor.cpython-313-pytest-8.4.2.pyc differ