diff --git a/backend/app/alembic/versions/06a8bed9e7a6_add_additional_columns_to_model_eval.py b/backend/app/alembic/versions/06a8bed9e7a6_add_additional_columns_to_model_eval.py new file mode 100644 index 00000000..f00052df --- /dev/null +++ b/backend/app/alembic/versions/06a8bed9e7a6_add_additional_columns_to_model_eval.py @@ -0,0 +1,42 @@ +"""add additional columns to model evaluation table + +Revision ID: 06a8bed9e7a6 +Revises: 72896bcc94da +Create Date: 2025-08-25 22:36:58.959768 + +""" +from alembic import op +import sqlalchemy as sa +import sqlmodel.sql.sqltypes +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = "06a8bed9e7a6" +down_revision = "72896bcc94da" +branch_labels = None +depends_on = None + + +def upgrade(): + op.drop_column("model_evaluation", "testing_file_id") + op.add_column( + "model_evaluation", + sa.Column( + "test_data_s3_url", sqlmodel.sql.sqltypes.AutoString(), nullable=False + ), + ) + op.add_column( + "model_evaluation", + sa.Column( + "prediction_data_s3_url", sqlmodel.sql.sqltypes.AutoString(), nullable=True + ), + ) + + +def downgrade(): + op.drop_column("model_evaluation", "prediction_data_s3_url") + op.drop_column("model_evaluation", "test_data_s3_url") + op.add_column( + "model_evaluation", + sa.Column("testing_file_id", sa.VARCHAR(), autoincrement=False, nullable=True), + ) diff --git a/backend/app/api/routes/model_evaluation.py b/backend/app/api/routes/model_evaluation.py index 23859f65..3394a344 100644 --- a/backend/app/api/routes/model_evaluation.py +++ b/backend/app/api/routes/model_evaluation.py @@ -22,6 +22,7 @@ ModelEvaluationPublic, ) from app.core.db import engine +from app.core.cloud import AmazonCloudStorage from app.core.finetune.evaluation import ModelEvaluator from app.utils import get_openai_client, APIResponse from app.api.deps import CurrentUserOrgProject, SessionDep @@ -35,7 +36,6 @@ def run_model_evaluation( eval_id: int, current_user: CurrentUserOrgProject, - client: OpenAI, ): start_time = time.time() logger.info( @@ -43,6 +43,11 @@ def run_model_evaluation( ) with Session(engine) as db: + client = get_openai_client( + db, current_user.organization_id, current_user.project_id + ) + storage = AmazonCloudStorage(current_user) + try: model_eval = update_model_eval( session=db, @@ -53,9 +58,10 @@ def run_model_evaluation( evaluator = ModelEvaluator( model_name=model_eval.model_name, - testing_file_id=model_eval.testing_file_id, + test_data_s3_url=model_eval.test_data_s3_url, system_prompt=model_eval.system_prompt, client=client, + storage=storage, ) result = evaluator.run() @@ -64,7 +70,9 @@ def run_model_evaluation( eval_id=eval_id, project_id=current_user.project_id, update=ModelEvaluationUpdate( - score=result, status=ModelEvaluationStatus.completed + score=result["evaluation_score"], + prediction_data_s3_url=result["prediction_data_s3_url"], + status=ModelEvaluationStatus.completed, ), ) @@ -111,7 +119,8 @@ def evaluate_models( """ client = get_openai_client( session, current_user.organization_id, current_user.project_id - ) + ) # keeping this here for checking if the user's validated OpenAI key is present or not, + # even though the client will be initialized separately inside the background task if not request.fine_tuning_ids: logger.error( @@ -150,9 +159,7 @@ def evaluate_models( f"[evaluate_model] Created evaluation for fine_tuning_id {job_id} with eval ID={model_eval.id}, project_id:{current_user.project_id}" ) - background_tasks.add_task( - run_model_evaluation, model_eval.id, current_user, client - ) + background_tasks.add_task(run_model_evaluation, model_eval.id, current_user) return APIResponse.success_response( {"message": "Model evaluation(s) started successfully", "data": evals} diff --git a/backend/app/core/finetune/evaluation.py b/backend/app/core/finetune/evaluation.py index fb83c133..1f34fddb 100644 --- a/backend/app/core/finetune/evaluation.py +++ b/backend/app/core/finetune/evaluation.py @@ -1,18 +1,18 @@ -import json import difflib import time import logging from typing import Set import openai +import pandas as pd from openai import OpenAI +import uuid from sklearn.metrics import ( matthews_corrcoef, - accuracy_score, - f1_score, - confusion_matrix, ) +from app.core.cloud import AmazonCloudStorage from app.api.routes.fine_tuning import handle_openai_error +from app.core.finetune.preprocessing import DataPreprocessor logger = logging.getLogger(__name__) @@ -26,12 +26,14 @@ class ModelEvaluator: def __init__( self, model_name: str, - testing_file_id: str, + test_data_s3_url: str, + storage: AmazonCloudStorage, system_prompt: str, client: OpenAI, ): self.model_name = model_name - self.testing_file_id = testing_file_id + self.test_data_s3_url = test_data_s3_url + self.storage = storage self.system_instruction = system_prompt self.client = client @@ -43,83 +45,57 @@ def __init__( def load_labels_and_prompts(self) -> None: """ - Loads labels and prompts directly from OpenAI NDJSON file content using the testing file ID. - - Example data format: - { - "messages": [ - {"role": "system", "content": "You are an assistant that is good at categorizing if what user is saying is a query or non-query"}, - {"role": "user", "content": "what is the colour of the apple"}, - {"role": "assistant", "content": "query"} - ] - } - { - "messages": [ - {"role": "system", "content": "You are an assistant that is good at categorizing if what user is saying is a query or non-query"}, - {"role": "user", "content": "i like apples"}, - {"role": "assistant", "content": "non-query"} - ] - } + Load prompts (X) and labels (y) from an S3-hosted CSV via storage.stream. + Expects: + - one of: 'query' | 'question' | 'message' + - 'label' """ logger.info( - f"[load_labels_and_prompts] Loading labels and prompts from file ID: {self.testing_file_id}" + f"[ModelEvaluator.load_labels_and_prompts] Loading CSV from: {self.test_data_s3_url}" ) + file_obj = self.storage.stream(self.test_data_s3_url) try: - response = self.client.files.content(self.testing_file_id) - file_bytes = response.read() - lines = file_bytes.decode("utf-8").splitlines() - - for ln, line in enumerate(lines, 1): - line = line.strip() - if not line: - continue - try: - obj = json.loads(line) - msgs = obj.get("messages", []) - if not isinstance(msgs, list) or not msgs: - logger.error( - f"[load_labels_and_prompts] Line {ln}: 'messages' missing or invalid" - ) - raise ValueError(f"Line {ln}: 'messages' missing or invalid") - - user_msgs = [ - m for m in msgs if m.get("role") == "user" and "content" in m - ] - model_msgs = [ - m - for m in msgs - if m.get("role") == "assistant" and "content" in m - ] - if not user_msgs or not model_msgs: - logger.error( - f"[load_labels_and_prompts] Line {ln}: missing user or assistant message" - ) - raise ValueError( - f"Line {ln}: missing user or assistant message" - ) + df = pd.read_csv(file_obj) + df.columns = [c.strip().lower() for c in df.columns] + + possible_query_columns = ["query", "question", "message"] + query_col = next( + (c for c in possible_query_columns if c in df.columns), None + ) + label_col = "label" if "label" in df.columns else None - prompt = user_msgs[0]["content"] - label = (model_msgs[0]["content"] or "").strip().lower() + if not query_col or not label_col: + logger.error( + "[ModelEvaluator.load_labels_and_prompts] CSV must contain a 'label' column " + f"and one of: {possible_query_columns}" + ) + raise ValueError( + f"CSV must contain a 'label' column and one of: {possible_query_columns}" + ) - self.prompts.append(prompt) - self.y_true.append(label) - self.allowed_labels.add(label) + prompts = df[query_col].astype(str).tolist() + labels = df[label_col].astype(str).str.strip().str.lower().tolist() - except Exception as e: - logger.error( - f"[load_labels_and_prompts] Error processing line {ln}: {str(e)}" - ) - raise + self.prompts = prompts + self.y_true = labels + self.allowed_labels = set(labels) + + self.query_col = query_col + self.label_col = label_col logger.info( - f"[load_labels_and_prompts] Loaded {len(self.prompts)} prompts and {len(self.y_true)} labels." + "[ModelEvaluator.load_labels_and_prompts] " + f"Loaded {len(self.prompts)} prompts and {len(self.y_true)} labels; " + f"query_col={query_col}, label_col={label_col}, allowed_labels={self.allowed_labels}" ) - except Exception as e: logger.error( - f"[load_labels_and_prompts] Failed to load file content: {str(e)}" + f"[ModelEvaluator.load_labels_and_prompts] Failed to load/parse test CSV: {e}", + exc_info=True, ) raise + finally: + file_obj.close() def normalize_prediction(self, text: str) -> str: logger.debug(f"[normalize_prediction] Normalizing prediction: {text}") @@ -139,7 +115,7 @@ def normalize_prediction(self, text: str) -> str: ) return next(iter(self.allowed_labels)) - def generate_predictions(self) -> list[str]: + def generate_predictions(self) -> tuple[list[str], str]: logger.info( f"[generate_predictions] Generating predictions for {len(self.prompts)} prompts." ) @@ -192,34 +168,74 @@ def generate_predictions(self) -> list[str]: total_elapsed = time.time() - start_preds logger.info( - f"[generate_predictions] Finished {total_prompts} prompts in {total_elapsed:.2f}s | Generated {len(predictions)} predictions." + f"[generate_predictions] Finished {total_prompts} prompts in {total_elapsed:.2f}s | " + f"Generated {len(predictions)} predictions." + ) + + prediction_data = pd.DataFrame( + { + "prompt": self.prompts, + "true_label": self.y_true, + "prediction": predictions, + } + ) + + unique_id = uuid.uuid4().hex + filename = f"predictions_{self.model_name}_{unique_id}.csv" + prediction_data_s3_url = DataPreprocessor.upload_csv_to_s3( + self.storage, prediction_data, filename + ) + self.prediction_data_s3_url = prediction_data_s3_url + + logger.info( + f"[generate_predictions] Predictions CSV uploaded to S3 | url={prediction_data_s3_url}" ) - return predictions - def evaluate(self, y_pred: list[str]) -> dict: - """Evaluate the predictions against the true labels.""" + return predictions, prediction_data_s3_url - logger.info(f"[evaluate] Starting evaluation with {len(y_pred)} predictions.") + def evaluate(self) -> dict: + """Evaluate using the predictions CSV previously uploaded to S3.""" + if not getattr(self, "prediction_data_s3_url", None): + raise RuntimeError( + "[evaluate] predictions_s3_url not set. Call generate_predictions() first." + ) + logger.info( + f"[evaluate] Streaming predictions CSV from: {self.prediction_data_s3_url}" + ) + prediction_obj = self.storage.stream(self.prediction_data_s3_url) try: - mcc_score = round(matthews_corrcoef(self.y_true, y_pred), 4) + df = pd.read_csv(prediction_obj) + finally: + prediction_obj.close() - return { - "mcc": mcc_score, - } + if "true_label" not in df.columns or "prediction" not in df.columns: + raise ValueError( + "[evaluate] prediction data CSV must contain 'true_label' and 'prediction' columns." + ) + y_true = df["true_label"].astype(str).str.strip().str.lower().tolist() + y_pred = df["prediction"].astype(str).str.strip().str.lower().tolist() + + try: + mcc_score = round(matthews_corrcoef(y_true, y_pred), 4) + logger.info(f"[evaluate] Computed MCC={mcc_score}") + return {"mcc_score": mcc_score} except Exception as e: - logger.error(f"[evaluate] Evaluation failed: {e}") + logger.error(f"[evaluate] Evaluation failed: {e}", exc_info=True) raise def run(self) -> dict: """Run the full evaluation process: load data, generate predictions, evaluate results.""" try: self.load_labels_and_prompts() - predictions = self.generate_predictions() - evaluation_results = self.evaluate(predictions) + predictions, prediction_data_s3_url = self.generate_predictions() + evaluation_results = self.evaluate() logger.info("[evaluate] Model evaluation completed successfully.") - return evaluation_results + return { + "evaluation_score": evaluation_results, + "prediction_data_s3_url": prediction_data_s3_url, + } except Exception as e: logger.error(f"[evaluate] Error in running ModelEvaluator: {str(e)}") raise diff --git a/backend/app/core/finetune/preprocessing.py b/backend/app/core/finetune/preprocessing.py index 12120258..28fe9b23 100644 --- a/backend/app/core/finetune/preprocessing.py +++ b/backend/app/core/finetune/preprocessing.py @@ -27,7 +27,11 @@ def __init__(self, document, storage, split_ratio: float, system_message: str): self.system_message = {"role": "system", "content": system_message.strip()} - def upload_csv_to_s3(self, df, filename: str) -> str: + @staticmethod + def upload_csv_to_s3(storage, df, filename: str) -> str: + """ + Uploads a DataFrame as CSV to S3 using the provided storage instance. + """ logger.info( f"[upload_csv_to_s3] Preparing to upload '{filename}' to s3 | rows={len(df)}, cols={len(df.columns)}" ) @@ -43,7 +47,7 @@ def upload_csv_to_s3(self, df, filename: str) -> str: ) try: - dest = self.storage.put(upload, basename=Path("datasets") / filename) + dest = storage.put(upload, basename=Path("datasets") / filename) logger.info( f"[upload_csv_to_s3] Upload successful | filename='{filename}', s3_url='{dest}'" ) @@ -139,8 +143,8 @@ def process(self): test_csv_name = f"test_split_{test_percentage}_{unique_id}.csv" train_jsonl_name = f"train_data_{train_percentage}_{unique_id}.jsonl" - train_csv_url = self.upload_csv_to_s3(train_data, train_csv_name) - test_csv_url = self.upload_csv_to_s3(test_data, test_csv_name) + train_csv_url = self.upload_csv_to_s3(self.storage, train_data, train_csv_name) + test_csv_url = self.upload_csv_to_s3(self.storage, test_data, test_csv_name) train_jsonl_path = self._save_to_jsonl(train_jsonl, train_jsonl_name) diff --git a/backend/app/crud/model_evaluation.py b/backend/app/crud/model_evaluation.py index 96b91477..7a1e2b3b 100644 --- a/backend/app/crud/model_evaluation.py +++ b/backend/app/crud/model_evaluation.py @@ -26,9 +26,9 @@ def create_model_evaluation( ) -> ModelEvaluation: fine_tuning_job = fetch_by_id(session, request.fine_tuning_id, project_id) - if fine_tuning_job.fine_tuned_model and fine_tuning_job.testing_file_id is None: + if fine_tuning_job.fine_tuned_model and fine_tuning_job.test_data_s3_url is None: logger.error( - f"[create_model_evaluation] No fine tuned model found for the given fine tuning ID | fine_tuning_id={request.fine_tuning_id}, project_id={project_id}" + f"[create_model_evaluation] No fine tuned model or test data found for the given fine tuning ID | fine_tuning_id={request.fine_tuning_id}, project_id={project_id}" ) raise HTTPException(404, "Fine tuned model not found") @@ -39,7 +39,7 @@ def create_model_evaluation( "split_ratio": fine_tuning_job.split_ratio, "model_name": fine_tuning_job.fine_tuned_model, "document_id": fine_tuning_job.document_id, - "testing_file_id": fine_tuning_job.testing_file_id, + "test_data_s3_url": fine_tuning_job.test_data_s3_url, "project_id": project_id, "organization_id": organization_id, "status": status, diff --git a/backend/app/models/model_evaluation.py b/backend/app/models/model_evaluation.py index 9251d2fb..da24cdcd 100644 --- a/backend/app/models/model_evaluation.py +++ b/backend/app/models/model_evaluation.py @@ -47,18 +47,20 @@ class ModelEvaluation(ModelEvaluationBase, table=True): nullable=False, ) model_name: str = Field(description="fine tuned model name from OpenAI") - testing_file_id: str = Field( - description="File ID of the testing file uploaded to OpenAI" - ) + test_data_s3_url: str = Field(description="S3 url of the testing data stored in S3") base_model: str = Field(nullable=False, description="Base model for fine-tuning") split_ratio: float = Field( nullable=False, description="the ratio the dataset was divided in" ) system_prompt: str = Field(sa_column=Column(Text, nullable=False)) - score: Optional[dict[str, float]] = Field( + score: dict[str, float] | None = Field( sa_column=Column(JSON, nullable=True), description="Evaluation scores per metric (e.g., {'mcc': 0.85})", ) + prediction_data_s3_url: str | None = Field( + default=None, + description="S3 URL where the prediction data generated by the fine-tuned model is stored", + ) status: ModelEvaluationStatus = ( Field(default=ModelEvaluationStatus.pending, description="Evaluation status"), ) @@ -85,6 +87,7 @@ class ModelEvaluationUpdate(SQLModel): score: Optional[dict[str, float]] = None status: Optional[ModelEvaluationStatus] = None error_message: Optional[str] = None + prediction_data_s3_url: Optional[str] = None class ModelEvaluationPublic(ModelEvaluationBase): @@ -95,8 +98,10 @@ class ModelEvaluationPublic(ModelEvaluationBase): model_name: str split_ratio: float base_model: str + prediction_data_s3_url: str | None score: dict[str, float] | None = None status: ModelEvaluationStatus + inserted_at: datetime updated_at: datetime deleted_at: datetime | None = None diff --git a/backend/app/tests/crud/test_model_evaluation.py b/backend/app/tests/crud/test_model_evaluation.py index 2b71ce4f..0eaa72f7 100644 --- a/backend/app/tests/crud/test_model_evaluation.py +++ b/backend/app/tests/crud/test_model_evaluation.py @@ -37,7 +37,7 @@ def test_create_model_evaluation(db: Session): base_model=fine_tune.base_model, model_name=fine_tune.fine_tuned_model, document_id=fine_tune.document_id, - testing_file_id=fine_tune.testing_file_id, + test_data_s3_url=fine_tune.test_data_s3_url, status="pending", ) @@ -52,7 +52,7 @@ def test_create_model_evaluation(db: Session): assert created_eval.status == "pending" assert created_eval.document_id == fine_tune.document_id assert created_eval.model_name == fine_tune.fine_tuned_model - assert created_eval.testing_file_id == fine_tune.testing_file_id + assert created_eval.test_data_s3_url == fine_tune.test_data_s3_url def test_fetch_by_eval_id_success(db: Session): diff --git a/backend/app/tests/utils/test_data.py b/backend/app/tests/utils/test_data.py index 680a2a87..edce345e 100644 --- a/backend/app/tests/utils/test_data.py +++ b/backend/app/tests/utils/test_data.py @@ -164,11 +164,11 @@ def create_test_finetuning_job_with_extra_fields( db: Session, ratios: list[float], ) -> tuple[list[Fine_Tuning], bool]: - jobs, _ = create_test_fine_tuning_jobs(db, [0.5, 0.7]) + jobs, _ = create_test_fine_tuning_jobs(db, ratios) if jobs: for job in jobs: - job.testing_file_id = "testing_file_id_example" + job.test_data_s3_url = "test_data_s3_url_example" job.fine_tuned_model = "fine_tuned_model_name" return jobs, True @@ -186,9 +186,7 @@ def create_test_model_evaluation(db) -> list[ModelEvaluation]: base_model=fine_tune.base_model, model_name=fine_tune.fine_tuned_model, document_id=fine_tune.document_id, - testing_file_id=fine_tune.testing_file_id - if fine_tune.testing_file_id - else None, + test_data_s3_url=fine_tune.test_data_s3_url, ) model_eval = create_model_evaluation(