#### This code is based on my previous written code [here](https://www.kaggle.com/code/lizhecheng/96-accuracy-bert-model-nlp-classification)

In [None]:
import numpy as np 
import pandas as pd 
import matplotlib.pyplot as plt
import torch 
import spacy
import nltk 
import string
import regex as re 
from torch import nn
from sklearn.metrics import f1_score
from transformers import BertModel, BertTokenizer
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from torch.optim.lr_scheduler import CosineAnnealingLR
from tqdm import tqdm
from transformers import logging
logging.set_verbosity_warning()
plt.style.use("fivethirtyeight")

In [None]:
# Here we use a single RTX4090 GPU to do the training
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
use_cuda, device

In [None]:
df = pd.read_csv("../HomeWork1/nyt.csv")
print(df.shape)
df.head()

In [None]:
plt.figure(figsize=(5, 3))
df["label"].value_counts().plot(kind="barh", color="green")
plt.show()

In [None]:
def label_to_number(label):
    mapping = {"business": 0, "politics": 1, "sports": 2}
    return mapping.get(label, -1)

df["label"] = df["label"].apply(label_to_number)

In [None]:
sp = spacy.load("en_core_web_md")

nltk.download("stopwords")
nltk.download("punkt")

spacy_st = sp.Defaults.stop_words
nltk_st = stopwords.words("english")

In [None]:
def clean(text, http=True, punc=True, lem=True, stop_w=True):
    if http == True:
        text = re.sub("https?:\/\/t.co\/[A-Za-z0-9]*", "", text)
    if stop_w == True:
        text = [word for word in word_tokenize(text) if not word.lower() in nltk_st]
        text = " ".join(text)
    if lem == True:
        lemmatized = [word.lemma_ for word in sp(text)]
        text = " ".join(lemmatized)
    if punc == True:
        text = text.translate(str.maketrans("", "", string.punctuation))
        
    text = text.lower()
    text = re.sub(r"what's", "what is", text)
    text = re.sub(r"\'s", " ", text)
    text = re.sub(r"\'ve", " have", text)
    text = re.sub(r"can't", "cannot", text)
    text = re.sub(r"n't", " not", text)
    text = re.sub(r"i'm", "i am", text)
    text = re.sub(r"im", "i am", text)
    text = re.sub(r"\'re", " are", text)
    text = re.sub(r"\'d", " would", text)
    text = re.sub(r"\'ll", " will", text)
    text = re.sub(r"\'scuse", " excuse", text)
    text = re.sub("\W", " ", text)
    text = re.sub("\s+", " ", text)
    text = text.strip(" ")
    
    return text

In [None]:
df["cleaned_text"] = df["text"].apply(lambda text: clean(text, http=True, punc=True, lem=True, stop_w=True))
df.drop(columns=["text"], axis=1, inplace=True)
df.head()

In [None]:
PRE_TRAINED_MODEL_NAME = "bert-base-uncased"
tokenizer = BertTokenizer.from_pretrained(PRE_TRAINED_MODEL_NAME)

df_train, df_val, df_test = np.split(df.sample(frac=1, random_state=42), [int(0.8 * len(df)), int(0.9 * len(df))])
print(len(df_train), len(df_val), len(df_test))

class Dataset(torch.utils.data.Dataset):

    def __init__(self, df):
        self.labels = df["label"]
        self.texts = [tokenizer(text, padding="max_length", max_length=64, truncation=True, return_tensors="pt") for text in df["cleaned_text"]]

    def __len__(self):
        return len(self.labels)

    def get_batch_labels(self, idx):
        return np.array(self.labels)[idx]

    def get_batch_texts(self, idx):
        return self.texts[idx]

    def __getitem__(self, idx):
        batch_texts = self.get_batch_texts(idx)
        batch_y = self.get_batch_labels(idx)
        return batch_texts, batch_y

In [None]:
class BertClassifier(nn.Module):
    def __init__(self, dropout=0.5):
        super(BertClassifier, self).__init__()
        self.bert = BertModel.from_pretrained(PRE_TRAINED_MODEL_NAME)
        self.dropout = nn.Dropout(dropout)
        self.linear = nn.Linear(768, 3)
        self.relu = nn.ReLU()

    def forward(self, input_id, mask):
        _, pooled_output = self.bert(input_ids=input_id, attention_mask=mask, return_dict=False)
        dropout_output = self.dropout(pooled_output)
        linear_output = self.linear(dropout_output)
        final_layer = self.relu(linear_output)
        return final_layer

In [None]:
def train(model, train_data, val_data, learning_rate=3e-6, epochs=3, T_max=3):
    train = Dataset(train_data)
    val = Dataset(val_data)

    train_dataloader = torch.utils.data.DataLoader(train, batch_size=16, shuffle=True)
    val_dataloader = torch.utils.data.DataLoader(val, batch_size=16)

    class_weights = torch.tensor([2.0, 2.0, 1.0], dtype=torch.float32)
    criterion = torch.nn.CrossEntropyLoss(weight=class_weights)
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    scheduler = CosineAnnealingLR(optimizer, T_max=T_max)

    if use_cuda:
        model = model.cuda()
        criterion = criterion.cuda()

    for epoch_num in range(epochs):
        total_acc_train = 0
        total_loss_train = 0
        for train_input, train_label in tqdm(train_dataloader, total=len(train_dataloader), desc=f"Training Epoch: {epoch_num + 1}"):
            train_label = train_label.to(device)
            mask = train_input["attention_mask"].to(device)
            input_id = train_input["input_ids"].squeeze(1).to(device)

            output = model(input_id, mask)
            batch_loss = criterion(output, train_label.long())
            total_loss_train += batch_loss.item()
            
            acc = (output.argmax(dim=1) == train_label).sum().item()
            total_acc_train += acc

            model.zero_grad()
            batch_loss.backward()
            optimizer.step()

        scheduler.step()
        
        total_acc_val = 0
        total_loss_val = 0
        with torch.no_grad():
            for val_input, val_label in tqdm(val_dataloader, total=len(val_dataloader), desc=f"Validating Epoch: {epoch_num + 1}"):
                val_label = val_label.to(device)
                mask = val_input["attention_mask"].to(device)
                input_id = val_input["input_ids"].squeeze(1).to(device)

                output = model(input_id, mask)
                batch_loss = criterion(output, val_label.long())
                total_loss_val += batch_loss.item()
                
                acc = (output.argmax(dim=1) == val_label).sum().item()
                total_acc_val += acc
        
        print(f"Epochs: {epoch_num + 1} | Train Loss: {total_loss_train / len(train_data): .3f} | Train Accuracy: {total_acc_train / len(train_data): .3f} | Val Loss: {total_loss_val / len(val_data): .3f} | Val Accuracy: {total_acc_val / len(val_data): .3f}")

In [None]:
model = BertClassifier()
train(model, df_train, df_val)

In [None]:
def evaluate(model, test_data):
    test = Dataset(test_data)
    test_dataloader = torch.utils.data.DataLoader(test, batch_size=16)

    if use_cuda:
        model = model.cuda()

    all_labels = []
    all_preds = []

    total_acc_test = 0
    with torch.no_grad():
        for test_input, test_label in tqdm(test_dataloader, total=len(test_dataloader), desc="Testing"):
            test_label = test_label.to(device)
            mask = test_input["attention_mask"].to(device)
            input_id = test_input["input_ids"].squeeze(1).to(device)

            output = model(input_id, mask)
            preds = output.argmax(dim=1)
            acc = (preds == test_label).sum().item()
            total_acc_test += acc

            all_labels.extend(test_label.cpu().numpy())
            all_preds.extend(preds.cpu().numpy())      
        
    macro_f1 = f1_score(all_labels, all_preds, average="macro")
    micro_f1 = f1_score(all_labels, all_preds, average="micro")
    
    print(f"Accuracy Score: {total_acc_test / len(test_data): .3f}")
    print(f"Macro F1-score: {macro_f1: .3f}")
    print(f"Micro F1-score: {micro_f1: .3f}")

In [None]:
evaluate(model, df_test)