diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..17abf185 --- /dev/null +++ b/.gitignore @@ -0,0 +1,9 @@ +# Virtual environments +.venv/ +venv/ +ENV/ +env/ + +# Python +__pycache__/ +*.pyc diff --git a/.venv/bin/python b/.venv/bin/python deleted file mode 120000 index b8a0adbb..00000000 --- a/.venv/bin/python +++ /dev/null @@ -1 +0,0 @@ -python3 \ No newline at end of file diff --git a/.venv/bin/python3 b/.venv/bin/python3 deleted file mode 120000 index ae65fdaa..00000000 --- a/.venv/bin/python3 +++ /dev/null @@ -1 +0,0 @@ -/usr/bin/python3 \ No newline at end of file diff --git a/.venv/bin/python3.12 b/.venv/bin/python3.12 deleted file mode 120000 index b8a0adbb..00000000 --- a/.venv/bin/python3.12 +++ /dev/null @@ -1 +0,0 @@ -python3 \ No newline at end of file diff --git a/.venv/lib64 b/.venv/lib64 deleted file mode 120000 index 7951405f..00000000 --- a/.venv/lib64 +++ /dev/null @@ -1 +0,0 @@ -lib \ No newline at end of file diff --git a/.venv/pyvenv.cfg b/.venv/pyvenv.cfg deleted file mode 100644 index 244ff70c..00000000 --- a/.venv/pyvenv.cfg +++ /dev/null @@ -1,5 +0,0 @@ -home = /usr/bin -include-system-site-packages = false -version = 3.12.3 -executable = /usr/bin/python3.12 -command = /usr/bin/python3 -m venv /home/repositories/LumintelAnalytics/ChainReport-API/.venv diff --git a/backend/app/core/error_utils.py b/backend/app/core/error_utils.py index 5de8064c..c5316de1 100644 --- a/backend/app/core/error_utils.py +++ b/backend/app/core/error_utils.py @@ -1,6 +1,5 @@ import logging import traceback -import os from typing import Dict, Any from backend.app.core.config import settings diff --git a/backend/app/core/orchestrator.py b/backend/app/core/orchestrator.py index 468bcb9d..4bb977da 100644 --- a/backend/app/core/orchestrator.py +++ b/backend/app/core/orchestrator.py @@ -5,20 +5,20 @@ from backend.app.services.agents.onchain_agent import fetch_onchain_metrics, fetch_tokenomics from backend.app.services.agents.social_sentiment_agent import SocialSentimentAgent from backend.app.services.agents.team_doc_agent import TeamDocAgent -from backend.app.services.agents.code_audit_agent import CodeAuditAgent # Import CodeAuditAgent +from backend.app.services.agents.code_audit_agent import CodeAuditAgent from backend.app.core.config import settings from backend.app.db.repositories.report_repository import ReportRepository from backend.app.core.error_utils import capture_exception -from backend.app.db.models.report_state import ReportStatusEnum, ReportState +from backend.app.db.models.report_state import ReportStatusEnum from sqlalchemy.ext.asyncio import AsyncSession -from backend.app.db.database import get_db, AsyncSessionLocal # Added for agent functions +from backend.app.db.database import AsyncSessionLocal async def dummy_agent(report_id: str, token_id: str) -> Dict[str, Any]: """ A dummy agent for testing purposes. """ orchestrator_logger.info("Dummy agent received report_id: %s, token_id: %s", report_id, token_id) - await asyncio.sleep(1) # Simulate some async work + await asyncio.sleep(1) return {"dummy_data": f"Processed by dummy agent for {report_id}"} class Orchestrator: @@ -39,15 +39,12 @@ async def execute_agents(self, report_id: str, token_id: str) -> Dict[str, Any]: for name, agent_func in self._agents.items() } - # Execute agents concurrently and capture exceptions results = await asyncio.gather(*tasks.values(), return_exceptions=True) - # Process results, capture exceptions, and update report state processed_results = {} errors_flag: Dict[str, bool] = {} for agent_name, result in zip(tasks.keys(), results): if isinstance(result, Exception): - # Capture the exception with additional context orchestrator_logger.exception("Agent %s failed for report %s", agent_name, report_id) capture_exception(result, {"agent_name": agent_name, "report_id": report_id, "token_id": token_id}) errors_flag[agent_name] = True @@ -55,14 +52,13 @@ async def execute_agents(self, report_id: str, token_id: str) -> Dict[str, Any]: else: processed_results[agent_name] = result - # Update the report with agent-specific errors if any agent failed if errors_flag: existing_report = await self.report_repository.get_report_by_id(report_id) if existing_report: existing_errors = existing_report.errors if existing_report.errors else {} await self.report_repository.update_partial( report_id, - {"status": ReportStatusEnum.AGENTS_FAILED, "errors": {**existing_errors, **errors_flag}} + {"state": ReportStatusEnum.AGENTS_FAILED, "errors": {**existing_errors, **errors_flag}} ) else: raise RuntimeError(f"Report {report_id} not found when attempting to update with agent errors. Errors detected: {errors_flag}") @@ -71,10 +67,10 @@ async def execute_agents(self, report_id: str, token_id: str) -> Dict[str, Any]: if existing_report: await self.report_repository.update_partial( report_id, - {"status": ReportStatusEnum.AGENTS_COMPLETED} + {"state": ReportStatusEnum.AGENTS_COMPLETED} ) else: - orchestrator_logger.warning("Report %s not found when attempting to update with AGENTS_COMPLETED status.", report_id) + orchestrator_logger.warning("Report %s not found when attempting to update with AGENTS_COMPLETED state.", report_id) return processed_results @@ -125,46 +121,45 @@ async def onchain_data_agent(report_id: str, token_id: str) -> Dict[str, Any]: try: onchain_metrics_result, tokenomics_result = await asyncio.gather( asyncio.wait_for(onchain_metrics_task, timeout=settings.AGENT_TIMEOUT - 1), - asyncio.wait_for(tokenomics_task, timeout=settings.AGENT_TIMEOUT - 1), - return_exceptions=True # This will allow us to handle exceptions for each task individually - ) + asyncio.wait_for(tokenomics_task, timeout=settings.AGENT_TIMEOUT - 1), + return_exceptions=True + ) - # Handle individual task results and exceptions - if isinstance(onchain_metrics_result, asyncio.TimeoutError): - orchestrator_logger.error("Onchain metrics fetch timed out for report %s", report_id) - onchain_metrics_result = {"status": "failed", "error": "Onchain metrics fetch timed out"} - elif isinstance(onchain_metrics_result, Exception): - orchestrator_logger.error("Onchain metrics fetch failed for report %s", report_id) - onchain_metrics_result = {"status": "failed", "error": str(onchain_metrics_result)} + if isinstance(onchain_metrics_result, asyncio.TimeoutError): + orchestrator_logger.error("Onchain metrics fetch timed out for report %s", report_id) + onchain_metrics_result = {"status": "failed", "error": "Onchain metrics fetch timed out"} + elif isinstance(onchain_metrics_result, Exception): + orchestrator_logger.error("Onchain metrics fetch failed for report %s", report_id) + onchain_metrics_result = {"status": "failed", "error": str(onchain_metrics_result)} - if isinstance(tokenomics_result, asyncio.TimeoutError): - orchestrator_logger.error("Tokenomics fetch timed out for report %s", report_id) - tokenomics_result = {"status": "failed", "error": "Tokenomics fetch timed out"} - elif isinstance(tokenomics_result, Exception): - orchestrator_logger.error("Tokenomics fetch failed for report %s", report_id) - tokenomics_result = {"status": "failed", "error": str(tokenomics_result)} + if isinstance(tokenomics_result, asyncio.TimeoutError): + orchestrator_logger.error("Tokenomics fetch timed out for report %s", report_id) + tokenomics_result = {"status": "failed", "error": "Tokenomics fetch timed out"} + elif isinstance(tokenomics_result, Exception): + orchestrator_logger.error("Tokenomics fetch failed for report %s", report_id) + tokenomics_result = {"status": "failed", "error": str(tokenomics_result)} - overall_agent_status = "completed" - if onchain_metrics_result.get("status") == "failed" or tokenomics_result.get("status") == "failed": - overall_agent_status = "failed" + overall_agent_status = "completed" + if onchain_metrics_result.get("status") == "failed" or tokenomics_result.get("status") == "failed": + overall_agent_status = "failed" - result = { - "status": overall_agent_status, - "data": { - "onchain_metrics": onchain_metrics_result, - "tokenomics": tokenomics_result - } + result = { + "status": overall_agent_status, + "data": { + "onchain_metrics": onchain_metrics_result, + "tokenomics": tokenomics_result } - existing_report = await orch.report_repository.get_report_by_id(report_id) - existing_partial_agent_output = existing_report.partial_agent_output if existing_report else {} - await orch.report_repository.update_partial(report_id, {"partial_agent_output": {**existing_partial_agent_output, "onchain_data_agent": result}}) - return result - except asyncio.TimeoutError as e: - orchestrator_logger.error("Onchain Data Agent timed out for report %s", report_id) - return {"status": "failed", "error": "Agent timed out"} - except Exception as e: - orchestrator_logger.exception("Onchain Data Agent failed for report %s", report_id) - return {"status": "failed", "error": str(e)} + } + existing_report = await orch.report_repository.get_report_by_id(report_id) + existing_partial_agent_output = existing_report.partial_agent_output if existing_report else {} + await orch.report_repository.update_partial(report_id, {"partial_agent_output": {**existing_partial_agent_output, "onchain_data_agent": result}}) + return result + except asyncio.TimeoutError: + orchestrator_logger.error("Onchain Data Agent timed out for report %s", report_id) + return {"status": "failed", "error": "Agent timed out"} + except Exception as e: + orchestrator_logger.exception("Onchain Data Agent failed for report %s", report_id) + return {"status": "failed", "error": str(e)} orch.register_agent('onchain_data_agent', onchain_data_agent) else: orchestrator_logger.warning("Onchain Data Agent will not be registered due to invalid configuration.") @@ -175,28 +170,28 @@ async def social_sentiment_agent_func(report_id: str, token_id: str) -> Dict[str agent = SocialSentimentAgent() try: social_data = await asyncio.wait_for(agent.fetch_social_data(token_id), timeout=settings.AGENT_TIMEOUT - 1) - sentiment_report = await asyncio.wait_for(agent.analyze_sentiment(social_data), timeout=settings.AGENT_TIMEOUT - 1) - orchestrator_logger.info(f"Social Sentiment Agent completed for report {report_id}.") - result = { - "status": "completed", - "data": { - "social_sentiment": { - "overall_sentiment": sentiment_report.get("overall_sentiment"), - "score": sentiment_report.get("score"), - "summary": sentiment_report.get("details") # Storing details as summary for now - } + sentiment_report = await asyncio.wait_for(agent.analyze_sentiment(social_data), timeout=settings.AGENT_TIMEOUT - 1) + orchestrator_logger.info(f"Social Sentiment Agent completed for report {report_id}.") + result = { + "status": "completed", + "data": { + "social_sentiment": { + "overall_sentiment": sentiment_report.get("overall_sentiment"), + "score": sentiment_report.get("score"), + "summary": sentiment_report.get("details") } } - existing_report = await orch.report_repository.get_report_by_id(report_id) - existing_partial_agent_output = existing_report.partial_agent_output if existing_report else {} - await orch.report_repository.update_partial(report_id, {"partial_agent_output": {**existing_partial_agent_output, "social_sentiment_agent": result}}) - return result - except asyncio.TimeoutError as e: - orchestrator_logger.error("Social Sentiment Agent timed out for report %s", report_id) - return {"status": "failed", "error": "Agent timed out"} - except Exception as e: - orchestrator_logger.exception("Social Sentiment Agent failed for report %s", report_id) - return {"status": "failed", "error": str(e)} + } + existing_report = await orch.report_repository.get_report_by_id(report_id) + existing_partial_agent_output = existing_report.partial_agent_output if existing_report else {} + await orch.report_repository.update_partial(report_id, {"partial_agent_output": {**existing_partial_agent_output, "social_sentiment_agent": result}}) + return result + except asyncio.TimeoutError: + orchestrator_logger.error("Social Sentiment Agent timed out for report %s", report_id) + return {"status": "failed", "error": "Agent timed out"} + except Exception as e: + orchestrator_logger.exception("Social Sentiment Agent failed for report %s", report_id) + return {"status": "failed", "error": str(e)} orch.register_agent('social_sentiment_agent', social_sentiment_agent_func) # Configure and register Team and Documentation Agent @@ -205,108 +200,100 @@ async def team_documentation_agent(report_id: str, token_id: str) -> Dict[str, A agent = TeamDocAgent() team_analysis = [] whitepaper_summary = {} - - # Placeholder for fetching token-related data (URLs, whitepaper text) - # In a real scenario, this data would be fetched based on token_id - # For now, we'll use dummy data or assume it comes from settings - team_profile_urls = settings.TEAM_PROFILE_URLS.get(token_id, []) - whitepaper_text_source = settings.WHITEPAPER_TEXT_SOURCES.get(token_id, "") + + team_profile_urls = settings.TEAM_PROFILE_URLS.get(token_id, []) + whitepaper_text_source = settings.WHITEPAPER_TEXT_SOURCES.get(token_id, "") try: - # Scrape team profiles orchestrator_logger.info(f"Scraping team profiles for token {token_id} from URLs: {team_profile_urls}") - team_analysis = await asyncio.wait_for( - asyncio.to_thread(agent.scrape_team_profiles, team_profile_urls), + team_analysis = await asyncio.wait_for( + asyncio.to_thread(agent.scrape_team_profiles, team_profile_urls), + timeout=settings.AGENT_TIMEOUT - 1 + ) + orchestrator_logger.info(f"Team profile scraping completed for token {token_id}.") + + if whitepaper_text_source: + orchestrator_logger.info(f"Analyzing whitepaper for token {token_id} from source: {whitepaper_text_source}") + whitepaper_summary = await asyncio.wait_for( + asyncio.to_thread(agent.analyze_whitepaper, whitepaper_text_source), timeout=settings.AGENT_TIMEOUT - 1 ) - orchestrator_logger.info(f"Team profile scraping completed for token {token_id}.") + orchestrator_logger.info(f"Whitepaper analysis completed for token {token_id}.") + else: + orchestrator_logger.warning(f"No whitepaper text source provided for token {token_id}. Skipping whitepaper analysis.") - # Analyze whitepaper - if whitepaper_text_source: - orchestrator_logger.info(f"Analyzing whitepaper for token {token_id} from source: {whitepaper_text_source}") - whitepaper_summary = await asyncio.wait_for( - asyncio.to_thread(agent.analyze_whitepaper, whitepaper_text_source), + result = { + "status": "completed", + "data": { + "team_documentation": { + "team_analysis": team_analysis, + "whitepaper_summary": whitepaper_summary + } + } + } + existing_report = await orch.report_repository.get_report_by_id(report_id) + existing_partial_agent_output = existing_report.partial_agent_output if existing_report else {} + await orch.report_repository.update_partial(report_id, {"partial_agent_output": {**existing_partial_agent_output, "team_documentation_agent": result}}) + return result + except asyncio.TimeoutError: + orchestrator_logger.error("Team and Documentation Agent timed out for report %s", report_id) + return {"status": "failed", "error": "Agent timed out"} + except Exception as e: + orchestrator_logger.exception("Team and Documentation Agent failed for report %s", report_id) + return {"status": "failed", "error": str(e)} + orch.register_agent('team_documentation_agent', team_documentation_agent) + + # Configure and register Code/Audit Agent + code_audit_repo_url = settings.CODE_AUDIT_REPO_URL + if _is_valid_url(code_audit_repo_url, "CODE_AUDIT_REPO_URL"): + async def code_audit_agent_func(report_id: str, token_id: str) -> Dict[str, Any]: + orchestrator_logger.info(f"Calling Code/Audit Agent for report_id: {report_id}, token_id: {token_id}") + code_metrics_data = {} + audit_summary_data = [] + try: + async with CodeAuditAgent() as agent: + orchestrator_logger.info(f"Fetching repository metrics for {code_audit_repo_url}") + code_metrics = await asyncio.wait_for( + agent.fetch_repo_metrics(code_audit_repo_url), + timeout=settings.AGENT_TIMEOUT - 1 + ) + code_metrics_data = code_metrics.model_dump() + + orchestrator_logger.info(f"Analyzing code activity for {code_audit_repo_url}") + code_activity_analysis = await asyncio.wait_for( + agent.analyze_code_activity(code_metrics), timeout=settings.AGENT_TIMEOUT - 1 ) - orchestrator_logger.info(f"Whitepaper analysis completed for token {token_id}.") - else: - orchestrator_logger.warning(f"No whitepaper text source provided for token {token_id}. Skipping whitepaper analysis.") + code_metrics_data.update({"activity_analysis": code_activity_analysis}) + + orchestrator_logger.info(f"Searching and summarizing audit reports for {code_audit_repo_url}") + audit_summary = await asyncio.wait_for( + agent.search_and_summarize_audit_reports(code_audit_repo_url), + timeout=settings.AGENT_TIMEOUT - 1 + ) + audit_summary_data = audit_summary result = { "status": "completed", "data": { - "team_documentation": { - "team_analysis": team_analysis, - "whitepaper_summary": whitepaper_summary + "code_audit": { + "code_metrics": code_metrics_data, + "audit_summary": audit_summary_data } } } existing_report = await orch.report_repository.get_report_by_id(report_id) existing_partial_agent_output = existing_report.partial_agent_output if existing_report else {} - await orch.report_repository.update_partial(report_id, {"partial_agent_output": {**existing_partial_agent_output, "team_documentation_agent": result}}) + await orch.report_repository.update_partial(report_id, {"partial_agent_output": {**existing_partial_agent_output, "code_audit_agent": result}}) return result - except asyncio.TimeoutError as e: - orchestrator_logger.error("Team and Documentation Agent timed out for report %s", report_id) + except asyncio.TimeoutError: + orchestrator_logger.error("Code/Audit Agent timed out for report %s", report_id) return {"status": "failed", "error": "Agent timed out"} except Exception as e: - orchestrator_logger.exception("Team and Documentation Agent failed for report %s", report_id) + orchestrator_logger.exception("Code/Audit Agent failed for report %s", report_id) return {"status": "failed", "error": str(e)} - orch.register_agent('team_documentation_agent', team_documentation_agent) - - # Configure and register Code/Audit Agent - code_audit_repo_url = settings.CODE_AUDIT_REPO_URL - if _is_valid_url(code_audit_repo_url, "CODE_AUDIT_REPO_URL"): - async def code_audit_agent_func(report_id: str, token_id: str) -> Dict[str, Any]: - orchestrator_logger.info(f"Calling Code/Audit Agent for report_id: {report_id}, token_id: {token_id}") - code_metrics_data = {} - audit_summary_data = [] - try: - async with CodeAuditAgent() as agent: - # Fetch repo metrics - orchestrator_logger.info(f"Fetching repository metrics for {code_audit_repo_url}") - code_metrics = await asyncio.wait_for( - agent.fetch_repo_metrics(code_audit_repo_url), - timeout=settings.AGENT_TIMEOUT - 1 - ) - code_metrics_data = code_metrics.model_dump() - - # Analyze code activity - orchestrator_logger.info(f"Analyzing code activity for {code_audit_repo_url}") - code_activity_analysis = await asyncio.wait_for( - agent.analyze_code_activity(code_metrics), - timeout=settings.AGENT_TIMEOUT - 1 - ) - code_metrics_data.update({"activity_analysis": code_activity_analysis}) - - # Search and summarize audit reports - orchestrator_logger.info(f"Searching and summarizing audit reports for {code_audit_repo_url}") - audit_summary = await asyncio.wait_for( - agent.search_and_summarize_audit_reports(code_audit_repo_url), - timeout=settings.AGENT_TIMEOUT - 1 - ) - audit_summary_data = audit_summary - - result = { - "status": "completed", - "data": { - "code_audit": { - "code_metrics": code_metrics_data, - "audit_summary": audit_summary_data - } - } - } - existing_report = await orch.report_repository.get_report_by_id(report_id) - existing_partial_agent_output = existing_report.partial_agent_output if existing_report else {} - await orch.report_repository.update_partial(report_id, {"partial_agent_output": {**existing_partial_agent_output, "code_audit_agent": result}}) - return result - except asyncio.TimeoutError as e: - orchestrator_logger.error("Code/Audit Agent timed out for report %s", report_id) - return {"status": "failed", "error": "Agent timed out"} - except Exception as e: - orchestrator_logger.exception("Code/Audit Agent failed for report %s", report_id) - return {"status": "failed", "error": str(e)} orch.register_agent('code_audit_agent', code_audit_agent_func) else: orchestrator_logger.warning("Code/Audit Agent will not be registered due to invalid CODE_AUDIT_REPO_URL configuration.") - return orch + return orch \ No newline at end of file diff --git a/backend/app/core/time_tracker.py b/backend/app/core/time_tracker.py new file mode 100644 index 00000000..6785c9ce --- /dev/null +++ b/backend/app/core/time_tracker.py @@ -0,0 +1,41 @@ +import logging +from datetime import datetime +from backend.app.cache.redis_client import redis_client + +logger = logging.getLogger(__name__) + +REDIS_KEY_PREFIX = "report_timer:" + +def start_timer(report_id: str): + """ + Records the start timestamp for a given report_id in Redis. + """ + try: + start_time = datetime.now().isoformat() + key = f"{REDIS_KEY_PREFIX}{report_id}" + redis_client.set_cache(key, start_time, ttl=3600 * 24) # Store for 24 hours + logger.info(f"Timer started for report_id: {report_id} at {start_time}") + except Exception as e: + logger.error(f"Failed to start timer for report_id {report_id}: {e}", exc_info=True) + +def finish_timer(report_id: str) -> float | None: + """ + Retrieves the start timestamp, calculates the duration, and removes the timer from Redis. + Returns the duration in seconds or None if the timer was not found or an error occurred. + """ + key = f"{REDIS_KEY_PREFIX}{report_id}" + try: + start_time_str = redis_client.get_cache(key) + if start_time_str: + redis_client.delete_cache(key) + start_time = datetime.fromisoformat(start_time_str.decode('utf-8')) + end_time = datetime.now() + duration = (end_time - start_time).total_seconds() + logger.info(f"Timer finished for report_id: {report_id}. Duration: {duration:.2f} seconds.") + return duration + else: + logger.warning(f"Timer not found for report_id: {report_id}") + return None + except Exception as e: + logger.error(f"Failed to finish timer for report_id {report_id}: {e}", exc_info=True) + return None diff --git a/backend/app/services/agents/code_audit_agent.py b/backend/app/services/agents/code_audit_agent.py index 1b0b1fd4..87a3d256 100644 --- a/backend/app/services/agents/code_audit_agent.py +++ b/backend/app/services/agents/code_audit_agent.py @@ -1,6 +1,5 @@ import os import re -import logging import json import hashlib from typing import Dict, Any, List diff --git a/backend/tests/test_orchestrator_updates.py b/backend/tests/test_orchestrator_updates.py index 8ceb3301..c690948e 100644 --- a/backend/tests/test_orchestrator_updates.py +++ b/backend/tests/test_orchestrator_updates.py @@ -1,7 +1,7 @@ import pytest from unittest.mock import AsyncMock, MagicMock from backend.app.core.orchestrator import Orchestrator -from backend.app.db.models.report_state import ReportStatusEnum, ReportState +from backend.app.db.models.report_state import ReportStatusEnum from sqlalchemy.ext.asyncio import AsyncSession from backend.app.db.repositories.report_repository import ReportRepository