-
Notifications
You must be signed in to change notification settings - Fork 0
feat: Implement asynchronous report processing service #11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughAdds a new in-memory report processing service with concurrency control, async processing and status queries, plus a comprehensive test suite covering success, concurrent starts, cancellation, exception handling, and status retrieval. Changes
Sequence Diagram(s)sequenceDiagram
participant Caller
participant process_report as process_report()
participant report_status as report_status dict
participant report_status_lock as report_status_lock
Caller->>process_report: call process_report(report_id, token_id)
process_report->>report_status_lock: acquire lock
report_status_lock-->>process_report: locked
process_report->>report_status: set status="processing", token_id
process_report->>process_report: log "starting"
report_status_lock->>process_report: release lock
alt normal completion
process_report->>process_report: await 5s (simulated work)
process_report->>report_status_lock: acquire lock
report_status_lock-->>process_report: locked
process_report->>report_status: set status="completed"
process_report->>process_report: log "completed"
report_status_lock->>process_report: release lock
process_report-->>Caller: return True
else cancellation (CancelledError)
process_report->>process_report: CancelledError raised
process_report->>report_status_lock: acquire lock
report_status_lock-->>process_report: locked
process_report->>report_status: set status="cancelled"
report_status_lock->>process_report: release lock
process_report-->>Caller: raise CancelledError
else exception
process_report->>process_report: exception raised
process_report->>report_status_lock: acquire lock
report_status_lock-->>process_report: locked
process_report->>report_status: set status="failed"
process_report->>process_report: log "failed"
report_status_lock->>process_report: release lock
process_report-->>Caller: re-raise exception
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Nitpick comments (1)
backend/app/services/report_processor.py (1)
8-17: Consider task lifecycle and observability for production.For production deployment, consider:
- Task management: How will tasks be tracked? Should
asyncio.create_task()be used by callers?- Cancellation: What happens if the service shuts down mid-processing?
- Monitoring: Add metrics for processing time, success/failure rates, and queue depth
- Resource limits: Should there be a maximum number of concurrent reports?
- Persistence: On restart, how are in-flight reports recovered?
Example adding observability:
import time from contextlib import asynccontextmanager @asynccontextmanager async def track_processing_time(report_id: str): start_time = time.time() try: yield finally: duration = time.time() - start_time logger.info("Processing duration", extra={ "report_id": report_id, "duration_seconds": duration })Then use it in
process_report:async with track_processing_time(report_id): await asyncio.sleep(5)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
backend/tests/test_report_processor.py (1)
56-73: Refactor monkey-patching to use proper test fixtures.The global monkey-patching of
asyncio.sleepis risky because:
- If the test fails before line 73, the original
sleepis never restored, potentially breaking subsequent tests- The mock function has unused parameters (lines 63)
Use pytest's
monkeypatchfixture orunittest.mock.patchwith a context manager for safer cleanup:+from unittest.mock import patch + @pytest.mark.asyncio async def test_process_report_exception_handling(): report_id = "test_report_4" token_id = "test_token_4" - # Temporarily modify process_report to raise an exception - original_sleep = asyncio.sleep - async def mock_sleep_raise(*args, **kwargs): + async def mock_sleep_raise(_duration): raise Exception("Simulated processing error") - asyncio.sleep = mock_sleep_raise - with pytest.raises(Exception, match="Simulated processing error"): - await process_report(report_id, token_id) + with patch('asyncio.sleep', side_effect=mock_sleep_raise): + with pytest.raises(Exception, match="Simulated processing error"): + await process_report(report_id, token_id) async with report_status_lock: assert report_status[report_id]["status"] == "failed" - - asyncio.sleep = original_sleep # Restore original sleepThis ensures automatic cleanup even if the test fails.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
backend/app/services/report_processor.py(1 hunks)backend/tests/test_report_processor.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
backend/tests/test_report_processor.py (1)
backend/app/services/report_processor.py (2)
process_report(14-50)get_report_status(52-57)
🪛 Ruff (0.14.1)
backend/tests/test_report_processor.py
16-16: Possible hardcoded password assigned to: "token_id"
(S105)
28-28: Possible hardcoded password assigned to: "token_id"
(S105)
44-44: Possible hardcoded password assigned to: "token_id"
(S105)
59-59: Possible hardcoded password assigned to: "token_id"
(S105)
63-63: Unused function argument: args
(ARG001)
63-63: Unused function argument: kwargs
(ARG001)
64-64: Create your own exception
(TRY002)
64-64: Avoid specifying long messages outside the exception class
(TRY003)
78-78: Possible hardcoded password assigned to: "token_id"
(S105)
92-92: Possible hardcoded password assigned to: "token_id_1"
(S105)
94-94: Possible hardcoded password assigned to: "token_id_2"
(S105)
backend/app/services/report_processor.py
28-28: Avoid specifying long messages outside the exception class
(TRY003)
39-39: Consider moving this statement to an else block
(TRY300)
🔇 Additional comments (9)
backend/tests/test_report_processor.py (6)
5-11: Excellent test isolation pattern.The autouse fixture ensures clean state before and after each test, which is essential for testing concurrent state management. The proper use of the lock guarantees thread-safe cleanup.
13-23: Well-structured success case test.The test properly verifies both the return value and the state changes under lock, ensuring complete validation of the happy path.
25-39: Thorough concurrent processing test.The test correctly simulates concurrent processing attempts and validates both the ValueError for duplicate processing and proper cleanup of the cancelled task.
41-54: Good cancellation path coverage.The test properly validates that task cancellation updates the status to "cancelled" and propagates the CancelledError as expected.
75-87: Complete coverage of status retrieval.The test validates both successful retrieval of existing report status and proper handling of non-existent reports with None return.
89-105: Excellent concurrent processing validation.This test confirms that the locking mechanism correctly allows concurrent processing of different reports while preventing duplicate processing of the same report. The use of
asyncio.gatherproperly validates true concurrency.backend/app/services/report_processor.py (3)
1-12: Excellent resolution of previous critical issues.The addition of
report_status_lock(line 12) addresses the race condition flagged in previous reviews. The comprehensive comments (lines 6-10) clearly document the single-process limitation, and the proper logging setup (lines 2-4) replaces the previousprint()statements.
14-50: Well-implemented async processing with proper concurrency control.The function correctly addresses all previous review concerns:
- Comprehensive locking prevents race conditions (lines 26, 35, 41, 46)
- Proper error handling for cancellation (lines 40-44) and failures (lines 45-50)
- Clear return value and raised exceptions enable proper caller handling
- Defensive isinstance check (line 36) prevents KeyError in edge cases
The implementation is production-ready for single-process deployments.
52-57: Clean and correct status retrieval.The function properly protects read access with the lock and returns None for non-existent reports, providing a simple and safe interface.
Overview: This PR introduces a new service for asynchronously processing reports, laying the groundwork for future AI orchestration.
Changes
app/services/report_processor.pyto handle report generation.process_reportfunction that simulates background work usingasyncio.sleep().Summary by CodeRabbit