Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions backend/app/services/report_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import asyncio
import logging

logger = logging.getLogger(__name__)

# In a real application, this would be a more robust shared state management system (e.g., Redis, a database, or a dedicated in-memory store with proper locking).
# For now, a simple dictionary will simulate the state.
# NOTE: This in-memory lock is only suitable for single-process deployments.
# For multi-process or distributed deployments, consider using an external store
# like Redis or a database with appropriate distributed locking mechanisms.
report_status = {}
report_status_lock = asyncio.Lock()

async def process_report(report_id: str, token_id: str) -> bool:
"""
Simulates a background report generation process.
Updates report_status to 'processing' and then to 'completed' on success.

Raises:
ValueError: If report_id is already being processed.
Exception: Any underlying exceptions are re-raised after marking status.
Returns:
True on success.
"""
# Mark processing under lock
async with report_status_lock:
if report_id in report_status:
raise ValueError(f"Report {report_id} is already being processed")
report_status[report_id] = {"status": "processing", "token_id": token_id}

logger.info("Processing report %s for token %s", report_id, token_id)

try:
await asyncio.sleep(5) # Simulate work
async with report_status_lock:
if report_id in report_status and isinstance(report_status[report_id], dict):
report_status[report_id]["status"] = "completed"
logger.info("Report %s completed.", report_id)
return True
except asyncio.CancelledError:
async with report_status_lock:
if report_id in report_status:
report_status[report_id]["status"] = "cancelled"
raise
except Exception:
async with report_status_lock:
if report_id in report_status:
report_status[report_id]["status"] = "failed"
logger.exception("Report %s failed.", report_id)
raise

async def get_report_status(report_id: str):
"""
Retrieves the status of a report.
"""
async with report_status_lock:
return report_status.get(report_id)
105 changes: 105 additions & 0 deletions backend/tests/test_report_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import pytest
import asyncio
from backend.app.services.report_processor import process_report, report_status, report_status_lock, get_report_status

@pytest.fixture(autouse=True)
async def clear_report_status():
async with report_status_lock:
report_status.clear()
yield
async with report_status_lock:
report_status.clear()

@pytest.mark.asyncio
async def test_process_report_success():
report_id = "test_report_1"
token_id = "test_token_1"

result = await process_report(report_id, token_id)
assert result is True

async with report_status_lock:
assert report_status[report_id]["status"] == "completed"
assert report_status[report_id]["token_id"] == token_id

@pytest.mark.asyncio
async def test_process_report_already_processing():
report_id = "test_report_2"
token_id = "test_token_2"

# Start processing but don't await it to simulate concurrency
task = asyncio.create_task(process_report(report_id, token_id))
await asyncio.sleep(0.1) # Give it a moment to set status to 'processing'

with pytest.raises(ValueError, match=f"Report {report_id} is already being processed"):
await process_report(report_id, token_id)

task.cancel()
with pytest.raises(asyncio.CancelledError):
await task # Await the cancelled task to ensure it raises CancelledError

@pytest.mark.asyncio
async def test_process_report_cancellation():
report_id = "test_report_3"
token_id = "test_token_3"

task = asyncio.create_task(process_report(report_id, token_id))
await asyncio.sleep(0.1) # Let it start processing

task.cancel()
with pytest.raises(asyncio.CancelledError):
await task

async with report_status_lock:
assert report_status[report_id]["status"] == "cancelled"

@pytest.mark.asyncio
async def test_process_report_exception_handling():
report_id = "test_report_4"
token_id = "test_token_4"

# Temporarily modify process_report to raise an exception
original_sleep = asyncio.sleep
async def mock_sleep_raise(*args, **kwargs):
raise Exception("Simulated processing error")
asyncio.sleep = mock_sleep_raise

with pytest.raises(Exception, match="Simulated processing error"):
await process_report(report_id, token_id)

async with report_status_lock:
assert report_status[report_id]["status"] == "failed"

asyncio.sleep = original_sleep # Restore original sleep

@pytest.mark.asyncio
async def test_get_report_status():
report_id = "test_report_5"
token_id = "test_token_5"

async with report_status_lock:
report_status[report_id] = {"status": "initial", "token_id": token_id}

status = await get_report_status(report_id)
assert status == {"status": "initial", "token_id": token_id}

status = await get_report_status("non_existent_report")
assert status is None

@pytest.mark.asyncio
async def test_concurrent_different_reports():
report_id_1 = "concurrent_report_1"
token_id_1 = "concurrent_token_1"
report_id_2 = "concurrent_report_2"
token_id_2 = "concurrent_token_2"

task1 = asyncio.create_task(process_report(report_id_1, token_id_1))
task2 = asyncio.create_task(process_report(report_id_2, token_id_2))

await asyncio.gather(task1, task2)

async with report_status_lock:
assert report_status[report_id_1]["status"] == "completed"
assert report_status[report_id_2]["status"] == "completed"
assert report_status[report_id_1]["token_id"] == token_id_1
assert report_status[report_id_2]["token_id"] == token_id_2