-
Notifications
You must be signed in to change notification settings - Fork 0
Feat: Update report state in orchestrator after each stage #67
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
2e95ee7
60ceb4d
ae6b1fd
27bfc1d
eff89aa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,116 @@ | ||
| # A generic, single database configuration. | ||
|
|
||
| [alembic] | ||
| # path to migration scripts | ||
| script_location = backend/app/db/migrations | ||
|
|
||
| # template used to generate migration file names; The default value is %%(rev)s_%%(slug)s | ||
| # Uncomment the line below if you want the files to be prepended with date and time | ||
| # see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file | ||
| # for all available tokens | ||
| # file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s | ||
|
|
||
| # sys.path path, will be prepended to sys.path if present. | ||
| # defaults to the current working directory. | ||
| prepend_sys_path = . | ||
|
|
||
| # timezone to use when rendering the date within the migration file | ||
| # as well as the filename. | ||
| # If specified, requires the python-dateutil library that can be | ||
| # installed by adding `alembic[tz]` to the pip requirements | ||
| # string value is passed to dateutil.tz.gettz() | ||
| # leave blank for localtime | ||
| # timezone = | ||
|
|
||
| # max length of characters to apply to the | ||
| # "slug" field | ||
| # truncate_slug_length = 40 | ||
|
|
||
| # set to 'true' to run the environment during | ||
| # the 'revision' command, regardless of autogenerate | ||
| # revision_environment = false | ||
|
|
||
| # set to 'true' to allow .pyc and .pyo files without | ||
| # a source .py file to be detected as revisions in the | ||
| # versions/ directory | ||
| # sourceless = false | ||
|
|
||
| # version location specification; This defaults | ||
| # to backend/app/db/migrations/versions. When using multiple version | ||
| # directories, initial revisions must be specified with --version-path. | ||
| # The path separator used here should be the separator specified by "version_path_separator" below. | ||
| # version_locations = %(here)s/bar:%(here)s/bat:backend/app/db/migrations/versions | ||
|
|
||
| # version path separator; As mentioned above, this is the character used to split | ||
| # version_locations. The default within new alembic.ini files is "os", which uses os.pathsep. | ||
| # If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas. | ||
| # Valid values for version_path_separator are: | ||
| # | ||
| # version_path_separator = : | ||
| # version_path_separator = ; | ||
| # version_path_separator = space | ||
| version_path_separator = os # Use os.pathsep. Default configuration used for new projects. | ||
|
|
||
| # set to 'true' to search source files recursively | ||
| # in each "version_locations" directory | ||
| # new in Alembic version 1.10 | ||
| # recursive_version_locations = false | ||
|
|
||
| # the output encoding used when revision files | ||
| # are written from script.py.mako | ||
| # output_encoding = utf-8 | ||
|
|
||
| sqlalchemy.url = sqlite+aiosqlite:///./sql_app.db | ||
|
|
||
|
|
||
| [post_write_hooks] | ||
| # post_write_hooks defines scripts or Python functions that are run | ||
| # on newly generated revision scripts. See the documentation for further | ||
| # detail and examples | ||
|
|
||
| # format using "black" - use the console_scripts runner, against the "black" entrypoint | ||
| # hooks = black | ||
| # black.type = console_scripts | ||
| # black.entrypoint = black | ||
| # black.options = -l 79 REVISION_SCRIPT_FILENAME | ||
|
|
||
| # lint with attempts to fix using "ruff" - use the exec runner, execute a binary | ||
| # hooks = ruff | ||
| # ruff.type = exec | ||
| # ruff.executable = %(here)s/.venv/bin/ruff | ||
| # ruff.options = --fix REVISION_SCRIPT_FILENAME | ||
|
|
||
| # Logging configuration | ||
| [loggers] | ||
| keys = root,sqlalchemy,alembic | ||
|
|
||
| [handlers] | ||
| keys = console | ||
|
|
||
| [formatters] | ||
| keys = generic | ||
|
|
||
| [logger_root] | ||
| level = WARN | ||
| handlers = console | ||
| qualname = | ||
|
|
||
| [logger_sqlalchemy] | ||
| level = WARN | ||
| handlers = | ||
| qualname = sqlalchemy.engine | ||
|
|
||
| [logger_alembic] | ||
| level = INFO | ||
| handlers = | ||
| qualname = alembic | ||
|
|
||
| [handler_console] | ||
| class = StreamHandler | ||
| args = (sys.stderr,) | ||
| level = NOTSET | ||
| formatter = generic | ||
|
|
||
| [formatter_generic] | ||
| format = %(levelname)-5.5s [%(name)s] %(message)s | ||
| datefmt = %H:%M:%S |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,87 +1,78 @@ | ||
| from fastapi import APIRouter, BackgroundTasks | ||
| from fastapi import APIRouter, BackgroundTasks, Depends | ||
| from fastapi.responses import JSONResponse | ||
| from sqlalchemy.ext.asyncio import AsyncSession | ||
| from backend.app.db.database import get_db | ||
| from backend.app.db.repositories.report_repository import ReportRepository | ||
| from backend.app.models.report_models import ReportRequest, ReportResponse | ||
| from backend.app.services.report_service import generate_report, in_memory_reports, get_report_status_from_memory, get_report_data | ||
| from backend.app.core.orchestrator import create_orchestrator | ||
| from backend.app.services.report_service import generate_report, get_report_status, get_report_data | ||
| from backend.app.services.report_processor import process_report | ||
| from backend.app.core.logger import api_logger | ||
| from backend.app.core.exceptions import ReportNotFoundException | ||
| import asyncio | ||
| from backend.app.db.models.report_state import ReportStatusEnum | ||
|
|
||
| router = APIRouter() | ||
|
|
||
| # Dummy Agent for demonstration | ||
| async def dummy_agent_one(report_id: str, token_id: str) -> dict: | ||
| print(f"Dummy Agent One running for report {report_id} and token {token_id}") | ||
| await asyncio.sleep(2) # Simulate async work | ||
| return {"agent_one_data": "data_from_agent_one"} | ||
|
|
||
| async def dummy_agent_two(report_id: str, token_id: str) -> dict: | ||
| print(f"Dummy Agent Two running for report {report_id} and token {token_id}") | ||
| await asyncio.sleep(1.5) # Simulate async work | ||
| return {"agent_two_data": "data_from_agent_two"} | ||
|
|
||
| # Register agents | ||
| orchestrator_instance = create_orchestrator() | ||
| orchestrator_instance.register_agent("AgentOne", dummy_agent_one) | ||
| orchestrator_instance.register_agent("AgentTwo", dummy_agent_two) | ||
|
|
||
| @router.get("/") | ||
| async def read_root(): | ||
| return {"message": "Welcome to API v1"} | ||
|
|
||
| async def _run_agents_in_background(report_id: str, token_id: str): | ||
| try: | ||
| await orchestrator_instance.execute_agents_concurrently(report_id, token_id) | ||
| except Exception as e: | ||
| api_logger.error(f"Agent execution failed for report {report_id}: {e}") | ||
| # Here you might want to update the report status to 'failed' in in_memory_reports | ||
| # For now, we'll just log it. | ||
| if report_id in in_memory_reports: | ||
| in_memory_reports[report_id]["status"] = "failed" | ||
| in_memory_reports[report_id]["detail"] = f"Agent execution failed: {e}" | ||
| async for session in get_db(): | ||
| report_repository = ReportRepository(session) | ||
| try: | ||
| await report_repository.update_report_status(report_id, ReportStatusEnum.RUNNING_AGENTS) | ||
| await process_report(report_id, token_id, report_repository) | ||
| await report_repository.update_report_status(report_id, ReportStatusEnum.COMPLETED) | ||
| break # Exit the async for loop after successful processing | ||
| except Exception as e: | ||
| api_logger.error(f"Report processing failed for report {report_id}: {e}") | ||
| await report_repository.update_partial(report_id, {"status": ReportStatusEnum.FAILED, "error_message": str(e)}) | ||
| break # Exit the async for loop on failure | ||
|
Comment on lines
19
to
+30
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
According to Either pass the factory directly: async def _run_agents_in_background(report_id: str, token_id: str):
- async for session in get_db():
- report_repository = ReportRepository(session)
- try:
- await report_repository.update_report_status(report_id, ReportStatusEnum.RUNNING_AGENTS)
- await process_report(report_id, token_id, report_repository)
- await report_repository.update_report_status(report_id, ReportStatusEnum.COMPLETED)
- break # Exit the async for loop after successful processing
- except Exception as e:
- api_logger.error(f"Report processing failed for report {report_id}: {e}")
- await report_repository.update_partial(report_id, {"status": ReportStatusEnum.FAILED, "error_message": str(e)})
- break # Exit the async for loop on failure
+ from backend.app.db.database import AsyncSessionLocal
+ report_repository = ReportRepository(AsyncSessionLocal)
+ try:
+ await report_repository.update_report_status(report_id, ReportStatusEnum.RUNNING_AGENTS)
+ await process_report(report_id, token_id, report_repository)
+ await report_repository.update_report_status(report_id, ReportStatusEnum.COMPLETED)
+ except Exception as e:
+ api_logger.error(f"Report processing failed for report {report_id}: {e}")
+ await report_repository.update_partial(report_id, {"status": ReportStatusEnum.FAILED, "error_message": str(e)})Or refactor 🧰 Tools🪛 Ruff (0.14.5)27-27: Do not catch blind exception: (BLE001) 🤖 Prompt for AI Agents |
||
|
|
||
| @router.post("/report/generate", response_model=ReportResponse) | ||
| async def generate_report_endpoint(request: ReportRequest, background_tasks: BackgroundTasks): | ||
| async def generate_report_endpoint(request: ReportRequest, background_tasks: BackgroundTasks, session: AsyncSession = Depends(get_session)): | ||
| api_logger.info(f"Received report generation request for token_id: {request.token_id}") | ||
| report_response = await generate_report(request) | ||
| report_repository = ReportRepository(session) | ||
| report_response = await generate_report(request, report_repository) | ||
|
Comment on lines
+33
to
+36
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same The endpoint passes a raw This applies to all three endpoints (lines 33-35, 42-44, 52-54). Either the report_repository = ReportRepository(lambda: contextmanager_yielding_session(session))Or consider a design where request-scoped endpoints use a different repository pattern than background tasks. 🧰 Tools🪛 Ruff (0.14.5)33-33: Do not perform function call (B008) 33-33: Undefined name (F821) 🤖 Prompt for AI Agents |
||
| report_id = report_response.report_id | ||
| background_tasks.add_task(_run_agents_in_background, report_id, request.token_id) | ||
| return report_response | ||
|
|
||
| @router.get("/reports/{report_id}/status") | ||
| async def get_report_status(report_id: str): | ||
| async def get_report_status_endpoint(report_id: str, session: AsyncSession = Depends(get_session)): | ||
| api_logger.info(f"Received status request for report_id: {report_id}") | ||
| report = get_report_status_from_memory(report_id) | ||
| report_repository = ReportRepository(session) | ||
| report = await get_report_status(report_id, report_repository) | ||
| if not report: | ||
| api_logger.error(f"Report with id {report_id} not found for status request.") | ||
| raise ReportNotFoundException(detail="Report not found") | ||
| return {"report_id": report_id, "status": report["status"]} | ||
|
|
||
| @router.get("/reports/{report_id}/data") | ||
| async def get_report_data_endpoint(report_id: str): | ||
| async def get_report_data_endpoint(report_id: str, session: AsyncSession = Depends(get_session)): | ||
| api_logger.info(f"Received data request for report_id: {report_id}") | ||
| report_result = get_report_data(report_id) | ||
| report_repository = ReportRepository(session) | ||
| report_result = await get_report_data(report_id, report_repository) | ||
| if report_result: | ||
| if "data" in report_result: | ||
| if report_result.get("status") == ReportStatusEnum.COMPLETED.value: | ||
| api_logger.info(f"Returning data for report_id: {report_id}") | ||
| return report_result | ||
| elif report_result.get("status") == "processing": | ||
| elif report_result.get("status") == ReportStatusEnum.RUNNING_AGENTS.value or report_result.get("status") == ReportStatusEnum.PENDING.value or report_result.get("status") == ReportStatusEnum.RUNNING_AGENTS.value or report_result.get("status") == ReportStatusEnum.GENERATING_NLG.value or report_result.get("status") == ReportStatusEnum.GENERATING_SUMMARY.value: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Duplicate condition: The condition checks - elif report_result.get("status") == ReportStatusEnum.RUNNING_AGENTS.value or report_result.get("status") == ReportStatusEnum.PENDING.value or report_result.get("status") == ReportStatusEnum.RUNNING_AGENTS.value or report_result.get("status") == ReportStatusEnum.GENERATING_NLG.value or report_result.get("status") == ReportStatusEnum.GENERATING_SUMMARY.value:
+ elif report_result.get("status") in (
+ ReportStatusEnum.PENDING.value,
+ ReportStatusEnum.RUNNING_AGENTS.value,
+ ReportStatusEnum.GENERATING_NLG.value,
+ ReportStatusEnum.GENERATING_SUMMARY.value,
+ ):This refactor also improves readability by using 🤖 Prompt for AI Agents |
||
| api_logger.warning(f"Report {report_id} is still processing.") | ||
| # Match test expectations exactly | ||
| return JSONResponse( | ||
| status_code=202, | ||
| content={ | ||
| "detail": "Report is still processing.", | ||
| }, | ||
| ) | ||
| elif report_result.get("status") == "failed": | ||
| api_logger.error(f"Report {report_id} failed with detail: {report_result.get("detail", "N/A")}") | ||
| elif report_result.get("status") == ReportStatusEnum.FAILED.value: | ||
| api_logger.error(f"Report {report_id} failed with detail: {report_result.get('detail', 'N/A')}") | ||
| return JSONResponse( | ||
| status_code=409, | ||
| content={ | ||
| "report_id": report_id, | ||
| "message": "Report failed", | ||
| "detail": report_result.get("detail", "Report processing failed."), | ||
| "detail": report_result.get('detail', 'Report processing failed.'), | ||
| }, | ||
| ) | ||
| api_logger.error(f"Report with id {report_id} not found or not completed for data request.") | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get_sessionis undefined; should useget_db.Line 4 imports
get_db, but lines 33, 42, and 52 referenceget_sessionwhich doesn't exist. This will raise aNameErrorat runtime.Alternatively, update all usages on lines 33, 42, 52 to use
get_dbinstead ofget_session.🤖 Prompt for AI Agents