# 3-Class Transformer Inference & Evaluation on USS Reviews

**High-level summary:**  
Loads a saved Transformer checkpoint, preprocesses USS review text with an enhanced tokenizer, runs batched inference to produce 3-class sentiment predictions and probability scores, performs detailed evaluation (accuracy, F1, confusion matrix, per-class stats), saves results and examples to CSV, and provides a single-review prediction utility.

In [None]:
# prompt: connect google drive

from google.colab import drive
drive.mount('/content/drive')

# prompt: load current directory

import os

os.chdir('/content/drive/My Drive/CS605-NLP-Project')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# Install
#!pip install --upgrade numpy gensim --no-cache-dir


In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
import torch
import torch.nn as nn
import pandas as pd
import numpy as np
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, f1_score
import re
import math
import time
from collections import Counter

# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

class ImprovedYelpDataset:
    """Dataset class for inference (no torch Dataset needed)"""
    def __init__(self, max_length=384):
        self.max_length = max_length

    def enhanced_tokenize(self, text):
        """Enhanced tokenization with better preprocessing"""
        if pd.isna(text) or text == "":
            return ["<UNK>"]

        # Convert to string and lowercase
        text = str(text).lower()

        # Handle contractions and common patterns
        text = re.sub(r"won't", "will not", text)
        text = re.sub(r"can't", "cannot", text)
        text = re.sub(r"n't", " not", text)
        text = re.sub(r"'re", " are", text)
        text = re.sub(r"'ve", " have", text)
        text = re.sub(r"'ll", " will", text)
        text = re.sub(r"'d", " would", text)
        text = re.sub(r"'m", " am", text)

        # Handle punctuation - keep some sentiment-relevant ones
        text = re.sub(r'[!]{2,}', ' very_excited ', text)  # Multiple exclamations
        text = re.sub(r'[?]{2,}', ' very_confused ', text)  # Multiple questions
        text = re.sub(r'[.]{3,}', ' continuation ', text)   # Ellipsis

        # Remove remaining punctuation except basic ones
        text = re.sub(r'[^a-zA-Z0-9\s!?.]', ' ', text)

        # Handle repeated characters (e.g., "soooo good" -> "so good")
        text = re.sub(r'(.)\1{2,}', r'\1\1', text)

        # Clean whitespace
        text = re.sub(r'\s+', ' ', text).strip()

        return text.split()

    def preprocess_text(self, text, vocab):
        """Preprocess single text for inference"""
        tokens = self.enhanced_tokenize(text)
        token_ids = [vocab.get(token, vocab.get('<UNK>', 1)) for token in tokens]

        # Pad or truncate
        if len(token_ids) > self.max_length:
            token_ids = token_ids[:self.max_length]
        else:
            token_ids += [vocab.get('<PAD>', 0)] * (self.max_length - len(token_ids))

        return torch.tensor(token_ids, dtype=torch.long)

# Model Architecture (same as training)
class ImprovedMultiHeadAttention(nn.Module):
    def __init__(self, d_model, n_heads, dropout=0.1):
        super().__init__()
        self.d_model = d_model
        self.n_heads = n_heads
        self.d_k = d_model // n_heads

        self.w_q = nn.Linear(d_model, d_model)
        self.w_k = nn.Linear(d_model, d_model)
        self.w_v = nn.Linear(d_model, d_model)
        self.w_o = nn.Linear(d_model, d_model)

        self.dropout = nn.Dropout(dropout)
        self.layer_norm = nn.LayerNorm(d_model)

    def forward(self, x, mask=None):
        batch_size, seq_len, d_model = x.size()
        residual = x
        x = self.layer_norm(x)

        Q = self.w_q(x).view(batch_size, seq_len, self.n_heads, self.d_k).transpose(1, 2)
        K = self.w_k(x).view(batch_size, seq_len, self.n_heads, self.d_k).transpose(1, 2)
        V = self.w_v(x).view(batch_size, seq_len, self.n_heads, self.d_k).transpose(1, 2)

        attention_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)

        if mask is not None:
            attention_scores = attention_scores.masked_fill(mask == 0, -1e9)

        attention_weights = torch.softmax(attention_scores, dim=-1)
        attention_weights = self.dropout(attention_weights)

        context = torch.matmul(attention_weights, V)
        context = context.transpose(1, 2).contiguous().view(batch_size, seq_len, d_model)
        output = self.w_o(context)

        return residual + self.dropout(output)

class ImprovedTransformerBlock(nn.Module):
    def __init__(self, d_model, n_heads, d_ff, dropout=0.1):
        super().__init__()
        self.attention = ImprovedMultiHeadAttention(d_model, n_heads, dropout)
        self.feed_forward = nn.Sequential(
            nn.LayerNorm(d_model),
            nn.Linear(d_model, d_ff),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(d_ff, d_model)
        )
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        x = self.attention(x, mask)
        residual = x
        ff_output = self.feed_forward(x)
        x = residual + self.dropout(ff_output)
        return x

class ImprovedTransformer(nn.Module):
    def __init__(self, vocab_size, d_model=128, n_heads=8, n_layers=4, d_ff=512, max_length=384,
                 num_classes=3, dropout=0.15):
        super().__init__()
        self.d_model = d_model
        self.max_length = max_length

        self.token_embedding = nn.Embedding(vocab_size, d_model)
        self.position_embedding = nn.Embedding(max_length, d_model)
        self.embedding_dropout = nn.Dropout(dropout)
        self.embedding_norm = nn.LayerNorm(d_model)

        self.transformer_blocks = nn.ModuleList([
            ImprovedTransformerBlock(d_model, n_heads, d_ff, dropout)
            for _ in range(n_layers)
        ])

        self.final_norm = nn.LayerNorm(d_model)
        self.classifier = nn.Sequential(
            nn.Linear(d_model * 2, d_model),  # *2 for concatenated pooling
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(d_model, d_model // 2),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(d_model // 2, num_classes)
        )

        self.init_weights()

    def init_weights(self):
        for module in self.modules():
            if isinstance(module, nn.Linear):
                torch.nn.init.xavier_uniform_(module.weight)
                if module.bias is not None:
                    torch.nn.init.zeros_(module.bias)
            elif isinstance(module, nn.Embedding):
                torch.nn.init.normal_(module.weight, mean=0, std=0.02)

    def forward(self, x):
        batch_size, seq_len = x.size()
        positions = torch.arange(0, seq_len, device=x.device).unsqueeze(0).expand(batch_size, seq_len)

        token_emb = self.token_embedding(x) * math.sqrt(self.d_model)
        pos_emb = self.position_embedding(positions)
        embeddings = token_emb + pos_emb
        embeddings = self.embedding_norm(embeddings)
        embeddings = self.embedding_dropout(embeddings)

        pad_mask = (x != 0).unsqueeze(1).unsqueeze(1)

        x = embeddings
        for transformer in self.transformer_blocks:
            x = transformer(x, pad_mask)

        x = self.final_norm(x)

        # Dual pooling
        mask = (x.sum(dim=-1) != 0).float().unsqueeze(-1)
        x_mean = (x * mask).sum(dim=1) / (mask.sum(dim=1) + 1e-9)
        x_max, _ = (x + (1 - mask) * (-1e9)).max(dim=1)
        x_pooled = torch.cat([x_mean, x_max], dim=-1)

        logits = self.classifier(x_pooled)
        return logits

def load_trained_model(model_path='model/3class_transformer_v2.pth'):
    """Load the trained model and vocabulary"""
    print(f"Loading model from {model_path}...")

    # Load checkpoint
    checkpoint = torch.load(model_path, map_location=device)

    # Extract model config and vocab
    model_config = checkpoint['model_config']
    vocab = checkpoint['vocab']

    # Initialize model with same architecture
    model = ImprovedTransformer(**model_config)

    # Load state dict
    model.load_state_dict(checkpoint['model_state_dict'])
    model.to(device)
    model.eval()

    print(f"✓ Model loaded successfully!")
    print(f"✓ Vocabulary size: {len(vocab)}")
    print(f"✓ Model parameters: {sum(p.numel() for p in model.parameters()):,}")

    return model, vocab

def convert_stars_to_3class(stars):
    """Convert star ratings to 3-class labels"""
    if stars <= 2:
        return 0  # Negative
    elif stars == 3:
        return 1  # Neutral
    else:  # stars >= 3
        return 2  # Positive

def predict_batch(model, texts, vocab, batch_size=64, max_length=384):
    """Predict sentiment for a batch of texts"""
    model.eval()
    dataset_processor = ImprovedYelpDataset(max_length=max_length)

    all_predictions = []
    all_probabilities = []

    # Process in batches
    for i in range(0, len(texts), batch_size):
        batch_texts = texts[i:i+batch_size]

        # Preprocess batch
        batch_tensors = []
        for text in batch_texts:
            tensor = dataset_processor.preprocess_text(text, vocab)
            batch_tensors.append(tensor)

        # Stack into batch
        batch_input = torch.stack(batch_tensors).to(device)

        # Predict
        with torch.no_grad():
            outputs = model(batch_input)
            probabilities = torch.softmax(outputs, dim=1)
            predictions = outputs.argmax(dim=1)

            all_predictions.extend(predictions.cpu().numpy())
            all_probabilities.extend(probabilities.cpu().numpy())

    return np.array(all_predictions), np.array(all_probabilities)

def evaluate_on_uss_reviews(model_path='model/3class_transformer_v2.pth'):
    """Main inference function for USS Reviews dataset"""

    print("="*60)
    print("🎬 USS REVIEWS SENTIMENT ANALYSIS")
    print("="*60)

    # Load the dataset
    print("Loading USS Reviews dataset...")
    try:
        uss_reviews = pd.read_csv("datastore/USS_Reviews_Silver.csv", parse_dates=["publishedAtDate"])
        print(f"✓ Loaded {len(uss_reviews)} reviews")
    except FileNotFoundError:
        print("❌ Error: Could not find 'datastore/USS_Reviews_Silver.csv'")
        print("Please check the file path and try again.")
        return

    # Check required columns
    if 'review' not in uss_reviews.columns or 'stars' not in uss_reviews.columns:
        print("❌ Error: Required columns 'review' and 'stars' not found")
        print(f"Available columns: {list(uss_reviews.columns)}")
        return

    # Data cleaning
    print("Preprocessing data...")

    # Remove rows with missing reviews or stars
    initial_count = len(uss_reviews)
    uss_reviews = uss_reviews.dropna(subset=['review', 'stars'])
    uss_reviews = uss_reviews[uss_reviews['review'].str.strip() != '']

    print(f"✓ Cleaned data: {len(uss_reviews)} reviews ({initial_count - len(uss_reviews)} removed)")

    # Convert stars to 3-class labels
    uss_reviews['true_label'] = uss_reviews['stars'].apply(convert_stars_to_3class)

    # Print distribution
    print("\nUSS Reviews Star Distribution:")
    star_dist = uss_reviews['stars'].value_counts().sort_index()
    print(star_dist)

    print("\nUSS Reviews 3-Class Distribution:")
    class_dist = uss_reviews['true_label'].value_counts().sort_index()
    class_names = ['Negative (≤1★)', 'Neutral (2★)', 'Positive (≥3★)']
    for i, count in enumerate(class_dist):
        print(f"  {class_names[i]}: {count} ({count/len(uss_reviews)*100:.1f}%)")

    # Load trained model
    try:
        model, vocab = load_trained_model(model_path)
    except FileNotFoundError:
        print(f"❌ Error: Could not find model file '{model_path}'")
        print("Please make sure the model has been trained and saved.")
        return

    # Make predictions
    print(f"\nMaking predictions on {len(uss_reviews)} reviews...")
    start_time = time.time()

    texts = uss_reviews['review'].tolist()
    true_labels = uss_reviews['true_label'].tolist()

    predictions, probabilities = predict_batch(model, texts, vocab)

    inference_time = time.time() - start_time
    print(f"✓ Inference completed in {inference_time:.2f} seconds")
    print(f"✓ Average time per review: {inference_time/len(texts)*1000:.2f}ms")

    # Calculate metrics
    accuracy = accuracy_score(true_labels, predictions)
    macro_f1 = f1_score(true_labels, predictions, average='macro')
    weighted_f1 = f1_score(true_labels, predictions, average='weighted')

    # Print results
    print("\n" + "="*60)
    print("🎯 EVALUATION RESULTS")
    print("="*60)
    print(f"Overall Accuracy: {accuracy:.4f} ({accuracy*100:.2f}%)")
    print(f"Macro F1-Score: {macro_f1:.4f}")
    print(f"Weighted F1-Score: {weighted_f1:.4f}")

    # Detailed classification report
    print(f"\n📊 DETAILED CLASSIFICATION REPORT:")
    print("-" * 50)
    target_names = ['Negative', 'Neutral', 'Positive']
    report = classification_report(true_labels, predictions, target_names=target_names, digits=4)
    print(report)

    # Confusion matrix
    print(f"\n🔄 CONFUSION MATRIX:")
    print("-" * 30)
    cm = confusion_matrix(true_labels, predictions)
    print("         Predicted")
    print("       Neg  Neu  Pos")
    print("Actual")
    for i, row in enumerate(cm):
        class_name = ['Neg', 'Neu', 'Pos'][i]
        print(f"  {class_name}  {row[0]:4d} {row[1]:4d} {row[2]:4d}")

    # Per-class accuracy
    print(f"\n📈 PER-CLASS PERFORMANCE:")
    print("-" * 40)
    for i, class_name in enumerate(target_names):
        class_mask = np.array(true_labels) == i
        if class_mask.sum() > 0:
            class_acc = (np.array(predictions)[class_mask] == i).mean()
            class_count = class_mask.sum()
            print(f"{class_name:>8}: {class_acc:.4f} accuracy ({class_count:,} samples)")

    # Save detailed results
    print(f"\n💾 SAVING DETAILED RESULTS...")

    # Add predictions to dataframe
    uss_reviews['predicted_label'] = predictions
    uss_reviews['predicted_class'] = [target_names[pred] for pred in predictions]
    uss_reviews['true_class'] = [target_names[true] for true in true_labels]
    uss_reviews['confidence'] = probabilities.max(axis=1)
    uss_reviews['prob_negative'] = probabilities[:, 0]
    uss_reviews['prob_neutral'] = probabilities[:, 1]
    uss_reviews['prob_positive'] = probabilities[:, 2]
    uss_reviews['correct_prediction'] = uss_reviews['true_label'] == uss_reviews['predicted_label']

    # Save results
    output_file = 'uss_reviews_predictions.csv'
    uss_reviews.to_csv(output_file, index=False)
    print(f"✓ Results saved to '{output_file}'")

    # Show some example predictions
    print(f"\n🔍 EXAMPLE PREDICTIONS:")
    print("-" * 50)

    # Show some correct and incorrect predictions
    correct_preds = uss_reviews[uss_reviews['correct_prediction'] == True].sample(n=min(3, len(uss_reviews)), random_state=42)
    incorrect_preds = uss_reviews[uss_reviews['correct_prediction'] == False].sample(n=min(2, (uss_reviews['correct_prediction'] == False).sum()), random_state=42)

    print("✅ CORRECT PREDICTIONS:")
    for idx, row in correct_preds.iterrows():
        print(f"Review: {row['review'][:100]}...")
        print(f"Stars: {row['stars']} → True: {row['true_class']} | Predicted: {row['predicted_class']} | Confidence: {row['confidence']:.3f}")
        print()

    if len(incorrect_preds) > 0:
        print("❌ INCORRECT PREDICTIONS:")
        for idx, row in incorrect_preds.iterrows():
            print(f"Review: {row['review'][:100]}...")
            print(f"Stars: {row['stars']} → True: {row['true_class']} | Predicted: {row['predicted_class']} | Confidence: {row['confidence']:.3f}")
            print()

    print("="*60)
    print("🎉 INFERENCE COMPLETE!")
    print("="*60)

    return uss_reviews, accuracy, macro_f1

# Custom prediction function for new reviews
def predict_new_review(review_text, model_path='model/3class_transformer_v2.pth'):
    """Predict sentiment for a single new review"""
    model, vocab = load_trained_model(model_path)

    predictions, probabilities = predict_batch(model, [review_text], vocab, batch_size=1)

    class_names = ['Negative', 'Neutral', 'Positive']
    predicted_class = predictions[0]
    confidence = probabilities[0][predicted_class]

    print(f"Review: {review_text}")
    print(f"Predicted: {class_names[predicted_class]} (confidence: {confidence:.3f})")
    print(f"Probabilities - Neg: {probabilities[0][0]:.3f}, Neu: {probabilities[0][1]:.3f}, Pos: {probabilities[0][2]:.3f}")

    return class_names[predicted_class], confidence, probabilities[0]

if __name__ == "__main__":
    # Run inference on USS Reviews
    results_df, accuracy, macro_f1 = evaluate_on_uss_reviews()

    # Example of predicting a new review
    print(f"\n" + "="*60)
    print("🔮 TESTING ON NEW REVIEW:")
    print("="*60)

    sample_review = "The rides were amazing and the staff was super friendly! Had a great time with family."
    predict_new_review(sample_review)

Using device: cuda
🎬 USS REVIEWS SENTIMENT ANALYSIS
Loading USS Reviews dataset...
✓ Loaded 29412 reviews
Preprocessing data...
✓ Cleaned data: 29412 reviews (0 removed)

USS Reviews Star Distribution:
stars
1     1370
2      836
3     2133
4     5558
5    19515
Name: count, dtype: int64

USS Reviews 3-Class Distribution:
  Negative (≤1★): 2206 (7.5%)
  Neutral (2★): 2133 (7.3%)
  Positive (≥3★): 25073 (85.2%)
Loading model from model/3class_transformer_v2.pth...
✓ Model loaded successfully!
✓ Vocabulary size: 12000
✓ Model parameters: 2,420,099

Making predictions on 29412 reviews...
✓ Inference completed in 37.11 seconds
✓ Average time per review: 1.26ms

🎯 EVALUATION RESULTS
Overall Accuracy: 0.6868 (68.68%)
Macro F1-Score: 0.4998
Weighted F1-Score: 0.7451

📊 DETAILED CLASSIFICATION REPORT:
--------------------------------------------------
              precision    recall  f1-score   support

    Negative     0.3380    0.8096    0.4769      2206
     Neutral     0.1405    0.4004  