## This is a skeleton code of OpenAI batch processing. For more technical details, see the [official document](https://platform.openai.com/docs/guides/batch).


In [7]:
import pandas as pd
from tqdm import tqdm
import json
from datetime import datetime
import time
import matplotlib.pyplot as plt
import random
import tiktoken
import requests
import ast
import logging
from dotenv import load_dotenv
import os
from pathlib import Path
from glob import glob

from openai import OpenAI
from openai import RateLimitError, APIError, APIConnectionError, APITimeoutError, InternalServerError

# API keys
- Create an .env file and store the keys for security

In [None]:
def add_or_update_token(env_file, token_name, token_value):
    # Read the existing .env file
    env_file_path = Path(env_file)
    if not env_file_path.exists():
        print(f"{env_file} does not exist.")
        # create an .env file
        env_file_path.touch()
        print(f".env file created at {env_file_path}")
    
    # Read lines from the .env file
    with open(env_file, 'r') as file:
        print(f"{env_file} exists.")
        lines = file.readlines()
    
    # Track if the token was updated
    token_exists = False
    
    # Modify the existing token if it exists
    for i, line in enumerate(lines):
        if line.startswith(f"{token_name}="):
            print(f"{token_name} already exists.")
            lines[i] = f"{token_name}={token_value}\n"
            token_exists = True
            break
    
    # If the token does not exist, append it
    if not token_exists:
        print(f"Add {token_name}.")
        lines.append(f"{token_name}={token_value}\n")
    
    # Write the lines back to the .env file
    with open(env_file, 'w') as file:
        file.writelines(lines)
    
    print(f"Token {token_name} has been {'updated' if token_exists else 'added'} successfully.")

In [None]:
# add_or_update_token('your .env path', 'OPENAI_API_KEY', 'your opanai api key')

In [8]:
# # Reading from the .env file to verify contents
folder_path = 'your_folder_path'
env_file_path = os.path.join(folder_path, '.env')

with open(env_file_path, 'r') as file:
    # print(file.read())
    print('.env file exists')

# Load the .env file
load_dotenv(dotenv_path = env_file_path)
openai_api_key = os.getenv('OPENAI_API_KEY')
openai_org_id = os.getenv('OPENAI_ORG_ID')

# Set the environment variables
os.environ['OPENAI_API_KEY'] = openai_api_key
os.environ['OPENAI_ORG_ID'] = openai_org_id

.env file exists


# Prepare the dataset
-  Make sure the type of **id** is *string*
-  If it is image, convert to *base64*

In [22]:
raw_data = pd.read_csv('your_csv_path')

# preprocess the data
data = raw_data[['id', 'text']]

# Setup model
- For available models, see [Model Availability](https://platform.openai.com/docs/guides/batch/model-availability).

In [25]:
# Initializing OpenAI client
client = OpenAI()

In [142]:
# Define the prompt

# For example, the prompt can be:
label_system_prompt = '''
As a political researcher analyzing U.S. elections, your goal is to evaluate the political standpoint expressed in a provided post. 
Each indicator should represent whether the post supports or opposes various political figures or parties.
Based on the post's content, you will need to output a JSON object containing various binary indicators (0 or 1) reflecting specific conditions:

{
    pro_democrat: int, // Set to 1 if the post supports the Democratic party, otherwise 0,
    against_democrat: int, // Set to 1 if the post opposes the Democratic party, otherwise 0,
    pro_republican: int, // Set to 1 if the post supports the Republican party, otherwise 0,
    against_republican: int, // Set to 1 if the post opposes the Republican party, otherwise 0,
    pro_biden: int, // Set to 1 if the post supports Joe Biden, otherwise 0,
    against_biden: int, // Set to 1 if the post opposes Joe Biden, otherwise 0,
    pro_trump: int, // Set to 1 if the post supports Donald Trump, otherwise 0,
    against_trump: int, // Set to 1 if the post opposes Donald Trump, otherwise 0,
    pro_kamala: int, // Set to 1 if the post supports Kamala Harris, otherwise 0,
    against_kamala: int // Set to 1 if the post opposes Kamala Harris, otherwise 0
}
'''

# Create batch file

In [143]:
# Define the model name
# model_name = "gpt-4o"
model_name = "gpt-4o-mini"

In [145]:
def create_task(row):
    return {
        "custom_id": str(row.id), #must ensure the id is string and unique
        "method": "POST",
        "url": "/v1/chat/completions",
        "body": {
            "model": model_name,
            "temperature": 0.1,
            "max_tokens": 120,
            "response_format": {
                "type": "json_object"
            },
            "messages": [
                {
                    "role": "system",
                    "content": label_system_prompt
                },
                {
                    "role": "user",
                    "content": row.text
                }
            ],
        }
    }

In [146]:
tasks = []

for idx, row in data.iterrows():
    task = create_task(row)
    tasks.append(task)

print(f'num of tasks: {len(tasks)}')

num of tasks: 199316


In [148]:
print(type(tasks), type(tasks[1]))

<class 'list'> <class 'dict'>


In [149]:
tasks[0]

{'custom_id': '7325221162672508206',
 'method': 'POST',
 'url': '/v1/chat/completions',
 'body': {'model': 'gpt-4o-mini',
  'temperature': 0.1,
  'max_tokens': 120,
  'response_format': {'type': 'json_object'},
  'messages': [{'role': 'system',
    'content': "\nAs a political researcher analyzing U.S. elections, your goal is to evaluate the political standpoint expressed in a provided post. \nEach indicator should represent whether the post supports or opposes various political figures or parties.\nBased on the post's content, you will need to output a JSON object containing various binary indicators (0 or 1) reflecting specific conditions:\n\n{\n    pro_democrat: int, // Set to 1 if the post supports the Democratic party, otherwise 0,\n    against_democrat: int, // Set to 1 if the post opposes the Democratic party, otherwise 0,\n    pro_republican: int, // Set to 1 if the post supports the Republican party, otherwise 0,\n    against_republican: int, // Set to 1 if the post opposes th

# Batch functions
- Must make sure the batch file is an *object*, not a list

In [150]:
# Save tasks
file_name = 'batch_task_file.jsonl'

with open(file_name, 'w') as f:
    for obj in tasks:
        f.write(json.dumps(obj) + '\n')

In [151]:
def writeBatchTasks(batch_data, file_name, directory='batch_tasks'):
    # Create the directory if it doesn't exist
    path = Path(directory)
    path.mkdir(parents=True, exist_ok=True)

    # Construct the full file path
    file_path = path / file_name

    try:
        with file_path.open('w') as f:
            for obj in batch_data:
                f.write(json.dumps(obj) + '\n')
        print(f'File {file_path} created with {len(batch_data)} requests.', end='\n\n')

    except IOError as e:
        print(f"Error writing to file {file_path}: {e}")

In [152]:
# Upload batch file
def uploadBatchFile(file_name, directory='batch_tasks'):
    file_path = Path(directory) / file_name
    
    if not file_path.exists():
        print(f"Error: File {file_path} does not exist.")
        return None

    print('Uploading batch file...')

    try:
        with file_path.open('rb') as file:
            batch_file = client.files.create(
                file=file,
                purpose="batch"
            )
        print('Batch file name:', batch_file.filename)
        print('Batch file ID:', batch_file.id)
        print('Batch file status:', batch_file.status, end='\n\n')
        
    except Exception as e:
        print(f"Error uploading file: {e}")
        return None

    return batch_file

In [153]:
# Create batch job
def createBatchJob(batch_file, endpoint="/v1/chat/completions", completion_window="24h"):
    print('Creating batch job...')
    try:
        batch_job = client.batches.create(
            input_file_id=batch_file.id,
            endpoint=endpoint,
            completion_window=completion_window
        )
        print('Batch job ID:', batch_job.id)
        
    except Exception as e:
        print(f"Error creating batch job: {e}")
        return None

    return batch_job

In [154]:
def checkBatchJobStatus(batch_job_id, check_interval=3):
    batch_job = client.batches.retrieve(batch_job_id)
    final_statuses = {'completed', 'failed', 'expired', 'cancelled'}

    while True:
        try:
            batch_job = client.batches.retrieve(batch_job_id)
            current_status = batch_job.status.lower()
            
            print(f"Current status of job {batch_job_id}: {current_status}")
            
            if current_status in final_statuses:
                print(f"Job {batch_job_id} has reached a final status: {current_status}")
                return current_status
            
            if current_status == 'finalizing':
                print(f"Job {batch_job_id} is finalizing. Checking again in 2 minutes...")
                time.sleep(2 * 60)

            else:
                print(f"Job {batch_job_id} is still {current_status}. Checking again in {check_interval} minutes...")
                time.sleep(check_interval * 60)  # Convert minutes to seconds
        
        except Exception as e:
            print(f"An error occurred while checking job {batch_job_id}: {str(e)}")
            print(f"Retrying in {check_interval} minutes...")
            time.sleep(check_interval * 60)

        print('===========================', end='\n\n')

- Adjust the batch_size based on the API rate limits and your data
- Try and check before loop all the files

In [None]:
# Set the maximum number of requests
batch_size = 2500 
num_files = (len(tasks) + batch_size - 1) // batch_size
print(f'Total requests: {len(tasks)}. Batch size: {batch_size}. Separated in {num_files} files.', end='\n\n')

batch_job_records = []

# Save data in chunks
for i in range(num_files):
    start_index = i * batch_size
    end_index = start_index + batch_size
    
    # Slice the list to get the current batch
    batch_data = tasks[start_index:end_index]
    
    # Create a filename
    file_name = f'batch_task_file{i+1}.jsonl'

    # record the batch job
    batch_job_records.append({
        'file_name': file_name,
        'start_index': start_index,
        'end_index': end_index
    })
    
    # Write batch tasks
    writeBatchTasks(batch_data, file_name)

    # Upload batch file
    batch_file = uploadBatchFile(file_name)
    if not batch_file:
        print(f"Failed to upload file {file_name}. Skipping this batch.")
        continue
    
    # Create batch job
    batch_job = createBatchJob(batch_file)
    if not batch_job:
        print(f"Failed to create batch job for file {file_name}.")
        print(batch_job.errors.data)
        continue
    
    # Check batch job status, until it reaches a final status
    final_statuses = {'completed', 'failed', 'expired', 'cancelled'}
    final_status = checkBatchJobStatus(batch_job.id)
    if final_status in final_statuses:
        print(f"Batch job {batch_job.id} is {final_status}.")
        continue

print("Batch processing completed.")

# save batch job records as a jsonl file
with open('batch_job_records.jsonl', 'w') as f:
    for obj in batch_job_records:
        f.write(json.dumps(obj) + '\n')
print('Batch job records saved.')

# Retrieve the results

In [None]:
output_files = []

for batch in client.batches.list(after='the head batch id'):
    if batch.id == 'the tail batch id':
        break
    print(batch.id, batch.status)
    output_files.append([batch.id, batch.created_at, batch.output_file_id])

print(len(output_files))

In [54]:
output_files = sorted(output_files, key=lambda x: x[1])

In [56]:
len(output_files)

80

In [57]:
directory = 'batch_output'
for idx, output_file in enumerate(output_files):
    path = Path(directory)
    path.mkdir(parents=True, exist_ok=True)

    output_file_name = 'batch_task_file{}.json'.format(idx+1)
    file_path = path / output_file_name

    result = client.files.content(output_files[idx][2]).content
    with open(file_path, 'wb') as file:
        file.write(result)

# Read results

In [None]:
results = []

directory = 'batch_output'
output_files = glob(f'{directory}/*.json')

for file_path in output_files:
    print(file_path)
    # Loading data from saved file

    with open(file_path, 'r') as file:
        for line in file:
            # Parsing the JSON string into a dict and appending to the list of results
            json_object = json.loads(line.strip())
            custom_id = json_object['custom_id']
            res = json.loads(json_object['response']['body']['choices'][0]['message']['content']).values()
        
            results.append([custom_id] + list(res))
            # print(results)


In [73]:
# map the results to the original data