In [26]:
import os
import time
import pandas as pd
import numpy as np
import json
import io 
from openai import OpenAI
from pydantic import BaseModel, Field, ConfigDict
from typing import List, Literal
from dotenv import load_dotenv

In [27]:
load_dotenv()

True

In [28]:
client = OpenAI(api_key=os.getenv("grecotel_data_labelling_key"))

In [29]:
# Define valid aspects and sentiments
VALID_ASPECTS = [
    "ROOM", "FOOD", "SERVICE", "FACILITIES", 
    "LOCATION", "VALUE_FOR_MONEY", "CLEANLINESS", "COMFORT"
]
VALID_SENTIMENTS = ["positive", "negative"]

# Define Pydantic Models for Validation ---

class AspectSentimentPair(BaseModel):
    """Represents a single aspect and its sentiment found in a sentence."""
    model_config = ConfigDict(extra='forbid')
    aspect: Literal[
        "ROOM", "FOOD", "SERVICE", "FACILITIES", 
        "LOCATION", "VALUE_FOR_MONEY", "CLEANLINESS", "COMFORT"
    ]
    sentiment: Literal["positive", "negative"]

class SentenceAnalysis(BaseModel):
    """Represents the complete analysis of a single sentence."""
    model_config = ConfigDict(extra='forbid')
    results: List[AspectSentimentPair] = Field(
        description="A list of aspect-sentiment pairs identified in the sentence. Empty if no relevant aspects are found."
    )

# Get the JSON schema for use in the Batch API request body
RESPONSE_SCHEMA = SentenceAnalysis.model_json_schema()

In [30]:
# Define the System Prompt as a constant for use in the Batch API request body
SYSTEM_PROMPT = f"""
You are an expert in Aspect-Based Sentiment Analysis (ABSA) for hotel reviews.
Your task is to extract Aspect Categories and their Sentiments from the review text.

Allowed Aspect Categories: {", ".join(VALID_ASPECTS)}
Allowed Sentiments: {", ".join(VALID_SENTIMENTS)}

Rules:
- If a sentence mentions multiple aspects, extract all of them.
- If a sentence has no relevant aspects from the list, return an empty list.
- Be precise. Implicit aspects are CRITICAL. You must infer the aspect from context.

Output Format: JSON object with a "results" key containing a list of aspect-sentiment pairs.

--- EXAMPLES ---

Example 1 (Simple):
Input: "The breakfast was delicious and fresh."
Output: {{ "results": [ {{ "aspect": "FOOD", "sentiment": "positive" }} ] }}

Example 2 (Mixed Sentiment & Multiple Aspects):
Input: "Great location near the metro, but the bathroom was dirty and the staff was unhelpful."
Output: {{ "results": [ 
    {{ "aspect": "LOCATION", "sentiment": "positive" }},
    {{ "aspect": "CLEANLINESS", "sentiment": "negative" }},
    {{ "aspect": "SERVICE", "sentiment": "negative" }}
] }}

Example 3 (Implicit Aspects):
Input: "The walls are paper thin, I could hear the neighbors talking all night. Also, it costs too much for what you get."
Output: {{ "results": [ 
    {{ "aspect": "COMFORT", "sentiment": "negative" }},
    {{ "aspect": "VALUE_FOR_MONEY", "sentiment": "negative" }}
] }}

Example 4 (No relevant info):
Input: "I traveled with my family in July."
Output: {{ "results": [] }}

--- END EXAMPLES ---
"""

In [31]:
def process_full_dataset_batch(df: pd.DataFrame, sample_size: int = None) -> pd.DataFrame:
    """
    Processes a DataFrame of reviews using the OpenAI Batch API.

    Handles file creation, upload, batch submission, status checking,
    result retrieval, and final parsing.
    """
    
    # 1. Filter and Sample Data
    df = df[df['lang'] == 'en'].copy()
    if sample_size:
        df = df.sample(n=sample_size)
    
    print(f"Preparing {len(df)} reviews for batch processing...")
    
    input_records = []
    
    # 2. Prepare JSONL Input File Content
    # The batch API requires a JSONL file where each line is an API request body.
    for index, row in df.iterrows():
        custom_id = f"review-{row['id']}-{index}"
        
        # Structure the request body for /v1/chat/completions
        request_body = {
            "model": "gpt-4o-mini",
            "messages": [
                {"role": "system", "content": SYSTEM_PROMPT},
                {"role": "user", "content": f"Analyze the review: {row['review_text']}"}
            ],
            "response_format": {
                "type": "json_schema",
                "json_schema": {
                    "name": "review_analysis",
                    "strict": True,
                    "schema": RESPONSE_SCHEMA
                }
            }
        }

        # Structure the top-level batch request object
        input_records.append({
            "custom_id": custom_id,
            "method": "POST",
            "url": "/v1/chat/completions",
            "body": request_body
        })

    # 3. Create JSONL file
    input_file_content = "\n".join([json.dumps(record) for record in input_records])
    input_filename = "batch_input.jsonl"
    with open(input_filename, "w") as f:
        f.write(input_file_content)

    # 4. Upload Input File
    print("Uploading input file...")
    input_file = client.files.create(file=open(input_filename, "rb"), purpose="batch")
    
    # 5. Create and Submit Batch Job
    print("Submitting batch job...")
    batch_job = client.batches.create(
        input_file_id=input_file.id,
        endpoint="/v1/chat/completions",
        completion_window="24h"
    )
    print(f"Batch Job Started! ID: {batch_job.id}")
    
    # 6. Check Job Status
    print("Waiting for batch job to complete...")
    while batch_job.status not in ["completed", "failed", "cancelled"]:
        time.sleep(10)
        batch_job = client.batches.retrieve(batch_job.id)
        print(f"Current status: {batch_job.status} | Processed: {batch_job.request_counts.completed} | Failed: {batch_job.request_counts.failed}")

    if batch_job.status != "completed":
        print(f"Batch job ended with status: {batch_job.status}")
        return pd.DataFrame()
        
    # 7. Download and Parse Results
    # Check if we have an output file (successes/failures handled gracefully) or error file
    result_file_id = batch_job.output_file_id
    if not result_file_id:
        print("No output file generated. Checking for error file...")
        result_file_id = batch_job.error_file_id
        
    if not result_file_id:
        print("CRITICAL: No output or error file found. The batch may have failed completely.")
        return pd.DataFrame()

    print(f"Downloading results from file ID: {result_file_id}...")
    output_content = client.files.content(result_file_id).content.decode('utf-8')
    
    labeled_data = []
    
    # Parse the JSONL output file
    for line in output_content.splitlines():
        if not line:
            continue
            
        result = json.loads(line)
        custom_id = result.get('custom_id')
        
        # Check if the individual request failed (non-200 status)
        response_data = result.get('response', {})
        if response_data.get('status_code') != 200:
            print(f"Request failed for {custom_id}: {response_data}")
            continue

        # Match back to original dataframe
        original_review_row = df.loc[df.index[df.apply(lambda r: f"review-{r['id']}-{r.name}" == custom_id, axis=1)]]
        if original_review_row.empty: continue

        r_id = original_review_row['id'].iloc[0]
        text = original_review_row['review_text'].iloc[0]

        try:
            # Extract content
            analysis_results = response_data['body']['choices'][0]['message']['content']
            parsed_analysis = json.loads(analysis_results)
            
            aspect_sentiment_pairs = parsed_analysis.get('results', [])
            
            if aspect_sentiment_pairs:
                for item in aspect_sentiment_pairs:
                    labeled_data.append({
                        "id": r_id,
                        "review_text": text, 
                        "aspect": item['aspect'],
                        "sentiment": item['sentiment']
                    })
            else:
                labeled_data.append({"id": r_id, "review_text": text, "aspect": None, "sentiment": None})
        
        except Exception as e:
            print(f"Error parsing result for {custom_id}: {e}")
              
    return pd.DataFrame(labeled_data)

In [32]:

def process_dataset_in_safe_chunks(df: pd.DataFrame, chunk_size: int = 2500):
    """
    Splits the dataset into smaller chunks to stay under the 2M token limit.
    Processes each chunk sequentially (Submit -> Wait -> Download -> Next).
    """
    
    # Filter for English
    df = df[df['lang'] == 'en'].copy()
    
    # --- CORRECTED SPLITTING LOGIC ---
    # Use list slicing to create a list of DataFrames. 
    # This guarantees chunk_df is a pandas DataFrame.
    chunks = [df[i:i + chunk_size] for i in range(0, len(df), chunk_size)]
    num_chunks = len(chunks)
    
    print(f"Dataset too large for Tier 1 limit. Split into {num_chunks} chunks of max {chunk_size} reviews.")
    
    all_labeled_data = []
    
    for i, chunk_df in enumerate(chunks):
        print(f"\n--- Processing Chunk {i+1}/{num_chunks} ({len(chunk_df)} reviews) ---")
        
        # 1. Prepare JSONL
        input_records = []
        # Now chunk_df is definitely a DataFrame, so .iterrows() works safely
        for index, row in chunk_df.iterrows():
            custom_id = f"review-{row['id']}-{index}"
            request_body = {
                "model": "gpt-4o-mini",
                "messages": [
                    {"role": "system", "content": SYSTEM_PROMPT},
                    {"role": "user", "content": f"Analyze the review: {row['review_text']}"}
                ],
                "response_format": {
                    "type": "json_schema", 
                    "json_schema": {
                        "name": "review_analysis",
                        "strict": True,
                        "schema": RESPONSE_SCHEMA
                    }
                }
            }
            input_records.append({
                "custom_id": custom_id,
                "method": "POST",
                "url": "/v1/chat/completions",
                "body": request_body
            })

        input_filename = f"batch_input_chunk_{i+1}.jsonl"
        with open(input_filename, "w") as f:
            f.write("\n".join([json.dumps(r) for r in input_records]))

        # 2. Upload & Submit
        print(f"Uploading chunk {i+1}...")
        input_file = client.files.create(file=open(input_filename, "rb"), purpose="batch")
        
        print(f"Submitting chunk {i+1}...")
        batch_job = client.batches.create(
            input_file_id=input_file.id,
            endpoint="/v1/chat/completions",
            completion_window="24h"
        )
        print(f"Chunk {i+1} Started! Job ID: {batch_job.id}")
        
        # 3. Wait Loop (BLOCKING)
        # We MUST wait for this to finish before submitting the next one
        while batch_job.status not in ["completed", "failed", "cancelled"]:
            time.sleep(30) # Check every 30s
            batch_job = client.batches.retrieve(batch_job.id)
            print(f"Chunk {i+1} Status: {batch_job.status}...")

        if batch_job.status != "completed":
            print(f"Chunk {i+1} Failed! Skipping...")
            continue

        # 4. Download Results
        print(f"Downloading results for Chunk {i+1}...")
        result_file_id = batch_job.output_file_id
        if result_file_id:
            output_content = client.files.content(result_file_id).content.decode('utf-8')
            
            # Parse results
            for line in output_content.splitlines():
                if not line: continue
                result = json.loads(line)
                custom_id = result.get('custom_id')
                
                response_data = result.get('response', {})
                if response_data.get('status_code') == 200:
                    try:
                        analysis_results = response_data['body']['choices'][0]['message']['content']
                        parsed_analysis = json.loads(analysis_results)
                        
                        # Find original text. The custom_id is "review-{id}-{index}"
                        # We extract the 'id' part to look it up.
                        orig_id_str = custom_id.split('-')[1]
                        orig_id = int(orig_id_str)
                        
                        # We look up the text in the specific chunk_df to be safe and fast
                        text_match = chunk_df[chunk_df['id'] == orig_id]
                        if not text_match.empty:
                            text = text_match.iloc[0]['review_text']
                            
                            for item in parsed_analysis.get('results', []):
                                all_labeled_data.append({
                                    "id": orig_id,
                                    "review_text": text,
                                    "aspect": item['aspect'],
                                    "sentiment": item['sentiment']
                                })
                    except Exception as e:
                        print(f"Error parsing item in chunk {i+1}: {e}")

        # Cleanup for this chunk
        try:
            client.files.delete(input_file.id)
            if batch_job.output_file_id: client.files.delete(batch_job.output_file_id)
            os.remove(input_filename)
        except Exception as e:
            print(f"Cleanup warning: {e}")
            
    return pd.DataFrame(all_labeled_data)

In [33]:
parent_dir = os.path.dirname(os.getcwd())
data_path = os.path.join(parent_dir, 'data','cleaned','cleaned_dataset_tripadvisor-reviews_2025-11-01_14-21-09-431.json')

df = pd.read_json(data_path, orient='records')

In [37]:
df[df['lang'] == 'en'].shape

(11989, 8)

In [None]:
labeled_df_sample = process_full_dataset_batch(df, sample_size=5)
labeled_df_sample.head()

Preparing 5 reviews for batch processing...
Uploading input file...
Submitting batch job...
Batch Job Started! ID: batch_693da8821f7c819092c617e8523d859a
Waiting for batch job to complete...
Current status: in_progress | Processed: 0 | Failed: 0
Current status: in_progress | Processed: 2 | Failed: 0
Current status: in_progress | Processed: 2 | Failed: 0
Current status: in_progress | Processed: 2 | Failed: 0
Current status: in_progress | Processed: 2 | Failed: 0
Current status: in_progress | Processed: 2 | Failed: 0
Current status: in_progress | Processed: 2 | Failed: 0
Current status: in_progress | Processed: 2 | Failed: 0
Current status: in_progress | Processed: 2 | Failed: 0
Current status: in_progress | Processed: 2 | Failed: 0
Current status: in_progress | Processed: 2 | Failed: 0
Current status: in_progress | Processed: 2 | Failed: 0
Current status: in_progress | Processed: 4 | Failed: 0
Current status: finalizing | Processed: 5 | Failed: 0
Current status: completed | Processed: 5

Unnamed: 0,id,review_text,aspect,sentiment
0,609653550,Terrible smell. I am writing this review from ...,CLEANLINESS,negative
1,609653550,Terrible smell. I am writing this review from ...,COMFORT,negative
2,609653550,Terrible smell. I am writing this review from ...,VALUE_FOR_MONEY,negative
3,1015557492,Relaxing natural environment & superb staff. W...,LOCATION,positive
4,1015557492,Relaxing natural environment & superb staff. W...,SERVICE,positive


In [34]:

final_df = process_dataset_in_safe_chunks(df, chunk_size=2500)
final_df.to_json("labeled_hotel_reviews_full.json", orient='records', indent=2)
print("Done! Saved to labeled_hotel_reviews_full.json")

Dataset too large for Tier 1 limit. Split into 5 chunks of max 2500 reviews.

--- Processing Chunk 1/5 (2500 reviews) ---
Uploading chunk 1...
Submitting chunk 1...
Chunk 1 Started! Job ID: batch_693dafebfa8c8190806a60181126af56
Chunk 1 Status: in_progress...
Chunk 1 Status: in_progress...
Chunk 1 Status: in_progress...
Chunk 1 Status: in_progress...
Chunk 1 Status: in_progress...
Chunk 1 Status: in_progress...
Chunk 1 Status: in_progress...
Chunk 1 Status: in_progress...
Chunk 1 Status: in_progress...
Chunk 1 Status: in_progress...
Chunk 1 Status: in_progress...
Chunk 1 Status: in_progress...
Chunk 1 Status: in_progress...
Chunk 1 Status: in_progress...
Chunk 1 Status: in_progress...
Chunk 1 Status: in_progress...
Chunk 1 Status: in_progress...
Chunk 1 Status: in_progress...
Chunk 1 Status: in_progress...
Chunk 1 Status: in_progress...
Chunk 1 Status: in_progress...
Chunk 1 Status: in_progress...
Chunk 1 Status: in_progress...
Chunk 1 Status: in_progress...
Chunk 1 Status: in_progress

In [35]:
final_df['id'].nunique()

11961