Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ FRONTEND_HOST=http://localhost:5173

ENVIRONMENT=local


PROJECT_NAME="AI Platform"
STACK_NAME=ai-platform

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/continuous_integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ name: AI Platform CI

on:
push:
branches: [staging]
branches: [main]
pull_request:
branches: [staging]
branches: [main]

jobs:
checks:
Expand Down
4 changes: 3 additions & 1 deletion backend/app/api/main.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from fastapi import APIRouter
from app.api.routes import items, login, private, users, utils,project,organization, project_user, api_keys

from app.api.routes import items, login, private, users, utils, project, organization, project_user, api_keys, threads
from app.core.config import settings

api_router = APIRouter()
api_router.include_router(login.router)
api_router.include_router(users.router)
api_router.include_router(utils.router)
api_router.include_router(items.router)
api_router.include_router(threads.router)
api_router.include_router(organization.router)
api_router.include_router(project.router)
api_router.include_router(project_user.router)
Expand Down
140 changes: 140 additions & 0 deletions backend/app/api/routes/threads.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import re
import requests

import openai
from openai import OpenAI
from fastapi import APIRouter, BackgroundTasks

from app.utils import APIResponse
from app.core import settings, logging

logger = logging.getLogger(__name__)
router = APIRouter(tags=["threads"])


def send_callback(callback_url: str, data: dict):
"""Send results to the callback URL (synchronously)."""
try:
session = requests.Session()
# uncomment this to run locally without SSL
# session.verify = False
response = session.post(callback_url, json=data)
response.raise_for_status()
return True

Check warning on line 23 in backend/app/api/routes/threads.py

View check run for this annotation

Codecov / codecov/patch

backend/app/api/routes/threads.py#L23

Added line #L23 was not covered by tests
except requests.RequestException as e:
logger.error(f"Callback failed: {str(e)}")
return False


def process_run(request: dict, client: OpenAI):
"""
Background task to run create_and_poll, then send the callback with the result.
This function is run in the background after we have already returned an initial response.
"""
try:
# Start the run
run = client.beta.threads.runs.create_and_poll(
thread_id=request["thread_id"],
assistant_id=request["assistant_id"],
)

if run.status == "completed":
messages = client.beta.threads.messages.list(
thread_id=request["thread_id"])
latest_message = messages.data[0]
message_content = latest_message.content[0].text.value

remove_citation = request.get("remove_citation", False)

if remove_citation:
message = re.sub(r"【\d+(?::\d+)?†[^】]*】", "", message_content)
else:
message = message_content

# Update the data dictionary with additional fields from the request, excluding specific keys
additional_data = {k: v for k, v in request.items(
) if k not in {"question", "assistant_id", "callback_url", "thread_id"}}
callback_response = APIResponse.success_response(data={
"status": "success",
"message": message,
"thread_id": request["thread_id"],
"endpoint": getattr(request, "endpoint", "some-default-endpoint"),
**additional_data
})
else:
callback_response = APIResponse.failure_response(
error=f"Run failed with status: {run.status}")

# Send callback with results
send_callback(request["callback_url"], callback_response.model_dump())

except openai.OpenAIError as e:

Check warning on line 71 in backend/app/api/routes/threads.py

View check run for this annotation

Codecov / codecov/patch

backend/app/api/routes/threads.py#L71

Added line #L71 was not covered by tests
# Handle any other OpenAI API errors
if isinstance(e.body, dict) and "message" in e.body:
error_message = e.body["message"]

Check warning on line 74 in backend/app/api/routes/threads.py

View check run for this annotation

Codecov / codecov/patch

backend/app/api/routes/threads.py#L73-L74

Added lines #L73 - L74 were not covered by tests
else:
error_message = str(e)

Check warning on line 76 in backend/app/api/routes/threads.py

View check run for this annotation

Codecov / codecov/patch

backend/app/api/routes/threads.py#L76

Added line #L76 was not covered by tests

callback_response = APIResponse.failure_response(error=error_message)

Check warning on line 78 in backend/app/api/routes/threads.py

View check run for this annotation

Codecov / codecov/patch

backend/app/api/routes/threads.py#L78

Added line #L78 was not covered by tests

send_callback(request["callback_url"], callback_response.model_dump())

Check warning on line 80 in backend/app/api/routes/threads.py

View check run for this annotation

Codecov / codecov/patch

backend/app/api/routes/threads.py#L80

Added line #L80 was not covered by tests


@router.post("/threads")
async def threads(request: dict, background_tasks: BackgroundTasks):
"""
Accepts a question, assistant_id, callback_url, and optional thread_id from the request body.
Returns an immediate "processing" response, then continues to run create_and_poll in background.
Once completed, calls send_callback with the final result.
"""
client = OpenAI(api_key=settings.OPENAI_API_KEY)

# Use get method to safely access thread_id
thread_id = request.get("thread_id")

# 1. Validate or check if there's an existing thread with an in-progress run
if thread_id:
try:
runs = client.beta.threads.runs.list(thread_id=thread_id)

Check warning on line 98 in backend/app/api/routes/threads.py

View check run for this annotation

Codecov / codecov/patch

backend/app/api/routes/threads.py#L97-L98

Added lines #L97 - L98 were not covered by tests
# Get the most recent run (first in the list) if any
if runs.data and len(runs.data) > 0:
latest_run = runs.data[0]
if latest_run.status in ["queued", "in_progress", "requires_action"]:
return APIResponse.failure_response(error=f"There is an active run on this thread (status: {latest_run.status}). Please wait for it to complete.")
except openai.OpenAIError:

Check warning on line 104 in backend/app/api/routes/threads.py

View check run for this annotation

Codecov / codecov/patch

backend/app/api/routes/threads.py#L100-L104

Added lines #L100 - L104 were not covered by tests
# Handle invalid thread ID
return APIResponse.failure_response(error=f"Invalid thread ID provided {thread_id}")

Check warning on line 106 in backend/app/api/routes/threads.py

View check run for this annotation

Codecov / codecov/patch

backend/app/api/routes/threads.py#L106

Added line #L106 was not covered by tests

# Use existing thread
client.beta.threads.messages.create(

Check warning on line 109 in backend/app/api/routes/threads.py

View check run for this annotation

Codecov / codecov/patch

backend/app/api/routes/threads.py#L109

Added line #L109 was not covered by tests
thread_id=thread_id, role="user", content=request["question"]
)
else:
try:
# Create new thread
thread = client.beta.threads.create()
client.beta.threads.messages.create(
thread_id=thread.id, role="user", content=request["question"]
)
request["thread_id"] = thread.id
except openai.OpenAIError as e:

Check warning on line 120 in backend/app/api/routes/threads.py

View check run for this annotation

Codecov / codecov/patch

backend/app/api/routes/threads.py#L120

Added line #L120 was not covered by tests
# Handle any other OpenAI API errors
if isinstance(e.body, dict) and "message" in e.body:
error_message = e.body["message"]

Check warning on line 123 in backend/app/api/routes/threads.py

View check run for this annotation

Codecov / codecov/patch

backend/app/api/routes/threads.py#L122-L123

Added lines #L122 - L123 were not covered by tests
else:
error_message = str(e)
return APIResponse.failure_response(error=error_message)

Check warning on line 126 in backend/app/api/routes/threads.py

View check run for this annotation

Codecov / codecov/patch

backend/app/api/routes/threads.py#L125-L126

Added lines #L125 - L126 were not covered by tests

# 2. Send immediate response to complete the API call
initial_response = APIResponse.success_response(data={
"status": "processing",
"message": "Run started",
"thread_id": request.get("thread_id"),
"success": True,
})

# 3. Schedule the background task to run create_and_poll and send callback
background_tasks.add_task(process_run, request, client)

# 4. Return immediately so the client knows we've accepted the request
return initial_response
4 changes: 4 additions & 0 deletions backend/app/core/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .config import settings
from .logger import logging

__all__ = ['settings', 'logging']
5 changes: 5 additions & 0 deletions backend/app/core/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import secrets
import warnings
import os
from typing import Annotated, Any, Literal

from pydantic import (
Expand Down Expand Up @@ -31,6 +32,7 @@ class Settings(BaseSettings):
env_ignore_empty=True,
extra="ignore",
)
OPENAI_API_KEY: str
API_V1_STR: str = "/api/v1"
SECRET_KEY: str = secrets.token_urlsafe(32)
# 60 minutes * 24 hours * 1 days = 1 days
Expand Down Expand Up @@ -95,6 +97,9 @@ def emails_enabled(self) -> bool:
FIRST_SUPERUSER: EmailStr
FIRST_SUPERUSER_PASSWORD: str

LOG_DIR: str = os.path.join(os.path.dirname(
os.path.dirname(__file__)), "logs")

def _check_default_secret(self, var_name: str, value: str | None) -> None:
if value == "changethis":
message = (
Expand Down
22 changes: 22 additions & 0 deletions backend/app/core/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import logging
import os
from logging.handlers import RotatingFileHandler
from app.core.config import settings

LOG_DIR = settings.LOG_DIR
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR)

Check warning on line 8 in backend/app/core/logger.py

View check run for this annotation

Codecov / codecov/patch

backend/app/core/logger.py#L8

Added line #L8 was not covered by tests

LOG_FILE_PATH = os.path.join(LOG_DIR, "app.log")

LOGGING_LEVEL = logging.INFO
LOGGING_FORMAT = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"

logging.basicConfig(level=LOGGING_LEVEL, format=LOGGING_FORMAT)

file_handler = RotatingFileHandler(
LOG_FILE_PATH, maxBytes=10485760, backupCount=5)
file_handler.setLevel(LOGGING_LEVEL)
file_handler.setFormatter(logging.Formatter(LOGGING_FORMAT))

logging.getLogger("").addHandler(file_handler)
111 changes: 111 additions & 0 deletions backend/app/tests/api/routes/test_threads.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import pytest
import openai

from unittest.mock import MagicMock, patch
from fastapi import FastAPI
from fastapi.testclient import TestClient

from app.api.routes.threads import router, process_run
from app.utils import APIResponse

# Wrap the router in a FastAPI app instance.
app = FastAPI()
app.include_router(router)
client = TestClient(app)


@patch("app.api.routes.threads.OpenAI")
def test_threads_endpoint(mock_openai):
"""
Test the /threads endpoint when creating a new thread.
The patched OpenAI client simulates:
- A successful assistant ID validation.
- New thread creation with a dummy thread id.
- No existing runs.
The expected response should have status "processing" and include a thread_id.
"""
# Create a dummy client to simulate OpenAI API behavior.
dummy_client = MagicMock()
# Simulate a valid assistant ID by ensuring retrieve doesn't raise an error.
dummy_client.beta.assistants.retrieve.return_value = None
# Simulate thread creation.
dummy_thread = MagicMock()
dummy_thread.id = "dummy_thread_id"
dummy_client.beta.threads.create.return_value = dummy_thread
# Simulate message creation.
dummy_client.beta.threads.messages.create.return_value = None
# Simulate that no active run exists.
dummy_client.beta.threads.runs.list.return_value = MagicMock(data=[])

mock_openai.return_value = dummy_client

request_data = {
"question": "What is Glific?",
"assistant_id": "assistant_123",
"callback_url": "http://example.com/callback",
}
response = client.post("/threads", json=request_data)
assert response.status_code == 200
response_json = response.json()
assert response_json["success"] is True
assert response_json["data"]["status"] == "processing"
assert response_json["data"]["message"] == "Run started"
assert response_json["data"]["thread_id"] == "dummy_thread_id"


@patch("app.api.routes.threads.OpenAI")
@pytest.mark.parametrize(
"remove_citation, expected_message",
[
(
True,
"Glific is an open-source, two-way messaging platform designed for nonprofits to scale their outreach via WhatsApp",
),
(
False,
"Glific is an open-source, two-way messaging platform designed for nonprofits to scale their outreach via WhatsApp【1:2†citation】",
),
],
)
def test_process_run_variants(mock_openai, remove_citation, expected_message):
"""
Test process_run for both remove_citation variants:
- Mocks the OpenAI client to simulate a completed run.
- Verifies that send_callback is called with the expected message based on the remove_citation flag.
"""
# Setup the mock client.
mock_client = MagicMock()
mock_openai.return_value = mock_client

# Create the request with the variable remove_citation flag.
request = {
"question": "What is Glific?",
"assistant_id": "assistant_123",
"callback_url": "http://example.com/callback",
"thread_id": "thread_123",
"remove_citation": remove_citation,
}

# Simulate a completed run.
mock_run = MagicMock()
mock_run.status = "completed"
mock_client.beta.threads.runs.create_and_poll.return_value = mock_run

# Set up the dummy message based on the remove_citation flag.
base_message = "Glific is an open-source, two-way messaging platform designed for nonprofits to scale their outreach via WhatsApp"
citation_message = base_message if remove_citation else f"{base_message}【1:2†citation】"
dummy_message = MagicMock()
dummy_message.content = [MagicMock(text=MagicMock(value=citation_message))]
mock_client.beta.threads.messages.list.return_value.data = [dummy_message]

# Patch send_callback and invoke process_run.
with patch("app.api.routes.threads.send_callback") as mock_send_callback:
process_run(request, mock_client)
mock_send_callback.assert_called_once()
callback_url, payload = mock_send_callback.call_args[0]
print(payload)
assert callback_url == request["callback_url"]
assert payload["data"]["message"] == expected_message
assert payload["data"]["status"] == "success"
assert payload["data"]["thread_id"] == "thread_123"
assert payload["success"] is True
2 changes: 2 additions & 0 deletions backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ dependencies = [
"pydantic-settings<3.0.0,>=2.2.1",
"sentry-sdk[fastapi]<2.0.0,>=1.40.6",
"pyjwt<3.0.0,>=2.8.0",
"openai>=1.67.0",
"pytest>=7.4.4",
]

[tool.uv]
Expand Down
Loading