-
Notifications
You must be signed in to change notification settings - Fork 0
Fix: Improve Agent Failure Logging and State Management #75
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughOrchestrator now runs registered agents concurrently, captures per-agent exceptions with Changes
Sequence Diagram(s)sequenceDiagram
participant Caller as Caller
participant Orch as Orchestrator
participant Agents as Agent(s)
participant Repo as ReportRepository
participant Err as capture_exception
Caller->>Orch: execute_agents(report_id, token_id, ...)
Orch->>Agents: Launch agent tasks concurrently
par Parallel agent runs
Agents->>Agents: Execute agent logic (per-agent timeout)
Agents-->>Orch: Return result OR raise exception
end
Orch->>Err: capture_exception(context) for failures
Err-->>Orch: Exception metadata
Orch->>Orch: Build processed_results (results + per-agent error flags)
alt any agent failed
Orch->>Repo: get_report_by_id(report_id)
Repo-->>Orch: report or None
Orch->>Repo: update_partial(report_id, errors=merged_errors, status=AGENTS_FAILED)
else all agents succeeded
Orch->>Repo: update_partial(report_id, partial_agent_output, status=AGENTS_COMPLETED)
end
Orch-->>Caller: Return processed_results
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes
Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (3)
backend/app/core/orchestrator.py (3)
16-22: Consider movingdummy_agentto a test utilities module.Module-level functions intended solely for testing are typically placed in test directories (e.g.,
tests/fixtures/ortests/helpers/) to avoid polluting production code and to make test-only code explicit.
48-48: Addstrict=Truetozip()for safety.Per static analysis, using
zip()withoutstrict=Truecan silently drop items if iterables have mismatched lengths. Whileasyncio.gatherguarantees matching lengths here, explicit strictness is defensive.- for agent_name, result in zip(tasks.keys(), results): + for agent_name, result in zip(tasks.keys(), results, strict=True):
159-166: Missingcapture_exceptionin individual agent exception handlers.The new error handling in
execute_agents(line 52) callscapture_exceptionfor failed agents, but the exception handlers within individual agents (e.g., here and at lines 196-203, 255-262, 314-321) don't callcapture_exception. This creates inconsistent Sentry reporting:
- Exceptions caught in
execute_agents→ logged to Sentry- Exceptions caught within agent functions → not logged to Sentry
Consider adding
capture_exceptioncalls to the individual agent exception handlers for consistent observability, or refactor to let exceptions propagate toexecute_agentsfor centralized handling.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
backend/app/core/orchestrator.py(2 hunks)backend/app/db/models/report_state.py(1 hunks)
🧰 Additional context used
🪛 Ruff (0.14.6)
backend/app/core/orchestrator.py
48-48: zip() without an explicit strict= parameter
Add explicit value for parameter strict=
(B905)
🔇 Additional comments (2)
backend/app/db/models/report_state.py (1)
36-36: LGTM! Newerrorscolumn for agent-specific error tracking.The addition of a JSON column to store per-agent error flags is appropriate for the use case. Ensure a corresponding Alembic migration is created for this schema change if not already done.
Consider adding a docstring or inline comment clarifying the expected structure (e.g.,
{"agent_name": bool}) to help future maintainers.backend/app/core/orchestrator.py (1)
49-54: Good error capture pattern with context.The integration with
capture_exceptionincludingagent_name,report_id, andtoken_idas context provides valuable debugging information in Sentry. The combination withorchestrator_logger.exceptionensures both local logging and centralized error tracking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (4)
backend/app/core/orchestrator.py (4)
105-158: Critical: Fix syntax error and undefined variable.Multiple issues in this function:
Syntax error—incorrect indentation (line 116): The
try:block is indented at the wrong level, causing a Python syntax error. It should be dedented to align properly with the code flow.Undefined variable (lines 149, 151):
report_repositoryis not defined in this scope. Should useorch.report_repositorysince this is a closure insidecreate_orchestrator.Apply this diff to fix both issues:
onchain_metrics_task = asyncio.create_task(fetch_onchain_metrics(url=onchain_metrics_url, params=onchain_metrics_params, token_id=token_id)) tokenomics_task = asyncio.create_task(fetch_tokenomics(url=tokenomics_url, params=tokenomics_params, token_id=token_id)) onchain_metrics_result = {} tokenomics_result = {} - try: + try: onchain_metrics_result, tokenomics_result = await asyncio.gather( asyncio.wait_for(onchain_metrics_task, timeout=settings.AGENT_TIMEOUT - 1), asyncio.wait_for(tokenomics_task, timeout=settings.AGENT_TIMEOUT - 1), return_exceptions=True # This will allow us to handle exceptions for each task individually ) # Handle individual task results and exceptions if isinstance(onchain_metrics_result, asyncio.TimeoutError): orchestrator_logger.error("Onchain metrics fetch timed out for report %s", report_id) onchain_metrics_result = {"status": "failed", "error": "Onchain metrics fetch timed out"} elif isinstance(onchain_metrics_result, Exception): orchestrator_logger.error("Onchain metrics fetch failed for report %s", report_id) onchain_metrics_result = {"status": "failed", "error": str(onchain_metrics_result)} if isinstance(tokenomics_result, asyncio.TimeoutError): orchestrator_logger.error("Tokenomics fetch timed out for report %s", report_id) tokenomics_result = {"status": "failed", "error": "Tokenomics fetch timed out"} elif isinstance(tokenomics_result, Exception): orchestrator_logger.error("Tokenomics fetch failed for report %s", report_id) tokenomics_result = {"status": "failed", "error": str(tokenomics_result)} overall_agent_status = "completed" if onchain_metrics_result.get("status") == "failed" or tokenomics_result.get("status") == "failed": overall_agent_status = "failed" result = { "status": overall_agent_status, "data": { "onchain_metrics": onchain_metrics_result, "tokenomics": tokenomics_result } } - existing_report = await report_repository.get_report_by_id(report_id) + existing_report = await orch.report_repository.get_report_by_id(report_id) existing_partial_agent_output = existing_report.partial_agent_output if existing_report else {} - await report_repository.update_partial(report_id, {"partial_agent_output": {**existing_partial_agent_output, "onchain_data_agent": result}}) + await orch.report_repository.update_partial(report_id, {"partial_agent_output": {**existing_partial_agent_output, "onchain_data_agent": result}}) return result - except asyncio.TimeoutError as e: + except asyncio.TimeoutError as e: orchestrator_logger.error("Onchain Data Agent timed out for report %s", report_id) return {"status": "failed", "error": "Agent timed out"} - except Exception as e: + except Exception as e: orchestrator_logger.exception("Onchain Data Agent failed for report %s", report_id) return {"status": "failed", "error": str(e)}
164-191: Critical: Fix syntax error and incorrect repository access.Two critical issues:
Syntax error—incorrect indentation (line 167): The
try:block is indented at the wrong level. It should be dedented to align with the line above.Incorrect scope reference (lines 181, 183):
self.report_repositoryis not accessible in this closure function. Should useorch.report_repository.Apply this diff to fix both issues:
async def social_sentiment_agent_func(report_id: str, token_id: str) -> Dict[str, Any]: orchestrator_logger.info(f"Calling Social Sentiment Agent for report_id: {report_id}, token_id: {token_id}") agent = SocialSentimentAgent() - try: + try: social_data = await asyncio.wait_for(agent.fetch_social_data(token_id), timeout=settings.AGENT_TIMEOUT - 1) sentiment_report = await asyncio.wait_for(agent.analyze_sentiment(social_data), timeout=settings.AGENT_TIMEOUT - 1) orchestrator_logger.info(f"Social Sentiment Agent completed for report {report_id}.") result = { "status": "completed", "data": { "social_sentiment": { "overall_sentiment": sentiment_report.get("overall_sentiment"), "score": sentiment_report.get("score"), "summary": sentiment_report.get("details") # Storing details as summary for now } } } - existing_report = await self.report_repository.get_report_by_id(report_id) + existing_report = await orch.report_repository.get_report_by_id(report_id) existing_partial_agent_output = existing_report.partial_agent_output if existing_report else {} - await self.report_repository.update_partial(report_id, {"partial_agent_output": {**existing_partial_agent_output, "social_sentiment_agent": result}}) + await orch.report_repository.update_partial(report_id, {"partial_agent_output": {**existing_partial_agent_output, "social_sentiment_agent": result}}) return result - except asyncio.TimeoutError as e: + except asyncio.TimeoutError as e: orchestrator_logger.error("Social Sentiment Agent timed out for report %s", report_id) return {"status": "failed", "error": "Agent timed out"} - except Exception as e: + except Exception as e: orchestrator_logger.exception("Social Sentiment Agent failed for report %s", report_id) return {"status": "failed", "error": str(e)}
194-245: Critical: Fix syntax error and incorrect repository access.Same issues as the previous agents:
Syntax error—incorrect indentation (line 206): The
try:block should be dedented to align properly.Incorrect scope reference (lines 235, 237):
self.report_repositoryis not accessible in this closure. Should useorch.report_repository.Apply this diff to fix both issues:
async def team_documentation_agent(report_id: str, token_id: str) -> Dict[str, Any]: orchestrator_logger.info(f"Calling Team and Documentation Agent for report_id: {report_id}, token_id: {token_id}") agent = TeamDocAgent() team_analysis = [] whitepaper_summary = {} # Placeholder for fetching token-related data (URLs, whitepaper text) # In a real scenario, this data would be fetched based on token_id # For now, we'll use dummy data or assume it comes from settings team_profile_urls = settings.TEAM_PROFILE_URLS.get(token_id, []) whitepaper_text_source = settings.WHITEPAPER_TEXT_SOURCES.get(token_id, "") - try: + try: # Scrape team profiles orchestrator_logger.info(f"Scraping team profiles for token {token_id} from URLs: {team_profile_urls}") team_analysis = await asyncio.wait_for( asyncio.to_thread(agent.scrape_team_profiles, team_profile_urls), timeout=settings.AGENT_TIMEOUT - 1 ) orchestrator_logger.info(f"Team profile scraping completed for token {token_id}.") # Analyze whitepaper if whitepaper_text_source: orchestrator_logger.info(f"Analyzing whitepaper for token {token_id} from source: {whitepaper_text_source}") whitepaper_summary = await asyncio.wait_for( asyncio.to_thread(agent.analyze_whitepaper, whitepaper_text_source), timeout=settings.AGENT_TIMEOUT - 1 ) orchestrator_logger.info(f"Whitepaper analysis completed for token {token_id}.") else: orchestrator_logger.warning(f"No whitepaper text source provided for token {token_id}. Skipping whitepaper analysis.") result = { "status": "completed", "data": { "team_documentation": { "team_analysis": team_analysis, "whitepaper_summary": whitepaper_summary } } } - existing_report = await self.report_repository.get_report_by_id(report_id) + existing_report = await orch.report_repository.get_report_by_id(report_id) existing_partial_agent_output = existing_report.partial_agent_output if existing_report else {} - await self.report_repository.update_partial(report_id, {"partial_agent_output": {**existing_partial_agent_output, "team_documentation_agent": result}}) + await orch.report_repository.update_partial(report_id, {"partial_agent_output": {**existing_partial_agent_output, "team_documentation_agent": result}}) return result - except asyncio.TimeoutError as e: + except asyncio.TimeoutError as e: orchestrator_logger.error("Team and Documentation Agent timed out for report %s", report_id) return {"status": "failed", "error": "Agent timed out"} - except Exception as e: + except Exception as e: orchestrator_logger.exception("Team and Documentation Agent failed for report %s", report_id) return {"status": "failed", "error": str(e)}
250-298: Critical: Fix compound statement syntax error and repository access.Multiple critical issues:
Syntax error—compound statement (line 253):
audit_summary_data = [] try:has both a variable assignment andtry:on the same line, separated by whitespace. This is a Python syntax error.Incorrect indentation (line 250): The function definition appears to have excessive indentation.
Incorrect scope reference (lines 288, 290):
self.report_repositoryis not accessible in this closure. Should useorch.report_repository.Apply this diff to fix all issues:
code_audit_repo_url = settings.CODE_AUDIT_REPO_URL if _is_valid_url(code_audit_repo_url, "CODE_AUDIT_REPO_URL"): - async def code_audit_agent_func(report_id: str, token_id: str) -> Dict[str, Any]: - orchestrator_logger.info(f"Calling Code/Audit Agent for report_id: {report_id}, token_id: {token_id}") + async def code_audit_agent_func(report_id: str, token_id: str) -> Dict[str, Any]: + orchestrator_logger.info(f"Calling Code/Audit Agent for report_id: {report_id}, token_id: {token_id}") code_metrics_data = {} - audit_summary_data = [] try: + audit_summary_data = [] + try: async with CodeAuditAgent() as agent: # Fetch repo metrics orchestrator_logger.info(f"Fetching repository metrics for {code_audit_repo_url}") code_metrics = await asyncio.wait_for( agent.fetch_repo_metrics(code_audit_repo_url), timeout=settings.AGENT_TIMEOUT - 1 ) code_metrics_data = code_metrics.model_dump() # Analyze code activity orchestrator_logger.info(f"Analyzing code activity for {code_audit_repo_url}") code_activity_analysis = await asyncio.wait_for( agent.analyze_code_activity(code_metrics), timeout=settings.AGENT_TIMEOUT - 1 ) code_metrics_data.update({"activity_analysis": code_activity_analysis}) # Search and summarize audit reports orchestrator_logger.info(f"Searching and summarizing audit reports for {code_audit_repo_url}") audit_summary = await asyncio.wait_for( agent.search_and_summarize_audit_reports(code_audit_repo_url), timeout=settings.AGENT_TIMEOUT - 1 ) audit_summary_data = audit_summary result = { "status": "completed", "data": { "code_audit": { "code_metrics": code_metrics_data, "audit_summary": audit_summary_data } } } - existing_report = await self.report_repository.get_report_by_id(report_id) + existing_report = await orch.report_repository.get_report_by_id(report_id) existing_partial_agent_output = existing_report.partial_agent_output if existing_report else {} - await self.report_repository.update_partial(report_id, {"partial_agent_output": {**existing_partial_agent_output, "code_audit_agent": result}}) + await orch.report_repository.update_partial(report_id, {"partial_agent_output": {**existing_partial_agent_output, "code_audit_agent": result}}) return result - except asyncio.TimeoutError as e: + except asyncio.TimeoutError as e: orchestrator_logger.error("Code/Audit Agent timed out for report %s", report_id) return {"status": "failed", "error": "Agent timed out"} - except Exception as e: + except Exception as e: orchestrator_logger.exception("Code/Audit Agent failed for report %s", report_id) return {"status": "failed", "error": str(e)}
🧹 Nitpick comments (1)
backend/app/core/orchestrator.py (1)
14-14: Remove unused import.The
get_dbimport is not used anywhere in this file.Apply this diff to remove the unused import:
-from backend.app.db.database import get_db # Added for agent functions
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
backend/app/core/orchestrator.py(8 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
backend/app/core/orchestrator.py (3)
backend/app/core/error_utils.py (1)
capture_exception(10-70)backend/app/db/models/report_state.py (2)
ReportStatusEnum(8-20)ReportState(25-36)backend/app/db/repositories/report_repository.py (2)
get_report_by_id(72-76)update_partial(78-88)
🪛 Ruff (0.14.6)
backend/app/core/orchestrator.py
191-191: Expected a statement
(invalid-syntax)
245-245: Expected a statement
(invalid-syntax)
253-253: Compound statements are not allowed on the same line as simple statements
(invalid-syntax)
🔇 Additional comments (1)
backend/app/core/orchestrator.py (1)
16-22: LGTM!The dummy agent implementation is appropriate for testing purposes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (4)
backend/app/core/orchestrator.py (4)
114-168: Critical: Indentation error and missingselfreference will cause runtime failures.Two issues in
onchain_data_agent:
Indentation error (Line 125): The
tryblock is incorrectly indented as if it's inside another block, but the preceding code (lines 116-124) is at function body level. This causes a syntax error.Missing
selfprefix (Lines 158, 160):report_repositoryis referenced withoutself., causingNameErrorat runtime since this is a closure insidecreate_orchestrator, not a method ofOrchestrator.onchain_metrics_result = {} tokenomics_result = {} - try: + try: onchain_metrics_result, tokenomics_result = await asyncio.gather(For the repository access, since this closure is inside
create_orchestrator, you need to either:
- Pass
orch.report_repositoryinto the closure, or- Use
orch.report_repositorydirectly sinceorchis in scope- existing_report = await report_repository.get_report_by_id(report_id) + existing_report = await orch.report_repository.get_report_by_id(report_id) existing_partial_agent_output = existing_report.partial_agent_output if existing_report else {} - await report_repository.update_partial(report_id, {"partial_agent_output": {**existing_partial_agent_output, "onchain_data_agent": result}}) + await orch.report_repository.update_partial(report_id, {"partial_agent_output": {**existing_partial_agent_output, "onchain_data_agent": result}})
173-200: Critical: Indentation error and undefinedselfreference.Similar issues to
onchain_data_agent:
Indentation error (Line 176): The
tryblock is incorrectly indented.
selfis undefined (Lines 190, 192): This is a closure insidecreate_orchestrator, not a method.selfdoesn't exist here.agent = SocialSentimentAgent() - try: + try: social_data = await asyncio.wait_for(agent.fetch_social_data(token_id), timeout=settings.AGENT_TIMEOUT - 1)- existing_report = await self.report_repository.get_report_by_id(report_id) + existing_report = await orch.report_repository.get_report_by_id(report_id) existing_partial_agent_output = existing_report.partial_agent_output if existing_report else {} - await self.report_repository.update_partial(report_id, {"partial_agent_output": {**existing_partial_agent_output, "social_sentiment_agent": result}}) + await orch.report_repository.update_partial(report_id, {"partial_agent_output": {**existing_partial_agent_output, "social_sentiment_agent": result}})
203-254: Critical: Same indentation andselfreference issues.The
team_documentation_agentclosure has the same pattern of issues:
Indentation errors: Lines 208+ have inconsistent indentation with the
tryblock.Undefined
self(Lines 244, 246): Replace withorch.report_repository.Apply similar fixes as the other agent functions:
- existing_report = await self.report_repository.get_report_by_id(report_id) + existing_report = await orch.report_repository.get_report_by_id(report_id) existing_partial_agent_output = existing_report.partial_agent_output if existing_report else {} - await self.report_repository.update_partial(report_id, {"partial_agent_output": {**existing_partial_agent_output, "team_documentation_agent": result}}) + await orch.report_repository.update_partial(report_id, {"partial_agent_output": {**existing_partial_agent_output, "team_documentation_agent": result}})
257-307: Critical: Multiple syntax errors prevent this code from running.Several critical issues in
code_audit_agent_func:
Compound statement (Line 262):
audit_summary_data = []andtry:are on the same line, which is invalid Python syntax.Indentation inconsistencies: The function definition and body have misaligned indentation.
Undefined
self(Lines 297, 299): Same issue as other agents.code_metrics_data = {} - audit_summary_data = [] try: + audit_summary_data = [] + try: async with CodeAuditAgent() as agent:- existing_report = await self.report_repository.get_report_by_id(report_id) + existing_report = await orch.report_repository.get_report_by_id(report_id) existing_partial_agent_output = existing_report.partial_agent_output if existing_report else {} - await self.report_repository.update_partial(report_id, {"partial_agent_output": {**existing_partial_agent_output, "code_audit_agent": result}}) + await orch.report_repository.update_partial(report_id, {"partial_agent_output": {**existing_partial_agent_output, "code_audit_agent": result}})The entire
create_orchestratorfunction needs indentation cleanup. Consider running a formatter likeblackorruff formatafter fixing the compound statement issue.
🧹 Nitpick comments (1)
backend/tests/test_orchestrator_updates.py (1)
16-31: Consider using a callable mock forsession_factory.The
mock_session_factoryfixture returns aMagicMock(spec=AsyncSession)butOrchestrator.__init__expects a callable (factory function). This works becausemock_report_repositorycompletely replaces the repository methods withAsyncMock, but for correctness and if you ever need to test actual repository internals:@pytest.fixture def mock_session_factory(): - return MagicMock(spec=AsyncSession) + mock_session = MagicMock(spec=AsyncSession) + return MagicMock(return_value=mock_session)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
.venv/bin/python(1 hunks).venv/bin/python3(1 hunks).venv/bin/python3.12(1 hunks).venv/lib64(1 hunks).venv/pyvenv.cfg(1 hunks)backend/app/core/orchestrator.py(8 hunks)backend/tests/test_orchestrator_updates.py(1 hunks)
✅ Files skipped from review due to trivial changes (5)
- .venv/bin/python3.12
- .venv/bin/python3
- .venv/bin/python
- .venv/pyvenv.cfg
- .venv/lib64
🧰 Additional context used
🧬 Code graph analysis (1)
backend/tests/test_orchestrator_updates.py (2)
backend/app/core/orchestrator.py (3)
Orchestrator(24-88)register_agent(33-34)execute_agents(36-79)backend/app/db/models/report_state.py (2)
ReportStatusEnum(8-20)ReportState(25-36)
🪛 Ruff (0.14.6)
backend/app/core/orchestrator.py
200-200: Expected a statement
(invalid-syntax)
254-254: Expected a statement
(invalid-syntax)
262-262: Compound statements are not allowed on the same line as simple statements
(invalid-syntax)
backend/tests/test_orchestrator_updates.py
36-36: Possible hardcoded password assigned to: "token_id"
(S105)
58-58: Possible hardcoded password assigned to: "token_id"
(S105)
80-80: Possible hardcoded password assigned to: "token_id"
(S105)
100-100: Possible hardcoded password assigned to: "token_id"
(S105)
🔇 Additional comments (8)
backend/tests/test_orchestrator_updates.py (6)
1-6: LGTM!Imports are appropriate for the test module. The necessary dependencies for mocking async behavior, testing the orchestrator, and validating repository interactions are all present.
9-14: LGTM!The mock class adequately represents the
ReportStatemodel fields needed for testing the orchestrator's error handling and status update logic.
33-53: LGTM!Test correctly validates the success path: agent execution, status retrieval, and status update to
AGENTS_COMPLETED. Assertions are comprehensive.
55-75: LGTM!Test correctly validates the failure path: exception handling, status update to
AGENTS_FAILED, and per-agent error flag recording. The expected payload{"status": ReportStatusEnum.AGENTS_FAILED, "errors": {"AgentOne": True}}aligns with the orchestrator implementation.
77-95: LGTM!Test properly validates that
RuntimeErroris raised with appropriate context when the report is not found during failure handling. The assertion on error message content and verification thatupdate_partialis not called are correct.
97-115: LGTM!Test correctly validates the warning behavior when a report is not found during the success path. Using
caplogto verify log output is the appropriate pytest pattern. The distinction between this graceful warning and theRuntimeErrorin the failure path is properly tested.backend/app/core/orchestrator.py (2)
11-14: LGTM!Import additions for
capture_exception,ReportState, andget_dbare appropriate for the new error handling and database access patterns.
36-79: Well-structured error handling and state management.The implementation correctly:
- Runs agents concurrently with
return_exceptions=True- Captures exceptions with structured context for Sentry
- Merges per-agent error flags with existing errors
- Differentiates handling for missing reports (RuntimeError on failure, warning on success)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
backend/app/core/orchestrator.py (3)
36-79: execute_agents treats only raised exceptions as failures, ignoring agents that return status="failed"The current failure detection logic in
execute_agentsonly checks forisinstance(result, Exception)when buildingerrors_flag. Agents that catch their own errors and return a structured failure payload like{"status": "failed", "error": ...}(as your built-in agents do) will:
- Not set an entry in
errors_flag- Cause the report status to be updated to
AGENTS_COMPLETED- Not add an entry to
ReportState.errorsThis conflicts with the intent of “agent failures are properly logged and recorded in the report state” and means real agent failures can be surfaced only via logs, while the report is marked as fully successful.
Consider also treating agent results with
status == "failed"as failures when computingerrors_flag, for example:- for agent_name, result in zip(tasks.keys(), results): - if isinstance(result, Exception): + for agent_name, result in zip(tasks.keys(), results): + is_exception = isinstance(result, Exception) + is_failed_status = isinstance(result, dict) and result.get("status") == "failed" + + if is_exception or is_failed_status: # Capture the exception with additional context - orchestrator_logger.exception("Agent %s failed for report %s", agent_name, report_id) - capture_exception(result, {"agent_name": agent_name, "report_id": report_id, "token_id": token_id}) + if is_exception: + orchestrator_logger.exception("Agent %s failed for report %s", agent_name, report_id) + capture_exception(result, {"agent_name": agent_name, "report_id": report_id, "token_id": token_id}) + errors_flag[agent_name] = True - processed_results[agent_name] = {"status": "failed", "error": str(result)} + if is_exception: + processed_results[agent_name] = {"status": "failed", "error": str(result)} + else: + processed_results[agent_name] = result else: processed_results[agent_name] = resultYou can refine this further (e.g., deciding when to call
capture_exceptionfor non-exception failures), but the key point is to keep report status anderrorsaligned with agents’ own failure statuses.
114-161: Concurrent partial_agent_output updates can race and silently drop data; also no handling when report is missingEach agent function (
onchain_data_agent,social_sentiment_agent_func,team_documentation_agent,code_audit_agent_func) performs:
existing_report = await orch.report_repository.get_report_by_id(report_id)- Builds
existing_partial_agent_output = existing_report.partial_agent_output if existing_report else {}- Calls
update_partialwith a full merged dict forpartial_agent_outputBecause
execute_agentsruns all registered agents concurrently viaasyncio.gather, these agent functions may read the same stalepartial_agent_outputvalue and race to write back different merged versions. The last writer wins, so earlier agents’ contributions topartial_agent_outputcan be lost.Additionally, if
get_report_by_idreturnsNone(missing row), the code still callsupdate_partial, which is effectively a no-op in your repository but doesn’t log or raise. That means agent outputs can be silently discarded in these flows, whereasexecute_agentsexplicitly raises or logs when the report is missing.To make this more robust and aligned with the new central error-handling:
- Prefer centralizing partial output persistence in
execute_agentsonce all agent results are available:- # inside each agent - existing_report = await orch.report_repository.get_report_by_id(report_id) - existing_partial_agent_output = existing_report.partial_agent_output if existing_report else {} - await orch.report_repository.update_partial( - report_id, - {"partial_agent_output": {**existing_partial_agent_output, "onchain_data_agent": result}} - ) + # inside each agent + return result # do not touch the DB hereThen, after processing results in
execute_agents:- # Update the report with agent-specific errors if any agent failed - if errors_flag: - existing_report = await self.report_repository.get_report_by_id(report_id) + # Fetch once so we can update both status/errors and partial outputs atomically + existing_report = await self.report_repository.get_report_by_id(report_id) + + # Optionally merge partial_agent_output from all agents here + if existing_report: + existing_partial = existing_report.partial_agent_output or {} + updated_partial = {**existing_partial, **processed_results} + + # Update the report with agent-specific errors if any agent failed + if errors_flag: - if existing_report: + if existing_report: existing_errors = existing_report.errors if existing_report.errors else {} await self.report_repository.update_partial( report_id, - {"status": ReportStatusEnum.AGENTS_FAILED, "errors": {**existing_errors, **errors_flag}} + { + "status": ReportStatusEnum.AGENTS_FAILED, + "errors": {**existing_errors, **errors_flag}, + "partial_agent_output": updated_partial, + }, ) else: raise RuntimeError(...) else: - existing_report = await self.report_repository.get_report_by_id(report_id) - if existing_report: - await self.report_repository.update_partial( - report_id, - {"status": ReportStatusEnum.AGENTS_COMPLETED} - ) + if existing_report: + await self.report_repository.update_partial( + report_id, + { + "status": ReportStatusEnum.AGENTS_COMPLETED, + "partial_agent_output": updated_partial, + }, + ) else: orchestrator_logger.warning(...)Even if you don’t centralize everything immediately, at minimum consider:
- Logging or raising when
existing_reportisNonein the agent functions, to avoid silent data loss.- Using a repository-level helper that can atomically merge a single agent’s output into the JSON field (e.g., with JSONB operators) instead of read-modify-write patterns across multiple concurrent sessions.
Also applies to: 173-193, 203-247, 259-301
172-199: Agent-level exceptions are swallowed without updating ReportState, diverging from execute_agents’ error semanticsIn each agent function, errors are caught and converted into
{"status": "failed", "error": ...}responses:
- Onchain Data Agent:
except asyncio.TimeoutError/ genericException- Social Sentiment Agent: same pattern
- Team & Documentation Agent: same pattern
- Code/Audit Agent: same pattern
However, in these exception paths you:
- Do not update
partial_agent_outputwith a failed status or error message.- Do not touch
ReportState.errors.- Do not re-raise, so
execute_agentsnever sees an exception and will (with current logic) mark the overall run asAGENTS_COMPLETED.This makes built‑in agents behave differently from the mocked agents in tests (which raise and therefore set
errors_flag). To align with the PR’s goal of consistent failure tracking, consider one of:
- Re-raising after logging, so
execute_agentscan use its centralized failure handling:except asyncio.TimeoutError as e: orchestrator_logger.error("... timed out ...") - return {"status": "failed", "error": "Agent timed out"} + raise
- Or, if you want to keep soft-failure behavior, explicitly updating
partial_agent_outputanderrorsfor these paths (and adjustingexecute_agentsto treatstatus == "failed"as an error, as suggested earlier).Right now, these failures are only visible via logs and the returned dict, not in the persisted report state.
Also applies to: 256-307
🧹 Nitpick comments (3)
backend/tests/test_orchestrator_updates.py (1)
36-118: Consider also asserting execute_agents’ returned payload for stronger guaranteesThe tests comprehensively cover repository interactions and status transitions for success, failure, and “report not found” paths. To make them more robust against regressions in the error-payload contract, you might also assert on the structure of the
execute_agentsreturn value (e.g., that failed agents return{"status": "failed", "error": ...}and successful agents return their data).This would tie the tests more directly to the PR goal of “partial_agent_output explicitly includes status: 'failed' and an error message for individual agents,” without changing the existing interaction checks.
backend/app/core/orchestrator.py (2)
43-48: Avoid duplicate DB lookups and consider reusing a single ReportState per execute_agents callRight now
execute_agentscallsself.report_repository.get_report_by_id(report_id)once in the failure path and again in the success path. While each branch is mutually exclusive, you could simplify and slightly optimize by reading the report once up front and branching off that cached value, especially if more metadata fromReportStateis needed later (e.g., mergingpartial_agent_outputhere instead of inside each agent).Not mandatory, but something to consider if this method evolves further.
Also applies to: 58-77
16-22: Optional: Make dummy_agent’s output consistent with aggregate_results expectations
dummy_agentreturns{"dummy_data": ...}without a"status"or"data"key.aggregate_resultsonly aggregates entries whereresult.get("status") == "completed", and logs an error otherwise, so any use ofdummy_agentwithaggregate_resultswill be treated as an error.If
dummy_agentis intended purely for connectivity testing this is fine, but if it’s used in end‑to‑end flows you might want it to follow the same contract as other agents:- return {"dummy_data": f"Processed by dummy agent for {report_id}"} + return { + "status": "completed", + "data": {"dummy_data": f"Processed by dummy agent for {report_id}"}, + }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
backend/app/core/orchestrator.py(8 hunks)backend/tests/test_orchestrator_updates.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
backend/tests/test_orchestrator_updates.py (3)
backend/app/core/orchestrator.py (3)
Orchestrator(24-88)register_agent(33-34)execute_agents(36-79)backend/app/db/models/report_state.py (2)
ReportStatusEnum(8-20)ReportState(25-36)backend/app/db/repositories/report_repository.py (3)
ReportRepository(8-88)get_report_by_id(72-76)update_partial(78-88)
🪛 Ruff (0.14.6)
backend/app/core/orchestrator.py
200-200: Expected a statement
(invalid-syntax)
212-212: Unexpected indentation
(invalid-syntax)
254-254: Expected a statement
(invalid-syntax)
backend/tests/test_orchestrator_updates.py
39-39: Possible hardcoded password assigned to: "token_id"
(S105)
61-61: Possible hardcoded password assigned to: "token_id"
(S105)
83-83: Possible hardcoded password assigned to: "token_id"
(S105)
103-103: Possible hardcoded password assigned to: "token_id"
(S105)
🔇 Additional comments (1)
backend/tests/test_orchestrator_updates.py (1)
8-35: Fixtures and MockReportState are well-structured and representativeThe
MockReportStateclass and themock_session_factory/mock_report_repository/orchestratorfixtures model the real components closely enough for these tests while keeping things simple. Theerrorsdefault handling inMockReportStatealso avoids the mutable-default pitfall.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
backend/app/core/orchestrator.py (2)
36-80: Report status only flips toAGENTS_FAILEDon raised exceptions, not on agents returning{"status": "failed"}Right now
errors_flagis only populated whenresultis anException. All your agents, however, return structured dicts with"status": "failed"for expected failure/timeouts rather than raising. That means:
processed_results[agent_name]["status"]can be"failed"whileerrors_flagstays empty.- In that case the
else:branch at Lines 70‑77 sets the report status toAGENTS_COMPLETED, which contradicts the PR intent of marking reports as failed when any agent fails.To align top‑level status and
errorswith per‑agent outcomes, consider also treating returned"status": "failed"as an error:processed_results = {} errors_flag: Dict[str, bool] = {} for agent_name, result in zip(tasks.keys(), results): if isinstance(result, Exception): # Capture the exception with additional context orchestrator_logger.exception("Agent %s failed for report %s", agent_name, report_id) capture_exception(result, {"agent_name": agent_name, "report_id": report_id, "token_id": token_id}) - errors_flag[agent_name] = True - processed_results[agent_name] = {"status": "failed", "error": str(result)} + errors_flag[agent_name] = True + processed_results[agent_name] = {"status": "failed", "error": str(result)} else: - processed_results[agent_name] = result + processed_results[agent_name] = result + if isinstance(result, dict) and result.get("status") == "failed": + errors_flag[agent_name] = TrueThis way, both unexpected exceptions and normalised
"status": "failed"payloads will setAGENTS_FAILEDand be reflected in theerrorsJSON.
203-254: Team/Documentation agent has invalid indentation and missing persistence for failure casesThere are two issues in this block:
Syntax/indentation error (module won't import)
Lines 209‑214 (team_profile_urls = ...,whitepaper_text_source = ...) are indented deeper than the preceding statements but are not inside any enclosing block (noif/for/with/tryintroducing that level). This will raise anIndentationErrorat import time.These lines should be aligned with the rest of the function body:
- team_analysis = [] - whitepaper_summary = {} - - # Placeholder for fetching token-related data (URLs, whitepaper text) - # In a real scenario, this data would be fetched based on token_id - # For now, we'll use dummy data or assume it comes from settings - team_profile_urls = settings.TEAM_PROFILE_URLS.get(token_id, []) - whitepaper_text_source = settings.WHITEPAPER_TEXT_SOURCES.get(token_id, "") + team_analysis = [] + whitepaper_summary = {} + + # Placeholder for fetching token-related data (URLs, whitepaper text) + # In a real scenario, this data would be fetched based on token_id + # For now, we'll use dummy data or assume it comes from settings + team_profile_urls = settings.TEAM_PROFILE_URLS.get(token_id, []) + whitepaper_text_source = settings.WHITEPAPER_TEXT_SOURCES.get(token_id, "")
Failure outcomes not written to
partial_agent_output
The DB update at lines 244‑247 only occurs on success. If scraping or analysis times out or raises an exception, you return a failure dict from theexceptblocks (lines 238‑241) but never persist that result under"team_documentation_agent"inpartial_agent_output. Both success and failure states should be stored.After fixing the indentation, restructure to build a
resultvalue across bothtryandexceptblocks, then perform a singleupdate_partialcall followed byreturn, ensuring all outcomes are persisted.
🧹 Nitpick comments (3)
backend/app/core/orchestrator.py (3)
115-162: Onchain agent only persists successful runs intopartial_agent_outputThe onchain agent correctly:
- Runs the two downstream calls concurrently with per‑task timeouts.
- Normalises failures into
{"status": "failed", "error": ...}and setsoverall_agent_statusaccordingly.However, the
partial_agent_outputupdate (Lines 159‑162) is only reached on the happy path. In theasyncio.TimeoutErrorand genericExceptionbranches (Lines 163‑167), you return a failure dict but never persist that intopartial_agent_output, so the DB has no record that the onchain agent ran and failed.To keep DB state aligned with the PR objective (“partial_agent_output includes explicit status: 'failed' and an error message”), a pattern like this would be more robust:
- try: - onchain_metrics_result, tokenomics_result = await asyncio.gather( - asyncio.wait_for(onchain_metrics_task, timeout=settings.AGENT_TIMEOUT - 1), - asyncio.wait_for(tokenomics_task, timeout=settings.AGENT_TIMEOUT - 1), - return_exceptions=True - ) - ... - result = { ... } # success or per-task failure handling - existing_report = await orch.report_repository.get_report_by_id(report_id) - existing_partial_agent_output = existing_report.partial_agent_output if existing_report else {} - await orch.report_repository.update_partial( - report_id, - {"partial_agent_output": {**existing_partial_agent_output, "onchain_data_agent": result}}, - ) - return result - except asyncio.TimeoutError as e: - orchestrator_logger.error("Onchain Data Agent timed out for report %s", report_id) - return {"status": "failed", "error": "Agent timed out"} - except Exception as e: - orchestrator_logger.exception("Onchain Data Agent failed for report %s", report_id) - return {"status": "failed", "error": str(e)} + try: + onchain_metrics_result, tokenomics_result = await asyncio.gather( + asyncio.wait_for(onchain_metrics_task, timeout=settings.AGENT_TIMEOUT - 1), + asyncio.wait_for(tokenomics_task, timeout=settings.AGENT_TIMEOUT - 1), + return_exceptions=True, + ) + # ... existing per-task handling ... + result = {...} # as currently built + except asyncio.TimeoutError: + orchestrator_logger.error("Onchain Data Agent timed out for report %s", report_id) + result = {"status": "failed", "error": "Agent timed out"} + except Exception as e: + orchestrator_logger.exception("Onchain Data Agent failed for report %s", report_id) + result = {"status": "failed", "error": str(e)} + + existing_report = await orch.report_repository.get_report_by_id(report_id) + existing_partial_agent_output = existing_report.partial_agent_output if existing_report else {} + await orch.report_repository.update_partial( + report_id, + {"partial_agent_output": {**existing_partial_agent_output, "onchain_data_agent": result}}, + ) + return resultSame pattern could be applied to the other agents to ensure all outcomes (success or failure) are reflected in
partial_agent_output.
175-200: Social Sentiment agent: failures not persisted topartial_agent_outputSimilar to the onchain agent, this function:
- Wraps its work in a
tryand returns structured failure dicts in theexceptbranches.- Only calls
update_partial(Lines 190‑193) on the success path.If
fetch_social_dataoranalyze_sentimenttimes out or raises, the caller gets{"status": "failed", "error": ...}, but:
- No
social_sentiment_agententry appears inpartial_agent_output.execute_agents(as currently written) does not see anException, so the newerrorsJSON may also remain empty unless you adopt status‑based error_flagging as suggested earlier.Consider following the “compute
resultonce, persist once” pattern so that failures are also captured inpartial_agent_output, and so DB state matches the in‑memory outcome.
259-301: Code/Audit agent only records successful runs inpartial_agent_outputThe code/audit agent correctly:
- Uses
CodeAuditAgentas an async context manager.- Normalises all error conditions into
{"status": "failed", "error": ...}in theexceptbranches.- Persists
partial_agent_output["code_audit_agent"]only on the success path (Lines 299‑301).This means any timeout or failure produces a failure dict for the caller but leaves no trace in
partial_agent_output, which weakens observability and partially contradicts the PR goal of having explicit failed status/error stored per agent.Consider mirroring the pattern suggested for the onchain agent:
- Compute a
resultin both the happy path andexceptbranches.- After the
try/except, fetch the report once and callupdate_partialto store"code_audit_agent": result, then return it.This would also reduce duplication in the repository‑update logic across agents.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
backend/app/core/orchestrator.py(8 hunks)
🧰 Additional context used
🪛 Ruff (0.14.6)
backend/app/core/orchestrator.py
200-200: Expected a statement
(invalid-syntax)
212-212: Unexpected indentation
(invalid-syntax)
254-254: Expected a statement
(invalid-syntax)
🔇 Additional comments (1)
backend/app/core/orchestrator.py (1)
11-14: New error/report-state imports look appropriateThe additions of
capture_exceptionandReportStatusEnumintegrate cleanly with the rest of the module and match their later usage inexecute_agents; no issues here.
|
That error handling update in |
Overview: This change enhances the error handling for agents within the
Orchestrator, ensuring that agent failures are properly logged and recorded in the report state.Changes
sentry_sdk.capture_exceptionfor structured error logging when an agent fails or times out.orchestrator.pyto update theReportStatewith agent-specific error flags under thepartial_agent_output'serrorskey.partial_agent_outputnow explicitly includes astatus: 'failed'anderrormessage for individual agents that encounter issues.Summary by CodeRabbit
New Features
Bug Fixes
Performance
Tests
Chores
✏️ Tip: You can customize this high-level summary in your review settings.