Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ def upgrade():
"fine_tuned_model", sqlmodel.sql.sqltypes.AutoString(), nullable=True
),
sa.Column(
"train_data_s3_url", sqlmodel.sql.sqltypes.AutoString(), nullable=True
"train_data_s3_object", sqlmodel.sql.sqltypes.AutoString(), nullable=True
),
sa.Column(
"test_data_s3_url", sqlmodel.sql.sqltypes.AutoString(), nullable=True
"test_data_s3_object", sqlmodel.sql.sqltypes.AutoString(), nullable=True
),
sa.Column("error_message", sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.Column("project_id", sa.Integer(), nullable=False),
Expand All @@ -87,16 +87,20 @@ def upgrade():
sa.Column("fine_tuning_id", sa.Integer(), nullable=False),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("document_id", sa.Uuid(), nullable=False),
sa.Column("model_name", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column(
"test_data_s3_url", sqlmodel.sql.sqltypes.AutoString(), nullable=False
"fine_tuned_model", sqlmodel.sql.sqltypes.AutoString(), nullable=False
),
sa.Column(
"test_data_s3_object", sqlmodel.sql.sqltypes.AutoString(), nullable=False
),
sa.Column("base_model", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column("split_ratio", sa.Float(), nullable=False),
sa.Column("system_prompt", sa.Text(), nullable=False),
sa.Column("score", postgresql.JSON(astext_type=sa.Text()), nullable=True),
sa.Column(
"prediction_data_s3_url", sqlmodel.sql.sqltypes.AutoString(), nullable=True
"prediction_data_s3_object",
sqlmodel.sql.sqltypes.AutoString(),
nullable=True,
),
sa.Column(
"status",
Expand Down
76 changes: 57 additions & 19 deletions backend/app/api/routes/fine_tuning.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
FineTuningUpdate,
FineTuningStatus,
)
from app.core.cloud import AmazonCloudStorage
from app.core.cloud import get_cloud_storage, storage
from app.crud.document import DocumentCrud
from app.utils import get_openai_client, APIResponse, mask_string, load_description
from app.crud import (
Expand Down Expand Up @@ -69,16 +69,18 @@ def process_fine_tuning_job(
client = get_openai_client(
session, current_user.organization_id, project_id
)
storage = AmazonCloudStorage(current_user)
document_crud = DocumentCrud(session=session, owner_id=current_user.id)
storage = get_cloud_storage(
session=session, project_id=current_user.project_id
)
document_crud = DocumentCrud(session, current_user.project_id)
document = document_crud.read_one(request.document_id)
preprocessor = DataPreprocessor(
document, storage, ratio, request.system_prompt
)
result = preprocessor.process()
train_data_temp_filepath = result["train_jsonl_temp_filepath"]
train_data_s3_url = result["train_csv_s3_url"]
test_data_s3_url = result["test_csv_s3_url"]
train_data_s3_object = result["train_csv_s3_object"]
test_data_s3_object = result["test_csv_s3_object"]

try:
with open(train_data_temp_filepath, "rb") as train_f:
Expand All @@ -100,7 +102,8 @@ def process_fine_tuning_job(
job=fine_tune,
update=FineTuningUpdate(
status=FineTuningStatus.failed,
error_message="Failed during background job processing",
error_message="Error while uploading file to openai : "
+ error_msg,
),
)
return
Expand Down Expand Up @@ -128,7 +131,8 @@ def process_fine_tuning_job(
job=fine_tune,
update=FineTuningUpdate(
status=FineTuningStatus.failed,
error_message="Failed during background job processing",
error_message="Error while creating an openai fine tuning job : "
+ error_msg,
),
)
return
Expand All @@ -138,8 +142,8 @@ def process_fine_tuning_job(
job=fine_tune,
update=FineTuningUpdate(
training_file_id=training_file_id,
train_data_s3_url=train_data_s3_url,
test_data_s3_url=test_data_s3_url,
train_data_s3_object=train_data_s3_object,
test_data_s3_object=test_data_s3_object,
split_ratio=ratio,
provider_job_id=job.id,
status=FineTuningStatus.running,
Expand All @@ -155,6 +159,7 @@ def process_fine_tuning_job(
)

except Exception as e:
error_msg = str(e)
logger.error(
f"[process_fine_tuning_job] Background job failure: {e} | "
f"job_id={job_id}, project_id={project_id}|"
Expand All @@ -164,7 +169,8 @@ def process_fine_tuning_job(
job=fine_tune,
update=FineTuningUpdate(
status=FineTuningStatus.failed,
error_message="Failed during background job processing",
error_message="Error while processing the background job : "
+ error_msg,
),
)

Expand All @@ -186,6 +192,7 @@ def fine_tune_from_CSV(
current_user.organization_id,
current_user.project_id,
)

results = []

for ratio in request.split_ratio:
Expand Down Expand Up @@ -235,29 +242,27 @@ def fine_tune_from_CSV(


@router.get(
"/{job_id}/refresh",
"/{fine_tuning_id}/refresh",
description=load_description("fine_tuning/retrieve.md"),
response_model=APIResponse[FineTuningJobPublic],
)
def refresh_fine_tune_status(
job_id: int, session: SessionDep, current_user: CurrentUserOrgProject
fine_tuning_id: int, session: SessionDep, current_user: CurrentUserOrgProject
):
project_id = current_user.project_id
job = fetch_by_id(session, job_id, project_id)
job = fetch_by_id(session, fine_tuning_id, project_id)
client = get_openai_client(session, current_user.organization_id, project_id)
storage = get_cloud_storage(session=session, project_id=current_user.project_id)

if job.provider_job_id is None:
return APIResponse.success_response(job)

else:
if job.provider_job_id is not None:
try:
openai_job = client.fine_tuning.jobs.retrieve(job.provider_job_id)
except openai.OpenAIError as e:
error_msg = handle_openai_error(e)
logger.error(
f"[Retrieve_fine_tune_status] Failed to retrieve OpenAI job | "
f"provider_job_id={mask_string(job.provider_job_id)}, "
f"error={error_msg}, job_id={job_id}, project_id={project_id}"
f"error={error_msg}, fine_tuning_id={fine_tuning_id}, project_id={project_id}"
)
raise HTTPException(
status_code=502, detail=f"OpenAI API error: {error_msg}"
Expand Down Expand Up @@ -285,6 +290,17 @@ def refresh_fine_tune_status(
):
job = update_finetune_job(session=session, job=job, update=update_payload)

job = job.model_copy(
update={
"train_data_file_url": storage.get_signed_url(job.train_data_s3_object)
if job.train_data_s3_object
else None,
"test_data_file_url": storage.get_signed_url(job.test_data_s3_object)
if job.test_data_s3_object
else None,
}
)

return APIResponse.success_response(job)


Expand All @@ -296,6 +312,7 @@ def refresh_fine_tune_status(
def retrieve_jobs_by_document(
document_id: UUID, session: SessionDep, current_user: CurrentUserOrgProject
):
storage = get_cloud_storage(session=session, project_id=current_user.project_id)
project_id = current_user.project_id
jobs = fetch_by_document_id(session, document_id, project_id)
if not jobs:
Expand All @@ -306,4 +323,25 @@ def retrieve_jobs_by_document(
status_code=404,
detail="No fine-tuning jobs found for the given document ID",
)
return APIResponse.success_response(jobs)
updated_jobs = []
for job in jobs:
train_url = (
storage.get_signed_url(job.train_data_s3_object)
if job.train_data_s3_object
else None
)
test_url = (
storage.get_signed_url(job.test_data_s3_object)
if job.test_data_s3_object
else None
)

updated_job = job.model_copy(
update={
"train_data_file_url": train_url,
"test_data_file_url": test_url,
}
)
updated_jobs.append(updated_job)

return APIResponse.success_response(updated_jobs)
82 changes: 64 additions & 18 deletions backend/app/api/routes/model_evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
ModelEvaluationPublic,
)
from app.core.db import engine
from app.core.cloud import AmazonCloudStorage
from app.core.cloud import get_cloud_storage
from app.core.finetune.evaluation import ModelEvaluator
from app.utils import get_openai_client, APIResponse
from app.api.deps import CurrentUserOrgProject, SessionDep
Expand All @@ -33,6 +33,19 @@
router = APIRouter(prefix="/model_evaluation", tags=["model_evaluation"])


def attach_prediction_file_url(model_obj, storage):
"""
Given a model-like object and a storage client,
attach a signed prediction data file URL (if available).
"""
s3_key = getattr(model_obj, "prediction_data_s3_object", None)
prediction_data_file_url = storage.get_signed_url(s3_key) if s3_key else None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the s3_key is None, what happens? should we send empty/null value is response json?

also maybe include error handling there, something like this

    try:
        prediction_data_file_url = storage.get_signed_url(s3_key)
    except Exception as e:
        logger.warning(f"Failed to generate signed URL for {s3_key}: {e}")
        prediction_data_file_url = None

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the s3_key is None, then there’s simply no prediction file to sign, so the function will set prediction_data_file_url to None. In the API response, this will show up as null, which is fine since the ModelEvaluationPublic schema already specifies a default of None for this field.

We don’t need to add extra error handling inside attach_prediction_file_url, because any errors related to generating the signed URL are already being logged and handled in the get_signed_url method itself. This keeps the logic clean — attach_prediction_file_url just decides whether or not to attempt URL generation, while get_signed_url is responsible for managing AWS-specific failures.


return model_obj.model_copy(
update={"prediction_data_file_url": prediction_data_file_url}
)


def run_model_evaluation(
eval_id: int,
current_user: CurrentUserOrgProject,
Expand All @@ -46,7 +59,7 @@ def run_model_evaluation(
client = get_openai_client(
db, current_user.organization_id, current_user.project_id
)
storage = AmazonCloudStorage(current_user)
storage = get_cloud_storage(session=db, project_id=current_user.project_id)

try:
model_eval = update_model_eval(
Expand All @@ -57,8 +70,8 @@ def run_model_evaluation(
)

evaluator = ModelEvaluator(
model_name=model_eval.model_name,
test_data_s3_url=model_eval.test_data_s3_url,
fine_tuned_model=model_eval.fine_tuned_model,
test_data_s3_object=model_eval.test_data_s3_object,
system_prompt=model_eval.system_prompt,
client=client,
storage=storage,
Expand All @@ -71,7 +84,7 @@ def run_model_evaluation(
project_id=current_user.project_id,
update=ModelEvaluationUpdate(
score=result["evaluation_score"],
prediction_data_s3_url=result["prediction_data_s3_url"],
prediction_data_s3_object=result["prediction_data_s3_object"],
status=ModelEvaluationStatus.completed,
),
)
Expand All @@ -82,6 +95,7 @@ def run_model_evaluation(
)

except Exception as e:
error_msg = str(e)
logger.error(
f"[run_model_evaluation] Failed | eval_id={eval_id}, project_id={current_user.project_id}: {e}"
)
Expand All @@ -92,7 +106,8 @@ def run_model_evaluation(
project_id=current_user.project_id,
update=ModelEvaluationUpdate(
status=ModelEvaluationStatus.failed,
error_message="failed during background job processing",
error_message="failed during background job processing:"
+ error_msg,
),
)

Expand Down Expand Up @@ -128,20 +143,20 @@ def evaluate_models(
)
raise HTTPException(status_code=400, detail="No fine-tuned job IDs provided")

evals: list[ModelEvaluationPublic] = []
evaluations: list[ModelEvaluationPublic] = []

for job_id in request.fine_tuning_ids:
fine_tuning_job = fetch_by_id(session, job_id, current_user.project_id)
active_evals = fetch_active_model_evals(
active_evaluations = fetch_active_model_evals(
session, job_id, current_user.project_id
)

if active_evals:
if active_evaluations:
logger.info(
f"[evaluate_model] Skipping creation for {job_id}. Active evaluation exists, project_id:{current_user.project_id}"
)
evals.extend(
ModelEvaluationPublic.model_validate(ev) for ev in active_evals
evaluations.extend(
ModelEvaluationPublic.model_validate(ev) for ev in active_evaluations
)
continue

Expand All @@ -153,22 +168,34 @@ def evaluate_models(
status=ModelEvaluationStatus.pending,
)

evals.append(ModelEvaluationPublic.model_validate(model_eval))
evaluations.append(ModelEvaluationPublic.model_validate(model_eval))

logger.info(
f"[evaluate_model] Created evaluation for fine_tuning_id {job_id} with eval ID={model_eval.id}, project_id:{current_user.project_id}"
)

background_tasks.add_task(run_model_evaluation, model_eval.id, current_user)

response_data = [
{
"id": ev.id,
"fine_tuning_id": ev.fine_tuning_id,
"fine_tuned_model": getattr(ev, "fine_tuned_model", None),
"document_id": getattr(ev, "document_id", None),
"status": ev.status,
}
for ev in evaluations
]

return APIResponse.success_response(
{"message": "Model evaluation(s) started successfully", "data": evals}
{"message": "Model evaluation(s) started successfully", "data": response_data}
)


@router.get(
"/{document_id}/top_model",
response_model=APIResponse[ModelEvaluationPublic],
response_model_exclude_none=True,
)
def get_top_model_by_doc_id(
document_id: UUID,
Expand All @@ -183,19 +210,38 @@ def get_top_model_by_doc_id(
f"[get_top_model_by_doc_id] Fetching top model for document_id={document_id}, "
f"project_id={current_user.project_id}"
)

top_model = fetch_top_model_by_doc_id(session, document_id, current_user.project_id)
storage = get_cloud_storage(session=session, project_id=current_user.project_id)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we not handle errors here? what if get_signed_url fails?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the way error handling is already there in the get signed url method, I will add it to the get cloud storage method as well, so that errors for this are also handled

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added one more error handling to get cloud storage


top_model = attach_prediction_file_url(top_model, storage)

return APIResponse.success_response(top_model)


@router.get("/{document_id}", response_model=APIResponse[list[ModelEvaluationPublic]])
def get_evals_by_doc_id(
document_id: UUID, session: SessionDep, current_user: CurrentUserOrgProject
@router.get(
"/{document_id}",
response_model=APIResponse[list[ModelEvaluationPublic]],
response_model_exclude_none=True,
)
def get_evaluations_by_doc_id(
document_id: UUID,
session: SessionDep,
current_user: CurrentUserOrgProject,
):
"""
Return all model evaluations for the given document_id within the current project.
"""
logger.info(
f"[get_evals_by_doc_id]Fetching evaluations for document_id: {document_id}, project_id: {current_user.project_id}"
f"[get_evaluations_by_doc_id] Fetching evaluations for document_id={document_id}, "
f"project_id={current_user.project_id}"
)

evaluations = fetch_eval_by_doc_id(session, document_id, current_user.project_id)
return APIResponse.success_response(evaluations)
storage = get_cloud_storage(session=session, project_id=current_user.project_id)

updated_evaluations = [
attach_prediction_file_url(ev, storage) for ev in evaluations
]

return APIResponse.success_response(updated_evaluations)
Loading