Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added backend/__init__.py
Empty file.
Binary file added backend/__pycache__/__init__.cpython-313.pyc
Binary file not shown.
Binary file added backend/app/__pycache__/__init__.cpython-313.pyc
Binary file not shown.
45 changes: 41 additions & 4 deletions backend/app/api/v1/routes.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,50 @@
from fastapi import APIRouter
from app.models.report_models import ReportRequest, ReportResponse
from app.services.report_service import generate_report
import logging
from fastapi import APIRouter, HTTPException
from backend.app.models.report_models import ReportRequest, ReportResponse
from backend.app.services.report_service import generate_report, in_memory_reports
from backend.app.core.orchestrator import orchestrator
import asyncio

logger = logging.getLogger(__name__)

router = APIRouter()

# Dummy Agent for demonstration
async def dummy_agent_one(report_id: str, token_id: str) -> dict:
print(f"Dummy Agent One running for report {report_id} and token {token_id}")
await asyncio.sleep(1) # Simulate async work
return {"agent_one_data": "data_from_agent_one"}

async def dummy_agent_two(report_id: str, token_id: str) -> dict:
print(f"Dummy Agent Two running for report {report_id} and token {token_id}")
await asyncio.sleep(0.5) # Simulate async work
return {"agent_two_data": "data_from_agent_two"}

# Register agents
orchestrator.register_agent("AgentOne", dummy_agent_one)
orchestrator.register_agent("AgentTwo", dummy_agent_two)

@router.get("/")
async def read_root():
return {"message": "Welcome to API v1"}

@router.post("/report/generate", response_model=ReportResponse)
async def generate_report_endpoint(request: ReportRequest):
return await generate_report(request)
report_response = await generate_report(request)
report_id = report_response.report_id
# Execute agents concurrently in a background task
task = asyncio.create_task(orchestrator.execute_agents_concurrently(report_id, request.token_id))
def _on_done(t: asyncio.Task):
try:
t.result()
except Exception as e:
logger.exception('Background orchestration failed for %s: %s', report_id, e)
# Optionally update report status to failed here as well
task.add_done_callback(_on_done)
return report_response

@router.get('/report/{report_id}/status')
async def get_report_status(report_id: str):
if report_id not in in_memory_reports:
raise HTTPException(status_code=404, detail='Report not found')
return in_memory_reports[report_id]
Binary file not shown.
56 changes: 56 additions & 0 deletions backend/app/core/orchestrator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import asyncio
import logging
from typing import Dict, Callable, Awaitable
from backend.app.services.report_service import save_report_data

logger = logging.getLogger(__name__)

class Orchestrator:
def __init__(self):
self.registered_agents: Dict[str, Callable[[str, str], Awaitable[Dict]]] = {}

def register_agent(self, name: str, agent_func: Callable[[str, str], Awaitable[Dict]]):
self.registered_agents[name] = agent_func

async def execute_agents_concurrently(self, report_id: str, token_id: str):
agent_tasks = []
agent_names = []

for name, agent_func in self.registered_agents.items():
agent_names.append(name)
agent_tasks.append(self._run_agent_safely(name, agent_func, report_id, token_id))

results = await asyncio.gather(*agent_tasks, return_exceptions=True)

aggregated_results = {}
for i, result in enumerate(results):
agent_name = agent_names[i]
if isinstance(result, Exception):
logger.error("Agent '%s' failed with error: %s", agent_name, result, exc_info=isinstance(result, BaseException))
aggregated_results[agent_name] = {'status': 'failed', 'error': str(result)}
else:
aggregated_results[agent_name] = {'status': 'completed', 'data': result}

failed_count = sum(1 for r in aggregated_results.values() if r['status'] == 'failed')
total = len(aggregated_results)
if failed_count == total:
overall_status = 'failed'
elif failed_count > 0:
overall_status = 'partial_success'
else:
overall_status = 'completed'

await save_report_data(report_id, {
'agent_results': aggregated_results,
'status': overall_status,
'summary': {'total': total, 'success': total - failed_count, 'failed': failed_count}
})

async def _run_agent_safely(self, name: str, agent_func: Callable[[str, str], Awaitable[Dict]], report_id: str, token_id: str) -> Dict:
try:
return await agent_func(report_id, token_id)
except Exception as e:
logger.error("Error running agent '%s': %s", name, e, exc_info=True)
raise # Re-raise to be caught by asyncio.gather

orchestrator = Orchestrator()
Binary file not shown.
Binary file not shown.
14 changes: 12 additions & 2 deletions backend/app/services/report_service.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from app.models.report_models import ReportRequest, ReportResponse
from app.utils.id_generator import generate_report_id
import logging
from backend.app.models.report_models import ReportRequest, ReportResponse
from backend.app.utils.id_generator import generate_report_id
from typing import Dict

logger = logging.getLogger(__name__)

# In-memory storage for reports (to be replaced with persistent storage)
in_memory_reports: Dict[str, Dict] = {}

Expand All @@ -15,3 +18,10 @@ async def generate_report(request: ReportRequest) -> ReportResponse:
"report_id": report_id
}
return ReportResponse(report_id=report_id, status="processing")

async def save_report_data(report_id: str, data: Dict):
if report_id in in_memory_reports:
in_memory_reports[report_id].update(data)
else:
# Handle case where report_id does not exist, or log a warning
logger.warning("Report ID %s not found for saving data.", report_id)
Binary file not shown.
Binary file not shown.
62 changes: 62 additions & 0 deletions backend/tests/test_orchestrator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import pytest
from unittest.mock import AsyncMock
from backend.app.core.orchestrator import Orchestrator
from backend.app.services.report_service import in_memory_reports

@pytest.fixture(autouse=True)
def clear_in_memory_reports():
in_memory_reports.clear()
yield
in_memory_reports.clear()

@pytest.mark.asyncio
async def test_execute_agents_concurrently_success():
orchestrator = Orchestrator()
mock_agent_one = AsyncMock(return_value={"agent_one_result": "data1"})
mock_agent_two = AsyncMock(return_value={"agent_two_result": "data2"})

orchestrator.register_agent("AgentOne", mock_agent_one)
orchestrator.register_agent("AgentTwo", mock_agent_two)

report_id = "test_report_id_success"
token_id = "test_token_id"

# Initialize report in in_memory_reports as generate_report would
in_memory_reports[report_id] = {"token_id": token_id, "status": "processing"}

await orchestrator.execute_agents_concurrently(report_id, token_id)

mock_agent_one.assert_called_once_with(report_id, token_id)
mock_agent_two.assert_called_once_with(report_id, token_id)

assert in_memory_reports[report_id]["status"] == "completed"
assert "agent_results" in in_memory_reports[report_id]
assert in_memory_reports[report_id]["agent_results"]["AgentOne"] == {"status": "completed", "data": {"agent_one_result": "data1"}}
assert in_memory_reports[report_id]["agent_results"]["AgentTwo"] == {"status": "completed", "data": {"agent_two_result": "data2"}}

@pytest.mark.asyncio
async def test_execute_agents_concurrently_with_failure():
orchestrator = Orchestrator()
mock_agent_one = AsyncMock(return_value={"agent_one_result": "data1"})
mock_agent_failing = AsyncMock(side_effect=Exception("Agent failed"))

orchestrator.register_agent("AgentOne", mock_agent_one)
orchestrator.register_agent("AgentFailing", mock_agent_failing)

report_id = "test_report_id_failure"
token_id = "test_token_id"

# Initialize report in in_memory_reports as generate_report would
in_memory_reports[report_id] = {"token_id": token_id, "status": "processing"}

await orchestrator.execute_agents_concurrently(report_id, token_id)

mock_agent_one.assert_called_once_with(report_id, token_id)
mock_agent_failing.assert_called_once_with(report_id, token_id)

assert in_memory_reports[report_id]["status"] == "partial_success"
assert "agent_results" in in_memory_reports[report_id]
assert in_memory_reports[report_id]["agent_results"]["AgentOne"] == {"status": "completed", "data": {"agent_one_result": "data1"}}
assert in_memory_reports[report_id]["agent_results"]["AgentFailing"]["status"] == "failed"
assert "error" in in_memory_reports[report_id]["agent_results"]["AgentFailing"]
assert "Agent failed" in in_memory_reports[report_id]["agent_results"]["AgentFailing"]["error"]
15 changes: 0 additions & 15 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,15 +0,0 @@
from typing import Union

from fastapi import FastAPI

app = FastAPI()


@app.get("/")
def read_root():s
return {"Hello": "World"}


@app.get("/items/{item_id}")
def read_item(item_id: int, q: Union[str, None] = None):
return {"item_id": item_id, "q": q}