diff --git a/backend/app/alembic/versions/6ed6ed401847_add_fine_tuning_and_model_evaluation_.py b/backend/app/alembic/versions/6ed6ed401847_add_fine_tuning_and_model_evaluation_.py index a1add3e0..dc925366 100644 --- a/backend/app/alembic/versions/6ed6ed401847_add_fine_tuning_and_model_evaluation_.py +++ b/backend/app/alembic/versions/6ed6ed401847_add_fine_tuning_and_model_evaluation_.py @@ -60,10 +60,10 @@ def upgrade(): "fine_tuned_model", sqlmodel.sql.sqltypes.AutoString(), nullable=True ), sa.Column( - "train_data_s3_url", sqlmodel.sql.sqltypes.AutoString(), nullable=True + "train_data_s3_object", sqlmodel.sql.sqltypes.AutoString(), nullable=True ), sa.Column( - "test_data_s3_url", sqlmodel.sql.sqltypes.AutoString(), nullable=True + "test_data_s3_object", sqlmodel.sql.sqltypes.AutoString(), nullable=True ), sa.Column("error_message", sqlmodel.sql.sqltypes.AutoString(), nullable=True), sa.Column("project_id", sa.Integer(), nullable=False), @@ -87,16 +87,20 @@ def upgrade(): sa.Column("fine_tuning_id", sa.Integer(), nullable=False), sa.Column("id", sa.Integer(), nullable=False), sa.Column("document_id", sa.Uuid(), nullable=False), - sa.Column("model_name", sqlmodel.sql.sqltypes.AutoString(), nullable=False), sa.Column( - "test_data_s3_url", sqlmodel.sql.sqltypes.AutoString(), nullable=False + "fine_tuned_model", sqlmodel.sql.sqltypes.AutoString(), nullable=False + ), + sa.Column( + "test_data_s3_object", sqlmodel.sql.sqltypes.AutoString(), nullable=False ), sa.Column("base_model", sqlmodel.sql.sqltypes.AutoString(), nullable=False), sa.Column("split_ratio", sa.Float(), nullable=False), sa.Column("system_prompt", sa.Text(), nullable=False), sa.Column("score", postgresql.JSON(astext_type=sa.Text()), nullable=True), sa.Column( - "prediction_data_s3_url", sqlmodel.sql.sqltypes.AutoString(), nullable=True + "prediction_data_s3_object", + sqlmodel.sql.sqltypes.AutoString(), + nullable=True, ), sa.Column( "status", diff --git a/backend/app/api/routes/fine_tuning.py b/backend/app/api/routes/fine_tuning.py index c5e4ac44..23d47a00 100644 --- a/backend/app/api/routes/fine_tuning.py +++ b/backend/app/api/routes/fine_tuning.py @@ -14,7 +14,7 @@ FineTuningUpdate, FineTuningStatus, ) -from app.core.cloud import AmazonCloudStorage +from app.core.cloud import get_cloud_storage, storage from app.crud.document import DocumentCrud from app.utils import get_openai_client, APIResponse, mask_string, load_description from app.crud import ( @@ -69,16 +69,18 @@ def process_fine_tuning_job( client = get_openai_client( session, current_user.organization_id, project_id ) - storage = AmazonCloudStorage(current_user) - document_crud = DocumentCrud(session=session, owner_id=current_user.id) + storage = get_cloud_storage( + session=session, project_id=current_user.project_id + ) + document_crud = DocumentCrud(session, current_user.project_id) document = document_crud.read_one(request.document_id) preprocessor = DataPreprocessor( document, storage, ratio, request.system_prompt ) result = preprocessor.process() train_data_temp_filepath = result["train_jsonl_temp_filepath"] - train_data_s3_url = result["train_csv_s3_url"] - test_data_s3_url = result["test_csv_s3_url"] + train_data_s3_object = result["train_csv_s3_object"] + test_data_s3_object = result["test_csv_s3_object"] try: with open(train_data_temp_filepath, "rb") as train_f: @@ -100,7 +102,8 @@ def process_fine_tuning_job( job=fine_tune, update=FineTuningUpdate( status=FineTuningStatus.failed, - error_message="Failed during background job processing", + error_message="Error while uploading file to openai : " + + error_msg, ), ) return @@ -128,7 +131,8 @@ def process_fine_tuning_job( job=fine_tune, update=FineTuningUpdate( status=FineTuningStatus.failed, - error_message="Failed during background job processing", + error_message="Error while creating an openai fine tuning job : " + + error_msg, ), ) return @@ -138,8 +142,8 @@ def process_fine_tuning_job( job=fine_tune, update=FineTuningUpdate( training_file_id=training_file_id, - train_data_s3_url=train_data_s3_url, - test_data_s3_url=test_data_s3_url, + train_data_s3_object=train_data_s3_object, + test_data_s3_object=test_data_s3_object, split_ratio=ratio, provider_job_id=job.id, status=FineTuningStatus.running, @@ -155,6 +159,7 @@ def process_fine_tuning_job( ) except Exception as e: + error_msg = str(e) logger.error( f"[process_fine_tuning_job] Background job failure: {e} | " f"job_id={job_id}, project_id={project_id}|" @@ -164,7 +169,8 @@ def process_fine_tuning_job( job=fine_tune, update=FineTuningUpdate( status=FineTuningStatus.failed, - error_message="Failed during background job processing", + error_message="Error while processing the background job : " + + error_msg, ), ) @@ -186,6 +192,7 @@ def fine_tune_from_CSV( current_user.organization_id, current_user.project_id, ) + results = [] for ratio in request.split_ratio: @@ -235,21 +242,19 @@ def fine_tune_from_CSV( @router.get( - "/{job_id}/refresh", + "/{fine_tuning_id}/refresh", description=load_description("fine_tuning/retrieve.md"), response_model=APIResponse[FineTuningJobPublic], ) def refresh_fine_tune_status( - job_id: int, session: SessionDep, current_user: CurrentUserOrgProject + fine_tuning_id: int, session: SessionDep, current_user: CurrentUserOrgProject ): project_id = current_user.project_id - job = fetch_by_id(session, job_id, project_id) + job = fetch_by_id(session, fine_tuning_id, project_id) client = get_openai_client(session, current_user.organization_id, project_id) + storage = get_cloud_storage(session=session, project_id=current_user.project_id) - if job.provider_job_id is None: - return APIResponse.success_response(job) - - else: + if job.provider_job_id is not None: try: openai_job = client.fine_tuning.jobs.retrieve(job.provider_job_id) except openai.OpenAIError as e: @@ -257,7 +262,7 @@ def refresh_fine_tune_status( logger.error( f"[Retrieve_fine_tune_status] Failed to retrieve OpenAI job | " f"provider_job_id={mask_string(job.provider_job_id)}, " - f"error={error_msg}, job_id={job_id}, project_id={project_id}" + f"error={error_msg}, fine_tuning_id={fine_tuning_id}, project_id={project_id}" ) raise HTTPException( status_code=502, detail=f"OpenAI API error: {error_msg}" @@ -285,6 +290,17 @@ def refresh_fine_tune_status( ): job = update_finetune_job(session=session, job=job, update=update_payload) + job = job.model_copy( + update={ + "train_data_file_url": storage.get_signed_url(job.train_data_s3_object) + if job.train_data_s3_object + else None, + "test_data_file_url": storage.get_signed_url(job.test_data_s3_object) + if job.test_data_s3_object + else None, + } + ) + return APIResponse.success_response(job) @@ -296,6 +312,7 @@ def refresh_fine_tune_status( def retrieve_jobs_by_document( document_id: UUID, session: SessionDep, current_user: CurrentUserOrgProject ): + storage = get_cloud_storage(session=session, project_id=current_user.project_id) project_id = current_user.project_id jobs = fetch_by_document_id(session, document_id, project_id) if not jobs: @@ -306,4 +323,25 @@ def retrieve_jobs_by_document( status_code=404, detail="No fine-tuning jobs found for the given document ID", ) - return APIResponse.success_response(jobs) + updated_jobs = [] + for job in jobs: + train_url = ( + storage.get_signed_url(job.train_data_s3_object) + if job.train_data_s3_object + else None + ) + test_url = ( + storage.get_signed_url(job.test_data_s3_object) + if job.test_data_s3_object + else None + ) + + updated_job = job.model_copy( + update={ + "train_data_file_url": train_url, + "test_data_file_url": test_url, + } + ) + updated_jobs.append(updated_job) + + return APIResponse.success_response(updated_jobs) diff --git a/backend/app/api/routes/model_evaluation.py b/backend/app/api/routes/model_evaluation.py index 3394a344..b38172dd 100644 --- a/backend/app/api/routes/model_evaluation.py +++ b/backend/app/api/routes/model_evaluation.py @@ -22,7 +22,7 @@ ModelEvaluationPublic, ) from app.core.db import engine -from app.core.cloud import AmazonCloudStorage +from app.core.cloud import get_cloud_storage from app.core.finetune.evaluation import ModelEvaluator from app.utils import get_openai_client, APIResponse from app.api.deps import CurrentUserOrgProject, SessionDep @@ -33,6 +33,19 @@ router = APIRouter(prefix="/model_evaluation", tags=["model_evaluation"]) +def attach_prediction_file_url(model_obj, storage): + """ + Given a model-like object and a storage client, + attach a signed prediction data file URL (if available). + """ + s3_key = getattr(model_obj, "prediction_data_s3_object", None) + prediction_data_file_url = storage.get_signed_url(s3_key) if s3_key else None + + return model_obj.model_copy( + update={"prediction_data_file_url": prediction_data_file_url} + ) + + def run_model_evaluation( eval_id: int, current_user: CurrentUserOrgProject, @@ -46,7 +59,7 @@ def run_model_evaluation( client = get_openai_client( db, current_user.organization_id, current_user.project_id ) - storage = AmazonCloudStorage(current_user) + storage = get_cloud_storage(session=db, project_id=current_user.project_id) try: model_eval = update_model_eval( @@ -57,8 +70,8 @@ def run_model_evaluation( ) evaluator = ModelEvaluator( - model_name=model_eval.model_name, - test_data_s3_url=model_eval.test_data_s3_url, + fine_tuned_model=model_eval.fine_tuned_model, + test_data_s3_object=model_eval.test_data_s3_object, system_prompt=model_eval.system_prompt, client=client, storage=storage, @@ -71,7 +84,7 @@ def run_model_evaluation( project_id=current_user.project_id, update=ModelEvaluationUpdate( score=result["evaluation_score"], - prediction_data_s3_url=result["prediction_data_s3_url"], + prediction_data_s3_object=result["prediction_data_s3_object"], status=ModelEvaluationStatus.completed, ), ) @@ -82,6 +95,7 @@ def run_model_evaluation( ) except Exception as e: + error_msg = str(e) logger.error( f"[run_model_evaluation] Failed | eval_id={eval_id}, project_id={current_user.project_id}: {e}" ) @@ -92,7 +106,8 @@ def run_model_evaluation( project_id=current_user.project_id, update=ModelEvaluationUpdate( status=ModelEvaluationStatus.failed, - error_message="failed during background job processing", + error_message="failed during background job processing:" + + error_msg, ), ) @@ -128,20 +143,20 @@ def evaluate_models( ) raise HTTPException(status_code=400, detail="No fine-tuned job IDs provided") - evals: list[ModelEvaluationPublic] = [] + evaluations: list[ModelEvaluationPublic] = [] for job_id in request.fine_tuning_ids: fine_tuning_job = fetch_by_id(session, job_id, current_user.project_id) - active_evals = fetch_active_model_evals( + active_evaluations = fetch_active_model_evals( session, job_id, current_user.project_id ) - if active_evals: + if active_evaluations: logger.info( f"[evaluate_model] Skipping creation for {job_id}. Active evaluation exists, project_id:{current_user.project_id}" ) - evals.extend( - ModelEvaluationPublic.model_validate(ev) for ev in active_evals + evaluations.extend( + ModelEvaluationPublic.model_validate(ev) for ev in active_evaluations ) continue @@ -153,7 +168,7 @@ def evaluate_models( status=ModelEvaluationStatus.pending, ) - evals.append(ModelEvaluationPublic.model_validate(model_eval)) + evaluations.append(ModelEvaluationPublic.model_validate(model_eval)) logger.info( f"[evaluate_model] Created evaluation for fine_tuning_id {job_id} with eval ID={model_eval.id}, project_id:{current_user.project_id}" @@ -161,14 +176,26 @@ def evaluate_models( background_tasks.add_task(run_model_evaluation, model_eval.id, current_user) + response_data = [ + { + "id": ev.id, + "fine_tuning_id": ev.fine_tuning_id, + "fine_tuned_model": getattr(ev, "fine_tuned_model", None), + "document_id": getattr(ev, "document_id", None), + "status": ev.status, + } + for ev in evaluations + ] + return APIResponse.success_response( - {"message": "Model evaluation(s) started successfully", "data": evals} + {"message": "Model evaluation(s) started successfully", "data": response_data} ) @router.get( "/{document_id}/top_model", response_model=APIResponse[ModelEvaluationPublic], + response_model_exclude_none=True, ) def get_top_model_by_doc_id( document_id: UUID, @@ -183,19 +210,38 @@ def get_top_model_by_doc_id( f"[get_top_model_by_doc_id] Fetching top model for document_id={document_id}, " f"project_id={current_user.project_id}" ) + top_model = fetch_top_model_by_doc_id(session, document_id, current_user.project_id) + storage = get_cloud_storage(session=session, project_id=current_user.project_id) + + top_model = attach_prediction_file_url(top_model, storage) + return APIResponse.success_response(top_model) -@router.get("/{document_id}", response_model=APIResponse[list[ModelEvaluationPublic]]) -def get_evals_by_doc_id( - document_id: UUID, session: SessionDep, current_user: CurrentUserOrgProject +@router.get( + "/{document_id}", + response_model=APIResponse[list[ModelEvaluationPublic]], + response_model_exclude_none=True, +) +def get_evaluations_by_doc_id( + document_id: UUID, + session: SessionDep, + current_user: CurrentUserOrgProject, ): """ Return all model evaluations for the given document_id within the current project. """ logger.info( - f"[get_evals_by_doc_id]Fetching evaluations for document_id: {document_id}, project_id: {current_user.project_id}" + f"[get_evaluations_by_doc_id] Fetching evaluations for document_id={document_id}, " + f"project_id={current_user.project_id}" ) + evaluations = fetch_eval_by_doc_id(session, document_id, current_user.project_id) - return APIResponse.success_response(evaluations) + storage = get_cloud_storage(session=session, project_id=current_user.project_id) + + updated_evaluations = [ + attach_prediction_file_url(ev, storage) for ev in evaluations + ] + + return APIResponse.success_response(updated_evaluations) diff --git a/backend/app/core/cloud/storage.py b/backend/app/core/cloud/storage.py index a3248a01..95c7f0dd 100644 --- a/backend/app/core/cloud/storage.py +++ b/backend/app/core/cloud/storage.py @@ -269,4 +269,11 @@ def get_cloud_storage(session: Session, project_id: int) -> CloudStorage: storage_path = project.storage_path - return AmazonCloudStorage(project_id=project_id, storage_path=storage_path) + try: + return AmazonCloudStorage(project_id=project_id, storage_path=storage_path) + except Exception as err: + logger.error( + f"[get_cloud_storage] Failed to initialize storage for project_id={project_id}: {err}", + exc_info=True, + ) + raise diff --git a/backend/app/core/finetune/evaluation.py b/backend/app/core/finetune/evaluation.py index 1f34fddb..527087eb 100644 --- a/backend/app/core/finetune/evaluation.py +++ b/backend/app/core/finetune/evaluation.py @@ -25,14 +25,14 @@ class ModelEvaluator: def __init__( self, - model_name: str, - test_data_s3_url: str, + fine_tuned_model: str, + test_data_s3_object: str, storage: AmazonCloudStorage, system_prompt: str, client: OpenAI, ): - self.model_name = model_name - self.test_data_s3_url = test_data_s3_url + self.fine_tuned_model = fine_tuned_model + self.test_data_s3_object = test_data_s3_object self.storage = storage self.system_instruction = system_prompt self.client = client @@ -41,7 +41,7 @@ def __init__( self.y_true: list[str] = [] self.prompts: list[str] = [] - logger.info(f"ModelEvaluator initialized with model: {model_name}") + logger.info(f"ModelEvaluator initialized with model: {fine_tuned_model}") def load_labels_and_prompts(self) -> None: """ @@ -51,9 +51,9 @@ def load_labels_and_prompts(self) -> None: - 'label' """ logger.info( - f"[ModelEvaluator.load_labels_and_prompts] Loading CSV from: {self.test_data_s3_url}" + f"[ModelEvaluator.load_labels_and_prompts] Loading CSV from: {self.test_data_s3_object}" ) - file_obj = self.storage.stream(self.test_data_s3_url) + file_obj = self.storage.stream(self.test_data_s3_object) try: df = pd.read_csv(file_obj) df.columns = [c.strip().lower() for c in df.columns] @@ -133,7 +133,7 @@ def generate_predictions(self) -> tuple[list[str], str]: try: response = self.client.responses.create( - model=self.model_name, + model=self.fine_tuned_model, instructions=self.system_instruction, input=prompt, ) @@ -181,29 +181,29 @@ def generate_predictions(self) -> tuple[list[str], str]: ) unique_id = uuid.uuid4().hex - filename = f"predictions_{self.model_name}_{unique_id}.csv" - prediction_data_s3_url = DataPreprocessor.upload_csv_to_s3( + filename = f"predictions_{self.fine_tuned_model}_{unique_id}.csv" + prediction_data_s3_object = DataPreprocessor.upload_csv_to_s3( self.storage, prediction_data, filename ) - self.prediction_data_s3_url = prediction_data_s3_url + self.prediction_data_s3_object = prediction_data_s3_object logger.info( - f"[generate_predictions] Predictions CSV uploaded to S3 | url={prediction_data_s3_url}" + f"[generate_predictions] Predictions CSV uploaded to S3 | url={prediction_data_s3_object}" ) - return predictions, prediction_data_s3_url + return predictions, prediction_data_s3_object def evaluate(self) -> dict: """Evaluate using the predictions CSV previously uploaded to S3.""" - if not getattr(self, "prediction_data_s3_url", None): + if not getattr(self, "prediction_data_s3_object", None): raise RuntimeError( - "[evaluate] predictions_s3_url not set. Call generate_predictions() first." + "[evaluate] predictions_s3_object not set. Call generate_predictions() first." ) logger.info( - f"[evaluate] Streaming predictions CSV from: {self.prediction_data_s3_url}" + f"[evaluate] Streaming predictions CSV from: {self.prediction_data_s3_object}" ) - prediction_obj = self.storage.stream(self.prediction_data_s3_url) + prediction_obj = self.storage.stream(self.prediction_data_s3_object) try: df = pd.read_csv(prediction_obj) finally: @@ -229,12 +229,12 @@ def run(self) -> dict: """Run the full evaluation process: load data, generate predictions, evaluate results.""" try: self.load_labels_and_prompts() - predictions, prediction_data_s3_url = self.generate_predictions() + predictions, prediction_data_s3_object = self.generate_predictions() evaluation_results = self.evaluate() logger.info("[evaluate] Model evaluation completed successfully.") return { "evaluation_score": evaluation_results, - "prediction_data_s3_url": prediction_data_s3_url, + "prediction_data_s3_object": prediction_data_s3_object, } except Exception as e: logger.error(f"[evaluate] Error in running ModelEvaluator: {str(e)}") diff --git a/backend/app/core/finetune/preprocessing.py b/backend/app/core/finetune/preprocessing.py index 28fe9b23..52d652ee 100644 --- a/backend/app/core/finetune/preprocessing.py +++ b/backend/app/core/finetune/preprocessing.py @@ -47,9 +47,9 @@ def upload_csv_to_s3(storage, df, filename: str) -> str: ) try: - dest = storage.put(upload, basename=Path("datasets") / filename) + dest = storage.put(upload, file_path=Path("datasets") / filename) logger.info( - f"[upload_csv_to_s3] Upload successful | filename='{filename}', s3_url='{dest}'" + f"[upload_csv_to_s3] Upload successful | filename='{filename}', s3_object='{dest}'" ) return str(dest) except Exception as err: @@ -143,13 +143,15 @@ def process(self): test_csv_name = f"test_split_{test_percentage}_{unique_id}.csv" train_jsonl_name = f"train_data_{train_percentage}_{unique_id}.jsonl" - train_csv_url = self.upload_csv_to_s3(self.storage, train_data, train_csv_name) - test_csv_url = self.upload_csv_to_s3(self.storage, test_data, test_csv_name) + train_csv_object = self.upload_csv_to_s3( + self.storage, train_data, train_csv_name + ) + test_csv_object = self.upload_csv_to_s3(self.storage, test_data, test_csv_name) train_jsonl_path = self._save_to_jsonl(train_jsonl, train_jsonl_name) return { - "train_csv_s3_url": train_csv_url, - "test_csv_s3_url": test_csv_url, + "train_csv_s3_object": train_csv_object, + "test_csv_s3_object": test_csv_object, "train_jsonl_temp_filepath": train_jsonl_path, } diff --git a/backend/app/crud/fine_tuning.py b/backend/app/crud/fine_tuning.py index 1890665b..2b80e968 100644 --- a/backend/app/crud/fine_tuning.py +++ b/backend/app/crud/fine_tuning.py @@ -12,6 +12,7 @@ FineTuningUpdate, FineTuningStatus, ) +from app.crud import DocumentCrud logger = logging.getLogger(__name__) @@ -38,6 +39,11 @@ def create_fine_tuning_job( ) return existing, False + document_crud = DocumentCrud( + session, project_id + ) # to check if the given document is present in the document table or not + document = document_crud.read_one(request.document_id) + fine_tune_data = request.model_dump(exclude_unset=True) base_data = { **fine_tune_data, diff --git a/backend/app/crud/model_evaluation.py b/backend/app/crud/model_evaluation.py index 7a1e2b3b..d93bcce5 100644 --- a/backend/app/crud/model_evaluation.py +++ b/backend/app/crud/model_evaluation.py @@ -26,7 +26,7 @@ def create_model_evaluation( ) -> ModelEvaluation: fine_tuning_job = fetch_by_id(session, request.fine_tuning_id, project_id) - if fine_tuning_job.fine_tuned_model and fine_tuning_job.test_data_s3_url is None: + if fine_tuning_job.fine_tuned_model and fine_tuning_job.test_data_s3_object is None: logger.error( f"[create_model_evaluation] No fine tuned model or test data found for the given fine tuning ID | fine_tuning_id={request.fine_tuning_id}, project_id={project_id}" ) @@ -37,9 +37,9 @@ def create_model_evaluation( "system_prompt": fine_tuning_job.system_prompt, "base_model": fine_tuning_job.base_model, "split_ratio": fine_tuning_job.split_ratio, - "model_name": fine_tuning_job.fine_tuned_model, + "fine_tuned_model": fine_tuning_job.fine_tuned_model, "document_id": fine_tuning_job.document_id, - "test_data_s3_url": fine_tuning_job.test_data_s3_url, + "test_data_s3_object": fine_tuning_job.test_data_s3_object, "project_id": project_id, "organization_id": organization_id, "status": status, @@ -128,7 +128,7 @@ def fetch_top_model_by_doc_id( for model_eval in model_evals: if model_eval.score is not None: - mcc = model_eval.score.get("mcc", None) + mcc = model_eval.score.get("mcc_score", None) if mcc is not None and mcc > highest_mcc: highest_mcc = mcc top_model = model_eval diff --git a/backend/app/models/fine_tuning.py b/backend/app/models/fine_tuning.py index 17bc1ccd..a3b0e866 100644 --- a/backend/app/models/fine_tuning.py +++ b/backend/app/models/fine_tuning.py @@ -64,11 +64,11 @@ class Fine_Tuning(FineTuningJobBase, table=True): fine_tuned_model: str | None = Field( default=None, description="Final fine tuned model name from OpenAI" ) - train_data_s3_url: str | None = Field( - default=None, description="S3 url of the training data stored ins S3" + train_data_s3_object: str | None = Field( + default=None, description="S3 URI of the training data stored ins S3" ) - test_data_s3_url: str | None = Field( - default=None, description="S3 url of the testing data stored ins S3" + test_data_s3_object: str | None = Field( + default=None, description="S3 URI of the testing data stored ins S3" ) error_message: str | None = Field( default=None, description="error message for when something failed" @@ -91,8 +91,8 @@ class Fine_Tuning(FineTuningJobBase, table=True): class FineTuningUpdate(SQLModel): training_file_id: Optional[str] = None - train_data_s3_url: Optional[str] = None - test_data_s3_url: Optional[str] = None + train_data_s3_object: Optional[str] = None + test_data_s3_object: Optional[str] = None split_ratio: Optional[float] = None provider_job_id: Optional[str] = None fine_tuned_model: Optional[str] = None @@ -108,13 +108,12 @@ class FineTuningJobPublic(SQLModel): base_model: str document_id: UUID provider_job_id: str | None = None + train_data_file_url: str | None = None + test_data_file_url: str | None = None status: str error_message: str | None = None fine_tuned_model: str | None = None training_file_id: str | None = None - train_data_s3_url: str | None = None - test_data_s3_url: str | None = None inserted_at: datetime updated_at: datetime - deleted_at: datetime | None = None diff --git a/backend/app/models/model_evaluation.py b/backend/app/models/model_evaluation.py index da24cdcd..900b57b6 100644 --- a/backend/app/models/model_evaluation.py +++ b/backend/app/models/model_evaluation.py @@ -46,8 +46,10 @@ class ModelEvaluation(ModelEvaluationBase, table=True): foreign_key="document.id", nullable=False, ) - model_name: str = Field(description="fine tuned model name from OpenAI") - test_data_s3_url: str = Field(description="S3 url of the testing data stored in S3") + fine_tuned_model: str = Field(description="fine tuned model name from OpenAI") + test_data_s3_object: str = Field( + description="S3 URI of the testing data stored in S3" + ) base_model: str = Field(nullable=False, description="Base model for fine-tuning") split_ratio: float = Field( nullable=False, description="the ratio the dataset was divided in" @@ -57,7 +59,7 @@ class ModelEvaluation(ModelEvaluationBase, table=True): sa_column=Column(JSON, nullable=True), description="Evaluation scores per metric (e.g., {'mcc': 0.85})", ) - prediction_data_s3_url: str | None = Field( + prediction_data_s3_object: str | None = Field( default=None, description="S3 URL where the prediction data generated by the fine-tuned model is stored", ) @@ -87,7 +89,7 @@ class ModelEvaluationUpdate(SQLModel): score: Optional[dict[str, float]] = None status: Optional[ModelEvaluationStatus] = None error_message: Optional[str] = None - prediction_data_s3_url: Optional[str] = None + prediction_data_s3_object: Optional[str] = None class ModelEvaluationPublic(ModelEvaluationBase): @@ -95,13 +97,12 @@ class ModelEvaluationPublic(ModelEvaluationBase): id: int document_id: UUID - model_name: str + fine_tuned_model: str split_ratio: float base_model: str - prediction_data_s3_url: str | None + prediction_data_file_url: str | None = None score: dict[str, float] | None = None status: ModelEvaluationStatus inserted_at: datetime updated_at: datetime - deleted_at: datetime | None = None diff --git a/backend/app/tests/api/routes/test_fine_tuning.py b/backend/app/tests/api/routes/test_fine_tuning.py index 477aea54..5582b73f 100644 --- a/backend/app/tests/api/routes/test_fine_tuning.py +++ b/backend/app/tests/api/routes/test_fine_tuning.py @@ -1,8 +1,10 @@ import pytest + from unittest.mock import patch, MagicMock from app.tests.utils.test_data import create_test_fine_tuning_jobs from app.tests.utils.utils import get_document +from app.models import Fine_Tuning def create_file_mock(file_type): @@ -32,20 +34,20 @@ def test_finetune_from_csv_multiple_split_ratio( db, user_api_key_header, ): - document = get_document(db) + document = get_document(db, "dalgo_sample.json") for path in ["/tmp/train.jsonl", "/tmp/test.jsonl"]: with open(path, "w") as f: f.write("{}") - mock_preprocessor_cls.return_value = MagicMock( - process=MagicMock( - return_value={ - "train_file": "/tmp/train.jsonl", - "test_file": "/tmp/test.jsonl", - } - ) - ) + mock_preprocessor = MagicMock() + mock_preprocessor.process.return_value = { + "train_jsonl_temp_filepath": "/tmp/train.jsonl", + "train_csv_s3_object": "s3://bucket/train.csv", + "test_csv_s3_object": "s3://bucket/test.csv", + } + mock_preprocessor.cleanup = MagicMock() + mock_preprocessor_cls.return_value = mock_preprocessor mock_openai = MagicMock() mock_openai.files.create.side_effect = create_file_mock("fine-tune") @@ -61,16 +63,34 @@ def test_finetune_from_csv_multiple_split_ratio( "system_prompt": "you are a model able to classify", } - response = client.post( - "/api/v1/fine_tuning/fine_tune", json=body, headers=user_api_key_header - ) - assert response.status_code == 200 + with patch("app.api.routes.fine_tuning.Session") as SessionMock: + SessionMock.return_value.__enter__.return_value = db + SessionMock.return_value.__exit__.return_value = None + response = client.post( + "/api/v1/fine_tuning/fine_tune", + json=body, + headers=user_api_key_header, + ) + + assert response.status_code == 200 json_data = response.json() assert json_data["success"] is True assert json_data["data"]["message"] == "Fine-tuning job(s) started." assert json_data["metadata"] is None + jobs = db.query(Fine_Tuning).all() + assert len(jobs) == 3 + + for i, job in enumerate(jobs, start=1): + db.refresh(job) + assert job.status == "running" + assert job.provider_job_id == f"ft_mock_job_{i}" + assert job.training_file_id is not None + assert job.train_data_s3_object == "s3://bucket/train.csv" + assert job.test_data_s3_object == "s3://bucket/test.csv" + assert job.split_ratio in [0.5, 0.7, 0.9] + @pytest.mark.usefixtures("client", "db", "user_api_key_header") @patch("app.api.routes.fine_tuning.get_openai_client") @@ -140,7 +160,7 @@ def test_retrieve_fine_tuning_job_failed( class TestFetchJob: def test_fetch_jobs_document(self, client, db, user_api_key_header): jobs, _ = create_test_fine_tuning_jobs(db, [0.3, 0.4]) - document = get_document(db) + document = get_document(db, "dalgo_sample.json") response = client.get( f"/api/v1/fine_tuning/{document.id}", headers=user_api_key_header diff --git a/backend/app/tests/api/routes/test_model_evaluation.py b/backend/app/tests/api/routes/test_model_evaluation.py index 9b969e47..605775f6 100644 --- a/backend/app/tests/api/routes/test_model_evaluation.py +++ b/backend/app/tests/api/routes/test_model_evaluation.py @@ -48,7 +48,7 @@ def test_top_model_by_doc(client, db, user_api_key_header): model_eval = model_evals[0] model_eval.score = { - "mcc": 0.85, + "mcc_score": 0.85, } db.flush() @@ -61,9 +61,9 @@ def test_top_model_by_doc(client, db, user_api_key_header): json_data = response.json() assert json_data["data"]["score"] == { - "mcc": 0.85, + "mcc_score": 0.85, } - assert json_data["data"]["model_name"] == model_eval.model_name + assert json_data["data"]["fine_tuned_model"] == model_eval.fine_tuned_model assert json_data["data"]["document_id"] == str(model_eval.document_id) assert json_data["data"]["id"] == model_eval.id diff --git a/backend/app/tests/crud/test_fine_tuning.py b/backend/app/tests/crud/test_fine_tuning.py index 387f6d55..d71a000b 100644 --- a/backend/app/tests/crud/test_fine_tuning.py +++ b/backend/app/tests/crud/test_fine_tuning.py @@ -17,7 +17,7 @@ def test_create_fine_tuning_job(db: Session): project = get_project(db, "Dalgo") - document = get_document(db) + document = get_document(db, "dalgo_sample.json") job_request = FineTuningJobCreate( document_id=document.id, @@ -100,7 +100,7 @@ def test_update_finetune_job(db: Session): def test_fetch_active_jobs_by_document_id(db: Session): project = get_project(db, "Dalgo") - document = get_document(db) + document = get_document(db, "dalgo_sample.json") job_request = FineTuningJobCreate( document_id=document.id, diff --git a/backend/app/tests/crud/test_model_evaluation.py b/backend/app/tests/crud/test_model_evaluation.py index 0eaa72f7..e0c6dba2 100644 --- a/backend/app/tests/crud/test_model_evaluation.py +++ b/backend/app/tests/crud/test_model_evaluation.py @@ -35,9 +35,9 @@ def test_create_model_evaluation(db: Session): fine_tuning_id=fine_tune.id, system_prompt=fine_tune.system_prompt, base_model=fine_tune.base_model, - model_name=fine_tune.fine_tuned_model, + fine_tuned_model=fine_tune.fine_tuned_model, document_id=fine_tune.document_id, - test_data_s3_url=fine_tune.test_data_s3_url, + test_data_s3_object=fine_tune.test_data_s3_object, status="pending", ) @@ -51,8 +51,8 @@ def test_create_model_evaluation(db: Session): assert created_eval.id is not None assert created_eval.status == "pending" assert created_eval.document_id == fine_tune.document_id - assert created_eval.model_name == fine_tune.fine_tuned_model - assert created_eval.test_data_s3_url == fine_tune.test_data_s3_url + assert created_eval.fine_tuned_model == fine_tune.fine_tuned_model + assert created_eval.test_data_s3_object == fine_tune.test_data_s3_object def test_fetch_by_eval_id_success(db: Session): @@ -92,7 +92,7 @@ def test_fetch_eval_by_doc_id_not_found(db: Session): def test_fetch_top_model_by_doc_id_success(db: Session): model_evals = create_test_model_evaluation(db) model_eval = model_evals[0] - model_eval.score = {"mcc": 0.8} + model_eval.score = {"mcc_score": 0.8} db.flush() doc_id = model_eval.document_id diff --git a/backend/app/tests/utils/test_data.py b/backend/app/tests/utils/test_data.py index edce345e..616904f2 100644 --- a/backend/app/tests/utils/test_data.py +++ b/backend/app/tests/utils/test_data.py @@ -135,7 +135,7 @@ def create_test_fine_tuning_jobs( ratios: list[float], ) -> tuple[list[Fine_Tuning], bool]: project = get_project(db, "Dalgo") - document = get_document(db) + document = get_document(db, "dalgo_sample.json") jobs = [] any_created = False @@ -168,7 +168,7 @@ def create_test_finetuning_job_with_extra_fields( if jobs: for job in jobs: - job.test_data_s3_url = "test_data_s3_url_example" + job.test_data_s3_object = "test_data_s3_object_example" job.fine_tuned_model = "fine_tuned_model_name" return jobs, True @@ -184,9 +184,9 @@ def create_test_model_evaluation(db) -> list[ModelEvaluation]: fine_tuning_id=fine_tune.id, system_prompt=fine_tune.system_prompt, base_model=fine_tune.base_model, - model_name=fine_tune.fine_tuned_model, + fine_tuned_model=fine_tune.fine_tuned_model, document_id=fine_tune.document_id, - test_data_s3_url=fine_tune.test_data_s3_url, + test_data_s3_object=fine_tune.test_data_s3_object, ) model_eval = create_model_evaluation(