In [None]:
from google.colab import drive
drive.mount('/content/drive')

import os
import glob
import json
import time
import pandas as pd
from openai import OpenAI


Mounted at /content/drive


In [None]:
# Provide your OpenAI API key directly
api_key = ""
client = OpenAI(api_key=api_key)


In [None]:
# Define the path to your processed JSON files
data_path = '/content/drive/MyDrive/IS450 Project/Historical Reddit Data/FinBERT_Data/'
file_paths = glob.glob(os.path.join(data_path, "finbert_r_*.json"))

# Maximum posts per JSONL file
max_requests = 1000
records = []
total_posts = 0

# Iterate through each file and accumulate records, encoding subreddit into custom_id
for file in file_paths:
    subreddit = os.path.basename(file).split('_')[-1].split('.')[0]
    with open(file, 'r') as f:
        data = json.load(f)
    for index, row in enumerate(data):
        total_posts += 1
        text = row.get("processed_text_finbert", "")
        prompt = (
            "Analyze the sentiment of the following financial post. "
            "Respond with a single number from 1 to 5, where: "
            "1 means very negative, 2 means negative, 3 means neutral, "
            "4 means positive, and 5 means very positive. "
            "Only output the number. Do not include any extra text, punctuation, or explanation.\n\n"
            f"{text}\n\n"
        )
        # Encode subreddit into custom_id using the '|' delimiter.
        record = {
            "custom_id": f"{row.get('id', f'post_{total_posts}')}" + "|" + f"{subreddit}",
            "method": "POST",
            "url": "/v1/chat/completions",
            "body": {
                "model": "gpt-4o-mini",
                "messages": [
                    {"role": "system", "content": "You are a helpful assistant. Only respond with a single word as specified."},
                    {"role": "user", "content": prompt}
                ]
            }
        }
        records.append(record)

print(f"Total posts loaded: {len(records)}")

# Split records into multiple JSONL files
num_posts = len(records)
num_files = (num_posts + max_requests - 1) // max_requests  # ceiling division

for i in range(num_files):
    start_idx = i * max_requests
    end_idx = min((i + 1) * max_requests, num_posts)
    chunk = records[start_idx:end_idx]
    output_file_path = f"input_part{i+1}.jsonl"
    with open(output_file_path, "w") as f:
        for record in chunk:
            f.write(json.dumps(record) + "\n")
    print(f"Input file for batch processing saved as {output_file_path}")


Total posts loaded: 49175
Input file for batch processing saved as input_part1.jsonl
Input file for batch processing saved as input_part2.jsonl
Input file for batch processing saved as input_part3.jsonl
Input file for batch processing saved as input_part4.jsonl
Input file for batch processing saved as input_part5.jsonl
Input file for batch processing saved as input_part6.jsonl
Input file for batch processing saved as input_part7.jsonl
Input file for batch processing saved as input_part8.jsonl
Input file for batch processing saved as input_part9.jsonl
Input file for batch processing saved as input_part10.jsonl
Input file for batch processing saved as input_part11.jsonl
Input file for batch processing saved as input_part12.jsonl
Input file for batch processing saved as input_part13.jsonl
Input file for batch processing saved as input_part14.jsonl
Input file for batch processing saved as input_part15.jsonl
Input file for batch processing saved as input_part16.jsonl
Input file for batch pr

In [None]:
# File paths for checkpointing processed batches and in-progress batch jobs
processed_batches_file = "/content/drive/MyDrive/IS450 Project/Historical Reddit Data/Processed Posts/processed_batches_2.log"
batch_jobs_checkpoint_file = "/content/drive/MyDrive/IS450 Project/Historical Reddit Data/Processed Posts/batch_jobs_2.json"

# Load processed batches (if they exist)
if os.path.exists(processed_batches_file):
    with open(processed_batches_file, "r") as f:
        processed_batches = set(f.read().splitlines())
else:
    processed_batches = set()

# Load in-progress batch jobs mapping
if os.path.exists(batch_jobs_checkpoint_file):
    with open(batch_jobs_checkpoint_file, "r") as f:
        batch_jobs = json.load(f)
else:
    batch_jobs = {}

# Ensure the output CSV exists
output_csv_path = "/content/drive/MyDrive/IS450 Project/Historical Reddit Data/Processed Posts/golden_dataset_sentiment_batch_2.csv"
if not os.path.exists(output_csv_path):
    pd.DataFrame(columns=["id", "subreddit", "sentiment"]).to_csv(output_csv_path, index=False)

def create_batch_job(client, input_file_path, endpoint, completion_window):
    """
    Creates a new batch job and returns its job id.
    """
    with open(input_file_path, "rb") as file:
        uploaded_file = client.files.create(file=file, purpose="batch")
    print(f"Uploaded input file with id: {uploaded_file.id}")

    batch_job = client.batches.create(
        input_file_id=uploaded_file.id,
        endpoint=endpoint,
        completion_window=completion_window
    )
    print(f"Batch job created with id: {batch_job.id}")
    return batch_job.id

def poll_batch_job(client, job_id):
    """
    Polls an existing batch job until it is completed, failed, or cancelled.
    If completed, retrieves and returns the results as a list of records.
    """
    # Continuously poll until the job is in a terminal state.
    while True:
        batch_job = client.batches.retrieve(job_id)
        if batch_job.status in ["completed", "failed", "cancelled"]:
            break
        print(f"Batch job status: {batch_job.status}... checking again in 60 seconds.")
        time.sleep(60)

    if batch_job.status == "completed":
        result_file_id = batch_job.output_file_id
        result_data = client.files.content(result_file_id)

        # Ensure we get the actual bytes from the response.
        result_bytes = result_data.content if hasattr(result_data, "content") else bytes(result_data)

        # Create a unique file name using the batch job id and the current timestamp.
        result_file_name = f"batch_job_results_{batch_job.id}_{int(time.time())}.jsonl"
        with open(result_file_name, "wb") as file:
            file.write(result_bytes)
        print(f"Batch job results saved as {result_file_name}")

        # Read and process the file contents
        results = []
        with open(result_file_name, "r", encoding="utf-8") as file:
            for line in file:
                results.append(json.loads(line.strip()))
        return results
    else:
        print(batch_job.errors)
        print(f"Batch job failed with status: {batch_job.status}")
        return None


In [None]:
# Use a custom sort key to ensure numerical order
input_files = sorted(glob.glob("input_part*.jsonl"), key=lambda x: int(x.split("input_part")[1].split(".")[0]))

for input_file in input_files:
    batch_name = os.path.basename(input_file)  # e.g., "input_part1.jsonl"

    if batch_name in processed_batches:
        print(f"Skipping already processed batch: {batch_name}")
        continue

    # Check if a job has already been started for this batch
    if batch_name in batch_jobs:
        job_id = batch_jobs[batch_name]
        print(f"Resuming batch {batch_name} with existing job id {job_id}")
    else:
        # Create a new batch job and record its id
        job_id = create_batch_job(client, input_file, "/v1/chat/completions", "24h")
        batch_jobs[batch_name] = job_id
        # Save the updated in-progress batch jobs mapping
        with open(batch_jobs_checkpoint_file, "w") as f:
            json.dump(batch_jobs, f)

    # Poll the batch job until it finishes
    batch_results = poll_batch_job(client, job_id)

    # If the job failed, remove it from the checkpoint mapping
    if not batch_results:
        print(f"Batch job failed for {input_file}. Removing failed job from checkpoint mapping for retry.")
        if batch_name in batch_jobs:
            del batch_jobs[batch_name]
            with open(batch_jobs_checkpoint_file, "w") as f:
                json.dump(batch_jobs, f)
        continue

    # Process results
    results_mapping = {
        record.get("custom_id", record.get("id")):
            record["response"]["body"]["choices"][0]["message"]["content"].strip().lower()
        for record in batch_results
        if record.get("response") and record["response"].get("body") and record["response"]["body"].get("choices")
    }

    with open(input_file, "r") as f:
        input_data = [json.loads(line) for line in f]

    rows = []
    for record in input_data:
        full_id = record.get("custom_id", record.get("id"))
        actual_id, subreddit = full_id.split("|", 1) if "|" in full_id else (full_id, "unknown")
        sentiment = results_mapping.get(full_id, "neutral")
        rows.append({"id": actual_id, "subreddit": subreddit, "sentiment": sentiment})

    df_batch = pd.DataFrame(rows)
    df_batch.to_csv(output_csv_path, mode="a", header=not os.path.exists(output_csv_path), index=False)

    with open(processed_batches_file, "a") as f:
        f.write(batch_name + "\n")
    processed_batches.add(batch_name)

    del batch_jobs[batch_name]
    with open(batch_jobs_checkpoint_file, "w") as f:
        json.dump(batch_jobs, f)
    print(f"Processed and saved batch: {batch_name}")

print("All batches processed successfully!")


Skipping already processed batch: input_part1.jsonl
Skipping already processed batch: input_part2.jsonl
Skipping already processed batch: input_part3.jsonl
Skipping already processed batch: input_part4.jsonl
Skipping already processed batch: input_part5.jsonl
Skipping already processed batch: input_part6.jsonl
Skipping already processed batch: input_part7.jsonl
Skipping already processed batch: input_part8.jsonl
Skipping already processed batch: input_part9.jsonl
Skipping already processed batch: input_part10.jsonl
Skipping already processed batch: input_part11.jsonl
Skipping already processed batch: input_part12.jsonl
Skipping already processed batch: input_part13.jsonl
Skipping already processed batch: input_part14.jsonl
Skipping already processed batch: input_part15.jsonl
Skipping already processed batch: input_part16.jsonl
Skipping already processed batch: input_part17.jsonl
Skipping already processed batch: input_part18.jsonl
Skipping already processed batch: input_part19.jsonl
Sk