In [80]:
import os
import random
from dataclasses import dataclass
from typing import List, Dict, Optional, Tuple

import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, classification_report, confusion_matrix

import torch
from torch.utils.data import Dataset

from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    Trainer,
    TrainingArguments,
    EarlyStoppingCallback,
)


In [81]:
from google.colab import files
uploaded = files.upload()  # choose reviews_cleaned.csv

import pandas as pd
fname = next(iter(uploaded))  # first uploaded file
# Ensure the correct columns are read
df = pd.read_csv(fname, encoding="latin-1")

# Drop the rating column as requested
if 'rating' in df.columns:
    df = df.drop('rating', axis=1)

# Handle missing values in critical columns ('text' and 'label') by dropping rows
df.dropna(subset=['text', 'label'], inplace=True)


print("Shape:", df.shape)
print("Columns:", list(df.columns))
df.head(10)

Saving finalllll_dataset.csv to finalllll_dataset (4).csv
Shape: (4962, 3)
Columns: ['name', 'text', 'label']


Unnamed: 0,name,text,label
0,Will Payne,Poor selection of shingles.,Good Review
1,ben franklin,"You have to be kidding me, 2 and a half hours ...",Rant
2,Courtney Lumpkin,This place is terrible. They had a patient tha...,Rant
3,Cathy Smith,The movers were very polite and on time. Howev...,Good Review
4,Jade Kleeschulte,Never used them because an estimate to move it...,Rant
5,Chuck Blue,"I'm a life member of the VFW, a past post comm...",Rant
6,Mikemike Fuqua,You take product working and get product back ...,Rant
7,chip linton,They are only out for your money not to help a...,Rant
8,Gun Wench,They won't let me give less than 1 star. Sign ...,Rant
9,GiGi,"Decided to look into this, they recommended pr...",Rant


In [82]:
def set_seed(seed: int = 42):
    random.seed(seed); np.random.seed(seed)
    torch.manual_seed(seed); torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

In [83]:
class ReviewsDataset(Dataset):
    """Torch dataset for text classification with Hugging Face tokenizers."""
    def __init__(
        self,
        df: pd.DataFrame,
        tokenizer: AutoTokenizer,
        text_col: str,
        label_col: Optional[str],
        max_len: int,
        label2id: Optional[Dict[str, int]] = None,
        is_train: bool = True,
    ):
        self.df = df.reset_index(drop=True)
        self.tokenizer = tokenizer
        self.text_col = text_col
        self.label_col = label_col if label_col in df.columns else None
        self.max_len = max_len
        self.is_train = is_train
        self.label2id = label2id

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        text = "" if pd.isna(row[self.text_col]) else str(row[self.text_col])

        enc = self.tokenizer(
            text, # Use only text
            truncation=True,
            padding="max_length",
            max_length=self.max_len,
            return_tensors="pt",
        )

        item = {k: v.squeeze(0) for k, v in enc.items()}
        if self.is_train and self.label_col is not None:
            y = row[self.label_col]
            # Check for NaN values and skip them
            if pd.isna(y):
                # Use first label as default for NaN values
                y = list(self.label2id.keys())[0] if self.label2id else 0
            if self.label2id is not None:
                y = self.label2id[str(y)]
            item["labels"] = torch.tensor(int(y), dtype=torch.long)
        return item

In [84]:
def build_label_maps(df: pd.DataFrame, label_col: str, label_list: Optional[List[str]] = None):
    """Create label <-> id maps. If label_list supplied, follow that order."""
    if label_list is None:
        # Filter out NaN values and auto from data (sorted for reproducibility)
        label_list = sorted(df[label_col].dropna().astype(str).unique().tolist())
    label2id = {lbl: i for i, lbl in enumerate(label_list)}
    id2label = {i: lbl for lbl, i in label2id.items()}
    return label2id, id2label, label_list

def compute_metrics_fn(id2label: Dict[int, str]):
    """Returns a compute_metrics callable that Trainer will use."""
    def _compute_metrics(eval_pred):
        logits, labels = eval_pred
        preds = logits.argmax(axis=1)
        acc = accuracy_score(labels, preds)
        f1_macro = f1_score(labels, preds, average="macro")
        return {"accuracy": acc, "f1_macro": f1_macro}
    return _compute_metrics

In [85]:
class WeightedTrainer(Trainer):
    """class-weighted loss to handle imbalance."""
    def __init__(self, class_weights: Optional[torch.Tensor] = None, **kwargs):
        super().__init__(**kwargs)
        self.class_weights = class_weights

    def compute_loss(self, model, inputs, return_outputs=False, num_items_in_batch=None): # Add num_items_in_batch
        labels = inputs.get("labels")
        outputs = model(**{k: v for k, v in inputs.items() if k != "labels"})
        logits = outputs.get("logits")
        if labels is not None:
            if self.class_weights is not None:
                self.class_weights = self.class_weights.to(logits.device)
                loss_fct = torch.nn.CrossEntropyLoss(weight=self.class_weights)
            else:
                loss_fct = torch.nn.CrossEntropyLoss()
            loss = loss_fct(logits.view(-1, logits.size(-1)), labels.view(-1)) # Use logits.size(-1) to get the number of classes
        else:
            loss = outputs["loss"] if "loss" in outputs else None
        return (loss, outputs) if return_outputs else loss

In [86]:
def predict_proba_texts(
    tokenizer: AutoTokenizer, # Add tokenizer as an explicit argument
    trainer: Trainer,
    texts: List[str],
    max_len: int = 128,
) -> np.ndarray:
    """Return softmax probabilities (N x C) for a list of raw texts."""
    model = trainer.model
    model.eval()

    all_probs = []
    with torch.no_grad():
        for i in range(0, len(texts), 64):
            batch_texts = texts[i:i+64]
            enc = tokenizer(
                batch_texts, truncation=True, padding=True, max_length=max_len, return_tensors="pt"
            )
            # Get the model's device
            device = next(model.parameters()).device
            enc = {k: v.to(device) for k, v in enc.items()}
            # Access logits from the dictionary returned by the model
            outputs = model(**enc)
            logits = outputs['logits']
            probs = torch.softmax(logits, dim=-1).cpu().numpy()
            all_probs.append(probs)
    return np.vstack(all_probs)

def predict_proba_df(
    tokenizer: AutoTokenizer, # Add tokenizer as an explicit argument
    trainer: Trainer,
    df: pd.DataFrame,
    id2label: Dict[int, str], # Add id2label as an argument
    text_col: str = "text",
    max_len: int = 128,
) -> pd.DataFrame:
    """Attach softmax probabilities and predicted label names to a dataframe."""
    probs = predict_proba_texts(tokenizer, trainer, df[text_col].astype(str).fillna("").tolist(), max_len=max_len) # Pass tokenizer to predict_proba_texts
    # Use the passed id2label dictionary
    pred_ids = probs.argmax(axis=1)
    pred_labels = [id2label[int(i)] for i in pred_ids]
    out = df.copy()
    out["pred_label"] = pred_labels
    for i in range(probs.shape[1]):
        out[f"p_{id2label[i]}"] = probs[:, i]
    return out

In [87]:
# Define column names first
TEXT_COL  = "text"                             # change if your text column is different
LABEL_COL = "label" if "label" in df.columns else "label_rule"
# RATING_COL = "rating" # Define rating column name - removed as rating is dropped

# Split the data into train, test, val
from sklearn.model_selection import train_test_split

# Filter out rows with NaN labels before splitting
df_clean = df.dropna(subset=[LABEL_COL])

# First split: separate out test set (20%)
train_val, test = train_test_split(df_clean, test_size=0.2, random_state=42)

# Second split: split remaining data into train (64%) and val (16%)
train, val = train_test_split(train_val, test_size=0.2, random_state=42)

# maps, tokenizer, datasets
label2id, id2label, _ = build_label_maps(train, LABEL_COL)
tok     = AutoTokenizer.from_pretrained("roberta-base", use_fast=True)
# Update ReviewsDataset initialization to not include rating_col
train_ds = ReviewsDataset(train, tok, TEXT_COL, LABEL_COL, max_len=128, label2id=label2id, is_train=True)
val_ds   = ReviewsDataset(val,   tok, TEXT_COL, LABEL_COL, max_len=128, label2id=label2id, is_train=True)

In [88]:
from sklearn.utils.class_weight import compute_class_weight
import numpy as np
import torch

# Calculate class weights
class_counts = train[LABEL_COL].value_counts()
labels = sorted(class_counts.index.tolist())
class_weights = compute_class_weight('balanced', classes=np.array(labels), y=train[LABEL_COL].dropna().values)

# Convert to dictionary for easier mapping
weight_dict = dict(zip(labels, class_weights))
print("Class weights:", weight_dict)

# Convert weights to a tensor in the order of label2id
weights_tensor = torch.tensor([weight_dict[label] for label in label2id.keys()], dtype=torch.float32)
print("Weights tensor:", weights_tensor)

Class weights: {'Advertisement': np.float64(8.246753246753247), 'Good Review': np.float64(0.25594518339379285), 'Irrelevant Content': np.float64(3.097560975609756), 'Rant': np.float64(1.8513119533527698), 'Spam': np.float64(9.202898550724637)}
Weights tensor: tensor([8.2468, 0.2559, 3.0976, 1.8513, 9.2029])


In [89]:
import torch.nn as nn
from transformers import RobertaModel # Import RobertaModel

class RobertaWithRating(nn.Module):
    def __init__(self, pretrained_model_name="roberta-base", num_labels=2):
        super().__init__()
        # Load the base RoBERTa model without the classification head
        self.roberta = RobertaModel.from_pretrained(pretrained_model_name)
        # Get the config for the model to access hidden size
        config = self.roberta.config
        # Create a new classification head
        self.classifier = nn.Sequential(
            nn.Linear(config.hidden_size, config.hidden_size),
            nn.GELU(),
            nn.Dropout(config.hidden_dropout_prob),
            nn.Linear(config.hidden_size, num_labels)
        )


    def forward(self, input_ids=None, attention_mask=None, token_type_ids=None, labels=None):
        # Pass inputs through the RoBERTa model
        outputs = self.roberta(
            input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
        )
        # Get the pooled output (usually the representation of the [CLS] token)
        sequence_output = outputs[0]
        logits = self.classifier(sequence_output[:, 0, :]) # Use the representation of the first token ([CLS])

        loss = None
        if labels is not None:
            loss_fct = nn.CrossEntropyLoss()
            loss = loss_fct(logits.view(-1, self.roberta.config.num_labels), labels.view(-1))

        return  {"loss": loss, "logits": logits} # Return a dictionary with loss and logits

# model + training args
model = RobertaWithRating(pretrained_model_name="roberta-base", num_labels=len(label2id))


args = TrainingArguments(
    output_dir="roberta_run", per_device_train_batch_size=64, per_device_eval_batch_size=32,
    num_train_epochs=10, learning_rate=5e-5, weight_decay=0.01, warmup_ratio=0.06,
     save_strategy="epoch", load_best_model_at_end=True,
    metric_for_best_model="f1_macro", fp16=True, report_to="none", eval_strategy="epoch",
    logging_steps=50 # set the logging steps
)

Some weights of RobertaModel were not initialized from the model checkpoint at roberta-base and are newly initialized: ['pooler.dense.bias', 'pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [95]:


# Initialize trainer
trainer = WeightedTrainer( # Use WeightedTrainer
    model=model,
    args=args,
    train_dataset=train_ds,
    eval_dataset=val_ds,
    compute_metrics=compute_metrics_fn(id2label),
    callbacks=[EarlyStoppingCallback(early_stopping_patience=2)],
    class_weights=weights_tensor # Pass class weights
)

In [96]:

# Train the model
print("Starting training...")
trainer.train()

Starting training...


Epoch,Training Loss,Validation Loss,Accuracy,F1 Macro
1,0.1569,0.780464,0.938287,0.85233
2,0.0642,1.214657,0.93073,0.80382
3,0.0601,1.22672,0.920655,0.791271


TrainOutput(global_step=150, training_loss=0.09372870127360027, metrics={'train_runtime': 31.4406, 'train_samples_per_second': 1009.84, 'train_steps_per_second': 15.903, 'total_flos': 0.0, 'train_loss': 0.09372870127360027, 'epoch': 3.0})

In [97]:

# Make predictions on validation set
print("\nMaking predictions on validation set...")
val_predictions = predict_proba_df(tok, trainer, val, id2label=id2label, text_col=TEXT_COL, max_len=128) # Pass tok and id2label here

# Get detailed classification report
val_true_labels = val[LABEL_COL].astype(str).tolist()
val_pred_labels = val_predictions["pred_label"].tolist()

print("\nClassification Report (Validation Set):")
print(classification_report(val_true_labels, val_pred_labels))

print("\nConfusion Matrix (Validation Set):")
print(confusion_matrix(val_true_labels, val_pred_labels))


Making predictions on validation set...

Classification Report (Validation Set):
                    precision    recall  f1-score   support

     Advertisement       0.87      0.81      0.84        16
       Good Review       0.97      0.97      0.97       638
Irrelevant Content       0.84      0.73      0.78        52
              Rant       0.77      0.86      0.81        78
              Spam       0.82      0.90      0.86        10

          accuracy                           0.94       794
         macro avg       0.85      0.85      0.85       794
      weighted avg       0.94      0.94      0.94       794


Confusion Matrix (Validation Set):
[[ 13   0   1   1   1]
 [  1 618   2  17   0]
 [  0  11  38   2   1]
 [  0   7   4  67   0]
 [  1   0   0   0   9]]


In [98]:
# Make predictions on test set
print("\nMaking predictions on test set...")
test_predictions = predict_proba_df(tok, trainer, test, id2label=id2label, text_col=TEXT_COL, max_len=128)

# Get detailed classification report
test_true_labels = test[LABEL_COL].astype(str).tolist()
test_pred_labels = test_predictions["pred_label"].tolist()

print("\nClassification Report (Test Set):")
print(classification_report(test_true_labels, test_pred_labels))

print("\nConfusion Matrix (Test Set):")
print(confusion_matrix(test_true_labels, test_pred_labels))


Making predictions on test set...

Classification Report (Test Set):
                    precision    recall  f1-score   support

     Advertisement       0.85      0.68      0.76        25
       Good Review       0.96      0.96      0.96       774
Irrelevant Content       0.80      0.72      0.76        72
              Rant       0.74      0.82      0.78       101
              Spam       0.84      0.76      0.80        21

          accuracy                           0.92       993
         macro avg       0.84      0.79      0.81       993
      weighted avg       0.92      0.92      0.92       993


Confusion Matrix (Test Set):
[[ 17   7   0   0   1]
 [  0 746   3  24   1]
 [  0  14  52   5   1]
 [  0   9   9  83   0]
 [  3   1   1   0  16]]


In [99]:

import torch
import joblib
from transformers import AutoTokenizer

# Save the model
model_path = "roberta_model_with_labels.pth"
torch.save(model.state_dict(), model_path)

# Save the tokenizer (Hugging Face tokenizers can be saved with save_pretrained)
tokenizer_path = "roberta_tokenizer"
tok.save_pretrained(tokenizer_path)

# Save label mappings (label2id and id2label)
label_mappings = {
    "label2id": label2id,
    "id2label": id2label
}
with open("label_mappings.pkl", "wb") as f:
    joblib.dump(label_mappings, f)

# Optionally, save class weights if needed
class_weights_path = "class_weights_tensor.pth"
torch.save(weights_tensor, class_weights_path)

print(f"Model saved at {model_path}")
print(f"Tokenizer saved at {tokenizer_path}")
print(f"Label mappings saved at label_mappings.pkl")
print(f"Class weights saved at {class_weights_path}")

Model saved at roberta_model_with_labels.pth
Tokenizer saved at roberta_tokenizer
Label mappings saved at label_mappings.pkl
Class weights saved at class_weights_tensor.pth
