From 6fb4e5fe9a0789e8bafc513b34660694492315e3 Mon Sep 17 00:00:00 2001 From: nishika26 Date: Sun, 24 Aug 2025 22:51:06 +0530 Subject: [PATCH 1/5] alembic file for adding and removing columns --- ...72896bcc94da_add_data_to_s3_url_columns.py | 64 +++++++++++++++++++ 1 file changed, 64 insertions(+) create mode 100644 backend/app/alembic/versions/72896bcc94da_add_data_to_s3_url_columns.py diff --git a/backend/app/alembic/versions/72896bcc94da_add_data_to_s3_url_columns.py b/backend/app/alembic/versions/72896bcc94da_add_data_to_s3_url_columns.py new file mode 100644 index 00000000..6b842d36 --- /dev/null +++ b/backend/app/alembic/versions/72896bcc94da_add_data_to_s3_url_columns.py @@ -0,0 +1,64 @@ +"""add data to s3 url columns + +Revision ID: 72896bcc94da +Revises: e317d05f49e4 +Create Date: 2025-08-22 00:44:45.426211 + +""" +from alembic import op +import sqlalchemy as sa +import sqlmodel.sql.sqltypes + + +# revision identifiers, used by Alembic. +revision = "72896bcc94da" +down_revision = "e317d05f49e4" +branch_labels = None +depends_on = None + + +def upgrade(): + op.drop_column("fine_tuning", "testing_file_id") + op.add_column( + "fine_tuning", + sa.Column("train_data_url", sqlmodel.sql.sqltypes.AutoString(), nullable=True), + ) + op.add_column( + "fine_tuning", + sa.Column("test_data_url", sqlmodel.sql.sqltypes.AutoString(), nullable=True), + ) + op.add_column( + "model_evaluation", + sa.Column( + "test_data_url", + sqlmodel.sql.sqltypes.AutoString(), + nullable=False, + server_default="", + ), + ) + op.add_column( + "model_evaluation", + sa.Column( + "prediction_data_url", sqlmodel.sql.sqltypes.AutoString(), nullable=True + ), + ) + op.alter_column( + "model_evaluation", "testing_file_id", existing_type=sa.VARCHAR(), nullable=True + ) + + +def downgrade(): + op.drop_column("fine_tuning", "test_data_url") + op.drop_column("fine_tuning", "train_data_url") + op.add_column( + "fine_tuning", + sa.Column("testing_file_id", sa.VARCHAR(), autoincrement=False, nullable=True), + ) + op.drop_column("model_evaluation", "prediction_data_url") + op.drop_column("model_evaluation", "test_data_url") + op.alter_column( + "model_evaluation", + "testing_file_id", + existing_type=sa.VARCHAR(), + nullable=False, + ) From 6f6fadb61684dd763df6f55f3ec3f5fdd9734d95 Mon Sep 17 00:00:00 2001 From: nishika26 Date: Sun, 24 Aug 2025 22:54:30 +0530 Subject: [PATCH 2/5] train and test s3 url column --- backend/app/api/routes/fine_tuning.py | 16 +++--- backend/app/core/finetune/preprocessing.py | 63 +++++++++++++++++----- 2 files changed, 56 insertions(+), 23 deletions(-) diff --git a/backend/app/api/routes/fine_tuning.py b/backend/app/api/routes/fine_tuning.py index 5a876736..7027a072 100644 --- a/backend/app/api/routes/fine_tuning.py +++ b/backend/app/api/routes/fine_tuning.py @@ -74,21 +74,17 @@ def process_fine_tuning_job( document, storage, ratio, request.system_prompt ) result = preprocessor.process() - train_path = result["train_file"] - test_path = result["test_file"] + train_path = result["train_jsonl_path"] + train_data_url = result["train_csv_url"] + test_data_url = result["test_csv_url"] try: with open(train_path, "rb") as train_f: uploaded_train = client.files.create( file=train_f, purpose="fine-tune" ) - with open(test_path, "rb") as test_f: - uploaded_test = client.files.create( - file=test_f, purpose="fine-tune" - ) - logger.info( - f"[process_fine_tuning_job] Files uploaded to OpenAI successfully | " + f"[process_fine_tuning_job] File uploaded to OpenAI successfully | " f"job_id={job_id}, project_id={project_id}|" ) except openai.OpenAIError as e: @@ -110,7 +106,6 @@ def process_fine_tuning_job( preprocessor.cleanup() training_file_id = uploaded_train.id - testing_file_id = uploaded_test.id try: job = client.fine_tuning.jobs.create( @@ -141,7 +136,8 @@ def process_fine_tuning_job( job=fine_tune, update=FineTuningUpdate( training_file_id=training_file_id, - testing_file_id=testing_file_id, + train_data_url=train_data_url, + test_data_url=test_data_url, split_ratio=ratio, provider_job_id=job.id, status=FineTuningStatus.running, diff --git a/backend/app/core/finetune/preprocessing.py b/backend/app/core/finetune/preprocessing.py index a936daf7..6985c7fe 100644 --- a/backend/app/core/finetune/preprocessing.py +++ b/backend/app/core/finetune/preprocessing.py @@ -3,6 +3,11 @@ import uuid import tempfile import logging +import io + +from pathlib import Path +from fastapi import UploadFile +from starlette.datastructures import Headers import pandas as pd from sklearn.model_selection import train_test_split @@ -22,6 +27,34 @@ def __init__(self, document, storage, split_ratio: float, system_message: str): self.system_message = {"role": "system", "content": system_message.strip()} + def upload_csv_to_s3(self, df, filename: str) -> str: + logger.info( + f"[upload_csv_to_s3] Preparing to upload '{filename}' to s3 | rows={len(df)}, cols={len(df.columns)}" + ) + + buf = io.BytesIO(df.to_csv(index=False).encode("utf-8")) + buf.seek(0) + + headers = Headers({"content-type": "text/csv"}) + upload = UploadFile( + filename=filename, + file=buf, + headers=headers, + ) + + try: + dest = self.storage.put(upload, basename=Path("datasets") / filename) + logger.info( + f"[upload_csv_to_s3] Upload successful | filename='{filename}', s3_url='{dest}'" + ) + return str(dest) + except Exception as err: + logger.error( + f"[upload_csv_to_s3] Upload failed | filename='{filename}', error='{err}'", + exc_info=True, + ) + raise + def _save_to_jsonl(self, data, filename): temp_dir = tempfile.gettempdir() file_path = os.path.join(temp_dir, filename) @@ -65,14 +98,14 @@ def _load_dataframe(self): if not self.query_col or not self.label_col: logger.error( - f"[DataPreprocessor] Dataset does not contai a 'label' column and one of: {possible_query_columns} " + f"[DataPreprocessor] Dataset does not contain a 'label' column and one of: {possible_query_columns}" ) raise ValueError( f"CSV must contain a 'label' column and one of: {possible_query_columns}" ) logger.info( - f"[DataPreprocessor]Identified columns - query_col={self.query_col}, label_col={self.label_col}" + f"[DataPreprocessor] Identified columns - query_col={self.query_col}, label_col={self.label_col}" ) return df finally: @@ -90,21 +123,25 @@ def process(self): ) logger.info( - f"[DataPreprocessor]Data split complete: train_size={len(train_data)}, test_size={len(test_data)}" + f"[DataPreprocessor] Data split complete: train_size={len(train_data)}, test_size={len(test_data)}" ) train_dict = train_data.to_dict(orient="records") - test_dict = test_data.to_dict(orient="records") - train_jsonl = self._modify_data_format(train_dict) - test_jsonl = self._modify_data_format(test_dict) - train_file = ( - f"train_data_{int(self.split_ratio * 100)}_{uuid.uuid4().hex}.jsonl" - ) - test_file = f"test_data_{int(self.split_ratio * 100)}_{uuid.uuid4().hex}.jsonl" + uuids = uuid.uuid4().hex + pct = int(self.split_ratio * 100) # percentage of data + train_csv_name = f"train_split_{pct}_{uuids}.csv" + test_csv_name = f"test_split_{pct}_{uuids}.csv" + train_jsonl_name = f"train_data_{pct}_{uuids}.jsonl" + + train_csv_url = self.upload_csv_to_s3(train_data, train_csv_name) + test_csv_url = self.upload_csv_to_s3(test_data, test_csv_name) - train_path = self._save_to_jsonl(train_jsonl, train_file) - test_path = self._save_to_jsonl(test_jsonl, test_file) + train_jsonl_path = self._save_to_jsonl(train_jsonl, train_jsonl_name) - return {"train_file": train_path, "test_file": test_path} + return { + "train_csv_url": train_csv_url, + "test_csv_url": test_csv_url, + "train_jsonl_path": train_jsonl_path, + } From c7a0f2fa7faf8ad0dbdc14f3bb4b7f8556fcf7a9 Mon Sep 17 00:00:00 2001 From: nishika26 Date: Mon, 25 Aug 2025 14:23:11 +0530 Subject: [PATCH 3/5] only keeping fine tuning table changes in alembic migration --- ...72896bcc94da_add_data_to_s3_url_columns.py | 26 ------------------- 1 file changed, 26 deletions(-) diff --git a/backend/app/alembic/versions/72896bcc94da_add_data_to_s3_url_columns.py b/backend/app/alembic/versions/72896bcc94da_add_data_to_s3_url_columns.py index 6b842d36..245a6d35 100644 --- a/backend/app/alembic/versions/72896bcc94da_add_data_to_s3_url_columns.py +++ b/backend/app/alembic/versions/72896bcc94da_add_data_to_s3_url_columns.py @@ -27,24 +27,6 @@ def upgrade(): "fine_tuning", sa.Column("test_data_url", sqlmodel.sql.sqltypes.AutoString(), nullable=True), ) - op.add_column( - "model_evaluation", - sa.Column( - "test_data_url", - sqlmodel.sql.sqltypes.AutoString(), - nullable=False, - server_default="", - ), - ) - op.add_column( - "model_evaluation", - sa.Column( - "prediction_data_url", sqlmodel.sql.sqltypes.AutoString(), nullable=True - ), - ) - op.alter_column( - "model_evaluation", "testing_file_id", existing_type=sa.VARCHAR(), nullable=True - ) def downgrade(): @@ -54,11 +36,3 @@ def downgrade(): "fine_tuning", sa.Column("testing_file_id", sa.VARCHAR(), autoincrement=False, nullable=True), ) - op.drop_column("model_evaluation", "prediction_data_url") - op.drop_column("model_evaluation", "test_data_url") - op.alter_column( - "model_evaluation", - "testing_file_id", - existing_type=sa.VARCHAR(), - nullable=False, - ) From 77ce4c4a095bd6839309989518d7478a14150f08 Mon Sep 17 00:00:00 2001 From: nishika26 Date: Thu, 28 Aug 2025 13:39:26 +0530 Subject: [PATCH 4/5] PR review fixes --- backend/app/api/routes/fine_tuning.py | 30 +++++++++++----------- backend/app/core/finetune/preprocessing.py | 20 +++++++++------ backend/app/models/fine_tuning.py | 13 +++++++--- 3 files changed, 37 insertions(+), 26 deletions(-) diff --git a/backend/app/api/routes/fine_tuning.py b/backend/app/api/routes/fine_tuning.py index 7027a072..e81d81f0 100644 --- a/backend/app/api/routes/fine_tuning.py +++ b/backend/app/api/routes/fine_tuning.py @@ -54,7 +54,6 @@ def process_fine_tuning_job( ratio: float, current_user: CurrentUserOrgProject, request: FineTuningJobCreate, - client: OpenAI, ): start_time = time.time() project_id = current_user.project_id @@ -67,6 +66,9 @@ def process_fine_tuning_job( try: fine_tune = fetch_by_id(session, job_id, project_id) + client = get_openai_client( + session, current_user.organization_id, project_id + ) storage = AmazonCloudStorage(current_user) document_crud = DocumentCrud(session=session, owner_id=current_user.id) document = document_crud.read_one(request.document_id) @@ -74,12 +76,12 @@ def process_fine_tuning_job( document, storage, ratio, request.system_prompt ) result = preprocessor.process() - train_path = result["train_jsonl_path"] - train_data_url = result["train_csv_url"] - test_data_url = result["test_csv_url"] + train_data_temp_filepath = result["train_jsonl_temp_filepath"] + train_data_s3_url = result["train_csv_s3_url"] + test_data_s3_url = result["test_csv_s3_url"] try: - with open(train_path, "rb") as train_f: + with open(train_data_temp_filepath, "rb") as train_f: uploaded_train = client.files.create( file=train_f, purpose="fine-tune" ) @@ -136,8 +138,8 @@ def process_fine_tuning_job( job=fine_tune, update=FineTuningUpdate( training_file_id=training_file_id, - train_data_url=train_data_url, - test_data_url=test_data_url, + train_data_url=train_data_s3_url, + test_data_url=test_data_s3_url, split_ratio=ratio, provider_job_id=job.id, status=FineTuningStatus.running, @@ -178,8 +180,11 @@ def fine_tune_from_CSV( request: FineTuningJobCreate, background_tasks: BackgroundTasks, ): - client = get_openai_client( - session, current_user.organization_id, current_user.project_id + client = get_openai_client( # Used here only to validate the user's OpenAI key; + # the actual client is re-initialized separately inside the background task + session, + current_user.organization_id, + current_user.project_id, ) results = [] @@ -195,12 +200,7 @@ def fine_tune_from_CSV( if created: background_tasks.add_task( - process_fine_tuning_job, - job.id, - ratio, - current_user, - request, - client, + process_fine_tuning_job, job.id, ratio, current_user, request ) if not results: diff --git a/backend/app/core/finetune/preprocessing.py b/backend/app/core/finetune/preprocessing.py index 6985c7fe..12120258 100644 --- a/backend/app/core/finetune/preprocessing.py +++ b/backend/app/core/finetune/preprocessing.py @@ -129,11 +129,15 @@ def process(self): train_dict = train_data.to_dict(orient="records") train_jsonl = self._modify_data_format(train_dict) - uuids = uuid.uuid4().hex - pct = int(self.split_ratio * 100) # percentage of data - train_csv_name = f"train_split_{pct}_{uuids}.csv" - test_csv_name = f"test_split_{pct}_{uuids}.csv" - train_jsonl_name = f"train_data_{pct}_{uuids}.jsonl" + unique_id = uuid.uuid4().hex + train_percentage = int(self.split_ratio * 100) # train % + test_percentage = ( + 100 - train_percentage + ) # remaining % for test (since we used 1 - ratio earlier for test size) + + train_csv_name = f"train_split_{train_percentage}_{unique_id}.csv" + test_csv_name = f"test_split_{test_percentage}_{unique_id}.csv" + train_jsonl_name = f"train_data_{train_percentage}_{unique_id}.jsonl" train_csv_url = self.upload_csv_to_s3(train_data, train_csv_name) test_csv_url = self.upload_csv_to_s3(test_data, test_csv_name) @@ -141,7 +145,7 @@ def process(self): train_jsonl_path = self._save_to_jsonl(train_jsonl, train_jsonl_name) return { - "train_csv_url": train_csv_url, - "test_csv_url": test_csv_url, - "train_jsonl_path": train_jsonl_path, + "train_csv_s3_url": train_csv_url, + "test_csv_s3_url": test_csv_url, + "train_jsonl_temp_filepath": train_jsonl_path, } diff --git a/backend/app/models/fine_tuning.py b/backend/app/models/fine_tuning.py index 7e2a0f2a..1535e595 100644 --- a/backend/app/models/fine_tuning.py +++ b/backend/app/models/fine_tuning.py @@ -22,7 +22,6 @@ class FineTuningJobBase(SQLModel): split_ratio: float = Field(nullable=False) document_id: UUID = Field(foreign_key="document.id", nullable=False) training_file_id: Optional[str] = Field(default=None) - testing_file_id: Optional[str] = Field(default=None) system_prompt: str = Field(sa_column=Column(Text, nullable=False)) @@ -65,6 +64,12 @@ class Fine_Tuning(FineTuningJobBase, table=True): fine_tuned_model: str | None = Field( default=None, description="Final fine tuned model name from OpenAI" ) + train_data_url: str | None = Field( + default=None, description="S3 url of the training data stored ins S3" + ) + test_data_url: str | None = Field( + default=None, description="S3 url of the testing data stored ins S3" + ) error_message: str | None = Field( default=None, description="error message for when something failed" ) @@ -86,7 +91,8 @@ class Fine_Tuning(FineTuningJobBase, table=True): class FineTuningUpdate(SQLModel): training_file_id: Optional[str] = None - testing_file_id: Optional[str] = None + train_data_url: Optional[str] = None + test_data_url: Optional[str] = None split_ratio: Optional[float] = None provider_job_id: Optional[str] = None fine_tuned_model: Optional[str] = None @@ -106,7 +112,8 @@ class FineTuningJobPublic(SQLModel): error_message: str | None = None fine_tuned_model: str | None = None training_file_id: str | None = None - testing_file_id: str | None = None + train_data_url: str | None = None + test_data_url: str | None = None inserted_at: datetime updated_at: datetime From 6278d343bcfe02af2c25221fc3fef255ff348f26 Mon Sep 17 00:00:00 2001 From: nishika26 Date: Thu, 28 Aug 2025 13:43:01 +0530 Subject: [PATCH 5/5] making column name descriptive --- .../72896bcc94da_add_data_to_s3_url_columns.py | 12 ++++++++---- backend/app/api/routes/fine_tuning.py | 4 ++-- backend/app/models/fine_tuning.py | 12 ++++++------ 3 files changed, 16 insertions(+), 12 deletions(-) diff --git a/backend/app/alembic/versions/72896bcc94da_add_data_to_s3_url_columns.py b/backend/app/alembic/versions/72896bcc94da_add_data_to_s3_url_columns.py index 245a6d35..ff432f31 100644 --- a/backend/app/alembic/versions/72896bcc94da_add_data_to_s3_url_columns.py +++ b/backend/app/alembic/versions/72896bcc94da_add_data_to_s3_url_columns.py @@ -21,17 +21,21 @@ def upgrade(): op.drop_column("fine_tuning", "testing_file_id") op.add_column( "fine_tuning", - sa.Column("train_data_url", sqlmodel.sql.sqltypes.AutoString(), nullable=True), + sa.Column( + "train_data_s3_url", sqlmodel.sql.sqltypes.AutoString(), nullable=True + ), ) op.add_column( "fine_tuning", - sa.Column("test_data_url", sqlmodel.sql.sqltypes.AutoString(), nullable=True), + sa.Column( + "test_data_s3_url", sqlmodel.sql.sqltypes.AutoString(), nullable=True + ), ) def downgrade(): - op.drop_column("fine_tuning", "test_data_url") - op.drop_column("fine_tuning", "train_data_url") + op.drop_column("fine_tuning", "test_data_s3_url") + op.drop_column("fine_tuning", "train_data_s3_url") op.add_column( "fine_tuning", sa.Column("testing_file_id", sa.VARCHAR(), autoincrement=False, nullable=True), diff --git a/backend/app/api/routes/fine_tuning.py b/backend/app/api/routes/fine_tuning.py index e81d81f0..c5e4ac44 100644 --- a/backend/app/api/routes/fine_tuning.py +++ b/backend/app/api/routes/fine_tuning.py @@ -138,8 +138,8 @@ def process_fine_tuning_job( job=fine_tune, update=FineTuningUpdate( training_file_id=training_file_id, - train_data_url=train_data_s3_url, - test_data_url=test_data_s3_url, + train_data_s3_url=train_data_s3_url, + test_data_s3_url=test_data_s3_url, split_ratio=ratio, provider_job_id=job.id, status=FineTuningStatus.running, diff --git a/backend/app/models/fine_tuning.py b/backend/app/models/fine_tuning.py index 1535e595..17bc1ccd 100644 --- a/backend/app/models/fine_tuning.py +++ b/backend/app/models/fine_tuning.py @@ -64,10 +64,10 @@ class Fine_Tuning(FineTuningJobBase, table=True): fine_tuned_model: str | None = Field( default=None, description="Final fine tuned model name from OpenAI" ) - train_data_url: str | None = Field( + train_data_s3_url: str | None = Field( default=None, description="S3 url of the training data stored ins S3" ) - test_data_url: str | None = Field( + test_data_s3_url: str | None = Field( default=None, description="S3 url of the testing data stored ins S3" ) error_message: str | None = Field( @@ -91,8 +91,8 @@ class Fine_Tuning(FineTuningJobBase, table=True): class FineTuningUpdate(SQLModel): training_file_id: Optional[str] = None - train_data_url: Optional[str] = None - test_data_url: Optional[str] = None + train_data_s3_url: Optional[str] = None + test_data_s3_url: Optional[str] = None split_ratio: Optional[float] = None provider_job_id: Optional[str] = None fine_tuned_model: Optional[str] = None @@ -112,8 +112,8 @@ class FineTuningJobPublic(SQLModel): error_message: str | None = None fine_tuned_model: str | None = None training_file_id: str | None = None - train_data_url: str | None = None - test_data_url: str | None = None + train_data_s3_url: str | None = None + test_data_s3_url: str | None = None inserted_at: datetime updated_at: datetime