-
Couldn't load subscription status.
- Fork 5
Classification : train and test data to s3 #343
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
6fb4e5f
6f6fadb
c7a0f2f
77ce4c4
6278d34
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,42 @@ | ||
| """add data to s3 url columns | ||
|
|
||
| Revision ID: 72896bcc94da | ||
| Revises: e317d05f49e4 | ||
| Create Date: 2025-08-22 00:44:45.426211 | ||
|
|
||
| """ | ||
| from alembic import op | ||
| import sqlalchemy as sa | ||
| import sqlmodel.sql.sqltypes | ||
|
|
||
|
|
||
| # revision identifiers, used by Alembic. | ||
| revision = "72896bcc94da" | ||
| down_revision = "e317d05f49e4" | ||
| branch_labels = None | ||
| depends_on = None | ||
|
|
||
|
|
||
| def upgrade(): | ||
| op.drop_column("fine_tuning", "testing_file_id") | ||
| op.add_column( | ||
| "fine_tuning", | ||
| sa.Column( | ||
| "train_data_s3_url", sqlmodel.sql.sqltypes.AutoString(), nullable=True | ||
| ), | ||
| ) | ||
| op.add_column( | ||
| "fine_tuning", | ||
| sa.Column( | ||
| "test_data_s3_url", sqlmodel.sql.sqltypes.AutoString(), nullable=True | ||
| ), | ||
| ) | ||
|
|
||
|
|
||
| def downgrade(): | ||
| op.drop_column("fine_tuning", "test_data_s3_url") | ||
| op.drop_column("fine_tuning", "train_data_s3_url") | ||
| op.add_column( | ||
| "fine_tuning", | ||
| sa.Column("testing_file_id", sa.VARCHAR(), autoincrement=False, nullable=True), | ||
| ) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -54,7 +54,6 @@ def process_fine_tuning_job( | |
| ratio: float, | ||
| current_user: CurrentUserOrgProject, | ||
| request: FineTuningJobCreate, | ||
| client: OpenAI, | ||
| ): | ||
| start_time = time.time() | ||
| project_id = current_user.project_id | ||
|
|
@@ -67,28 +66,27 @@ def process_fine_tuning_job( | |
| try: | ||
| fine_tune = fetch_by_id(session, job_id, project_id) | ||
|
|
||
| client = get_openai_client( | ||
| session, current_user.organization_id, project_id | ||
| ) | ||
| storage = AmazonCloudStorage(current_user) | ||
| document_crud = DocumentCrud(session=session, owner_id=current_user.id) | ||
| document = document_crud.read_one(request.document_id) | ||
| preprocessor = DataPreprocessor( | ||
| document, storage, ratio, request.system_prompt | ||
| ) | ||
| result = preprocessor.process() | ||
| train_path = result["train_file"] | ||
| test_path = result["test_file"] | ||
| train_data_temp_filepath = result["train_jsonl_temp_filepath"] | ||
| train_data_s3_url = result["train_csv_s3_url"] | ||
| test_data_s3_url = result["test_csv_s3_url"] | ||
|
|
||
| try: | ||
| with open(train_path, "rb") as train_f: | ||
| with open(train_data_temp_filepath, "rb") as train_f: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what's There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "rb" means read the file in binary mode, which is required here since we’re uploading the file object to OpenAI’s API — the Files API specifically accepts a binary stream as input. |
||
| uploaded_train = client.files.create( | ||
| file=train_f, purpose="fine-tune" | ||
| ) | ||
| with open(test_path, "rb") as test_f: | ||
| uploaded_test = client.files.create( | ||
| file=test_f, purpose="fine-tune" | ||
| ) | ||
|
|
||
| logger.info( | ||
| f"[process_fine_tuning_job] Files uploaded to OpenAI successfully | " | ||
| f"[process_fine_tuning_job] File uploaded to OpenAI successfully | " | ||
| f"job_id={job_id}, project_id={project_id}|" | ||
| ) | ||
| except openai.OpenAIError as e: | ||
|
|
@@ -110,7 +108,6 @@ def process_fine_tuning_job( | |
| preprocessor.cleanup() | ||
|
|
||
| training_file_id = uploaded_train.id | ||
| testing_file_id = uploaded_test.id | ||
|
|
||
| try: | ||
| job = client.fine_tuning.jobs.create( | ||
|
|
@@ -141,7 +138,8 @@ def process_fine_tuning_job( | |
| job=fine_tune, | ||
| update=FineTuningUpdate( | ||
| training_file_id=training_file_id, | ||
| testing_file_id=testing_file_id, | ||
| train_data_s3_url=train_data_s3_url, | ||
| test_data_s3_url=test_data_s3_url, | ||
| split_ratio=ratio, | ||
| provider_job_id=job.id, | ||
| status=FineTuningStatus.running, | ||
|
|
@@ -182,8 +180,11 @@ def fine_tune_from_CSV( | |
| request: FineTuningJobCreate, | ||
| background_tasks: BackgroundTasks, | ||
| ): | ||
| client = get_openai_client( | ||
| session, current_user.organization_id, current_user.project_id | ||
| client = get_openai_client( # Used here only to validate the user's OpenAI key; | ||
| # the actual client is re-initialized separately inside the background task | ||
| session, | ||
| current_user.organization_id, | ||
| current_user.project_id, | ||
| ) | ||
| results = [] | ||
|
|
||
|
|
@@ -199,12 +200,7 @@ def fine_tune_from_CSV( | |
|
|
||
| if created: | ||
| background_tasks.add_task( | ||
| process_fine_tuning_job, | ||
| job.id, | ||
| ratio, | ||
| current_user, | ||
| request, | ||
| client, | ||
| process_fine_tuning_job, job.id, ratio, current_user, request | ||
| ) | ||
|
|
||
| if not results: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,6 +3,11 @@ | |
| import uuid | ||
| import tempfile | ||
| import logging | ||
| import io | ||
|
|
||
| from pathlib import Path | ||
| from fastapi import UploadFile | ||
| from starlette.datastructures import Headers | ||
| import pandas as pd | ||
| from sklearn.model_selection import train_test_split | ||
|
|
||
|
|
@@ -22,6 +27,34 @@ def __init__(self, document, storage, split_ratio: float, system_message: str): | |
|
|
||
| self.system_message = {"role": "system", "content": system_message.strip()} | ||
|
|
||
| def upload_csv_to_s3(self, df, filename: str) -> str: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we need separate function to upload? why can't we use existing function to upload file? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are already using the existing put function for the actual upload inside this function. The upload_csv_to_s3 method is just a wrapper that converts a DataFrame into CSV bytes, wraps it in an UploadFile (the format that put(source: UploadFile, basename: Path) expects), and then delegates the upload to put. |
||
| logger.info( | ||
| f"[upload_csv_to_s3] Preparing to upload '{filename}' to s3 | rows={len(df)}, cols={len(df.columns)}" | ||
| ) | ||
|
|
||
| buf = io.BytesIO(df.to_csv(index=False).encode("utf-8")) | ||
| buf.seek(0) | ||
|
|
||
| headers = Headers({"content-type": "text/csv"}) | ||
| upload = UploadFile( | ||
| filename=filename, | ||
| file=buf, | ||
| headers=headers, | ||
| ) | ||
|
|
||
| try: | ||
| dest = self.storage.put(upload, basename=Path("datasets") / filename) | ||
| logger.info( | ||
| f"[upload_csv_to_s3] Upload successful | filename='{filename}', s3_url='{dest}'" | ||
| ) | ||
| return str(dest) | ||
| except Exception as err: | ||
| logger.error( | ||
| f"[upload_csv_to_s3] Upload failed | filename='{filename}', error='{err}'", | ||
| exc_info=True, | ||
| ) | ||
| raise | ||
|
|
||
| def _save_to_jsonl(self, data, filename): | ||
| temp_dir = tempfile.gettempdir() | ||
| file_path = os.path.join(temp_dir, filename) | ||
|
|
@@ -65,14 +98,14 @@ def _load_dataframe(self): | |
|
|
||
| if not self.query_col or not self.label_col: | ||
| logger.error( | ||
| f"[DataPreprocessor] Dataset does not contai a 'label' column and one of: {possible_query_columns} " | ||
| f"[DataPreprocessor] Dataset does not contain a 'label' column and one of: {possible_query_columns}" | ||
| ) | ||
| raise ValueError( | ||
| f"CSV must contain a 'label' column and one of: {possible_query_columns}" | ||
| ) | ||
|
|
||
| logger.info( | ||
| f"[DataPreprocessor]Identified columns - query_col={self.query_col}, label_col={self.label_col}" | ||
| f"[DataPreprocessor] Identified columns - query_col={self.query_col}, label_col={self.label_col}" | ||
| ) | ||
| return df | ||
| finally: | ||
|
|
@@ -90,21 +123,29 @@ def process(self): | |
| ) | ||
|
|
||
| logger.info( | ||
| f"[DataPreprocessor]Data split complete: train_size={len(train_data)}, test_size={len(test_data)}" | ||
| f"[DataPreprocessor] Data split complete: train_size={len(train_data)}, test_size={len(test_data)}" | ||
| ) | ||
|
|
||
| train_dict = train_data.to_dict(orient="records") | ||
| test_dict = test_data.to_dict(orient="records") | ||
|
|
||
| train_jsonl = self._modify_data_format(train_dict) | ||
| test_jsonl = self._modify_data_format(test_dict) | ||
|
|
||
| train_file = ( | ||
| f"train_data_{int(self.split_ratio * 100)}_{uuid.uuid4().hex}.jsonl" | ||
| ) | ||
| test_file = f"test_data_{int(self.split_ratio * 100)}_{uuid.uuid4().hex}.jsonl" | ||
| unique_id = uuid.uuid4().hex | ||
| train_percentage = int(self.split_ratio * 100) # train % | ||
| test_percentage = ( | ||
| 100 - train_percentage | ||
| ) # remaining % for test (since we used 1 - ratio earlier for test size) | ||
|
|
||
| train_csv_name = f"train_split_{train_percentage}_{unique_id}.csv" | ||
| test_csv_name = f"test_split_{test_percentage}_{unique_id}.csv" | ||
| train_jsonl_name = f"train_data_{train_percentage}_{unique_id}.jsonl" | ||
|
|
||
| train_csv_url = self.upload_csv_to_s3(train_data, train_csv_name) | ||
| test_csv_url = self.upload_csv_to_s3(test_data, test_csv_name) | ||
|
|
||
| train_path = self._save_to_jsonl(train_jsonl, train_file) | ||
| test_path = self._save_to_jsonl(test_jsonl, test_file) | ||
| train_jsonl_path = self._save_to_jsonl(train_jsonl, train_jsonl_name) | ||
|
|
||
| return {"train_file": train_path, "test_file": test_path} | ||
| return { | ||
| "train_csv_s3_url": train_csv_url, | ||
| "test_csv_s3_url": test_csv_url, | ||
| "train_jsonl_temp_filepath": train_jsonl_path, | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the
storage = AmazonCloudStorage(current_user)will later change to usingproject_idinsteadthis is just FYI for now, this change need not be part of the current PR - can be part of a future PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, as of now (Thu, 26 Aug, 12:10 PM), the PR that changes storage initialization from using user_id to project_id hasn’t been merged yet, so I can’t incorporate that change in my PR at this moment or in any other PR till that PR is not merged to main