In [None]:
import torch
from torch import nn
from torch.nn import functional as F
import numpy as np
import sys
sys.path.append(r'C:\Users\geass\Hierarchical_Attention_Networks_for_Document_Classification')
from models.HAN import HAN
from utils import preprocessing as pp

import pandas as pd
from matplotlib import pyplot as plt

In [None]:
df = pd.read_csv(r'C:\Users\geass\Hierarchical_Attention_Networks_for_Document_Classification\data\Amazon_reviews_polarity\train.csv')
df.head(10)

In [None]:
df.columns = ['polarity', 'title', 'review']
df = df.sample(n=10000, random_state=42)
df['review'] = df['title'].astype(str) + '. ' + df['review'].astype(str)
df = df.drop('title', axis=1)

In [None]:
df_train = df.sample(n=8000, random_state=42)

df_train_index = df_train.index
df = df.drop(df_train_index)
df_valid = df.sample(n=10000, random_state=42)

df_valid_index = df_valid.index
df_test = df.drop(df_valid_index)

y_train = torch.tensor(df_train['polarity'].map(lambda x: 0 if x==1 else 1).to_numpy(), dtype=torch.long)
y_valid = torch.tensor(df_valid['polarity'].map(lambda x: 0 if x==1 else 1).to_numpy(), dtype=torch.long)
y_test = torch.tensor(df_test['polarity'].map(lambda x: 0 if x==1 else 1).to_numpy(), dtype=torch.long)

num_classes = len(torch.unique(y_train))

In [None]:
tokenized_train = pp.tokenize_docs(df)
tokenized_valid = pp.tokenize_docs(df_valid)
tokenized_test = pp.tokenize_docs(df_test)

In [None]:
vocabulary, word_count = pp.build_vocabulary(tokenized_train)

tokenized_train = pp.replace_unk(tokenized_train, word_count)
tokenized_valid = pp.replace_unk(tokenized_valid, word_count)
tokenized_test = pp.replace_unk(tokenized_test, word_count)

max_sentence_len = pp.max_sentence_length(tokenized_train)
max_document_len = pp.max_document_length(tokenized_train)

tokenized_train = pp.insert_padding(tokenized_train, max_sentence_len, max_document_len)
tokenized_valid = pp.truncate(tokenized_valid, max_sentence_len, max_document_len)
tokenized_valid = pp.insert_padding(tokenized_valid, max_sentence_len, max_document_len)
tokenized_test = pp.truncate(tokenized_test, max_sentence_len, max_document_len)
tokenized_test = pp.insert_padding(tokenized_test, max_sentence_len, max_document_len)

embedding_matrix = pp.make_embedding_matrix(tokenized_train, vocabulary, embedding_size=100)
x_train = pp.word_to_indices(tokenized_train, vocabulary)
x_valid = pp.word_to_indices(tokenized_valid, vocabulary)
x_test = pp.word_to_indices(tokenized_test, vocabulary)

In [None]:
from torch.utils.data import Dataset, DataLoader

In [None]:
class ReviewDataset(Dataset):
    def __init__(self, docs, labels):
        self.docs = docs
        self.labels = labels
    
    def __len__(self):
        return len(self.docs)
    
    def __getitem__(self, idx):
        return self.docs[idx], self.labels[idx]

In [None]:
train_set = ReviewDataset(x_train, y_train)
train_dataloader = DataLoader(train_set, batch_size=256, shuffle=True, pin_memory=True)

valid_set = ReviewDataset(x_valid, y_valid)
valid_dataloader = DataLoader(valid_set, batch_size=256, pin_memory=True)

test_set = ReviewDataset(x_test, y_valid)
test_dataloader = DataLoader(test_set, batch_size=256, pin_memory=True)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

In [22]:
def train_model(model_class, model_kwargs, dataloader, criterion, optimizer_class, lr, device, epochs):
    model = model_class(**model_kwargs).to(device)
    optimizer = optimizer_class(model.parameters(), lr=lr)

    losses = []
    for epoch in range(epochs):
        model.train()
        total_loss = 0

        for x, y in dataloader:
            x = x.to(device)
            y = y.to(device)

            optimizer.zero_grad()

            output = model(x)
            loss = criterion(output, y)

            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        avg_loss = total_loss / len(dataloader)
        losses.append(avg_loss)
        print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}")

    plt.plot(losses)
    plt.xlabel("Epoch")
    plt.ylabel("Average Loss")
    plt.title("Training Loss per Epoch")
    plt.show()

    return model

In [18]:
from sklearn.metrics import accuracy_score, confusion_matrix, f1_score, ConfusionMatrixDisplay

In [20]:
def evaluate_model(model, dataloader, criterion, device):
    model.eval()
    total_loss = 0

    y_true = []
    y_pred = []

    with torch.no_grad():
        for x, y in dataloader:
            x = x.to(device)
            y = y.to(device)

            output = model(x)
            y_pred.extend(output.argmax(dim=1).cpu().tolist())
            y_true.extend(y.cpu().tolist())
            loss = criterion(output, y)

            total_loss += loss.item()

    avg_loss = total_loss / len(dataloader)
    accuracy = accuracy_score(y_true, y_pred)
    f1 = f1_score(y_true, y_pred)
    print(f"Accuracy: {accuracy:.4f}, Loss: {avg_loss:.4f}, F1 Score: {f1:.4f}")

    cm = confusion_matrix(y_true, y_pred)
    cm_display = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=["Negative", "Positive"])
    cm_display.plot()
    plt.title("Confusion Matrix")
    plt.show()

    return

In [23]:
def lr_grid_search(model_class, model_kwargs, learning_rates, dataloader, criterion, optimizer_class, device, epochs):

    results = []
    for lr in learning_rates:
        print(f"=============Learning rate: {lr}===========")
        model = train_model(model_class, model_kwargs, dataloader, criterion, optimizer_class, lr, device, epochs)
        evaluate_model(model, test_dataloader, criterion, device)