-
Notifications
You must be signed in to change notification settings - Fork 0
Feature: Integrate AI Orchestrator into Report Processing #21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThree agents now accept and return a Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Client
participant Processor as Report Processor
participant Storage
participant Orchestrator as AI Orchestrator
participant Price as Price Agent
participant Trend as Trend Agent
participant Volume as Volume Agent
Client->>Processor: process_report(report_id, token_id)
Processor->>Storage: try_set_processing(report_id)
alt already processing
Processor-->>Client: raise ProcessingError
else accepted
Processor->>Orchestrator: register agents (price, trend, volume)
par Concurrent agent runs
Orchestrator->>Price: price.run(report_id, token_id)
Orchestrator->>Trend: trend.run(report_id, token_id)
Orchestrator->>Volume: volume.run(report_id, token_id)
and
Price-->>Orchestrator: {"data":..., "token_id":..., "report_id":...}
Trend-->>Orchestrator: {"data":..., "token_id":..., "report_id":...}
Volume-->>Orchestrator: {"data":..., "token_id":..., "report_id":...}
end
Orchestrator->>Orchestrator: aggregate_results() %% merge completed agents' data
Processor->>Storage: save_report_data(report_id, aggregated_data)
Processor->>Storage: set_report_status(report_id, "completed")
Processor-->>Client: return True
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes
Possibly related PRs
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro ⛔ Files ignored due to path filters (8)
📒 Files selected for processing (3)
🧰 Additional context used🧬 Code graph analysis (2)backend/app/services/report_processor.py (5)
backend/app/tests/test_report_processor.py (2)
🪛 Ruff (0.14.2)backend/app/core/orchestrator.py70-70: Loop control variable Rename unused (B007) backend/app/services/report_processor.py28-28: Avoid specifying long messages outside the exception class (TRY003) 44-44: Consider moving this statement to an (TRY300) backend/app/tests/test_report_processor.py16-16: Possible hardcoded password assigned to: "token_id" (S105) 38-38: Possible hardcoded password assigned to: "token_id" (S105) 49-49: Possible hardcoded password assigned to: "token_id" (S105) 77-77: Possible hardcoded password assigned to: "token_id" (S105) 96-98: (S110) 96-96: Do not catch blind exception: (BLE001) Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
backend/tests/test_report_processor.py (1)
65-75: Remove unused variabletoken_id.The variable
token_idis assigned on line 67 but never used in this test. Since the test focuses on status management rather than report processing, this variable can be removed.Apply this diff:
@pytest.mark.asyncio async def test_get_report_status(): report_id = "test_report_5" - token_id = "test_token_5" set_report_status(report_id, "initial")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (13)
backend/app/api/v1/__pycache__/routes.cpython-313.pycis excluded by!**/*.pycbackend/app/core/__pycache__/logger.cpython-313.pycis excluded by!**/*.pycbackend/app/core/__pycache__/orchestrator.cpython-313.pycis excluded by!**/*.pycbackend/app/core/__pycache__/storage.cpython-313.pycis excluded by!**/*.pycbackend/app/services/__pycache__/report_processor.cpython-313-pytest-8.4.2.pycis excluded by!**/*.pycbackend/app/services/__pycache__/report_processor.cpython-313.pycis excluded by!**/*.pycbackend/app/services/__pycache__/report_service.cpython-313.pycis excluded by!**/*.pycbackend/app/services/agents/__pycache__/__init__.cpython-313.pycis excluded by!**/*.pycbackend/app/services/agents/__pycache__/price_agent.cpython-313.pycis excluded by!**/*.pycbackend/app/services/agents/__pycache__/trend_agent.cpython-313.pycis excluded by!**/*.pycbackend/app/services/agents/__pycache__/volume_agent.cpython-313.pycis excluded by!**/*.pycbackend/logs/app.logis excluded by!**/*.logbackend/tests/__pycache__/test_report_processor.cpython-313-pytest-8.4.2.pycis excluded by!**/*.pyc
📒 Files selected for processing (5)
backend/app/services/agents/price_agent.py(1 hunks)backend/app/services/agents/trend_agent.py(1 hunks)backend/app/services/agents/volume_agent.py(1 hunks)backend/app/services/report_processor.py(2 hunks)backend/tests/test_report_processor.py(4 hunks)
🧰 Additional context used
🧬 Code graph analysis (5)
backend/app/services/agents/volume_agent.py (2)
backend/app/services/agents/price_agent.py (1)
run(3-8)backend/app/services/agents/trend_agent.py (1)
run(3-8)
backend/app/services/agents/trend_agent.py (2)
backend/app/services/agents/price_agent.py (1)
run(3-8)backend/app/services/agents/volume_agent.py (1)
run(3-8)
backend/app/services/report_processor.py (5)
backend/app/core/orchestrator.py (4)
AIOrchestrator(14-68)register_agent(23-31)execute_agents(41-57)aggregate_results(59-68)backend/app/services/agents/price_agent.py (1)
run(3-8)backend/app/services/agents/trend_agent.py (1)
run(3-8)backend/app/services/agents/volume_agent.py (1)
run(3-8)backend/app/core/storage.py (1)
set_report_status(4-9)
backend/tests/test_report_processor.py (2)
backend/app/services/report_processor.py (1)
process_report(13-52)backend/app/core/storage.py (1)
set_report_status(4-9)
backend/app/services/agents/price_agent.py (2)
backend/app/services/agents/trend_agent.py (1)
run(3-8)backend/app/services/agents/volume_agent.py (1)
run(3-8)
🪛 Ruff (0.14.2)
backend/app/services/report_processor.py
28-28: Avoid specifying long messages outside the exception class
(TRY003)
45-45: Consider moving this statement to an else block
(TRY300)
52-52: Use raise without specifying exception name
Remove exception name
(TRY201)
backend/tests/test_report_processor.py
29-29: Possible hardcoded password assigned to: "token_id"
(S105)
54-54: Possible hardcoded password assigned to: "token_id"
(S105)
67-67: Local variable token_id is assigned to but never used
Remove assignment to unused variable token_id
(F841)
67-67: Possible hardcoded password assigned to: "token_id"
(S105)
🔇 Additional comments (12)
backend/app/services/agents/trend_agent.py (1)
3-8: LGTM! Signature and return payload updated correctly.The function signature and return payload have been updated to include
report_id, which aligns with the orchestration workflow requirements. This change is consistent across all agent functions (price, trend, and volume).backend/app/services/agents/price_agent.py (1)
3-8: LGTM! Signature and return payload updated correctly.The function signature and return payload have been updated to include
report_id, maintaining consistency with the other agent functions in this PR.backend/app/services/agents/volume_agent.py (1)
3-8: LGTM! Signature and return payload updated correctly.The function signature and return payload have been updated to include
report_id, completing the consistent update across all three agent functions.backend/app/services/report_processor.py (3)
3-7: LGTM! Imports correctly structured for orchestration.The new imports properly integrate the AIOrchestrator, agent functions, and storage-backed status management, aligning with the PR's objective to introduce AI orchestration into the report processing workflow.
26-31: LGTM! Storage-backed status management implemented correctly.The status check and initialization properly replaces the previous in-memory approach with storage-backed functions. The guard against concurrent processing for the same report is appropriate.
33-42: LGTM! Orchestration flow implemented correctly.The orchestration workflow is well-structured:
- Creates AIOrchestrator instance
- Registers all three agents (price, trend, volume)
- Executes agents with proper parameters
- Aggregates results
- Persists data and updates status
This implementation aligns with the PR objective to integrate AI orchestration into report processing.
backend/tests/test_report_processor.py (6)
3-10: LGTM! Test setup updated for storage-backed architecture.The imports and fixture properly integrate storage-backed status management, clearing REPORT_STORE before and after each test to ensure test isolation.
13-24: LGTM! Success test correctly validates storage-backed status.The test properly verifies that
process_reportcompletes successfully and updates the status to "completed" using the storage-backedget_report_statusfunction.
27-34: LGTM! Concurrency guard test implemented correctly.The test properly validates that attempting to process a report that's already in "processing" status raises a ValueError, ensuring the concurrency guard works as expected.
37-49: LGTM! Cancellation handling test implemented correctly.The test properly verifies that when a report processing task is cancelled, the status is updated to "cancelled" in storage.
52-62: LGTM! Exception handling test validates failure status correctly.The test mocks the orchestrator to raise an exception and verifies that the report status is properly set to "failed" in storage.
78-92: LGTM! Concurrent processing test validates independent report handling.The test correctly verifies that multiple reports can be processed concurrently without interfering with each other, demonstrating that the storage-backed status management properly isolates different report IDs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (3)
backend/app/services/report_processor.py (3)
33-36: Consider reusing the orchestrator instance.Creating a new
AIOrchestratorinstance and re-registering agents for every report adds minor overhead. If the agents are stateless (which they appear to be), consider instantiating and configuring the orchestrator once at the module level.Example:
# At module level _orchestrator = AIOrchestrator() _orchestrator.register_agent("price_agent", price_agent_run) _orchestrator.register_agent("trend_agent", trend_agent_run) _orchestrator.register_agent("volume_agent", volume_agent_run) async def process_report(report_id: str, token_id: str) -> bool: # ... status checks ... agent_results = await _orchestrator.execute_agents(report_id, token_id) combined_report_data = _orchestrator.aggregate_results(agent_results) # ... rest of function ...
45-45: Consider moving the return statement to an else block.Placing the
return Truestatement in anelseblock would make the control flow more explicit and clarify that it only executes when no exception occurs.Apply this diff:
save_report_data(report_id, combined_report_data) set_report_status(report_id, "completed") logger.info("Report %s completed.", report_id) - return True except asyncio.CancelledError: set_report_status(report_id, "cancelled") raise except Exception: set_report_status(report_id, "failed") logger.exception("Report %s failed.", report_id) raise + else: + return True
28-28: Consider extracting the error message to a constant or the exception class.Long exception messages embedded directly in raise statements can reduce code readability and make maintenance harder if the message needs to be reused or updated.
Example:
# At module level class ReportAlreadyProcessingError(ValueError): """Raised when attempting to process a report that is already being processed.""" def __init__(self, report_id: str): super().__init__(f"Report {report_id} is already being processed")Then in the function:
- raise ValueError(f"Report {report_id} is already being processed") + raise ReportAlreadyProcessingError(report_id)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (6)
backend/app/services/__pycache__/report_processor.cpython-313.pycis excluded by!**/*.pycbackend/app/services/agents/__pycache__/price_agent.cpython-313.pycis excluded by!**/*.pycbackend/app/services/agents/__pycache__/trend_agent.cpython-313.pycis excluded by!**/*.pycbackend/app/services/agents/__pycache__/volume_agent.cpython-313.pycis excluded by!**/*.pycbackend/logs/app.logis excluded by!**/*.logbackend/tests/__pycache__/test_report_processor.cpython-313-pytest-8.4.2.pycis excluded by!**/*.pyc
📒 Files selected for processing (1)
backend/app/services/report_processor.py(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
backend/app/services/report_processor.py (5)
backend/app/core/orchestrator.py (4)
AIOrchestrator(14-68)register_agent(23-31)execute_agents(41-57)aggregate_results(59-68)backend/app/services/agents/price_agent.py (1)
run(3-8)backend/app/services/agents/volume_agent.py (1)
run(3-8)backend/app/services/agents/trend_agent.py (1)
run(3-8)backend/app/core/storage.py (1)
set_report_status(4-9)
🪛 Ruff (0.14.2)
backend/app/services/report_processor.py
28-28: Avoid specifying long messages outside the exception class
(TRY003)
45-45: Consider moving this statement to an else block
(TRY300)
49-49: Local variable e is assigned to but never used
Remove assignment to unused variable e
(F841)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
🧹 Nitpick comments (2)
backend/app/core/storage.py (1)
2-5: Thread-local lock won't protect across processes.The
threading.Lock()provides thread safety within a single Python process. If the application runs with multiple worker processes (e.g., Gunicorn with multiple workers, Celery workers), this lock won't prevent race conditions across processes.For multi-process deployments, consider using a distributed lock mechanism (Redis, database row locks, or a distributed coordination service).
backend/app/services/report_processor.py (1)
32-41: Optional: Redundant status update.Line 40 calls
save_report_data(), which automatically sets the report status to "completed" (seebackend/app/core/storage.pylines 38, 41). Line 41 then redundantly sets the status to "completed" again.Consider removing line 41 to avoid the redundant call:
agent_results = await orchestrator.execute_agents(report_id, token_id) combined_report_data = orchestrator.aggregate_results(agent_results) save_report_data(report_id, combined_report_data) - set_report_status(report_id, "completed") logger.info("Report %s completed.", report_id)Alternatively, modify
save_report_datato not set the status if explicit control is desired.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (6)
backend/app/core/__pycache__/storage.cpython-313.pycis excluded by!**/*.pycbackend/app/services/__pycache__/report_processor.cpython-313.pycis excluded by!**/*.pycbackend/app/services/agents/__pycache__/price_agent.cpython-313.pycis excluded by!**/*.pycbackend/app/services/agents/__pycache__/trend_agent.cpython-313.pycis excluded by!**/*.pycbackend/app/services/agents/__pycache__/volume_agent.cpython-313.pycis excluded by!**/*.pycbackend/app/tests/__pycache__/test_report_processor.cpython-313-pytest-8.4.2.pycis excluded by!**/*.pyc
📒 Files selected for processing (3)
backend/app/core/storage.py(1 hunks)backend/app/services/report_processor.py(2 hunks)backend/app/tests/test_report_processor.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
backend/app/services/report_processor.py (5)
backend/app/core/orchestrator.py (4)
AIOrchestrator(14-68)register_agent(23-31)execute_agents(41-57)aggregate_results(59-68)backend/app/services/agents/trend_agent.py (1)
run(3-8)backend/app/services/agents/price_agent.py (1)
run(3-8)backend/app/services/agents/volume_agent.py (1)
run(3-8)backend/app/core/storage.py (4)
save_report_data(34-41)set_report_status(7-13)get_report_status(15-18)try_set_processing(20-32)
backend/app/core/storage.py (1)
backend/app/api/v1/routes.py (1)
get_report_status(41-47)
backend/app/tests/test_report_processor.py (2)
backend/app/services/report_processor.py (1)
process_report(13-51)backend/app/core/storage.py (3)
get_report_status(15-18)set_report_status(7-13)try_set_processing(20-32)
🪛 Ruff (0.14.2)
backend/app/services/report_processor.py
26-26: Undefined name storage
(F821)
28-28: Avoid specifying long messages outside the exception class
(TRY003)
44-44: Consider moving this statement to an else block
(TRY300)
50-50: Undefined name storage
(F821)
backend/app/tests/test_report_processor.py
14-14: Possible hardcoded password assigned to: "token_id"
(S105)
35-35: Possible hardcoded password assigned to: "token_id"
(S105)
45-45: Possible hardcoded password assigned to: "token_id"
(S105)
73-73: Possible hardcoded password assigned to: "token_id"
(S105)
92-94: try-except-pass detected, consider logging the exception
(S110)
92-92: Do not catch blind exception: Exception
(BLE001)
🔇 Additional comments (7)
backend/app/core/storage.py (2)
20-32: LGTM: Atomic check-and-set correctly implemented.The atomic operation properly prevents duplicate processing. Note that this allows transitioning from any non-"processing" status (including "completed" or "failed") back to "processing", which enables report reprocessing.
34-41: Verify thatsave_report_datashould always mark reports as completed.The function unconditionally sets the report status to "completed" (lines 38, 41). Ensure this aligns with the intended behavior and that
save_report_datais only called after successful processing. If called during partial/intermediate saves, this could prematurely mark a report as completed.backend/app/tests/test_report_processor.py (3)
8-10: LGTM: Proper test isolation.The autouse fixture ensures each test starts with a clean report store, preventing test interdependencies.
54-69: LGTM: Thorough coverage oftry_set_processingbehavior.These tests properly validate the atomic operation under different scenarios: new reports, already processing, and transitioning from completed status.
71-108: Critical: Async function called withoutawaitin threaded context.Line 85 calls
process_reportwithoutawait. Sinceprocess_reportis async, this test needs to run the coroutine properly in each thread usingasyncio.run().Apply this diff:
def worker(): nonlocal processed_count, exception_count try: # Mock the agents to prevent actual external calls with patch('backend.app.services.agents.price_agent.PriceAgent.get_price', return_value=100.0): with patch('backend.app.services.agents.volume_agent.VolumeAgent.get_volume', return_value=1000.0): with patch('backend.app.services.agents.trend_agent.TrendAgent.get_trend', return_value="up"): - process_report(report_id, token_id) + asyncio.run(process_report(report_id, token_id)) processed_count += 1 except ValueError as e: if "already being processed" in str(e): exception_count += 1 else: raise except Exception: # Catch other unexpected exceptions during processing passLikely an incorrect or invalid review comment.
backend/app/services/report_processor.py (2)
26-28: LGTM: Race condition properly addressed.The atomic
try_set_processingcheck correctly prevents duplicate processing, addressing the TOCTOU race condition from previous reviews. (Note: import issue flagged separately.)
45-51: LGTM: Error handling properly implemented.The exception handling correctly:
- Distinguishes between cancellation and failure scenarios
- Sets appropriate status for each error type
- Uses bare
raiseto preserve exception tracebacks- Logs errors before re-raising
(Note: import issue on line 50 flagged separately.)
|
Nice, the mock agents registration in the orchestrator makes the process more streamlined. 👍 |
Overview: This PR integrates AI orchestration into the report processing workflow to enhance data aggregation and analysis.
Changes
AIOrchestratorwithin theprocess_report()function inbackend/app/services/report_processor.py.save_report_data().Summary by CodeRabbit
Improvements
Tests