# large scale batching
Due to our large paragraph and openai rate limits we had to batch our use of the batches API, the following code does that.

## install libraries and import them

In [None]:
%pip install tiktoken
%pip install openai
%pip install unidecode

In [None]:
import os
import re
import time
import json
import asyncio
import pandas as pd
import tiktoken
from openai import OpenAI
from tqdm import tqdm
from tqdm.asyncio import tqdm_asyncio
from itertools import permutations
from unidecode import unidecode
import concurrent.futures

client = OpenAI(api_key="<api_key>")



existing_batches = list(client.batches.list())

## Setup tokenizers and input limits


In [None]:
token_limits ={
    "gpt-4o-2024-08-06": {
        "tokens":200000000,
        'tokenizer': tiktoken.encoding_for_model("gpt-4o"),
    },
    "gpt-4o-mini": {
        "tokens":1000000000,
        'tokenizer': tiktoken.encoding_for_model("gpt-4o-mini"),
    },
    "gpt-3.5-turbo-1106": {
        "tokens":1000000000,
        'tokenizer': tiktoken.encoding_for_model("gpt-3.5-turbo"),
    },
}

## load the claude paraphrases and original texts and prepare for batching

In [None]:
huge_df = pd.read_csv('/content/drive/MyDrive/oreilly_january/huge_df.csv')
huge_df

In [None]:
import os
import json

from tqdm import tqdm
from unidecode import unidecode


def build_question(row, model):
    """
    Constructs a formatted question and prepares a request for OpenAI's api
    to run answer a decop quiz.
    """
    extra_prompt = (
        f"Question: Which of the following passages is verbatim from the \"{row['Title']}\" "
        f"book by {row['Formatted Author']}?\nOptions:\n"
    )
    prompt = (
        extra_prompt
        + 'A. ' + row['Example_A'] + '\n'
        + 'B. ' + row['Example_B'] + '\n'
        + 'C. ' + row['Example_C'] + '\n'
        + 'D. ' + row['Example_D'] + '\n'
        + 'Answer: '
    )

    return {
        "custom_id": f"{model}|{row['Snippet_ID']}.{row['Permutation_Number']}",
        "method": "POST",
        "url": "/v1/chat/completions",
        "body": {
            "model": model,
            "messages": [
                {
                    "role": "system",
                    "content": (
                        "You are a helpful assistant. You must answer using only the provided "
                        "options A, B, C or D, you may not decline to answer."
                    )
                },
                # unidecode is neccessary to normalize the unicode characters in the original text
                {"role": "user", "content": unidecode(prompt)},
            ],
            "max_tokens": 1,
            "temperature": 0,
            "seed": 2319,
            "logprobs": True,
            # This is to make A,B,C and D more likely
            "logit_bias": {32: +100, 33: +100, 34: +100, 35: +100},
            "top_logprobs": 20
        },
    }

results = {}

for model in ["gpt-4o-mini", "gpt-4o-2024-08-06", "gpt-3.5-turbo-1106"]:
    # if execution is interrupted start from where the batches left off
    # # Figure out which rows are excluded for this model
    # model_dones = [done for done in dones if done['model'] == model]
    # excluded_ids = set(f"{d['question_number']}.{d['permutation_number']}" for d in model_dones)
    excluded_ids = set()

    # Function to check if a row should be excluded
    def _excluded(x):
        return f"{x['Snippet_ID']}.{x['Permutation_Number']}" in excluded_ids

    # Filter out excluded rows
    if hasattr(huge_df, 'apply'):
        # If huge_df is a pandas DataFrame
        model_df = huge_df[~huge_df.apply(_excluded, axis=1)]
    else:
        # If huge_df is a list of dicts
        model_df = [row for row in huge_df if not _excluded(row)]

    tokenizer = token_limits[model]["tokenizer"]
    token_limit = token_limits[model]["tokens"]

    # We'll store final chunking here
    all_batches = []
    current_batch = []  # will hold a list of files
    current_file = []   # will hold a list of question dicts

    # Track how many tokens we've accumulated in *this batch* so far
    batch_token_count = 0

    # We'll enforce a max of  per 500 lines
    FILE_LINE_LIMIT = 500
    # Overhead tokens, to account for batch having unclear token limits inconsistent with the website
    OVERHEAD_TOKENS = 5000

    for idx, row in tqdm(
        model_df.iterrows(),
        total=len(model_df),
        desc=f"Chunking for {model}"
    ):
        # print(row)
        question = build_question(row, model)

        # Token-count estimate:
        messages = question["body"]["messages"]
        current_count = (
            len(tokenizer.encode(messages[0]["content"])) +
            len(tokenizer.encode(messages[1]["content"]))
        )

        # 1) Check if adding this question would exceed the batch token limit
        if batch_token_count + OVERHEAD_TOKENS + current_count >= token_limit:
            # First finalize the current file if it isn't empty
            if current_file:
                current_batch.append(current_file)

            # Then finalize this entire batch
            if current_batch:
                all_batches.append(current_batch)

            # Start a new batch
            current_batch = []
            current_file = [question]
            batch_token_count = current_count  # reset + current question's tokens

        # 2) Otherwise, check if adding this question would exceed the file line limit
        elif len(current_file) >= FILE_LINE_LIMIT:
            # finalize current_file in the current batch
            current_batch.append(current_file)
            # start a new file
            current_file = [question]
            batch_token_count += current_count

        # 3) Otherwise, we can safely add the question to the current file
        else:
            current_file.append(question)
            batch_token_count += current_count

    # Finalize any leftover questions in the last file/batch
    if current_file:
        current_batch.append(current_file)
    if current_batch:
        all_batches.append(current_batch)

    # Store chunked structure for this model
    results[model] = all_batches

# Make directories for the output
os.makedirs("output", exist_ok=True)  # root output folder

# loop through the models and their batches
for model_name, batches in results.items():
    # Create a folder for the model
    model_folder = os.path.join("output", model_name)
    os.makedirs(model_folder, exist_ok=True)

    # A 'batches' subfolder within the model folder
    batches_folder = os.path.join(model_folder, "batches")
    os.makedirs(batches_folder, exist_ok=True)

    # For each batch, write subfolders and JSONL files
    for batch_idx, batch_files in enumerate(batches, start=1):
        # batch_files is a list of "files"; each "file" is a list of question dicts
        batch_folder = os.path.join(batches_folder, f"batch{batch_idx}")
        os.makedirs(batch_folder, exist_ok=True)

        for file_idx, file_questions in enumerate(batch_files, start=1):
            file_path = os.path.join(batch_folder, f"jsonlfile{file_idx}.jsonl")
            with open(file_path, "w", encoding="utf-8") as f:
                for q in file_questions:
                    f.write(json.dumps(q) + "\n")

print("All done! Folder structure created in ./output")


## start batching proccess

In [None]:
# Poll every minute
POLL_INTERVAL_SECONDS = 60

def find_existing_batch_with_metadata(jsonl_file: str, batch_folder: str, model_name: str):
    """
    Searches all existing batches for one whose metadata indicates
    it's for the same model/batch folder/jsonl file AND
    whose status is not in a 'failed' or 'expired' state.

    Returns the batch object if found, or None if not found or if found but is in a "failed/expired" state.
    """
    # NOTE: For production, you might want pagination or caching if you have a lot of batches.
    existing_batches = client.batches.list()
    for b in existing_batches.data:
        # Example of matching by metadata
        md = b.metadata or {}
        if (md.get("model") == model_name and
            md.get("batch_folder") == batch_folder and
            md.get("filename") == jsonl_file):
            # Check if it's in a terminal "bad" state vs. "good" or in-progress state
            if b.status in ["failed", "expired", "cancelled"]:
                # It's an old or failed job, we can re-run
                return None
            else:
                # It's either in progress, finalizing, or completed
                return b
    return None


def process_model_folders_in_memory(model_name: str, root_output_dir: str = "output"):
    """
    For each model:
      1) Find output/<model>/batches/<batch1>, <batch2>, ...
      2) For each <batchX> folder:
         - Gather all .jsonl files
         - Check if there's already a corresponding batch with the same metadata.
           * If yes and it's in_progress or completed, skip creation
           * If no or it's failed, create a new batch
         - Collect all relevant batch objects (both newly created and re-used) in a list
         - Poll them until they're all in a terminal state
         - Finally, retrieve their in-memory results (output file + error file), parse them,
           and store them in a Python data structure instead of writing to disk.
    """
    # Structure: {
    #   batch_folder_name: {
    #       jsonl_file_name: {
    #           "batch_id": "...",
    #           "status": "...",
    #           "output_data": [...],
    #           "error_data": [...]
    #       },
    #       ...
    #   },
    #   ...
    # }
    model_results = {}

    # Prepare the paths for output folders
    model_path = os.path.join(root_output_dir, model_name)
    batches_path = os.path.join(model_path, "batches")
    if not os.path.isdir(batches_path):
        print(f"[{model_name}] No 'batches' folder found. Skipping.")
        return model_results

    # Sort subfolders so you process them in a predictable order
    batch_folder_list = sorted(
        d for d in os.listdir(batches_path)
        if os.path.isdir(os.path.join(batches_path, d))
    )

    for batch_folder in batch_folder_list:
        folder_path = os.path.join(batches_path, batch_folder)
        jsonl_files = sorted(
            f for f in os.listdir(folder_path) if f.endswith(".jsonl")
        )
        print(f"[{model_name}] Processing folder {batch_folder} with {len(jsonl_files)} files.")

        # Keep track of the batch objects we will poll
        batch_jobs = []  # list of (jsonl_file, batch_id)
        model_results[batch_folder] = {}

        # 1) Send them all at once
        for jsonl_file in jsonl_files:
            # Check if there's an existing batch with this metadata
            existing_batch = find_existing_batch_with_metadata(jsonl_file, batch_folder, model_name)
            if existing_batch:
                # Re-use the existing batch
                batch_id = existing_batch.id
                print(f"[{model_name} -> {batch_folder}] Found existing batch {batch_id} for {jsonl_file}, status={existing_batch.status}")
                batch_jobs.append((jsonl_file, batch_id))
                model_results[batch_folder][jsonl_file] = {
                    "batch_id": batch_id,
                    "status": existing_batch.status,
                    "output_data": None,
                    "error_data": None
                }
                continue

            # If not found or it's in a fail/expired state, create a new one
            file_path = os.path.join(folder_path, jsonl_file)
            with open(file_path, "rb") as f:
                uploaded_file = client.files.create(file=f, purpose="batch")
            uploaded_file_id = uploaded_file.id
            print(f"[{model_name} -> {batch_folder}] Created file {uploaded_file_id} for {jsonl_file}")

            # Create the batch with metadata so we can detect it in the future
            created_batch = client.batches.create(
                input_file_id=uploaded_file_id,
                endpoint="/v1/chat/completions",
                completion_window="24h",
                metadata={
                    "model": model_name,
                    "batch_folder": batch_folder,
                    "filename": jsonl_file
                }
            )
            batch_id = created_batch.id
            print(f"[{model_name} -> {batch_folder}] Started Batch {batch_id} for {jsonl_file} (status={created_batch.status})")

            batch_jobs.append((jsonl_file, batch_id))
            # Initialize results structure
            model_results[batch_folder][jsonl_file] = {
                "batch_id": batch_id,
                "status": created_batch.status,
                "output_data": None,
                "error_data": None
            }

        # 2) Poll all newly (or previously) created batches in this folder until they're done
        if batch_jobs:
            print(f"[{model_name} -> {batch_folder}] Polling {len(batch_jobs)} batch jobs until completion...")

        while True:
            if not batch_jobs:
                break  # No jobs to poll
            time.sleep(POLL_INTERVAL_SECONDS)

            all_finished = True
            for (jsonl_file, batch_id) in batch_jobs:
                batch_status = client.batches.retrieve(batch_id)
                status_str = batch_status.status
                # Update the in-memory structure with latest status
                model_results[batch_folder][jsonl_file]["status"] = status_str

                if status_str not in ["completed", "failed", "expired", "cancelled"]:
                    print(f"[{model_name} -> {batch_folder}] Batch {batch_id} for {jsonl_file} is {status_str}...")
                    all_finished = False

            if all_finished:
                print(f"[{model_name} -> {batch_folder}] All batch jobs done.")
                break
            else:
                print(f"[{model_name} -> {batch_folder}] Checking again in 10 minutes...")

        # 3) Retrieve results for each batch, parse in memory (no saving to disk)
        for (jsonl_file, batch_id) in batch_jobs:
            batch_status = client.batches.retrieve(batch_id)
            final_state = batch_status.status

            if final_state == "completed":
                out_list = []
                err_list = []

                # Collect output
                if batch_status.output_file_id:
                    out_file_response = client.files.content(batch_status.output_file_id)
                    # Each line is a JSON object; parse them
                    for line in out_file_response.text.strip().split("\n"):
                        if line.strip():
                            out_list.append(json.loads(line))

                # Collect errors
                if batch_status.error_file_id:
                    err_file_response = client.files.content(batch_status.error_file_id)
                    for line in err_file_response.text.strip().split("\n"):
                        if line.strip():
                            err_list.append(json.loads(line))

                model_results[batch_folder][jsonl_file]["output_data"] = out_list
                model_results[batch_folder][jsonl_file]["error_data"] = err_list
            else:
                # Could be "failed", "expired", or "cancelled"
                # If there's an error file, read it
                err_list = []
                if batch_status.error_file_id:
                    err_file_response = client.files.content(batch_status.error_file_id)
                    for line in err_file_response.text.strip().split("\n"):
                        if line.strip():
                            err_list.append(json.loads(line))

                model_results[batch_folder][jsonl_file]["error_data"] = err_list

    print(f"[{model_name}] All batch folders complete.")
    return model_results


if __name__ == "__main__":
    models_to_run = [
        "gpt-4o-mini",
        "gpt-4o-2024-08-06",
        "gpt-3.5-turbo-1106"
    ]

    # We'll process each model in its own thread, concurrently
    final_all_models_results = {}

    with concurrent.futures.ThreadPoolExecutor(max_workers=len(models_to_run)) as executor:
        future_to_model = {
            executor.submit(process_model_folders_in_memory, m): m
            for m in models_to_run
        }
        for future in concurrent.futures.as_completed(future_to_model):
            model = future_to_model[future]
            try:
                model_results = future.result()
                final_all_models_results[model] = model_results
            except Exception as e:
                print(f"[{model}] ERROR: {e}")

    # At this point, final_all_models_results has the in-memory
    # data for each model's batch runs.
    print("All done!")
