In [13]:
import json
import os
import shutil
import glob
import time

import pandas as pd

from io import StringIO
from openai import OpenAI

In [14]:
credential_loc = "../../credentials.json"
batch_loc = "../../../../datasets/blogger/batch/"
batch_sent_loc = "../../../../datasets/blogger/batch_sent/"
batch_complete_loc = "../../../../datasets/blogger/batch_complete/"

In [15]:
with open(credential_loc, 'r') as f:
    data = json.load(f)
    
os.environ["OPENAI_API_KEY"] = data['OPENAI_API_KEY']

client = OpenAI()

In [16]:
# List all .jsonl files in the batch_loc directory
files_to_be_processed = [
    f for f in os.listdir(batch_loc)
    if os.path.isfile(os.path.join(batch_loc, f)) and f.endswith('.jsonl')
]

In [17]:
len(files_to_be_processed)

100

In [18]:
# Function to process a single file with a custom description
def process_file(file_path, description):
    # Open the file and create a batch input file
    with open(file_path, "rb") as f:
        batch_input_file = client.files.create(file=f, purpose="batch")
    
    batch_input_file_id = batch_input_file.id

    # Create a batch job with the batch input file and custom description
    client.batches.create(
        input_file_id=batch_input_file_id,
        endpoint="/v1/chat/completions",
        completion_window="24h",
        metadata={
            "description": description
        }
    )
    print(f"Processed file: {file_path} with description: {description}")

In [19]:
def process_all_files(batch_loc, batch_sent_loc, files, batch_size=5, wait_time=600):

    total_files = len(files)
    
    for i in range(0, total_files, batch_size):
        batch_files = files[i:i+batch_size]
        
        for file_name in batch_files:
            file_path = os.path.join(batch_loc, file_name)
            description = os.path.splitext(file_name)[0]
            process_file(file_path, description)
            
            # Move the processed file to batch_sent_loc directory
            shutil.move(file_path, os.path.join(batch_sent_loc, file_name))
            print(f"Moved file: {file_name} to {batch_sent_loc}")
        
        print(f"Processed {len(batch_files)} files. Waiting for {wait_time} seconds...")
        time.sleep(wait_time)  # Wait for the specified time before processing the next batch

    print("All files processed and moved successfully.")

In [20]:
def save_as_jsonl(data, output_file_path):
    with open(output_file_path, 'w') as file:
        for _, row in data.iterrows():
            json.dump(row.to_dict(), file)
            file.write('\n')

In [None]:
process_all_files(batch_loc, batch_sent_loc, files_to_be_processed)

Processed file: ../../../../datasets/blogger/batch/batch_617540.jsonl with description: batch_617540
Moved file: batch_617540.jsonl to ../../../../datasets/blogger/batch_sent/
Processed file: ../../../../datasets/blogger/batch/batch_579188.jsonl with description: batch_579188
Moved file: batch_579188.jsonl to ../../../../datasets/blogger/batch_sent/
Processed file: ../../../../datasets/blogger/batch/batch_448015.jsonl with description: batch_448015
Moved file: batch_448015.jsonl to ../../../../datasets/blogger/batch_sent/
Processed file: ../../../../datasets/blogger/batch/batch_671397.jsonl with description: batch_671397
Moved file: batch_671397.jsonl to ../../../../datasets/blogger/batch_sent/
Processed file: ../../../../datasets/blogger/batch/batch_543206.jsonl with description: batch_543206
Moved file: batch_543206.jsonl to ../../../../datasets/blogger/batch_sent/
Processed 5 files. Waiting for 600 seconds...
Processed file: ../../../../datasets/blogger/batch/batch_17850.jsonl with 

In [6]:
def fetch_batches(client, after=None):
    """
    Fetch batches from the client, optionally using an 'after' parameter to paginate.
    
    :param client: API client instance.
    :param after: The ID to paginate after.
    :return: List of batch data.
    """
    response = client.batches.list(limit=100, after=after)
    return response.data

def process_batches(client):
    """
    Process batches by fetching data, sorting, and paginating until no new data is retrieved.
    
    :param client: API client instance.
    :return: DataFrame containing all batch data.
    """
    batch_data = []
    after = None

    while True:
        # Fetch the current batch of data
        current_batches = fetch_batches(client, after)
        if not current_batches:
            print("No more batches to fetch.")
            break
        
        # Extract attributes and convert to DataFrame
        for batch in current_batches:
            batch_dict = {
                "id": batch.id,
                "completion_window": batch.completion_window,
                "created_at": batch.created_at,
                "endpoint": batch.endpoint,
                "input_file_id": batch.input_file_id,
                "object": batch.object,
                "status": batch.status,
                "cancelled_at": batch.cancelled_at,
                "cancelling_at": batch.cancelling_at,
                "completed_at": batch.completed_at,
                "error_file_id": batch.error_file_id,
                "errors": batch.errors,
                "expired_at": batch.expired_at,
                "expires_at": batch.expires_at,
                "failed_at": batch.failed_at,
                "finalizing_at": batch.finalizing_at,
                "in_progress_at": batch.in_progress_at,
                "metadata_description": batch.metadata.get('description', ''),
                "output_file_id": batch.output_file_id,
                "request_counts_completed": batch.request_counts.completed,
                "request_counts_failed": batch.request_counts.failed,
                "request_counts_total": batch.request_counts.total
            }
            batch_data.append(batch_dict)
        
        # Create a DataFrame from the current batch data
        batch_df = pd.DataFrame(batch_data)

        # Remove duplicate rows based on 'id'
        batch_df.drop_duplicates(subset='id', keep='last', inplace=True)
        
        # Sort DataFrame by 'created_at' column
        batch_df.sort_values(by='created_at', ascending=True, inplace=True)
        
        # Print the current DataFrame state for debugging
        print(f"Current DataFrame shape: {batch_df.shape}")
        
        # Update the 'after' parameter with the last batch ID for pagination
        last_batch_id = batch_df['id'].iloc[-1] if not batch_df.empty else None
        if last_batch_id == after:
            print("No new batches found.")
            break
        after = last_batch_id
    
    return batch_df

In [7]:
fetch_batches(client)

[Batch(id='batch_jRRojB26n3wM9pkna1Tamd69', completion_window='24h', created_at=1721670154, endpoint='/v1/chat/completions', input_file_id='file-mmClbX7uN1plVQhK4BmFT6sN', object='batch', status='completed', cancelled_at=None, cancelling_at=None, completed_at=1721673102, error_file_id=None, errors=None, expired_at=None, expires_at=1721756554, failed_at=None, finalizing_at=1721673082, in_progress_at=1721670155, metadata={'description': 'batch_175880'}, output_file_id='file-QYD7udnx7EQvDBuKYqKnVGvl', request_counts=BatchRequestCounts(completed=280, failed=0, total=280)),
 Batch(id='batch_dMm2cYSOrJPyxHNx6bipGaHT', completion_window='24h', created_at=1721670152, endpoint='/v1/chat/completions', input_file_id='file-QSRermrzFeyAJW3cT90zjCty', object='batch', status='completed', cancelled_at=None, cancelling_at=None, completed_at=1721673047, error_file_id=None, errors=None, expired_at=None, expires_at=1721756552, failed_at=None, finalizing_at=1721673032, in_progress_at=1721670153, metadata={

In [12]:
batch_df = process_batches(client)

Current DataFrame shape: (100, 22)
Current DataFrame shape: (101, 22)
No new batches found.


In [396]:
failed_df = batch_df[batch_df['status']== 'failed']

In [397]:
completed_df = batch_df[batch_df['status']== 'completed']

In [398]:
print(f"Failed: {len(failed_df)} - Completed: {len(completed_df)}")

Failed: 4 - Completed: 92


In [399]:
completed_df

Unnamed: 0,id,completion_window,created_at,endpoint,input_file_id,object,status,cancelled_at,cancelling_at,completed_at,...,expired_at,expires_at,failed_at,finalizing_at,in_progress_at,metadata_description,output_file_id,request_counts_completed,request_counts_failed,request_counts_total
195,batch_013eSRVWLKWmtIMpBWhHhSLl,24h,1721602034,/v1/chat/completions,file-hZsyYqcUisNUpHkbFrER9i88,batch,completed,,,1.721602e+09,...,,1721688434,,1.721602e+09,1.721602e+09,batch_445957,file-Dlhu1lHh1AT5NHYFoKsjcuB9,750,0,750
194,batch_pW9H6GloJ22shGWUxFYtmGIW,24h,1721602035,/v1/chat/completions,file-RkS8ygO9eHRH1TpOpIsi4FB3,batch,completed,,,1.721602e+09,...,,1721688435,,1.721602e+09,1.721602e+09,batch_323518,file-4hI5UGHDptcr7cSD2MCpnOik,170,0,170
193,batch_dbkjOb0dIxRYJCCb4zMLvwT6,24h,1721602037,/v1/chat/completions,file-UkEg0oDACYUvMxXC4Mj02zb9,batch,completed,,,1.721602e+09,...,,1721688437,,1.721602e+09,1.721602e+09,batch_214228,file-2Ax4kPbzFqTcW7wbTLmEO9fD,340,0,340
192,batch_rfoWSbvzrYMfQFRmg3jKirRp,24h,1721602038,/v1/chat/completions,file-riAtghuDfJNv6aU8WYDiWTxQ,batch,completed,,,1.721602e+09,...,,1721688438,,1.721602e+09,1.721602e+09,batch_526719,file-ZFFJ9DYMUMTrYwzJnKH3qjJi,410,0,410
191,batch_PHEFfCuaFSYQcbcG2cqSjt10,24h,1721602040,/v1/chat/completions,file-EtN4NLEbkmEPuT1MDUHLj6Ih,batch,completed,,,1.721602e+09,...,,1721688440,,1.721602e+09,1.721602e+09,batch_350617,file-aakhASlKcAfZMnFQlywE0x1D,150,0,150
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
108,batch_s5QkFr2x6SKdt7RhXBQefBfR,24h,1721610541,/v1/chat/completions,file-bo6sV9FD8ZwWaadPQLtDJ4Ov,batch,completed,,,1.721611e+09,...,,1721696941,,1.721611e+09,1.721611e+09,batch_585356,file-in1an4RiYXLWZyWPA6p1VXWh,300,0,300
107,batch_7PT7lqoPlGqx04jUhGPW0EsK,24h,1721610543,/v1/chat/completions,file-UWQF3oJm7HYJcLqQhStoc7U4,batch,completed,,,1.721611e+09,...,,1721696943,,1.721611e+09,1.721611e+09,batch_551652,file-wZOAeKNCkOvxNcIE5UbBHnW4,250,0,250
106,batch_T1Ol5j2QePjqV0xNGoH0c7pM,24h,1721610545,/v1/chat/completions,file-1Uli9XRn7Ai9CJUdoHiu2vZW,batch,completed,,,1.721611e+09,...,,1721696945,,1.721611e+09,1.721611e+09,batch_491687,file-0CiPpS2fmDbuJYCHpSbcJOe6,480,0,480
105,batch_F3HkDhT9AIP5BXPY58MASKNJ,24h,1721611047,/v1/chat/completions,file-y76MpaM9qjCGnI4hvTxA1MnJ,batch,completed,,,1.721611e+09,...,,1721697447,,1.721611e+09,1.721611e+09,batch_463289,file-iKTMJJmMFWEv59M3CQk6Dd09,150,0,150


In [400]:
# Define the function to process files
def process_batch_files(batch_complete_loc, batch_sent_loc, completed_df):
    # Define the function to extract the content from the response
    def extract_content(response):
        try:
            content = response['body']['choices'][0]['message']['content']
            return content
        except (KeyError, TypeError):
            return None

    # Get the list of files already in the batch_complete_loc
    existing_files = [f.replace('.jsonl', '') for f in os.listdir(batch_complete_loc) if f.endswith('.jsonl')]

    # Filter out rows in completed_df where metadata_description matches existing files
    df_to_process = completed_df[~completed_df['metadata_description'].isin(existing_files)]

    # Loop through the rows in the filtered DataFrame
    for index, row in df_to_process.iterrows():
        metadata_description = row['metadata_description']
        output_file_id = row['output_file_id']

        # Call the API to get the file content
        file_response = client.files.content(output_file_id)
        jsonl_io = StringIO(file_response.text)
        df = pd.read_json(jsonl_io, lines=True)

        # Apply the function to extract the 'content' from the 'response' column
        df['response'] = df['response'].apply(extract_content)

        # Select only the required columns
        df = df[['id', 'custom_id', 'response']]

        # Save the DataFrame as a jsonl file in batch_complete_loc
        output_filepath = os.path.join(batch_complete_loc, f"{metadata_description}.jsonl")
        df.to_json(output_filepath, orient='records', lines=True)

        # Move the file from batch_sent_loc to batch_complete_loc
        sent_filepath = os.path.join(batch_sent_loc, f"{metadata_description}.jsonl")
        if os.path.exists(sent_filepath):
            os.remove(sent_filepath)
            print(f"File {sent_filepath} moved to {batch_complete_loc}")

In [401]:
process_batch_files(batch_complete_loc, batch_sent_loc, completed_df)

In [402]:
failed_df

Unnamed: 0,id,completion_window,created_at,endpoint,input_file_id,object,status,cancelled_at,cancelling_at,completed_at,...,expired_at,expires_at,failed_at,finalizing_at,in_progress_at,metadata_description,output_file_id,request_counts_completed,request_counts_failed,request_counts_total
199,batch_zibO1jsIbRslkYvbS3vThvdl,24h,1721599575,/v1/chat/completions,file-vodz9cJrx83pXL50FHufIhyp,batch,failed,,,,...,,1721685975,1721600000.0,,,batch_551652,,0,0,0
198,batch_aSvmbAxRriIh6vAjwjuG2Waj,24h,1721599576,/v1/chat/completions,file-2iEYlEqTzEY8UUGqgh9HnL1f,batch,failed,,,,...,,1721685976,1721600000.0,,,batch_491687,,0,0,0
197,batch_hzJOTPbYOPSDYyBPiNjbPkrR,24h,1721599577,/v1/chat/completions,file-yAT2QpBBOElAx0LjdjwscxFb,batch,failed,,,,...,,1721685977,1721600000.0,,,batch_463289,,0,0,0
196,batch_uKMKE6DVqPchT5vmgQcJ1XmN,24h,1721599579,/v1/chat/completions,file-HJKxnBpyXKTrKj3tMxrc0ayW,batch,failed,,,,...,,1721685979,1721600000.0,,,batch_171105,,0,0,0


In [403]:
# Define the function to handle failed files
def handle_failed_files(batch_loc, batch_sent, failed_df):
    # Loop through the rows in the failed_df DataFrame
    for index, row in failed_df.iterrows():
        metadata_description = row['metadata_description']
        filename = f"{metadata_description}.jsonl"
        
        # Define the source and destination file paths
        sent_filepath = os.path.join(batch_sent, filename)
        loc_filepath = os.path.join(batch_loc, filename)
        
        # Check if the file exists in batch_sent
        if os.path.exists(sent_filepath):
            # Move the file to batch_loc
            shutil.move(sent_filepath, loc_filepath)
            print(f"File {filename} moved from {batch_sent} to {batch_loc}")

In [404]:
handle_failed_files(batch_loc, batch_sent_loc, failed_df)