In [None]:
import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset
from transformers import BertModel, DistilBertModel, RobertaModel
from transformers import BertTokenizer, DistilBertTokenizer, RobertaTokenizer
from transformers import AdamW, get_linear_schedule_with_warmup
from sklearn.metrics import accuracy_score, f1_score, classification_report, confusion_matrix, ConfusionMatrixDisplay
import pandas as pd
import random
import numpy as np
import os
import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline
import warnings
warnings.filterwarnings('ignore')

In [None]:
# Set up parameters
num_classes = 5
max_length = 512
batch_size = 16
num_epochs = 20

learning_rate = 2e-5

b_model = BertModel
model_name = 'bert-base-uncased'
tokenizer_name = BertTokenizer

In [None]:
seed_val = 42

random.seed(seed_val)
np.random.seed(seed_val)
torch.manual_seed(seed_val)
torch.cuda.manual_seed_all(seed_val)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

In [None]:
path = '''Dataset path'''

data_train = pd.read_csv(path + '/train.csv')
data_validation = pd.read_csv(path + '/validate.csv')
data_test = pd.read_csv(path + '/test.csv')

In [None]:
class dataset(Dataset):
    def __init__(self, data, tokenizer, max_length):
        self.data = data
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.texts = data['Sentence'].tolist()
        self.labels = data['class'].tolist()

    def __len__(self):
        return(len(self.texts))

    def __getitem__(self,idx):
        text = self.texts[idx]
        label = self.labels[idx]

        encoding = self.tokenizer.encode_plus(text,
                                              max_length = self.max_length,
                                              add_special_tokens = True,
                                              padding = 'max_length',
                                              truncation = True,
                                              return_attention_mask = True,
                                              return_tensors = 'pt',
                                              )

        return {'input_ids': encoding['input_ids'].flatten(),
                'attention_mask': encoding['attention_mask'].flatten(),
                'label': torch.tensor(label),
                }

In [None]:
class classifier(nn.Module):
    def __init__(self, model_name, num_classes):
        super(BERTClassifier, self).__init__()
        self.bert = b_model.from_pretrained(model_name)
        self.pre_classifier = nn.Linear(768, 768)
        self.dropout = nn.Dropout(0.1)
        self.classifier = nn.Linear(768, num_classes)

    def forward(self, input_ids, attention_mask):
        outputs = self.bert(input_ids = input_ids, attention_mask = attention_mask)
        pooled_output = outputs[0][:, 0]
        pooler = self.pre_classifier(pooled_output)
        pooler = nn.ReLU()(pooler)
        pooler = self.dropout(pooler)
        logits = self.classifier(pooler)
        return logits

In [None]:
def train(model, data_loader, optimizer, scheduler, device):
    model.train()
    losses = []
    predictions = []
    actual_labels = []

    for batch in data_loader:
        optimizer.zero_grad()
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['label'].to(device)
        outputs = model(input_ids = input_ids, attention_mask = attention_mask)
        loss = nn.CrossEntropyLoss()(outputs, labels)
        losses.append(loss.item())
        _, preds = torch.max(outputs, dim = 1)
        predictions.extend(preds.cpu().tolist())
        actual_labels.extend(labels.cpu().tolist())
        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), max_norm = 1.0)
        optimizer.step()
        scheduler.step()
    avg_loss = np.mean(losses)
    return avg_loss, predictions, actual_labels

In [None]:
def evaluate(model, data_loader, device):
    model.eval()
    losses = []
    predictions = []
    actual_labels = []
    with torch.no_grad():
        for batch in data_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['label'].to(device)
            outputs = model(input_ids = input_ids, attention_mask = attention_mask)
            loss = nn.CrossEntropyLoss()(outputs, labels)
            losses.append(loss.item())
            _, preds = torch.max(outputs, dim = 1)
            predictions.extend(preds.cpu().tolist())
            actual_labels.extend(labels.cpu().tolist())
    avg_loss = np.mean(losses)
    return avg_loss, predictions, actual_labels

In [None]:
tokenizer = tokenizer_name.from_pretrained(model_name, do_lower_case=True)

train_dataset = dataset(data_train, tokenizer, max_length)
train_dataloader = DataLoader(train_dataset, batch_size = batch_size)

val_dataset = dataset(data_validation, tokenizer, max_length)
val_dataloader = DataLoader(val_dataset, batch_size = batch_size)

test_dataset = dataset(data_test, tokenizer, max_length)
test_dataloader = DataLoader(test_dataset, batch_size = batch_size)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = classifier(model_name, num_classes).to(device)

In [None]:
optimizer = AdamW(model.parameters(), lr = learning_rate)

total_steps = len(train_dataloader) * num_epochs
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps = 0, num_training_steps = total_steps)

In [None]:
checkpoint_path = path + '/checkpoint/'
if os.path.exists(checkpoint_path) == False:
    os.makedirs(checkpoint_path)

In [None]:
training_stats = []

for epoch in range(num_epochs):
    print(f"Epoch {epoch + 1}/{num_epochs}")
    training_loss, training_predictions, training_actual_labels = train(model, train_dataloader, optimizer, scheduler, device)
    training_accuracy_score = accuracy_score(training_actual_labels, training_predictions)

    val_loss, val_predictions, val_actual_labels = evaluate(model, val_dataloader, device)
    val_accuracy_score = accuracy_score(val_actual_labels, val_predictions)
    val_f1_score = f1_score(val_actual_labels, val_predictions, average='macro')

    torch.save(model, path + '/last-model.pt')

    training_stats.append({'epoch': epoch + 1,
                           'Training Loss': training_loss,
                           'Training Accuracy': training_accuracy_score,
                           'Validation Loss': val_loss,
                           'Validation Accuracy': val_accuracy_score,
                           'Validation F1 Score': val_f1_score,
                           })

In [None]:
pd.set_option('display.precision', 4)
df_stats = pd.DataFrame(data = training_stats)
df_stats = df_stats.set_index('epoch')
df_stats

In [None]:
plt.plot(df_stats['Training Loss'])
plt.plot(df_stats['Validation Loss'])
plt.title('Training & Validation Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Training', 'Validation'], loc='upper left')
plt.show()
plt.plot(df_stats['Training Accuracy'])
plt.plot(df_stats['Validation Accuracy'])
plt.title('Training & Validation Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Training', 'Validation'], loc='upper left')
plt.show()

In [None]:
test_loss, test_predictions, test_actual_labels = evaluate(model, test_dataloader, device)
test_accuracy_score = accuracy_score(test_actual_labels, test_predictions)
test_f1_score = f1_score(test_actual_labels, test_predictions, average='macro')
report = classification_report(test_actual_labels, test_predictions, target_names=['Joy', 'Anger', 'Sadness', 'Surprise', 'Anticipation'],
                               output_dict = True)
print(f"Test Loss: {test_loss:.4f}")
print(f"Test Accuracy: {test_accuracy_score:.4f}")
print(f"Test F1 Score: {test_f1_score:.4f}")
report_df = pd.DataFrame(report).transpose()
report_df

In [None]:
cm = confusion_matrix(test_actual_labels, test_predictions)
ax = sns.heatmap(cm, square = True, annot = True, cmap = 'Blues', fmt = 'd', cbar = False)
ax.set_xlabel("Predicttion", fontsize = 14, labelpad = 20)
ax.xaxis.set_ticklabels(['Joy', 'Anger', 'Sadness', 'Surprise', 'Anticipation'])
ax.set_ylabel("Actual", fontsize = 14, labelpad = 20)
ax.yaxis.set_ticklabels(['Joy', 'Anger', 'Sadness', 'Surprise', 'Anticipation'])
ax.set_title("Confusion Matrix for Test data", fontsize = 14, pad = 20)
plt.show()