# Sentiment Analysis with Hugging Face Transformers

This notebook demonstrates how to fine-tune several pre-trained transformer models for sentiment analysis on a custom dataset. It covers data loading, preprocessing, model training with class weights, and evaluation of the models.

## Imports

In [None]:
# Core Libraries
import json
import pickle
import re
import string
import warnings
from typing import Dict, List, Tuple

import numpy as np
import pandas as pd
import torch
import nltk

# NLTK Components
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize

# Scikit-learn Tools
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, classification_report
from sklearn.utils.class_weight import compute_class_weight

# Hugging Face Transformers
from datasets import Dataset
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    TrainingArguments,
    Trainer,
)

# Suppress all warnings for cleaner output
warnings.filterwarnings("ignore")

# Download NLTK data for text processing
nltk.download("punkt", quiet=True)
nltk.download("stopwords", quiet=True)

## Data Preparation and Preprocessing

The `DataProcessor` class handles loading the raw data, cleaning the text, and creating a balanced dataset for training.

In [None]:
class DataProcessor:
    """
    Handles data loading, cleaning, and preparation for model training.
    """

    def __init__(self, filepath: str):
        """
        Initializes the DataProcessor with a dataset file path.

        Args:
            filepath (str): Path to the CSV file containing the dataset.
        """
        self.filepath = filepath
        self.stop_words = set(stopwords.words("english"))
        self.df = self._load_data()

    def _load_data(self) -> pd.DataFrame:
        """
        Loads the CSV file into a DataFrame.
        """
        df = pd.read_csv(self.filepath)
        return df

    def _clean_text(self, text: str) -> str:
        """
        Cleans and tokenizes text by removing punctuation, numbers, and stop words.

        Args:
            text (str): The input text to be cleaned.

        Returns:
            str: The cleaned and tokenized text.
        """
        if not isinstance(text, str) or pd.isna(text):
            return ""

        text = str(text).lower()
        text = text.translate(str.maketrans("", "", string.punctuation))
        text = re.sub(r"\d+", "", text)
        text = re.sub(r"\s+", " ", text)

        words = word_tokenize(text)
        filtered_words = [
            word for word in words if word not in self.stop_words and len(word) > 1
        ]
        return " ".join(filtered_words)

    def prepare_data(self) -> pd.DataFrame:
        """
        Combines and cleans text, maps sentiment labels, and balances the dataset.

        Returns:
            pd.DataFrame: A balanced DataFrame ready for splitting and tokenization.
        """
        self.df["full_review"] = self.df.apply(
            lambda row: f"{str(row.get('title', ''))} {str(row.get('text', ''))}".strip(),
            axis=1,
        )
        self.df["cleaned_review"] = self.df["full_review"].apply(self._clean_text)
        
        self.df = self.df[
            (self.df["cleaned_review"].str.len() > 10) & (self.df["star_sentiment"].notna())
        ]
        
        sentiment_map = {"Negative": 0, "Neutral": 1, "Positive": 2}
        self.df["label"] = self.df["star_sentiment"].map(sentiment_map)
        
        samples_per_class = {"Negative": 2400, "Neutral": 2800, "Positive": 3000}
        balanced_dfs = []

        for sentiment, class_id in sentiment_map.items():
            class_df = self.df[self.df["label"] == class_id]
            n_samples = min(samples_per_class[sentiment], len(class_df))
            balanced_dfs.append(class_df.sample(n=n_samples, random_state=42))

        balanced_df = pd.concat(balanced_dfs, ignore_index=True)
        return balanced_df

## Model Training and Evaluation

The `ModelTrainer` class orchestrates the fine-tuning process. It handles dataset tokenization, creates a custom `Trainer` to use class weights for a imbalanced dataset, and evaluates the model's performance.

In [None]:
class ModelTrainer:
    """
    Manages the training and evaluation of Hugging Face transformer models.
    """

    def __init__(self, train_df: pd.DataFrame, eval_df: pd.DataFrame):
        """
        Initializes the trainer with training and evaluation data.

        Args:
            train_df (pd.DataFrame): DataFrame for training.
            eval_df (pd.DataFrame): DataFrame for evaluation.
        """
        self.train_df = train_df
        self.eval_df = eval_df
        self.results = {}
        
        train_labels = train_df["label"].tolist()
        class_weights = compute_class_weight(
            class_weight="balanced",
            classes=np.unique(train_labels),
            y=train_labels
        )
        self.class_weights = torch.tensor(class_weights, dtype=torch.float)

    def _prepare_datasets(self, tokenizer) -> Tuple[Dataset, Dataset]:
        """
        Tokenizes and formats the data for the Hugging Face Trainer.
        """
        def tokenize_function(examples):
            return tokenizer(
                examples["cleaned_review"],
                padding="max_length",
                truncation=True,
                max_length=256
            )

        train_dataset = Dataset.from_pandas(self.train_df[["cleaned_review", "label"]])
        eval_dataset = Dataset.from_pandas(self.eval_df[["cleaned_review", "label"]])

        train_dataset = train_dataset.map(tokenize_function, batched=True)
        eval_dataset = eval_dataset.map(tokenize_function, batched=True)

        train_dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "label"])
        eval_dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "label"])

        return train_dataset, eval_dataset

    def _create_trainer(
        self,
        model,
        tokenizer,
        train_dataset,
        eval_dataset,
        training_args: TrainingArguments,
    ):
        """
        Creates a custom Trainer class that uses a weighted cross-entropy loss function.
        """
        class WeightedTrainer(Trainer):
            def __init__(self, class_weights=None, **kwargs):
                super().__init__(**kwargs)
                self.class_weights = class_weights.to(self.args.device) if class_weights is not None else None

            def compute_loss(self, model, inputs, return_outputs=False):
                labels = inputs.pop("labels")
                outputs = model(**inputs)
                logits = outputs.logits

                loss_fct = torch.nn.CrossEntropyLoss(weight=self.class_weights)
                loss = loss_fct(logits.view(-1, self.model.config.num_labels), labels.view(-1))
                return (loss, outputs) if return_outputs else loss

        return WeightedTrainer(
            model=model,
            args=training_args,
            train_dataset=train_dataset,
            eval_dataset=eval_dataset,
            tokenizer=tokenizer,
            class_weights=self.class_weights,
        )

    def train_and_evaluate_model(self, model_name: str, training_args: TrainingArguments) -> Dict:
        """
        Trains and evaluates a single model, returning its performance metrics.
        """
        try:
            tokenizer = AutoTokenizer.from_pretrained(model_name)
            model = AutoModelForSequenceClassification.from_pretrained(
                model_name,
                num_labels=3,
                id2label={0: "Negative", 1: "Neutral", 2: "Positive"},
                label2id={"Negative": 0, "Neutral": 1, "Positive": 2}
            )

            train_dataset, eval_dataset = self._prepare_datasets(tokenizer)

            trainer = self._create_trainer(
                model, tokenizer, train_dataset, eval_dataset, training_args
            )

            trainer.train()
            
            predictions = trainer.predict(eval_dataset)
            pred_labels = predictions.predictions.argmax(axis=1)
            true_labels = predictions.label_ids

            f1 = f1_score(true_labels, pred_labels, average="weighted")
            report = classification_report(
                true_labels,
                pred_labels,
                target_names=["Negative", "Neutral", "Positive"],
                output_dict=True
            )

            result = {
                "model_name": model_name,
                "f1_score": f1,
                "predictions": pred_labels.tolist(),
                "true_labels": true_labels.tolist(),
                "classification_report": report
            }
            self.results[model_name] = result

            del model, trainer, tokenizer
            torch.cuda.empty_cache()

            return result

        except Exception as e:
            return {"model_name": model_name, "error": str(e)}

    def run_all_models(self) -> Dict:
        """
        Trains and evaluates a predefined list of models and saves the results.
        """
        models = [
            "distilbert-base-uncased",
            "bert-base-uncased",
            "roberta-base",
            "cardiffnlp/twitter-roberta-base-sentiment"
        ]

        default_args = TrainingArguments(
            output_dir="./results",
            learning_rate=2e-5,
            per_device_train_batch_size=8,
            per_device_eval_batch_size=16,
            num_train_epochs=3,
            weight_decay=0.01,
            eval_strategy="epoch",
            save_strategy="no",
            logging_steps=500,
            fp16=torch.cuda.is_available(),
            report_to="none",
            disable_tqdm=False,
            seed=42,
            dataloader_pin_memory=True,
            gradient_checkpointing=True,
            lr_scheduler_type="linear",
            warmup_steps=50,
        )

        for model_name in models:
            self.train_and_evaluate_model(model_name, default_args)

        with open("model_results.json", "w") as f:
            serializable_results = {
                model: {
                    "model_name": result["model_name"],
                    "f1_score": float(result["f1_score"]),
                }
                for model, result in self.results.items() if "error" not in result
            }
            json.dump(serializable_results, f, indent=2)

        with open("detailed_results.pkl", "wb") as f:
            pickle.dump(self.results, f)

        return self.results

## Execution

This section initializes the data processing and model training pipelines.

In [None]:
# Load and prepare data
FILE_PATH = "Datafiniti_Amazon_Consumer_Reviews_of_Amazon_Products_cleaned.csv"
processor = DataProcessor(FILE_PATH)
processed_df = processor.prepare_data()

# Split data into training and evaluation sets
train_df, eval_df = train_test_split(
    processed_df,
    test_size=0.2,
    random_state=42,
    stratify=processed_df["label"],
)

# Initialize and run the model training pipeline
trainer = ModelTrainer(train_df, eval_df)
all_results = trainer.run_all_models()

## Results

A summary of the performance for each model is displayed here. Detailed results are saved in `model_results.json` and `detailed_results.pkl`.

In [None]:
# Display summary of all model results
print("\n" + "=" * 60)
print("SUMMARY OF ALL MODEL RESULTS")
print("=" * 60)
print(f"{'Model':<45} {'F1 Score':<10}")
print("-" * 60)

for model_name, result in all_results.items():
    if "error" not in result:
        print(f"{model_name:<45} {result['f1_score']:<10.4f}")
    else:
        print(f"{model_name:<45} {'ERROR':<10}")

In [None]:
# Visualization Class
class ModelVisualizer:
    """
    Generates various plots and analyses to compare the performance
    of the trained models.
    """

    def __init__(self, results_json_path: str = 'model_results.json', 
                 detailed_results_pkl_path: str = 'detailed_results.pkl'):
        """
        Initializes the visualizer by loading model results.

        Args:
            results_json_path (str): Path to the JSON file with summary results.
            detailed_results_pkl_path (str): Path to the pickle file with detailed results.
        """
        try:
            with open(results_json_path, 'r') as f:
                self.results = json.load(f)
            
            with open(detailed_results_pkl_path, 'rb') as f:
                self.detailed_results = pickle.load(f)

            self.model_names = [name for name in self.results.keys() if "error" not in self.results.get(name, {})]
            self.label_names = ["Negative", "Neutral", "Positive"]
        except FileNotFoundError as e:
            raise FileNotFoundError(
                f"Required results files not found. "
                f"Please ensure the training script ran successfully and generated '{e.filename}'."
            ) from e

    def _clean_model_name(self, name: str) -> str:
        """Helper to clean up model names for plot titles."""
        clean_name = name.replace('cardiffnlp/', '').replace('nlptown/', '')
        clean_name = clean_name.replace('-base-uncased', '').replace('-base', '')
        clean_name = clean_name.replace('twitter-roberta', 'Twitter RoBERTa')
        return clean_name.title()

    def create_confusion_matrices(self) -> plt.Figure:
        """
        Generates and displays normalized confusion matrices for all models.
        """
        n_models = len(self.model_names)
        if n_models == 0:
            return

        cols = 3
        rows = (n_models + cols - 1) // cols
        fig, axes = plt.subplots(rows, cols, figsize=(15, 5 * rows))
        axes = axes.flatten() if n_models > 1 else [axes]

        for idx, model in enumerate(self.model_names):
            if idx >= len(axes):
                break
            
            true_labels = self.detailed_results[model]['true_labels']
            pred_labels = self.detailed_results[model]['predictions']
            
            cm = confusion_matrix(true_labels, pred_labels)
            cm_normalized = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
            
            ax = axes[idx]
            sns.heatmap(cm_normalized, annot=True, fmt='.2f', cmap='Blues',
                        xticklabels=self.label_names, yticklabels=self.label_names,
                        ax=ax, cbar_kws={'shrink': 0.8})
            
            clean_name = self._clean_model_name(model)
            f1_score_val = self.detailed_results[model]['f1_score']
            ax.set_title(f'{clean_name}\nF1: {f1_score_val:.3f}', fontweight='bold')
            ax.set_ylabel('True Label')
            ax.set_xlabel('Predicted Label')

        for idx in range(n_models, len(axes)):
            axes[idx].set_visible(False)
            
        fig.suptitle('Normalized Confusion Matrices', fontsize=16, fontweight='bold')
        plt.tight_layout()
        plt.show()
        return fig

    def create_class_performance_comparison(self) -> plt.Figure:
        """
        Creates heatmaps comparing precision, recall, and F1-score across models.
        """
        metrics_data = []
        for model in self.model_names:
            report = self.detailed_results[model]['classification_report']
            for class_name in self.label_names:
                if class_name in report:
                    metrics_data.append({
                        'Model': self._clean_model_name(model),
                        'Class': class_name,
                        'Precision': report[class_name]['precision'],
                        'Recall': report[class_name]['recall'],
                        'F1-Score': report[class_name]['f1-score']
                    })
        
        df = pd.DataFrame(metrics_data)
        fig, axes = plt.subplots(1, 3, figsize=(18, 6))
        metrics = ['Precision', 'Recall', 'F1-Score']

        for idx, metric in enumerate(metrics):
            ax = axes[idx]
            pivot_df = df.pivot(index='Model', columns='Class', values=metric)
            sns.heatmap(pivot_df, annot=True, fmt='.3f', cmap='RdYlBu_r', ax=ax, 
                        cbar_kws={'shrink': 0.8}, vmin=0.5, vmax=1.0)
            ax.set_title(f'{metric} by Class', fontweight='bold', fontsize=14)
            ax.set_xlabel('')
            ax.set_ylabel('Model' if idx == 0 else '')
            ax.set_yticklabels(ax.get_yticklabels(), rotation=0)

        fig.suptitle('Per-Class Performance Metrics Across Models', fontsize=16, fontweight='bold')
        plt.tight_layout()
        plt.show()
        return fig

    def create_error_analysis(self) -> plt.Figure:
        """
        Analyzes and visualizes error rates and overall accuracy.
        """
        error_data, accuracies = [], []
        
        for model in self.model_names:
            true_labels = np.array(self.detailed_results[model]['true_labels'])
            pred_labels = np.array(self.detailed_results[model]['predictions'])
            
            accuracy = (pred_labels == true_labels).mean()
            accuracies.append((self._clean_model_name(model), accuracy))
            
            for class_idx, class_name in enumerate(self.label_names):
                class_mask = (true_labels == class_idx)
                if np.any(class_mask):
                    error_rate = (pred_labels[class_mask] != class_idx).mean()
                    error_data.append({
                        'Model': self._clean_model_name(model),
                        'Class': class_name,
                        'Error_Rate': error_rate
                    })

        df_errors = pd.DataFrame(error_data)
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 6))
        
        pivot_errors = df_errors.pivot(index='Model', columns='Class', values='Error_Rate')
        sns.heatmap(pivot_errors, annot=True, fmt='.3f', cmap='Reds', ax=ax1, cbar_kws={'shrink': 0.8})
        ax1.set_title('Error Rates by Class', fontweight='bold', fontsize=14)
        ax1.set_ylabel('Model')

        model_names_clean, accuracy_vals = zip(*accuracies)
        sns.barplot(x=list(accuracy_vals), y=list(model_names_clean), palette='viridis', ax=ax2)
        ax2.set_title('Overall Accuracy Comparison', fontweight='bold', fontsize=14)
        ax2.set_xlabel('Accuracy')
        ax2.set_xlim(0, 1.0)
        ax2.grid(axis='x', alpha=0.3)
        
        fig.suptitle('Error Analysis Across Models', fontsize=16, fontweight='bold')
        plt.tight_layout()
        plt.show()
        return fig

    def create_model_ranking_summary(self) -> Tuple[plt.Figure, pd.DataFrame]:
        """
        Generates a summary table and radar chart for the top models.
        """
        summary_data = []
        for model in self.model_names:
            true_labels = np.array(self.detailed_results[model]['true_labels'])
            pred_labels = np.array(self.detailed_results[model]['predictions'])
            report = self.detailed_results[model]['classification_report']
            
            summary_data.append({
                'Model': self._clean_model_name(model),
                'Accuracy': (pred_labels == true_labels).mean(),
                'Weighted F1': self.detailed_results[model]['f1_score'],
                'Negative F1': report.get('Negative', {}).get('f1-score', 0),
                'Neutral F1': report.get('Neutral', {}).get('f1-score', 0),
                'Positive F1': report.get('Positive', {}).get('f1-score', 0),
            })
        
        df_summary = pd.DataFrame(summary_data).sort_values('Weighted F1', ascending=False)
        
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 8))
        ax1.axis('tight')
        ax1.axis('off')
        
        table = ax1.table(cellText=df_summary.round(3).values, colLabels=df_summary.columns,
                          cellLoc='center', loc='center')
        table.auto_set_font_size(False)
        table.set_fontsize(10)
        table.scale(1.2, 2)
        ax1.set_title('Model Performance Ranking\n(Sorted by Weighted F1)', fontweight='bold', fontsize=14, pad=20)

        # Radar chart for top 3 models
        top_3 = df_summary.head(3)
        categories = ['Accuracy', 'Weighted F1', 'Negative F1', 'Neutral F1', 'Positive F1']
        angles = np.linspace(0, 2 * np.pi, len(categories), endpoint=False).tolist()
        angles += angles[:1]
        
        ax2 = plt.subplot(122, polar=True)
        colors = ['#FF6B6B', '#4ECDC4', '#45B7D1']

        for idx, (_, row) in enumerate(top_3.iterrows()):
            values = [row[cat] for cat in categories] + [row[categories[0]]]
            ax2.plot(angles, values, 'o-', linewidth=2, label=row['Model'], color=colors[idx], alpha=0.8)
            ax2.fill(angles, values, alpha=0.15, color=colors[idx])
            
        ax2.set_xticks(angles[:-1])
        ax2.set_xticklabels(categories)
        ax2.set_ylim(0, 1)
        ax2.set_title('Top 3 Models - Performance Radar', fontweight='bold', fontsize=14, pad=30)
        ax2.legend(loc='upper right', bbox_to_anchor=(1.3, 1.0))
        ax2.grid(True)
        
        plt.tight_layout()
        plt.show()
        return fig, df_summary

    def generate_all_visualizations(self):
        """Generates and displays all analysis plots."""
        print("Generating Model Comparison Visualizations...")
        self.create_confusion_matrices()
        self.create_class_performance_comparison()
        self.create_error_analysis()
        fig, summary_df = self.create_model_ranking_summary()
        
        print("=" * 60)
        print("FINAL RECOMMENDATIONS")
        print("=" * 60)
        best_model = summary_df.iloc[0]['Model']
        best_f1 = summary_df.iloc[0]['Weighted F1']
        print(f"Best Overall Model: {best_model}")
        print(f"Best F1 Score: {best_f1:.4f}")
        print("\n Key Insights:")
        print(f"• {best_model} shows the best overall performance")
        print(f"• Accuracy range: {summary_df['Accuracy'].min():.3f} - {summary_df['Accuracy'].max():.3f}")
        print(f"• F1 range: {summary_df['Weighted F1'].min():.3f} - {summary_df['Weighted F1'].max():.3f}")

## Hyperparameter Tuning and Single Model Analysis

This section demonstrates how to fine-tune a single model with custom hyperparameters and then visualizes its individual performance.

In [None]:
# Define custom hyperparameters for RoBERTa
roberta_hyperparameters = TrainingArguments(
    output_dir="./roberta_results",
    learning_rate=2e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=32,
    num_train_epochs=5,
    weight_decay=0.01,
    eval_strategy="epoch",
    save_strategy="no",
    logging_steps=500,
    fp16=torch.cuda.is_available(),
    report_to="none",
    disable_tqdm=False,
    seed=42,
    dataloader_pin_memory=True,
    gradient_checkpointing=True,
    lr_scheduler_type="cosine",
    warmup_steps=100,
)

# Re-run a specific model with the new hyperparameters
print("\n" + "="*50)
print("RE-RUNNING ROBERTA MODEL WITH EXTERNAL HYPERPARAMETERS")
print("="*50)

# The 'trainer' object must be available from the previous cell's execution.
# This part assumes a `ModelTrainer` instance `trainer` exists.
try:
    roberta_results = trainer.train_and_evaluate_model("roberta-base", roberta_hyperparameters)
except NameError:
    print("Error: `trainer` object not found. Please run the Model Training cell first.")

In [None]:
# Single model visualization
try:
    # Load the results from the saved files
    with open('detailed_results.pkl', 'rb') as f:
        all_results = pickle.load(f)

    model_name = "roberta-base"
    label_names = ["Negative", "Neutral", "Positive"]
    
    if model_name in all_results and "error" not in all_results[model_name]:
        print(f"\n Generating single-model visualization for {model_name}...")
        
        true_labels = np.array(all_results[model_name]['true_labels'])
        pred_labels = np.array(all_results[model_name]['predictions'])
        
        # --- Confusion Matrix Visualization ---
        fig, ax = plt.subplots(1, 1, figsize=(8, 6))
        cm_normalized = confusion_matrix(true_labels, pred_labels, normalize='true')
        
        sns.heatmap(
            cm_normalized,
            annot=True,
            fmt='.2f',
            cmap='Blues',
            xticklabels=label_names,
            yticklabels=label_names,
            ax=ax
        )
        
        ax.set_title(f'Confusion Matrix for {model_name} (Normalized)', fontweight='bold')
        ax.set_ylabel('True Label')
        ax.set_xlabel('Predicted Label')
        plt.tight_layout()
        plt.show()
        
        # --- Error Rate Analysis ---
        print("\n📈 Error Analysis:")
        accuracy = (pred_labels == true_labels).mean()
        print(f"• Overall Accuracy: {accuracy:.4f}")
        
        per_class_errors = {}
        for class_idx, class_name in enumerate(label_names):
            class_mask = (true_labels == class_idx)
            if np.any(class_mask):
                error_rate = (pred_labels[class_mask] != class_idx).mean()
                per_class_errors[class_name] = error_rate
        
        print("• Per-Class Error Rates:")
        for class_name, rate in per_class_errors.items():
            print(f"  - {class_name}: {rate:.4f}")
    else:
        print(f"Error: Results for model '{model_name}' not found or contain errors. Please ensure the model was trained successfully.")
except FileNotFoundError:
    print("Error: Could not find 'detailed_results.pkl'. Please ensure the model training and evaluation step was completed successfully.")
except Exception as e:
    print(f"An error occurred: {e}")