Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
b36adff
db models and migration script
nishika26 Jul 28, 2025
41b1e82
Merge branch 'main' into feature/db_model
nishika26 Jul 28, 2025
a2dfc07
merge conflict fixes
nishika26 Jul 28, 2025
3b6e7c4
migration file error
nishika26 Jul 28, 2025
55a0760
small fixes
nishika26 Jul 28, 2025
7f40a6d
Merge branch 'feature/classification' into feature/db_model
nishika26 Jul 29, 2025
ed10deb
removing replationship in org and project
nishika26 Jul 29, 2025
a9596cc
added files id columns and changed column types
nishika26 Jul 30, 2025
b81e13c
new migration file
nishika26 Jul 30, 2025
f214991
added extra columns
nishika26 Jul 30, 2025
649991c
removed model eval table for now
nishika26 Jul 31, 2025
cd9a5e5
adding is_deleted column
nishika26 Jul 31, 2025
05fe06e
moving foriegn key contstraint line up
nishika26 Jul 31, 2025
d267284
added error message column
nishika26 Jul 31, 2025
34e9706
first push: routes,crud and data preprocessing core
nishika26 Jul 31, 2025
d6909b8
core preprocessing
nishika26 Jul 31, 2025
7c62fca
logging and adding crud test cases
nishika26 Jul 31, 2025
d93415d
test cases including seed data and utility functions
nishika26 Aug 4, 2025
79de38a
test cases including seed data and utility functions
nishika26 Aug 4, 2025
454209c
adding function docs and small chnages
nishika26 Aug 4, 2025
f7ac5ed
adding sckit learn library
nishika26 Aug 4, 2025
5754112
review changes
nishika26 Aug 4, 2025
3bebf99
new migration file
nishika26 Aug 4, 2025
5341f3e
small fix
nishika26 Aug 4, 2025
d8e1e9d
small fix on migration file
nishika26 Aug 4, 2025
04c7269
changes because of db model change and adding pandas
nishika26 Aug 5, 2025
85ecc87
pep 8 and removing unused imports
nishika26 Aug 6, 2025
77644e6
Merge branch 'feature/classification' into feature/fine_tuning_endpoint
nishika26 Aug 6, 2025
40d4e08
pr review changes
nishika26 Aug 6, 2025
42bfcc2
removed optional from project id and org id
nishika26 Aug 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"""add system prompt column to fine tuning table

Revision ID: db9b5413d3ce
Revises: e3c74fab4356
Create Date: 2025-08-06 20:32:32.454567

"""
from alembic import op
import sqlalchemy as sa
import sqlmodel.sql.sqltypes


# revision identifiers, used by Alembic.
revision = "db9b5413d3ce"
down_revision = "e3c74fab4356"
branch_labels = None
depends_on = None


def upgrade():
op.add_column("fine_tuning", sa.Column("system_prompt", sa.Text(), nullable=False))


def downgrade():
op.drop_column("fine_tuning", "system_prompt")
21 changes: 21 additions & 0 deletions backend/app/api/docs/fine_tuning/create.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
This endpoint initiates the fine-tuning of an OpenAI model using your custom dataset that you would have uploaded using the upload document endpoint. The uploaded dataset must include:

- A column named `query`, `question`, or `message` containing user inputs or messages.
- A column named `label` indicating whether a given message is a genuine query or not (e.g., casual conversation or small talk).

The split_ratio in the request body determines how your data is divided between training and testing. For example, a split ratio of 0.5 means 50% of your data will be used for training, and the remaining 50% for testing. You can also provide multiple split ratios—for instance, [0.7, 0.9]. This will trigger multiple fine-tuning jobs, one for each ratio, effectively training multiple models on different portions of your dataset. You would also need to specify a base model that you want to finetune.

The system_prompt field specified in the request body allows you to define an initial instruction or context-setting message that will be included in the training data. This message helps the model learn how it is expected to behave when responding to user inputs. It is prepended as the first message in each training example during fine-tuning.

The system handles the fine-tuning process by interacting with OpenAI's APIs under the hood. These include:

- [Openai File create to upload your training and testing files](https://platform.openai.com/docs/api-reference/files/create)

- [Openai Fine Tuning Job create to initiate each fine-tuning job](https://platform.openai.com/docs/api-reference/fine_tuning/create)

If successful, the response will include a message along with a list of fine-tuning jobs that were initiated. Each job object includes:

- id: the internal ID of the fine-tuning job
- document_id: the ID of the document used for fine-tuning
- split_ratio: the data split used for that job
- status: the initial status of the job (usually "pending")
5 changes: 5 additions & 0 deletions backend/app/api/docs/fine_tuning/retrieve.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Refreshes the status of a fine-tuning job by retrieving the latest information from OpenAI.
If there are any changes in status, fine-tuned model, or error message, the local job record is updated accordingly.
Returns the latest state of the job.

OpenAI’s job status is retrieved using their [Fine-tuning Job Retrieve API](https://platform.openai.com/docs/api-reference/fine_tuning/retrieve).
3 changes: 3 additions & 0 deletions backend/app/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
utils,
onboarding,
credentials,
fine_tuning,
)
from app.core.config import settings

Expand All @@ -36,6 +37,8 @@
api_router.include_router(threads.router)
api_router.include_router(users.router)
api_router.include_router(utils.router)
api_router.include_router(fine_tuning.router)


if settings.ENVIRONMENT == "local":
api_router.include_router(private.router)
308 changes: 308 additions & 0 deletions backend/app/api/routes/fine_tuning.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,308 @@
from typing import Optional
import logging
import time
from uuid import UUID

import openai
from openai import OpenAI
from sqlmodel import Session
from fastapi import APIRouter, HTTPException, BackgroundTasks

from app.models import (
FineTuningJobCreate,
FineTuningJobPublic,
FineTuningUpdate,
FineTuningStatus,
)
from app.core.cloud import AmazonCloudStorage
from app.crud.document import DocumentCrud
from app.utils import get_openai_client, APIResponse, mask_string, load_description
from app.crud import (
create_fine_tuning_job,
fetch_by_id,
update_finetune_job,
fetch_by_document_id,
)
from app.api.deps import CurrentUserOrgProject, SessionDep
from app.core.finetune.preprocessing import DataPreprocessor


logger = logging.getLogger(__name__)

router = APIRouter(prefix="/fine_tuning", tags=["fine_tuning"])


OPENAI_TO_INTERNAL_STATUS = {
"validating_files": FineTuningStatus.running,
"queued": FineTuningStatus.running,
"running": FineTuningStatus.running,
"succeeded": FineTuningStatus.completed,
"failed": FineTuningStatus.failed,
}


def handle_openai_error(e: openai.OpenAIError) -> str:
"""Extract error message from OpenAI error."""
if isinstance(e.body, dict) and "message" in e.body:
return e.body["message"]
return str(e)


def process_fine_tuning_job(
job_id: int,
ratio: float,
session: Session,
current_user: CurrentUserOrgProject,
request: FineTuningJobCreate,
client: OpenAI,
):
start_time = time.time()
project_id = current_user.project_id
fine_tune = None

logger.info(
f"[process_fine_tuning_job]Starting fine-tuning job processing | job_id={job_id}, project_id={project_id}|"
)

try:
fine_tune = fetch_by_id(session, job_id, project_id)

storage = AmazonCloudStorage(current_user)
document_crud = DocumentCrud(session=session, owner_id=current_user.id)
document = document_crud.read_one(request.document_id)
preprocessor = DataPreprocessor(document, storage, ratio, request.system_prompt)
result = preprocessor.process()
train_path = result["train_file"]
test_path = result["test_file"]

try:
with open(train_path, "rb") as train_f:
uploaded_train = client.files.create(file=train_f, purpose="fine-tune")
with open(test_path, "rb") as test_f:
uploaded_test = client.files.create(file=test_f, purpose="fine-tune")

logger.info(
f"[process_fine_tuning_job] Files uploaded to OpenAI successfully | "
f"job_id={job_id}, project_id={project_id}|"
)
except openai.OpenAIError as e:
error_msg = handle_openai_error(e)
logger.error(
f"[process_fine_tuning_job] Failed to upload to OpenAI: {error_msg} | "
f"job_id={job_id}, project_id={project_id}|"
)
update_finetune_job(
session=session,
job=fine_tune,
update=FineTuningUpdate(
status=FineTuningStatus.failed,
error_message="Failed during background job processing",
),
)
return
finally:
preprocessor.cleanup()

training_file_id = uploaded_train.id
testing_file_id = uploaded_test.id

try:
job = client.fine_tuning.jobs.create(
training_file=training_file_id, model=request.base_model
)
logger.info(
f"[process_fine_tuning_job] OpenAI fine-tuning job created | "
f"provider_job_id={mask_string(job.id)}, job_id={job_id}, project_id={project_id}|"
)
except openai.OpenAIError as e:
error_msg = handle_openai_error(e)
logger.error(
f"[process_fine_tuning_job] Failed to create OpenAI fine-tuning job: {error_msg} | "
f"job_id={job_id}, project_id={project_id}|"
)
update_finetune_job(
session=session,
job=fine_tune,
update=FineTuningUpdate(
status=FineTuningStatus.failed,
error_message="Failed during background job processing",
),
)
return

update_finetune_job(
session=session,
job=fine_tune,
update=FineTuningUpdate(
training_file_id=training_file_id,
testing_file_id=testing_file_id,
split_ratio=ratio,
provider_job_id=job.id,
status=FineTuningStatus.running,
),
)

end_time = time.time()
duration = end_time - start_time

logger.info(
f"[process_fine_tuning_job] Fine-tuning job processed successfully | "
f"time_taken={duration:.2f}s, job_id={job_id}, project_id={project_id}|"
)

except Exception as e:
logger.error(
f"[process_fine_tuning_job] Background job failure: {e} | "
f"job_id={job_id}, project_id={project_id}|"
)
update_finetune_job(
session=session,
job=fine_tune,
update=FineTuningUpdate(
status=FineTuningStatus.failed,
error_message="Failed during background job processing",
),
)


@router.post(
"/fine-tune",
description=load_description("fine_tuning/create.md"),
response_model=APIResponse,
)
def fine_tune_from_CSV(
session: SessionDep,
current_user: CurrentUserOrgProject,
request: FineTuningJobCreate,
background_tasks: BackgroundTasks,
):
client = get_openai_client(
session, current_user.organization_id, current_user.project_id
)
results = []

for ratio in request.split_ratio:
job, created = create_fine_tuning_job(
session=session,
request=request,
split_ratio=ratio,
organization_id=current_user.organization_id,
project_id=current_user.project_id,
)
results.append((job, created))

if created:
background_tasks.add_task(
process_fine_tuning_job,
job.id,
ratio,
session,
current_user,
request,
client,
)

if not results:
logger.error(
f"[fine_tune_from_CSV]All fine-tuning job creations failed for document_id={request.document_id}, project_id={current_user.project_id}"
)
raise HTTPException(
status_code=500, detail="Failed to create or fetch any fine-tuning jobs."
)

job_infos = [
{
"id": job.id,
"document_id": job.document_id,
"split_ratio": job.split_ratio,
"status": job.status,
}
for job, _ in results
]

created_count = sum(c for _, c in results)
total = len(results)
message = (
"Fine-tuning job(s) started."
if created_count == total
else "Active fine-tuning job(s) already exists."
if created_count == 0
else f"Started {created_count} job(s); {total - created_count} active fine-tuning job(s) already exists."
)

return APIResponse.success_response({"message": message, "jobs": job_infos})


@router.get(
"/{job_id}/refresh",
description=load_description("fine_tuning/retrieve.md"),
response_model=APIResponse[FineTuningJobPublic],
)
def refresh_fine_tune_status(
job_id: int, session: SessionDep, current_user: CurrentUserOrgProject
):
project_id = current_user.project_id
job = fetch_by_id(session, job_id, project_id)
client = get_openai_client(session, current_user.organization_id, project_id)

if job.provider_job_id is None:
return APIResponse.success_response(job)

else:
try:
openai_job = client.fine_tuning.jobs.retrieve(job.provider_job_id)
except openai.OpenAIError as e:
error_msg = handle_openai_error(e)
logger.error(
f"[Retrieve_fine_tune_status] Failed to retrieve OpenAI job | "
f"provider_job_id={mask_string(job.provider_job_id)}, "
f"error={error_msg}, job_id={job_id}, project_id={project_id}"
)
raise HTTPException(
status_code=502, detail=f"OpenAI API error: {error_msg}"
)

mapped_status: Optional[str] = OPENAI_TO_INTERNAL_STATUS.get(
getattr(openai_job, "status", None)
)

openai_error = getattr(openai_job, "error", None)
openai_error_msg = (
getattr(openai_error, "message", None) if openai_error else None
)

update_payload = FineTuningUpdate(
status=mapped_status or job.status,
fine_tuned_model=getattr(openai_job, "fine_tuned_model", None),
error_message=openai_error_msg,
)

if (
job.status != update_payload.status
or job.fine_tuned_model != update_payload.fine_tuned_model
or job.error_message != update_payload.error_message
):
job = update_finetune_job(session=session, job=job, update=update_payload)

return APIResponse.success_response(job)


@router.get(
"/{document_id}",
description="Retrieves all fine-tuning jobs associated with the given document ID for the current project",
response_model=APIResponse[list[FineTuningJobPublic]],
)
def retrieve_jobs_by_document(
document_id: UUID, session: SessionDep, current_user: CurrentUserOrgProject
):
project_id = current_user.project_id
jobs = fetch_by_document_id(session, document_id, project_id)
if not jobs:
logger.warning(
f"[retrive_job_by_document]No fine-tuning jobs found for document_id={document_id}, project_id={project_id}"
)
raise HTTPException(
status_code=404,
detail="No fine-tuning jobs found for the given document ID",
)
return APIResponse.success_response(jobs)
Loading