# ⚠️ IMPORTANT

**Please Read the README.MD before running this notebook**
README.MD


In [None]:
import os
import pandas as pd
from openai import OpenAI
from dotenv import load_dotenv
from src.data_gen_pipelines import CharGenBatchJob, CharEnvBatchJob, ConversationBatchJob, RawBatchJob

# Init


In [None]:
load_dotenv()
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')
client = OpenAI(api_key=OPENAI_API_KEY)

# Generate Character-Environment-Context


In [None]:
# 🔴 Run only once!
# char_gen_job = CharGenBatchJob(OPENAI_API_KEY,"data/temp/char_names.csv", "data/temp/char.csv", "data/prompt/char_gen.md")
# char_gen_job.invoke_job()

In [None]:
# 🟢 Run whenever Ever after Restart! does not effect the code
# char_gen_job = CharGenBatchJob(OPENAI_API_KEY,"data/temp/char_names.csv", "data/temp/char.csv", "data/prompt/char_gen.md")
# char_gen_job.job_id = "Run it Above first then paste **THEN PASTE GENERATED JOB ID HERE**"
# char_gen_job.invoke_job()

# Generate Multiple Character-Environment-Context


In [None]:
# 🔴 Run only once!
# char_envs_job = CharEnvBatchJob(OPENAI_API_KEY,"data/temp/char.csv", "data/temp/char_envs.csv", "data/prompt/char_envs_gen.md")
# char_envs_job.invoke_job()

In [None]:
# 🟢 Run whenever Ever after Restart! does not effect the code
# char_envs_job = CharEnvBatchJob(OPENAI_API_KEY,"data/temp/char.csv", "data/temp/char_envs.csv", "data/prompt/char_envs_gen.md")
# char_envs_job.job_id = "Run it Above first then paste **THEN PASTE GENERATED JOB ID HERE**"
# char_envs_job.invoke_job()

- Here’s a reusable function that takes any DataFrame and:
- Saves the full DF to temp/char_envs.csv (configurable)
- Splits it into chunks of at most 7 000 rows (configurable)
- Writes each chunk to temp/char_envs/char_envs_part<N>.csv


In [None]:
def save_and_split_df(
    df: pd.DataFrame,
    out_dir: str = "data/temp",
    filename: str = "char_envs.csv",
    subfolder: str = "char_envs",
    chunk_size: int = 7000
) -> None:
    """
    Saves the full DataFrame and splits it into smaller CSV chunks.

    Parameters
    ----------
    df : pd.DataFrame
        The DataFrame to save and split.
    out_dir : str, default "temp"
        Base directory where files will be written.
    filename : str, default "char_envs.csv"
        Name of the full-DataFrame CSV.
    subfolder : str, default "char_envs"
        Subdirectory under out_dir for chunked files.
    chunk_size : int, default 7000
        Maximum number of rows per chunk.

    Writes
    ------
    - {out_dir}/{filename}
    - {out_dir}/{subfolder}/char_envs_part1.csv
    - {out_dir}/{subfolder}/char_envs_part2.csv
    - ...
    """
    # ensure directories exist
    full_dir = os.path.join(out_dir)
    chunks_dir = os.path.join(out_dir, subfolder)
    os.makedirs(full_dir, exist_ok=True)
    os.makedirs(chunks_dir, exist_ok=True)

    # save the complete DataFrame
    full_path = os.path.join(full_dir, filename)
    df.to_csv(full_path, index=False)
    print(f"Saved full DataFrame ({len(df)} rows) to {full_path}")

    # split into chunks
    num_rows = len(df)
    for start in range(0, num_rows, chunk_size):
        chunk = df.iloc[start : start + chunk_size]
        part_num = (start // chunk_size) + 1
        part_name = f"{os.path.splitext(filename)[0]}_part{part_num}.csv"
        part_path = os.path.join(chunks_dir, part_name)
        chunk.to_csv(part_path, index=False)
        print(f"  • Chunk {part_num}: {len(chunk)} rows → {part_path}")


In [None]:
# 🔴 Run only once!
# save_and_split_df(pd.read_csv("data/temp/char_envs.csv"))

# Generate Conversations


In [None]:
def start_all_jobs(folder_path: str = "data/temp/char_envs") -> list[str]:
    """
    Lists all .csv files in the specified folder (non-recursive).

    Parameters
    ----------
    folder_path : str
        Path to the directory to scan.
    """
    job_ids = []
    try:
        for idx, fname in enumerate(os.listdir(folder_path)):
            if fname.lower().endswith('.csv'):
                char_conv_job = ConversationBatchJob(OPENAI_API_KEY,f"data/temp/char_envs/char_envs_part{idx}.csv", "data/conversation.csv", "data/prompt/conversation_gen.md")
                job_ids.append(char_conv_job.invoke_job())
        return job_ids
    except FileNotFoundError:
        print(f"Folder not found: {folder_path}")
        
    except PermissionError:
        print(f"Permission denied: {folder_path}")
    
    return []

def check_and_download(job_ids:list[str],folder_path: str = "data/temp/char_envs"):
    """
    Check Each Chunk in job for each csv subfolder and download.
    """
    try:
        idx = 0
        for fname in os.listdir(folder_path):
            if fname.lower().endswith('.csv'):
                char_conv_job = ConversationBatchJob(OPENAI_API_KEY,f"data/temp/char_envs/char_envs_part{idx}.csv", "data/conversation.csv", "data/prompt/conversation_gen.md")
                char_conv_job.job_id = job_ids[idx]
                char_conv_job.invoke_job()
                idx = idx + 1

    except FileNotFoundError:
        print(f"Folder not found: {folder_path}")
        
    except PermissionError:
        print(f"Permission denied: {folder_path}")

**RUN only ONCE!!**


In [None]:
# 🔴 Run only once!
# all_job_ids = start_all_jobs()
# print("save them in order",all_job_ids)

In [None]:
# 🟢 Run whenever Ever after Restart! does not effect the code
# Check job status and download results when ready (can be run in a loop or manually)
# check_and_download(all_job_ids)

# RawDF


In [None]:
# 🔴 Run only once!
# raw_job = RawBatchJob(OPENAI_API_KEY,"data/raw_df/raw_conv.csv", "data/conversations.csv", "data/prompt/conv_expand.md")
# raw_job.invoke_job()

In [None]:
# 🟢 Run whenever Ever after Restart! does not effect the code
# raw_job = RawBatchJob(OPENAI_API_KEY,"data/raw_df/raw_conv.csv", "data/conversations.csv", "data/prompt/conv_expand.md")
# raw_job.job_id = "Run it first commented **THEN PASTE GENERATED JOB ID HERE**"
# raw_job.invoke_job()

# Check ALL the Batch Jobs


In [None]:
jobs = client.batches.list()
len(jobs.data)
jobs.data

# Testing (SKIP)


In [None]:
# from concurrent.futures import ThreadPoolExecutor, as_completed
# import pandas as pd
# import json
# import language_tool_python

# def correct_grammar(text:str) -> str:
#     text = text.replace(" ,", ",").replace(" .", ".").replace(" ?", "?").replace(" !", "!")     
#     return tool.correct(text)


# def convert_dialogues(df:pd.DataFrame):
#     output_list = []
    
#     for _ , row in df.iterrows():
#         dialogue = [] 
#         turn = False
#         dialogue.append({"role": "system", "content": correct_grammar(row['Persona'])})
#         cg = correct_grammar(row['chat'])
#         chat_list = cg.split("\n")

#         for chat_item in chat_list:
#             turn = not turn 
#             if turn and chat_item !=  "":
#                 dialogue.append({"role": "user", "content": chat_item})
#             elif not turn and chat_item != "":
#                 dialogue.append({"role": "assistant", "content": chat_item})
                
#         output_list.append(json.dumps({"conversation": dialogue}, indent=4))
    

#     return output_list


# # Single shared Tool instance for all threads


# tool = language_tool_python.LanguageTool('en-US')

# def preprocess_text_series(series: pd.Series) -> pd.Series:
#     return (series
#         .str.replace(r"\s+,", ",", regex=True)
#         .str.replace(r"\s+\.", ".", regex=True)
#         .str.replace(r"\s+\?", "?", regex=True)
#         .str.replace(r"\s+!", "!", regex=True)
#     )

# def _process_row(persona: str, chat: str) -> str:
#     sys_text  = tool.correct(persona)
#     chat_text = tool.correct(chat)

#     dialogue = [{"role": "system", "content": sys_text}]
#     turn = True
#     for line in chat_text.splitlines():
#         if not line.strip():
#             continue
#         role = "user" if turn else "assistant"
#         dialogue.append({"role": role, "content": line})
#         turn = not turn
    
#     return json.dumps({"conversation": dialogue}, indent=4)

# def convert_dialogues_parallel(df: pd.DataFrame, max_workers: int = 16):
#     # 1) cleanup
#     df = df.copy()
#     df['Persona_clean'] = preprocess_text_series(df['Persona'])
#     df['Chat_clean']   = preprocess_text_series(df['chat'])

#     results = [None] * len(df)
#     with ThreadPoolExecutor(max_workers=max_workers) as pool:
#         futures = {
#             pool.submit(_process_row, persona, chat): idx
#             for idx, (persona, chat) in enumerate(
#                 zip(df['Persona_clean'], df['Chat_clean'])
#             )
#         }
#         for fut in as_completed(futures):
#             idx = futures[fut]
#             results[idx] = fut.result()
#     tool.close()
#     return results


# out = convert_dialogues_parallel(df)
# new_df = pd.DataFrame({"conversation": out})
# dff = pd.read_csv("conversations.csv")
# pd.concat([dff, new_df], axis=0).to_csv("conversation.csv", index=False)