### Imports

In [None]:
# Native
import os
import json
import logging

# Third-party
import pandas as pd
from dotenv import load_dotenv
from openai import OpenAI

# Local
from shared.prompts import (
    CLAIM_EXTRACTION_SYSTEM_MESSAGE,
    CLAIM_NORMALIZATION_SYSTEM_MESSAGE,
)
from shared.utils import move_file_to_directory

### Setup

In [2]:
# Configure logging
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)

# Load environment variables from a .env file
load_dotenv()

# Initialize the OpenAI client
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

### Constants

In [3]:
# Directory paths
DATASET_PATH = "../data/faketweetbr"
UNPROCESSED_BATCHES_DIR = f"{DATASET_PATH}/batches/unprocessed"
UPLOADED_BATCHES_DIR = f"{DATASET_PATH}/batches/uploaded"
PROCESSING_BATCHES_DIR = f"{DATASET_PATH}/batches/processing"
PROCESSED_BATCHES_DIR = f"{DATASET_PATH}/batches/processed"
RESULTS_BATCHES_DIR = f"{DATASET_PATH}/batches/results"
FAILED_BATCHES_DIR = f"{DATASET_PATH}/batches/failed"

# OpenAI batch processing parameters
COMPLETION_ENDPOINT = "/v1/chat/completions"
MODEL = "gpt-5-nano"
MAX_COMPLETION_TOKENS = 500
TEMPERATURE = 1 # Obs: gpt-5-nano does not support temperature=0
VERBOSITY = "low" # Options: "low", "medium", "high"
REASONING_EFFORT = "high" # Options: "low", "medium", "high"
ROWS_PER_BATCH = 5000 # Number of rows to process in each batch (will generate double the amount of calls due to executing two tasks: claim extraction and claim normalization)

### Load Original Data

In [4]:
# Initialize an empty DataFrame to hold the concatenated dataset
original_dataset_path = DATASET_PATH + "/original"
dataset_df = pd.DataFrame()

# Read all CSV files in the dataset directory and concatenate them into a single DataFrame
for file in os.listdir(original_dataset_path):
    if file.endswith(".csv"):
        df = pd.read_csv(os.path.join(original_dataset_path, file))
        df["source"] = file.replace(".csv", "") # Add a source column to identify the origin of each row
        dataset_df = pd.concat([dataset_df, df], ignore_index=True)

# Keep only relevant columns
dataset_df = dataset_df[["id", "subject", "text", "classificacao", "source"]]

# Display the first few rows of the concatenated dataset
logging.info(f"Total records: {len(dataset_df)}")
dataset_df.head()

2025-09-26 11:54:50,631 - INFO - Total records: 279


Unnamed: 0,id,subject,text,classificacao,source
0,1.124513e+18,macaco marielle,Marielle >BANDIDOS Narco-traficantes-Milícias ...,fake,train
1,1.124049e+18,macaco marielle,"Bem, as últimas noticias a respeito disso que ...",fake,train
2,1.119295e+18,macaco marielle,@jornalnacional convivi com notícias da Mariel...,fake,train
3,1.114583e+18,macaco marielle,"O Cesari Battisti confessou seus crimes, a esq...",fake,train
4,1.113246e+18,macaco marielle,[Agência Lupa] Verificamos: É falso que Thiago...,true,train


### Create Batches

In [5]:
# Function to generate JSONL rows for a batch
def generate_batch_jsonl_rows(batch_df):
    batch_jsonl_rows = []

    for _, row in batch_df.iterrows():
        claim_extraction_row_dict = {
            "custom_id": f"{row['id']}_extr",
            "method": "POST",
            "url": COMPLETION_ENDPOINT,
            "body": {
                "model": MODEL,
                "messages": [
                    {"role": "developer", "content": CLAIM_EXTRACTION_SYSTEM_MESSAGE},
                    {"role": "user", "content": f"Postagem: {row['text']}\nDeclaração extraída:"}
                ],
                "max_completion_tokens": MAX_COMPLETION_TOKENS,
                "metadata": {
                    "source": row["source"],
                    "subject": row["subject"],
                    "classificacao": row["classificacao"],
                    "task": "claim_extraction"
                },
                "verbosity": VERBOSITY,
                "reasoning_effort": REASONING_EFFORT,
                "temperature": TEMPERATURE
            }
        }
        claim_normalization_row_dict = {
            "custom_id": f"{row['id']}_norm",
            "method": "POST",
            "url": COMPLETION_ENDPOINT,
            "body": {
                "model": MODEL,
                "messages": [
                    {
                        "role": "developer",
                        "content": CLAIM_NORMALIZATION_SYSTEM_MESSAGE,
                    },
                    {
                        "role": "user",
                        "content": f"Postagem: {row['text']}\nDeclaração normalizada:",
                    },
                ],
                "max_completion_tokens": MAX_COMPLETION_TOKENS,
                "metadata": {
                    "source": row["source"],
                    "subject": row["subject"],
                    "classificacao": row["classificacao"],
                    "task": "claim_normalization",
                },
                "verbosity": VERBOSITY,
                "reasoning_effort": REASONING_EFFORT,
                "temperature": TEMPERATURE,
            },
        }

        batch_jsonl_rows.append(claim_extraction_row_dict)
        batch_jsonl_rows.append(claim_normalization_row_dict)

    return batch_jsonl_rows


# Function to save JSONL file
def save_batch_jsonl_file(batch_jsonl_rows, batch_file_path):
    try:
        with open(batch_file_path, "w") as jsonl_file:
            for row in batch_jsonl_rows:
                jsonl_file.write(
                    json.dumps(row) + "\n"
                )  # Use json.dumps to format with double quotes
        logging.info(f"Saved batch to {batch_file_path}")
    except Exception as e:
        logging.error(f"Error saving batch to {batch_file_path}: {e}")


# Function to upload file to OpenAI
def upload_batch_file_to_openai(batch_file_path):
    try:
        batch_uploaded_file = client.files.create(
            file=open(batch_file_path, "rb"), purpose="batch"
        )
        logging.info(f"Uploaded batch to OpenAI successfully! File ID: {batch_uploaded_file.id}")
        return batch_uploaded_file.id
    except Exception as e:
        logging.error(f"Error uploading batch to OpenAI: {e}")
        return None

# Function to create a batch in OpenAI
def create_openai_batch(batch_input_file_id):
    try:
        batch_info = client.batches.create(
            input_file_id=batch_input_file_id,
            endpoint="/v1/chat/completions",
            completion_window="24h",
            metadata={"description": f"Batch created from file ID {batch_input_file_id}"},
        )
        logging.info(f"Created batch successfully! Batch ID: {batch_info.id}")
        return batch_info.id
    except Exception as e:
        logging.error(f"Error creating batch: {e}")
        return None

# Main batch processing loop
timestamp = pd.Timestamp.now().strftime("%Y%m%d_%H%M%S")
current_batch = 0

for i in range(0, len(dataset_df), ROWS_PER_BATCH):
    # Generate JSONL rows for the current batch
    batch_df = dataset_df.iloc[i:i + ROWS_PER_BATCH]
    batch_jsonl_rows = generate_batch_jsonl_rows(batch_df)

    # Save the batch to a JSONL file
    os.makedirs(UNPROCESSED_BATCHES_DIR, exist_ok=True)
    batch_file_name = f"batch_{current_batch}_{timestamp}.jsonl"
    batch_file_path = f"{UNPROCESSED_BATCHES_DIR}/{batch_file_name}"
    save_batch_jsonl_file(batch_jsonl_rows, batch_file_path)

    # Upload the batch file to OpenAI
    batch_input_file_id = upload_batch_file_to_openai(batch_file_path)

    if batch_input_file_id:
        # Move the batch file to the uploaded directory
        os.makedirs(UPLOADED_BATCHES_DIR, exist_ok=True)
        uploaded_file_path = f"{UPLOADED_BATCHES_DIR}/batch-file-id_{batch_input_file_id}_{batch_file_name}"
        move_file_to_directory(batch_file_path, uploaded_file_path)

        # Create the batch in OpenAI
        batch_id = create_openai_batch(batch_input_file_id)

        if batch_id:
            # Move the batch file to the processing directory
            os.makedirs(PROCESSING_BATCHES_DIR, exist_ok=True)
            processing_file_path = f"{PROCESSING_BATCHES_DIR}/batch-id_{batch_id}_{batch_file_name}"
            move_file_to_directory(uploaded_file_path, processing_file_path)

    # Increment the batch counter
    current_batch += 1

2025-09-26 11:54:50,727 - INFO - Saved batch to ../data/faketweetbr/batches/unprocessed/batch_0_20250926_115450.jsonl
2025-09-26 11:54:54,108 - INFO - HTTP Request: POST https://api.openai.com/v1/files "HTTP/1.1 200 OK"
2025-09-26 11:54:54,110 - INFO - Uploaded batch to OpenAI successfully! File ID: file-GFkYDrePecZtf3pmSgpnfS
2025-09-26 11:54:54,111 - INFO - Moved file to ../data/faketweetbr/batches/uploaded/batch-file-id_file-GFkYDrePecZtf3pmSgpnfS_batch_0_20250926_115450.jsonl
2025-09-26 11:54:54,951 - INFO - HTTP Request: POST https://api.openai.com/v1/batches "HTTP/1.1 200 OK"
2025-09-26 11:54:54,957 - INFO - Created batch successfully! Batch ID: batch_68d6a93fb65881909cdd3c6a4975073f
2025-09-26 11:54:54,958 - INFO - Moved file to ../data/faketweetbr/batches/processing/batch-id_batch_68d6a93fb65881909cdd3c6a4975073f_batch_0_20250926_115450.jsonl


### Check Batches Results

In [8]:
# Function to retrieve batch status from OpenAI
def retrieve_batch_status(batch_id):
    try:
        batch_object = client.batches.retrieve(batch_id)
        batch_status = batch_object.status

        logging.info(f"Batch {batch_id} status: {batch_status}")

        if batch_status in ["completed", "failed"]:
            return batch_object
        elif batch_status in ["created", "in_progress", "finalizing"]:
            return None
        else:
            logging.warning(f"Batch {batch_id} has unexpected status: {batch_status}")
            return None
    except Exception as e:
        logging.error(f"Error retrieving batch {batch_id}: {e}")
        return None

# Function to process completed batches
def process_completed_batch(batch_id, batch_info, batch_processing_file):
    error_occurred = False

    if batch_info.output_file_id:
        try:
            results_response = client.files.content(batch_info.output_file_id)
            results_dir = RESULTS_BATCHES_DIR
            os.makedirs(results_dir, exist_ok=True)
            completed_file_path = f"{results_dir}/{os.path.splitext(batch_processing_file)[0]}_results.jsonl"

            with open(
                completed_file_path, "wb"
            ) as result_file:  # Use "wb" for binary write
                result_file.write(
                    results_response.read()
                )  # Read binary content and write

            logging.info(f"Saved results for batch {batch_id} to {completed_file_path}")
        except Exception as e:
            logging.error(f"Error processing completed batch {batch_id}: {e}")
            error_occurred = True

    if batch_info.error_file_id:
        try:
            error_response = client.files.content(batch_info.error_file_id)
            errors_dir = FAILED_BATCHES_DIR
            os.makedirs(errors_dir, exist_ok=True)
            failed_file_path = f"{errors_dir}/{os.path.splitext(batch_processing_file)[0]}_errors.jsonl"

            with open(
                failed_file_path, "wb"
            ) as error_file:  # Use "wb" for binary write
                error_file.write(error_response.read())  # Read binary content and write

            logging.info(f"Saved errors for batch {batch_id} to {failed_file_path}")
        except Exception as e:
            logging.error(f"Error processing failed {batch_id}: {e}")
            error_occurred = True

    if not error_occurred:
        # Move the processing file to processed directory
        os.makedirs(PROCESSED_BATCHES_DIR, exist_ok=True)
        processed_file_path = f"{PROCESSED_BATCHES_DIR}/{batch_processing_file}"
        move_file_to_directory(
            f"{PROCESSING_BATCHES_DIR}/{batch_processing_file}", processed_file_path
        )
        logging.info(
            f"Moved processing file for batch {batch_id} to processed directory"
        )

# Function to get processing batches
def get_processing_batches():
    if not os.path.exists(PROCESSING_BATCHES_DIR):
        logging.warning(f"Processing directory {PROCESSING_BATCHES_DIR} does not exist.")
        return []

    processing_file_paths = os.listdir(PROCESSING_BATCHES_DIR)
    if not processing_file_paths:
        logging.info("No batches are currently being processed.")
        return []

    return processing_file_paths

# Main function to check on batches being processed
def check_batches_processing():
    processing_file_paths = get_processing_batches()

    if not processing_file_paths:
        logging.info("No batches are currently being processed.")
        return

    for batch_file in processing_file_paths:
        try:
            batch_id = f"batch_{batch_file.split('_')[2]}"
            batch_info = retrieve_batch_status(batch_id)

            if batch_info:
                process_completed_batch(
                    batch_id,
                    batch_info,
                    batch_file
                )

        except Exception as e:
            logging.error(f"Error processing batch file {batch_file}: {e}")

# Call the batches processing check function
check_batches_processing()

2025-09-26 11:58:36,479 - INFO - HTTP Request: GET https://api.openai.com/v1/batches/batch_68d6a93fb65881909cdd3c6a4975073f "HTTP/1.1 200 OK"
2025-09-26 11:58:36,482 - INFO - Batch batch_68d6a93fb65881909cdd3c6a4975073f status: in_progress


### Save Dataset