# WQF7007 Group 5

In [None]:
!pip install transformers

In [None]:
!pip install numpy==1.25.2

# Data Extraction and Preprocessing

In [None]:
import pandas as pd

df = pd.read_csv("amazon_reviews.csv")
df.head()

## Initial Inspection


In [None]:
# Check for null values and basic info
df.info()
df.isnull().sum()


In [None]:
# Drop rows with missing essential text fields
df.dropna(subset=['reviewText'], inplace=True)
df.reset_index(drop=True, inplace=True)


## Sentiment Labelling

In [None]:
# extract reviews and ratings
reviews = df['reviewText'].tolist()
ratings = df['overall'].tolist()

# Create a new dataframe with reviews and ratings
reviews_df = pd.DataFrame({
    'review': reviews,
    'rating': ratings
})

# Add a new column of sentiment based on ratings
def sentiment(rating):
    if rating >= 4:
        return 'positive'
    elif rating == 3:
        return 'neutral'
    else:
        return 'negative'

# Map sentiment labels to integers
label_map = {'negative': 0, 'neutral': 1, 'positive': 2}


reviews_df['sentiment'] = reviews_df['rating'].apply(sentiment)
reviews_df['label'] = reviews_df['sentiment'].map(label_map)



## Text Cleaning

In [None]:
import re

def clean_text(text):
    text = text.lower()
    text = re.sub(r'<[^>]+>', ' ', text)        # Remove HTML tags
    text = re.sub(r'[^a-z\s]', ' ', text)       # Remove non-alphabetic characters
    text = re.sub(r'\s+', ' ', text).strip()    # Remove extra whitespace
    return text

reviews_df['review'] = reviews_df['review'].apply(clean_text)


In [None]:
reviews_df.to_csv("reviews.csv")

## Measure Tokenization Length

In [None]:
from transformers import BertTokenizer

tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")

# Check max length of reviews
max_length = reviews_df['review'].apply(lambda x: len(tokenizer.encode(x, add_special_tokens=True))).max()
print(f'Max length of reviews: {max_length}')

In [None]:
# Filter reviews longer than max length
reviews_df = reviews_df[reviews_df['review'].apply(lambda x: len(tokenizer.encode(x, add_special_tokens=True))) <= 256]

# reindex the dataframe
reviews_df = reviews_df.reset_index(drop=True)
display(reviews_df)

## Tokenization

In [None]:
# Tokenize function
def tokenize_function(example):
    return tokenizer(
        example["review"],
        padding="max_length",
        truncation=True,
        max_length=256  # or 256, 512 depending on your model/memory
    )

## Stop Words Removal & Lemmatization

In [None]:
# Stop words removal and lemmatization not necessary for transformer models

## Conversion to HuggingFace dataset

In [None]:
from datasets import Dataset

reviews_df = reviews_df[['review', 'label']]

# Convert to Hugging Face dataset
dataset = Dataset.from_pandas(reviews_df)

tokenized_datasets = dataset.map(tokenize_function, batched=True)
tokenized_datasets.set_format(type="torch", columns=["input_ids", "attention_mask", "label"])

display(tokenized_datasets)

In [None]:
print(tokenized_datasets.to_pandas().head()['review'])
print(tokenized_datasets.to_pandas().head()['input_ids'])
print(tokenized_datasets.to_pandas().head()['token_type_ids'])
print(tokenized_datasets.to_pandas().head()['attention_mask'])
print(tokenized_datasets.to_pandas().head()['label'])

# Model training using DistilBert

## Train test spliting

In [None]:
from datasets import DatasetDict
from sklearn.model_selection import train_test_split

# Perform train-test split (20% test)
dataset_dict = tokenized_datasets.train_test_split(test_size=0.2, seed=42)
dataset_dict = DatasetDict({
    "train": dataset_dict["train"],
    "test": dataset_dict["test"]
})

## 1. Trainer only

In [None]:
from transformers import DistilBertTokenizer, DistilBertForSequenceClassification

# Load DistilBERT tokenizer and model for 3-class classification
tokenizer = DistilBertTokenizer.from_pretrained("distilbert-base-uncased")
model = DistilBertForSequenceClassification.from_pretrained("distilbert-base-uncased", num_labels=3)

In [None]:
from sklearn.metrics import classification_report
import numpy as np

def compute_metrics(eval_pred):
    preds, labels = eval_pred
    preds = np.asarray(preds)
    preds = np.argmax(preds, axis=1)
    report = classification_report(labels, preds, output_dict=True)
    return {
        "accuracy": report["accuracy"],
        "macro_f1": report["macro avg"]["f1-score"],
        "neutral_f1": report["1"]["f1-score"],
        "precision": report["macro avg"]["precision"],
        "recall": report["macro avg"]["recall"]
    }

In [None]:
from transformers import set_seed
set_seed(42)

In [None]:
from transformers import TrainingArguments

training_args = TrainingArguments(
    output_dir="./results",
    eval_strategy="epoch",
    save_strategy="epoch",
    seed = 42,
    load_best_model_at_end=True,
    metric_for_best_model="neutral_f1",
    greater_is_better=True,
    logging_strategy="steps",
    logging_steps=10,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=16,
    learning_rate=2e-5,
    num_train_epochs=4,
    weight_decay=0.01,
    save_total_limit=1,
    logging_dir="./logs"
)

In [None]:
from transformers import Trainer

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=dataset_dict["train"],
    eval_dataset=dataset_dict["test"],
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
)

trainer.train()

In [None]:
from sklearn.metrics import classification_report
import numpy as np

# Run predictions using the Trainer
predictions_output = trainer.predict(dataset_dict["test"])

# Get predicted labels
y_pred = np.argmax(predictions_output.predictions, axis=1)
y_true = predictions_output.label_ids

# Generate classification report
target_names = ["negative", "neutral", "positive"]
print(classification_report(y_true, y_pred, target_names=target_names))

## 2. Weighted Loss with L1/L2 regularization

In [None]:
class WeightedLossTrainer(Trainer):
    def __init__(self, *args, class_weights=None, l1_lambda=0.00000001, **kwargs):
        super().__init__(*args, **kwargs)
        self.class_weights = class_weights
        self.l1_lambda = l1_lambda

    def compute_loss(self, model, inputs, return_outputs=False, **kwargs):
        labels = inputs.get("labels")
        outputs = model(**inputs)
        logits = outputs.get("logits")

        # Use plain CrossEntropyLoss without weights
        loss_fct = CrossEntropyLoss()
        loss = loss_fct(logits, labels)

        # Optional: Keep or skip L1 regularization
        if self.l1_lambda > 0:
            l1_reg = torch.tensor(0., device=labels.device)
            for param in model.parameters():
                l1_reg += torch.norm(param, 1)
            loss += self.l1_lambda * l1_reg

        return (loss, outputs) if return_outputs else loss

In [None]:
from transformers import TrainingArguments

training_args = TrainingArguments(
    output_dir="./results",
    eval_strategy="epoch",
    save_strategy="epoch",
    seed = 42,
    load_best_model_at_end=True,
    metric_for_best_model="neutral_f1",
    greater_is_better=True,
    logging_strategy="steps",
    logging_steps=10,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=16,
    learning_rate=2e-5,
    num_train_epochs=4,
    weight_decay=0.00000001,  # L2 regularization
    save_total_limit=1,
    logging_dir="./logs"
)

# Initialize class weights tensor
#class_weights_tensor = torch.tensor([1, 1, 1], dtype=torch.float)

In [None]:
from transformers import Trainer

trainer = WeightedLossTrainer(
    model=model,
    args=training_args,
    train_dataset=dataset_dict["train"],
    eval_dataset=dataset_dict["test"],
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
)

trainer.train()

In [None]:
trainer.evaluate()

In [None]:
# Evaluation metrics: epochs = 4 with L1/L2 regularization
from sklearn.metrics import classification_report
import numpy as np

# Run predictions using the Trainer
predictions_output = trainer.predict(dataset_dict["test"])

# Get predicted labels
y_pred = np.argmax(predictions_output.predictions, axis=1)
y_true = predictions_output.label_ids

# Generate classification report
target_names = ["negative", "neutral", "positive"]
print(classification_report(y_true, y_pred, target_names=target_names))

## 3. Focal Loss

In [None]:
from transformers import Trainer

class PlainFocalTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False, **kwargs):
        labels = inputs.get("labels")
        outputs = model(**inputs)
        logits = outputs.get("logits")

        # Use FocalLoss without weights
        loss_fct = FocalLoss()
        loss = loss_fct(logits, labels)

        return (loss, outputs) if return_outputs else loss

In [None]:
training_args = TrainingArguments(
    output_dir="./results",
    eval_strategy="epoch",
    save_strategy="epoch",
    seed = 42,
    load_best_model_at_end=True,
    metric_for_best_model="neutral_f1",
    greater_is_better=True,
    logging_strategy="steps",
    logging_steps=10,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=16,
    learning_rate=2e-5,
    num_train_epochs=4,
    weight_decay=0.01,
    save_total_limit=1,
    logging_dir="./logs"
)

In [None]:
trainer = PlainFocalTrainer(
    model=model,
    args=training_args,
    train_dataset=dataset_dict["train"],
    eval_dataset=dataset_dict["test"],
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
)
trainer.train()

In [None]:
trainer.evaluate()

In [None]:
# Evaluation metrics: epochs = 4 and PlainFocalLoss

from sklearn.metrics import classification_report
import numpy as np

# Run predictions using the Trainer
predictions_output = trainer.predict(dataset_dict["test"])

# Get predicted labels
y_pred = np.argmax(predictions_output.predictions, axis=1)
y_true = predictions_output.label_ids

# Generate classification report
target_names = ["negative", "neutral", "positive"]
print(classification_report(y_true, y_pred, target_names=target_names))