diff --git a/backend/app/alembic/versions/72896bcc94da_add_data_to_s3_url_columns.py b/backend/app/alembic/versions/72896bcc94da_add_data_to_s3_url_columns.py new file mode 100644 index 00000000..ff432f31 --- /dev/null +++ b/backend/app/alembic/versions/72896bcc94da_add_data_to_s3_url_columns.py @@ -0,0 +1,42 @@ +"""add data to s3 url columns + +Revision ID: 72896bcc94da +Revises: e317d05f49e4 +Create Date: 2025-08-22 00:44:45.426211 + +""" +from alembic import op +import sqlalchemy as sa +import sqlmodel.sql.sqltypes + + +# revision identifiers, used by Alembic. +revision = "72896bcc94da" +down_revision = "e317d05f49e4" +branch_labels = None +depends_on = None + + +def upgrade(): + op.drop_column("fine_tuning", "testing_file_id") + op.add_column( + "fine_tuning", + sa.Column( + "train_data_s3_url", sqlmodel.sql.sqltypes.AutoString(), nullable=True + ), + ) + op.add_column( + "fine_tuning", + sa.Column( + "test_data_s3_url", sqlmodel.sql.sqltypes.AutoString(), nullable=True + ), + ) + + +def downgrade(): + op.drop_column("fine_tuning", "test_data_s3_url") + op.drop_column("fine_tuning", "train_data_s3_url") + op.add_column( + "fine_tuning", + sa.Column("testing_file_id", sa.VARCHAR(), autoincrement=False, nullable=True), + ) diff --git a/backend/app/api/routes/fine_tuning.py b/backend/app/api/routes/fine_tuning.py index 5a876736..c5e4ac44 100644 --- a/backend/app/api/routes/fine_tuning.py +++ b/backend/app/api/routes/fine_tuning.py @@ -54,7 +54,6 @@ def process_fine_tuning_job( ratio: float, current_user: CurrentUserOrgProject, request: FineTuningJobCreate, - client: OpenAI, ): start_time = time.time() project_id = current_user.project_id @@ -67,6 +66,9 @@ def process_fine_tuning_job( try: fine_tune = fetch_by_id(session, job_id, project_id) + client = get_openai_client( + session, current_user.organization_id, project_id + ) storage = AmazonCloudStorage(current_user) document_crud = DocumentCrud(session=session, owner_id=current_user.id) document = document_crud.read_one(request.document_id) @@ -74,21 +76,17 @@ def process_fine_tuning_job( document, storage, ratio, request.system_prompt ) result = preprocessor.process() - train_path = result["train_file"] - test_path = result["test_file"] + train_data_temp_filepath = result["train_jsonl_temp_filepath"] + train_data_s3_url = result["train_csv_s3_url"] + test_data_s3_url = result["test_csv_s3_url"] try: - with open(train_path, "rb") as train_f: + with open(train_data_temp_filepath, "rb") as train_f: uploaded_train = client.files.create( file=train_f, purpose="fine-tune" ) - with open(test_path, "rb") as test_f: - uploaded_test = client.files.create( - file=test_f, purpose="fine-tune" - ) - logger.info( - f"[process_fine_tuning_job] Files uploaded to OpenAI successfully | " + f"[process_fine_tuning_job] File uploaded to OpenAI successfully | " f"job_id={job_id}, project_id={project_id}|" ) except openai.OpenAIError as e: @@ -110,7 +108,6 @@ def process_fine_tuning_job( preprocessor.cleanup() training_file_id = uploaded_train.id - testing_file_id = uploaded_test.id try: job = client.fine_tuning.jobs.create( @@ -141,7 +138,8 @@ def process_fine_tuning_job( job=fine_tune, update=FineTuningUpdate( training_file_id=training_file_id, - testing_file_id=testing_file_id, + train_data_s3_url=train_data_s3_url, + test_data_s3_url=test_data_s3_url, split_ratio=ratio, provider_job_id=job.id, status=FineTuningStatus.running, @@ -182,8 +180,11 @@ def fine_tune_from_CSV( request: FineTuningJobCreate, background_tasks: BackgroundTasks, ): - client = get_openai_client( - session, current_user.organization_id, current_user.project_id + client = get_openai_client( # Used here only to validate the user's OpenAI key; + # the actual client is re-initialized separately inside the background task + session, + current_user.organization_id, + current_user.project_id, ) results = [] @@ -199,12 +200,7 @@ def fine_tune_from_CSV( if created: background_tasks.add_task( - process_fine_tuning_job, - job.id, - ratio, - current_user, - request, - client, + process_fine_tuning_job, job.id, ratio, current_user, request ) if not results: diff --git a/backend/app/core/finetune/preprocessing.py b/backend/app/core/finetune/preprocessing.py index a936daf7..12120258 100644 --- a/backend/app/core/finetune/preprocessing.py +++ b/backend/app/core/finetune/preprocessing.py @@ -3,6 +3,11 @@ import uuid import tempfile import logging +import io + +from pathlib import Path +from fastapi import UploadFile +from starlette.datastructures import Headers import pandas as pd from sklearn.model_selection import train_test_split @@ -22,6 +27,34 @@ def __init__(self, document, storage, split_ratio: float, system_message: str): self.system_message = {"role": "system", "content": system_message.strip()} + def upload_csv_to_s3(self, df, filename: str) -> str: + logger.info( + f"[upload_csv_to_s3] Preparing to upload '{filename}' to s3 | rows={len(df)}, cols={len(df.columns)}" + ) + + buf = io.BytesIO(df.to_csv(index=False).encode("utf-8")) + buf.seek(0) + + headers = Headers({"content-type": "text/csv"}) + upload = UploadFile( + filename=filename, + file=buf, + headers=headers, + ) + + try: + dest = self.storage.put(upload, basename=Path("datasets") / filename) + logger.info( + f"[upload_csv_to_s3] Upload successful | filename='{filename}', s3_url='{dest}'" + ) + return str(dest) + except Exception as err: + logger.error( + f"[upload_csv_to_s3] Upload failed | filename='{filename}', error='{err}'", + exc_info=True, + ) + raise + def _save_to_jsonl(self, data, filename): temp_dir = tempfile.gettempdir() file_path = os.path.join(temp_dir, filename) @@ -65,14 +98,14 @@ def _load_dataframe(self): if not self.query_col or not self.label_col: logger.error( - f"[DataPreprocessor] Dataset does not contai a 'label' column and one of: {possible_query_columns} " + f"[DataPreprocessor] Dataset does not contain a 'label' column and one of: {possible_query_columns}" ) raise ValueError( f"CSV must contain a 'label' column and one of: {possible_query_columns}" ) logger.info( - f"[DataPreprocessor]Identified columns - query_col={self.query_col}, label_col={self.label_col}" + f"[DataPreprocessor] Identified columns - query_col={self.query_col}, label_col={self.label_col}" ) return df finally: @@ -90,21 +123,29 @@ def process(self): ) logger.info( - f"[DataPreprocessor]Data split complete: train_size={len(train_data)}, test_size={len(test_data)}" + f"[DataPreprocessor] Data split complete: train_size={len(train_data)}, test_size={len(test_data)}" ) train_dict = train_data.to_dict(orient="records") - test_dict = test_data.to_dict(orient="records") - train_jsonl = self._modify_data_format(train_dict) - test_jsonl = self._modify_data_format(test_dict) - train_file = ( - f"train_data_{int(self.split_ratio * 100)}_{uuid.uuid4().hex}.jsonl" - ) - test_file = f"test_data_{int(self.split_ratio * 100)}_{uuid.uuid4().hex}.jsonl" + unique_id = uuid.uuid4().hex + train_percentage = int(self.split_ratio * 100) # train % + test_percentage = ( + 100 - train_percentage + ) # remaining % for test (since we used 1 - ratio earlier for test size) + + train_csv_name = f"train_split_{train_percentage}_{unique_id}.csv" + test_csv_name = f"test_split_{test_percentage}_{unique_id}.csv" + train_jsonl_name = f"train_data_{train_percentage}_{unique_id}.jsonl" + + train_csv_url = self.upload_csv_to_s3(train_data, train_csv_name) + test_csv_url = self.upload_csv_to_s3(test_data, test_csv_name) - train_path = self._save_to_jsonl(train_jsonl, train_file) - test_path = self._save_to_jsonl(test_jsonl, test_file) + train_jsonl_path = self._save_to_jsonl(train_jsonl, train_jsonl_name) - return {"train_file": train_path, "test_file": test_path} + return { + "train_csv_s3_url": train_csv_url, + "test_csv_s3_url": test_csv_url, + "train_jsonl_temp_filepath": train_jsonl_path, + } diff --git a/backend/app/models/fine_tuning.py b/backend/app/models/fine_tuning.py index 7e2a0f2a..17bc1ccd 100644 --- a/backend/app/models/fine_tuning.py +++ b/backend/app/models/fine_tuning.py @@ -22,7 +22,6 @@ class FineTuningJobBase(SQLModel): split_ratio: float = Field(nullable=False) document_id: UUID = Field(foreign_key="document.id", nullable=False) training_file_id: Optional[str] = Field(default=None) - testing_file_id: Optional[str] = Field(default=None) system_prompt: str = Field(sa_column=Column(Text, nullable=False)) @@ -65,6 +64,12 @@ class Fine_Tuning(FineTuningJobBase, table=True): fine_tuned_model: str | None = Field( default=None, description="Final fine tuned model name from OpenAI" ) + train_data_s3_url: str | None = Field( + default=None, description="S3 url of the training data stored ins S3" + ) + test_data_s3_url: str | None = Field( + default=None, description="S3 url of the testing data stored ins S3" + ) error_message: str | None = Field( default=None, description="error message for when something failed" ) @@ -86,7 +91,8 @@ class Fine_Tuning(FineTuningJobBase, table=True): class FineTuningUpdate(SQLModel): training_file_id: Optional[str] = None - testing_file_id: Optional[str] = None + train_data_s3_url: Optional[str] = None + test_data_s3_url: Optional[str] = None split_ratio: Optional[float] = None provider_job_id: Optional[str] = None fine_tuned_model: Optional[str] = None @@ -106,7 +112,8 @@ class FineTuningJobPublic(SQLModel): error_message: str | None = None fine_tuned_model: str | None = None training_file_id: str | None = None - testing_file_id: str | None = None + train_data_s3_url: str | None = None + test_data_s3_url: str | None = None inserted_at: datetime updated_at: datetime