Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified backend/app/api/v1/__pycache__/routes.cpython-313.pyc
Binary file not shown.
Binary file modified backend/app/core/__pycache__/logger.cpython-313.pyc
Binary file not shown.
Binary file modified backend/app/core/__pycache__/orchestrator.cpython-313.pyc
Binary file not shown.
Binary file not shown.
7 changes: 6 additions & 1 deletion backend/app/core/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ async def execute_agents(self, report_id: str, token_id: str) -> Dict[str, Any]:
except Exception as e:
orchestrator_logger.exception("Agent %s failed for report %s", name, report_id)
results[name] = {"status": "failed", "error": str(e)}
raise # Re-raise the exception
return results

def aggregate_results(self, results: Dict[str, Any]) -> Dict[str, Any]:
Expand All @@ -65,7 +66,11 @@ def aggregate_results(self, results: Dict[str, Any]) -> Dict[str, Any]:
Returns:
The aggregated result.
"""
return {"agent_results": results}
aggregated_data = {}
for agent_name, agent_result in results.items():
if agent_result["status"] == "completed" and "data" in agent_result:
aggregated_data.update(agent_result["data"])
return aggregated_data

class Orchestrator(AIOrchestrator):
"""
Expand Down
40 changes: 30 additions & 10 deletions backend/app/core/storage.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,41 @@

import threading

REPORT_STORE = {}
_report_store_lock = threading.Lock()

def set_report_status(report_id: str, status: str):
"""Sets the status of a report."""
if report_id not in REPORT_STORE:
REPORT_STORE[report_id] = {"status": status, "data": None}
else:
REPORT_STORE[report_id]["status"] = status
with _report_store_lock:
if report_id not in REPORT_STORE:
REPORT_STORE[report_id] = {"status": status, "data": None}
else:
REPORT_STORE[report_id]["status"] = status

def get_report_status(report_id: str) -> str | None:
"""Gets the status of a report."""
return REPORT_STORE.get(report_id, {}).get("status")
with _report_store_lock:
return REPORT_STORE.get(report_id, {}).get("status")

def try_set_processing(report_id: str) -> bool:
"""
Atomically checks if a report is not processing and, if so, sets its status to "processing".
Returns True if successful, False otherwise.
"""
with _report_store_lock:
if REPORT_STORE.get(report_id, {}).get("status") != "processing":
if report_id not in REPORT_STORE:
REPORT_STORE[report_id] = {"status": "processing", "data": None}
else:
REPORT_STORE[report_id]["status"] = "processing"
return True
return False

def save_report_data(report_id: str, data: dict):
"""Saves the data for a report."""
if report_id not in REPORT_STORE:
REPORT_STORE[report_id] = {"status": "completed", "data": data}
else:
REPORT_STORE[report_id]["data"] = data
REPORT_STORE[report_id]["status"] = "completed"
with _report_store_lock:
if report_id not in REPORT_STORE:
REPORT_STORE[report_id] = {"status": "completed", "data": data}
else:
REPORT_STORE[report_id]["data"] = data
REPORT_STORE[report_id]["status"] = "completed"
Binary file not shown.
Binary file modified backend/app/services/__pycache__/report_processor.cpython-313.pyc
Binary file not shown.
Binary file modified backend/app/services/__pycache__/report_service.cpython-313.pyc
Binary file not shown.
Binary file modified backend/app/services/agents/__pycache__/__init__.cpython-313.pyc
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
4 changes: 2 additions & 2 deletions backend/app/services/agents/price_agent.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import asyncio

async def run(token_id: str):
async def run(report_id: str, token_id: str):
"""
Mocks fetching price data for a given token.
"""
await asyncio.sleep(0.1) # Simulate a small delay
return {"price": 123.45, "token_id": token_id}
return {"price": 123.45, "token_id": token_id, "report_id": report_id}
4 changes: 2 additions & 2 deletions backend/app/services/agents/trend_agent.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import asyncio

async def run(token_id: str):
async def run(report_id: str, token_id: str):
"""
Mocks fetching trend data for a given token.
"""
await asyncio.sleep(0.1) # Simulate a small delay
return {"trend": "up", "change_24h": 5.67, "token_id": token_id}
return {"trend": "up", "change_24h": 5.67, "token_id": token_id, "report_id": report_id}
4 changes: 2 additions & 2 deletions backend/app/services/agents/volume_agent.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import asyncio

async def run(token_id: str):
async def run(report_id: str, token_id: str):
"""
Mocks fetching volume data for a given token.
"""
await asyncio.sleep(0.1) # Simulate a small delay
return {"volume": 987654.32, "token_id": token_id}
return {"volume": 987654.32, "token_id": token_id, "report_id": report_id}
58 changes: 26 additions & 32 deletions backend/app/services/report_processor.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
import asyncio
import logging
from backend.app.core.orchestrator import AIOrchestrator
from backend.app.services.agents.price_agent import run as price_agent_run
from backend.app.services.agents.trend_agent import run as trend_agent_run
from backend.app.services.agents.volume_agent import run as volume_agent_run
from backend.app.core.storage import save_report_data, set_report_status, try_set_processing

logger = logging.getLogger(__name__)

# In a real application, this would be a more robust shared state management system (e.g., Redis, a database, or a dedicated in-memory store with proper locking).
# For now, a simple dictionary will simulate the state.
# NOTE: This in-memory lock is only suitable for single-process deployments.
# For multi-process or distributed deployments, consider using an external store
# like Redis or a database with appropriate distributed locking mechanisms.
report_status = {}
report_status_lock = asyncio.Lock()


async def process_report(report_id: str, token_id: str) -> bool:
"""
Expand All @@ -22,36 +21,31 @@ async def process_report(report_id: str, token_id: str) -> bool:
Returns:
True on success.
"""
# Mark processing under lock
async with report_status_lock:
if report_id in report_status:
raise ValueError(f"Report {report_id} is already being processed")
report_status[report_id] = {"status": "processing", "token_id": token_id}

logger.info("Processing report %s for token %s", report_id, token_id)

if not try_set_processing(report_id):
logger.info("Report %s is already being processed, skipping.", report_id)
raise ValueError(f"Report {report_id} is already being processed")

logger.info("Processing report %s for token %s", report_id, token_id)
try:
await asyncio.sleep(5) # Simulate work
async with report_status_lock:
if report_id in report_status and isinstance(report_status[report_id], dict):
report_status[report_id]["status"] = "completed"
orchestrator = AIOrchestrator()
orchestrator.register_agent("price_agent", price_agent_run)
orchestrator.register_agent("trend_agent", trend_agent_run)
orchestrator.register_agent("volume_agent", volume_agent_run)

agent_results = await orchestrator.execute_agents(report_id, token_id)
combined_report_data = orchestrator.aggregate_results(agent_results)

save_report_data(report_id, combined_report_data)
set_report_status(report_id, "completed")

logger.info("Report %s completed.", report_id)
return True
except asyncio.CancelledError:
async with report_status_lock:
if report_id in report_status:
report_status[report_id]["status"] = "cancelled"
set_report_status(report_id, "cancelled")
raise
except Exception:
async with report_status_lock:
if report_id in report_status:
report_status[report_id]["status"] = "failed"
logger.exception("Report %s failed.", report_id)
raise

async def get_report_status(report_id: str):
"""
Retrieves the status of a report.
"""
async with report_status_lock:
return report_status.get(report_id)
logger.exception("Error processing report %s for token %s", report_id, token_id)
set_report_status(report_id, "failed")
raise
Binary file not shown.
112 changes: 112 additions & 0 deletions backend/app/tests/test_report_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import pytest
import threading
import asyncio
from unittest.mock import patch

from backend.app.services.report_processor import process_report
from backend.app.core import storage

@pytest.fixture(autouse=True)
def clear_report_store():
storage.REPORT_STORE.clear()

@pytest.mark.asyncio
async def test_process_report_success():
report_id = "test_report_1"
token_id = "test_token_1"

# Mock the agents to prevent actual external calls
with patch('backend.app.services.report_processor.price_agent_run') as mock_price_agent_run:
with patch('backend.app.services.report_processor.volume_agent_run') as mock_volume_agent_run:
with patch('backend.app.services.report_processor.trend_agent_run') as mock_trend_agent_run:

mock_price_agent_run.return_value = {"price": 100.0, "token_id": token_id, "report_id": report_id}
mock_volume_agent_run.return_value = {"volume": 1000.0, "token_id": token_id, "report_id": report_id}
mock_trend_agent_run.return_value = {"trend": "up", "token_id": token_id, "report_id": report_id}

await process_report(report_id, token_id)

assert storage.get_report_status(report_id) == "completed"
report_data = storage.REPORT_STORE[report_id]["data"]
assert report_data["price"] == 100.0
assert report_data["volume"] == 1000.0
assert report_data["trend"] == "up"

@pytest.mark.asyncio
async def test_process_report_already_processing():
report_id = "test_report_2"
token_id = "test_token_2"
storage.set_report_status(report_id, "processing")

with pytest.raises(ValueError, match=f"Report {report_id} is already being processed"):
await process_report(report_id, token_id)

assert storage.get_report_status(report_id) == "processing"

@pytest.mark.asyncio
async def test_process_report_failure_sets_failed_status():
report_id = "test_report_3"
token_id = "test_token_3"

with patch('backend.app.services.report_processor.price_agent_run') as mock_price_agent_run:
mock_price_agent_run.side_effect = Exception("Agent error")
with pytest.raises(Exception, match="Agent error"):
await process_report(report_id, token_id)

assert storage.get_report_status(report_id) == "failed"

def test_try_set_processing_success():
report_id = "new_report"
assert storage.try_set_processing(report_id) is True
assert storage.get_report_status(report_id) == "processing"

def test_try_set_processing_failure_already_processing():
report_id = "existing_report"
storage.set_report_status(report_id, "processing")
assert storage.try_set_processing(report_id) is False
assert storage.get_report_status(report_id) == "processing"

def test_try_set_processing_failure_other_status():
report_id = "existing_report_completed"
storage.set_report_status(report_id, "completed")
assert storage.try_set_processing(report_id) is True
assert storage.get_report_status(report_id) == "processing"

def test_concurrent_processing_only_one_succeeds():
report_id = "concurrent_report"
token_id = "concurrent_token"

processed_count = 0
exception_count = 0

def worker():
nonlocal processed_count, exception_count
try:
# Mock the agents to prevent actual external calls
with patch('backend.app.services.report_processor.price_agent_run', return_value={"price": 100.0, "token_id": token_id, "report_id": report_id}):
with patch('backend.app.services.report_processor.volume_agent_run', return_value={"volume": 1000.0, "token_id": token_id, "report_id": report_id}):
with patch('backend.app.services.report_processor.trend_agent_run', return_value={"trend": "up", "token_id": token_id, "report_id": report_id}):
asyncio.run(process_report(report_id, token_id))
processed_count += 1
except ValueError as e:
if "already being processed" in str(e):
exception_count += 1
else:
raise
except Exception:
# Catch other unexpected exceptions during processing
pass

threads = []
for _ in range(5):
thread = threading.Thread(target=worker)
threads.append(thread)
thread.start()

for thread in threads:
thread.join()

assert processed_count == 1
assert exception_count == 4
assert storage.get_report_status(report_id) == "completed"
assert "data" in storage.REPORT_STORE[report_id]
Loading