Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""add additional columns to model evaluation table
Revision ID: 06a8bed9e7a6
Revises: 72896bcc94da
Create Date: 2025-08-25 22:36:58.959768
"""
from alembic import op
import sqlalchemy as sa
import sqlmodel.sql.sqltypes
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = "06a8bed9e7a6"
down_revision = "72896bcc94da"
branch_labels = None
depends_on = None


def upgrade():
op.drop_column("model_evaluation", "testing_file_id")
op.add_column(
"model_evaluation",
sa.Column(
"test_data_s3_url", sqlmodel.sql.sqltypes.AutoString(), nullable=False
),
)
op.add_column(
"model_evaluation",
sa.Column(
"prediction_data_s3_url", sqlmodel.sql.sqltypes.AutoString(), nullable=True
Comment on lines +25 to +31
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep this consistent with other migrations, somewhere i saw it was just data_url and here it is data_s3_url

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I have turned all the "data_url" to "data_s3_url" , and I have made sure that this is consistent

),
)


def downgrade():
op.drop_column("model_evaluation", "prediction_data_s3_url")
op.drop_column("model_evaluation", "test_data_s3_url")
op.add_column(
"model_evaluation",
sa.Column("testing_file_id", sa.VARCHAR(), autoincrement=False, nullable=True),
)
21 changes: 14 additions & 7 deletions backend/app/api/routes/model_evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
ModelEvaluationPublic,
)
from app.core.db import engine
from app.core.cloud import AmazonCloudStorage
from app.core.finetune.evaluation import ModelEvaluator
from app.utils import get_openai_client, APIResponse
from app.api.deps import CurrentUserOrgProject, SessionDep
Expand All @@ -35,14 +36,18 @@
def run_model_evaluation(
eval_id: int,
current_user: CurrentUserOrgProject,
client: OpenAI,
):
start_time = time.time()
logger.info(
f"[run_model_evaluation] Starting | eval_id={eval_id}, project_id={current_user.project_id}"
)

with Session(engine) as db:
client = get_openai_client(
db, current_user.organization_id, current_user.project_id
)
storage = AmazonCloudStorage(current_user)

try:
model_eval = update_model_eval(
session=db,
Expand All @@ -53,9 +58,10 @@ def run_model_evaluation(

evaluator = ModelEvaluator(
model_name=model_eval.model_name,
testing_file_id=model_eval.testing_file_id,
test_data_s3_url=model_eval.test_data_s3_url,
system_prompt=model_eval.system_prompt,
client=client,
storage=storage,
)
result = evaluator.run()

Expand All @@ -64,7 +70,9 @@ def run_model_evaluation(
eval_id=eval_id,
project_id=current_user.project_id,
update=ModelEvaluationUpdate(
score=result, status=ModelEvaluationStatus.completed
score=result["evaluation_score"],
prediction_data_s3_url=result["prediction_data_s3_url"],
status=ModelEvaluationStatus.completed,
),
)

Expand Down Expand Up @@ -111,7 +119,8 @@ def evaluate_models(
"""
client = get_openai_client(
session, current_user.organization_id, current_user.project_id
)
) # keeping this here for checking if the user's validated OpenAI key is present or not,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didn't understand this part

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The client that actually carries out the OpenAI functions will be initialized inside the background task. The client being created in the router isn’t passed to the background task, because it’s generally not a good practice to pass objects into background tasks, and can cause same error as when passing router session to bg task. However, we still want to validate upfront that the user invoking this endpoint has a valid OpenAI key stored in our database. If we leave that check only to the background task, the user wouldn’t immediately know if their key is invalid or not present. That’s why the client is initialized in the router for validation purposes, while the actual client used to perform operations is re-initialized inside the background task.

# even though the client will be initialized separately inside the background task

if not request.fine_tuning_ids:
logger.error(
Expand Down Expand Up @@ -150,9 +159,7 @@ def evaluate_models(
f"[evaluate_model] Created evaluation for fine_tuning_id {job_id} with eval ID={model_eval.id}, project_id:{current_user.project_id}"
)

background_tasks.add_task(
run_model_evaluation, model_eval.id, current_user, client
)
background_tasks.add_task(run_model_evaluation, model_eval.id, current_user)

return APIResponse.success_response(
{"message": "Model evaluation(s) started successfully", "data": evals}
Expand Down
182 changes: 99 additions & 83 deletions backend/app/core/finetune/evaluation.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
import json
import difflib
import time
import logging
from typing import Set

import openai
import pandas as pd
from openai import OpenAI
import uuid
from sklearn.metrics import (
matthews_corrcoef,
accuracy_score,
f1_score,
confusion_matrix,
)
from app.core.cloud import AmazonCloudStorage
from app.api.routes.fine_tuning import handle_openai_error
from app.core.finetune.preprocessing import DataPreprocessor


logger = logging.getLogger(__name__)
Expand All @@ -26,12 +26,14 @@ class ModelEvaluator:
def __init__(
self,
model_name: str,
testing_file_id: str,
test_data_s3_url: str,
storage: AmazonCloudStorage,
system_prompt: str,
client: OpenAI,
):
self.model_name = model_name
self.testing_file_id = testing_file_id
self.test_data_s3_url = test_data_s3_url
self.storage = storage
self.system_instruction = system_prompt
self.client = client

Expand All @@ -43,83 +45,57 @@ def __init__(

def load_labels_and_prompts(self) -> None:
"""
Loads labels and prompts directly from OpenAI NDJSON file content using the testing file ID.

Example data format:
{
"messages": [
{"role": "system", "content": "You are an assistant that is good at categorizing if what user is saying is a query or non-query"},
{"role": "user", "content": "what is the colour of the apple"},
{"role": "assistant", "content": "query"}
]
}
{
"messages": [
{"role": "system", "content": "You are an assistant that is good at categorizing if what user is saying is a query or non-query"},
{"role": "user", "content": "i like apples"},
{"role": "assistant", "content": "non-query"}
]
}
Load prompts (X) and labels (y) from an S3-hosted CSV via storage.stream.
Expects:
- one of: 'query' | 'question' | 'message'
- 'label'
"""
logger.info(
f"[load_labels_and_prompts] Loading labels and prompts from file ID: {self.testing_file_id}"
f"[ModelEvaluator.load_labels_and_prompts] Loading CSV from: {self.test_data_s3_url}"
)
file_obj = self.storage.stream(self.test_data_s3_url)
try:
response = self.client.files.content(self.testing_file_id)
file_bytes = response.read()
lines = file_bytes.decode("utf-8").splitlines()

for ln, line in enumerate(lines, 1):
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
msgs = obj.get("messages", [])
if not isinstance(msgs, list) or not msgs:
logger.error(
f"[load_labels_and_prompts] Line {ln}: 'messages' missing or invalid"
)
raise ValueError(f"Line {ln}: 'messages' missing or invalid")

user_msgs = [
m for m in msgs if m.get("role") == "user" and "content" in m
]
model_msgs = [
m
for m in msgs
if m.get("role") == "assistant" and "content" in m
]
if not user_msgs or not model_msgs:
logger.error(
f"[load_labels_and_prompts] Line {ln}: missing user or assistant message"
)
raise ValueError(
f"Line {ln}: missing user or assistant message"
)
df = pd.read_csv(file_obj)
df.columns = [c.strip().lower() for c in df.columns]

possible_query_columns = ["query", "question", "message"]
query_col = next(
(c for c in possible_query_columns if c in df.columns), None
)
label_col = "label" if "label" in df.columns else None

prompt = user_msgs[0]["content"]
label = (model_msgs[0]["content"] or "").strip().lower()
if not query_col or not label_col:
logger.error(
"[ModelEvaluator.load_labels_and_prompts] CSV must contain a 'label' column "
f"and one of: {possible_query_columns}"
)
raise ValueError(
f"CSV must contain a 'label' column and one of: {possible_query_columns}"
)

self.prompts.append(prompt)
self.y_true.append(label)
self.allowed_labels.add(label)
prompts = df[query_col].astype(str).tolist()
labels = df[label_col].astype(str).str.strip().str.lower().tolist()

except Exception as e:
logger.error(
f"[load_labels_and_prompts] Error processing line {ln}: {str(e)}"
)
raise
self.prompts = prompts
self.y_true = labels
self.allowed_labels = set(labels)

self.query_col = query_col
self.label_col = label_col

logger.info(
f"[load_labels_and_prompts] Loaded {len(self.prompts)} prompts and {len(self.y_true)} labels."
"[ModelEvaluator.load_labels_and_prompts] "
f"Loaded {len(self.prompts)} prompts and {len(self.y_true)} labels; "
f"query_col={query_col}, label_col={label_col}, allowed_labels={self.allowed_labels}"
)

except Exception as e:
logger.error(
f"[load_labels_and_prompts] Failed to load file content: {str(e)}"
f"[ModelEvaluator.load_labels_and_prompts] Failed to load/parse test CSV: {e}",
exc_info=True,
)
raise
finally:
file_obj.close()

def normalize_prediction(self, text: str) -> str:
logger.debug(f"[normalize_prediction] Normalizing prediction: {text}")
Expand All @@ -139,7 +115,7 @@ def normalize_prediction(self, text: str) -> str:
)
return next(iter(self.allowed_labels))

def generate_predictions(self) -> list[str]:
def generate_predictions(self) -> tuple[list[str], str]:
logger.info(
f"[generate_predictions] Generating predictions for {len(self.prompts)} prompts."
)
Expand Down Expand Up @@ -192,34 +168,74 @@ def generate_predictions(self) -> list[str]:

total_elapsed = time.time() - start_preds
logger.info(
f"[generate_predictions] Finished {total_prompts} prompts in {total_elapsed:.2f}s | Generated {len(predictions)} predictions."
f"[generate_predictions] Finished {total_prompts} prompts in {total_elapsed:.2f}s | "
f"Generated {len(predictions)} predictions."
)

prediction_data = pd.DataFrame(
{
"prompt": self.prompts,
"true_label": self.y_true,
"prediction": predictions,
}
)

unique_id = uuid.uuid4().hex
filename = f"predictions_{self.model_name}_{unique_id}.csv"
prediction_data_s3_url = DataPreprocessor.upload_csv_to_s3(
self.storage, prediction_data, filename
)
self.prediction_data_s3_url = prediction_data_s3_url

logger.info(
f"[generate_predictions] Predictions CSV uploaded to S3 | url={prediction_data_s3_url}"
)
return predictions

def evaluate(self, y_pred: list[str]) -> dict:
"""Evaluate the predictions against the true labels."""
return predictions, prediction_data_s3_url

logger.info(f"[evaluate] Starting evaluation with {len(y_pred)} predictions.")
def evaluate(self) -> dict:
"""Evaluate using the predictions CSV previously uploaded to S3."""
if not getattr(self, "prediction_data_s3_url", None):
raise RuntimeError(
"[evaluate] predictions_s3_url not set. Call generate_predictions() first."
)

logger.info(
f"[evaluate] Streaming predictions CSV from: {self.prediction_data_s3_url}"
)
prediction_obj = self.storage.stream(self.prediction_data_s3_url)
try:
mcc_score = round(matthews_corrcoef(self.y_true, y_pred), 4)
df = pd.read_csv(prediction_obj)
finally:
prediction_obj.close()

return {
"mcc": mcc_score,
}
if "true_label" not in df.columns or "prediction" not in df.columns:
raise ValueError(
"[evaluate] prediction data CSV must contain 'true_label' and 'prediction' columns."
)

y_true = df["true_label"].astype(str).str.strip().str.lower().tolist()
y_pred = df["prediction"].astype(str).str.strip().str.lower().tolist()

try:
mcc_score = round(matthews_corrcoef(y_true, y_pred), 4)
logger.info(f"[evaluate] Computed MCC={mcc_score}")
return {"mcc_score": mcc_score}
except Exception as e:
logger.error(f"[evaluate] Evaluation failed: {e}")
logger.error(f"[evaluate] Evaluation failed: {e}", exc_info=True)
raise

def run(self) -> dict:
"""Run the full evaluation process: load data, generate predictions, evaluate results."""
try:
self.load_labels_and_prompts()
predictions = self.generate_predictions()
evaluation_results = self.evaluate(predictions)
predictions, prediction_data_s3_url = self.generate_predictions()
evaluation_results = self.evaluate()
logger.info("[evaluate] Model evaluation completed successfully.")
return evaluation_results
return {
"evaluation_score": evaluation_results,
"prediction_data_s3_url": prediction_data_s3_url,
}
except Exception as e:
logger.error(f"[evaluate] Error in running ModelEvaluator: {str(e)}")
raise
Loading