In [None]:
import pandas as pd
import numpy as np
import re
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm

!pip install transformers
import torch
from torch import nn
from transformers import BertModel
from transformers import BertTokenizer
from torch.optim import Adam

import random
random.seed(520)
np.random.seed(520)
torch.manual_seed(520)

from sklearn.metrics import classification_report, confusion_matrix

In [2]:
class TweetDataset(torch.utils.data.Dataset):
    def __init__(self, df):
        self.labels = [labels[label] for label in df['class']]
        self.texts = [tokenizer(text,
                                padding='max_length', max_length=512, truncation=True,
                                return_tensors="pt") for text in df['TweetContent']]

    def classes(self):
        return self.labels

    def __len__(self):
        return len(self.labels)

    def get_batch_labels(self, idx):
        return np.array(self.labels[idx])

    def get_batch_texts(self, idx):
        return self.texts[idx]

    def __getitem__(self, idx):
        batch_texts = self.get_batch_texts(idx)
        batch_y = self.get_batch_labels(idx)
        return batch_texts, batch_y


class BertClassifier(nn.Module):
    def __init__(self, dropout=0.5):
        super(BertClassifier, self).__init__()
        self.bert = BertModel.from_pretrained('bert-base-uncased')
        self.linear = nn.Linear(768, len(labels.keys()))

    def forward(self, input_id, mask):
        _, pooled_output = self.bert(
            input_ids=input_id, attention_mask=mask, return_dict=False)

        final_layer = self.linear(pooled_output)

        return final_layer


def train(model, train_data, val_data, learning_rate, epochs):
    train, val = TweetDataset(train_data), TweetDataset(val_data)

    train_dataloader = torch.utils.data.DataLoader(
        train, batch_size=8, shuffle=True)
    val_dataloader = torch.utils.data.DataLoader(val, batch_size=8)

    use_cuda = torch.cuda.is_available()
    device = torch.device("cuda" if use_cuda else "cpu")

    criterion = nn.CrossEntropyLoss()
    optimizer = Adam(model.parameters(), lr=learning_rate)

    if use_cuda:
        model = model.cuda()
        criterion = criterion.cuda()

    for epoch_num in range(epochs):
        total_acc_train = 0
        total_loss_train = 0

        for train_input, train_label in tqdm(train_dataloader):
            train_label = train_label.to(device)
            mask = train_input['attention_mask'].to(device)
            input_id = train_input['input_ids'].squeeze(1).to(device)

            output = model(input_id, mask)

            batch_loss = criterion(output, train_label.long())
            total_loss_train += batch_loss.item()

            acc = (output.argmax(dim=1) == train_label).sum().item()
            total_acc_train += acc

            model.zero_grad()
            batch_loss.backward()
            optimizer.step()

        total_acc_val = 0
        total_loss_val = 0

        with torch.no_grad():
            for val_input, val_label in val_dataloader:
                val_label = val_label.to(device)
                mask = val_input['attention_mask'].to(device)
                input_id = val_input['input_ids'].squeeze(1).to(device)

                output = model(input_id, mask)

                batch_loss = criterion(output, val_label.long())
                total_loss_val += batch_loss.item()

                acc = (output.argmax(dim=1) == val_label).sum().item()
                total_acc_val += acc

        print(
            f'Epochs: {epoch_num+1} | Train Loss: {total_loss_train/len(train_data): .3f} \
      | Train Accuracy: {total_acc_train/len(train_data): .3f} \
      | Val Loss: {total_loss_val/len(val_data): .3f} \
      | Val Accuracy: {total_acc_val/len(val_data): .3f}')

In [3]:
# Function to clean text
def clean_text(text):
    text = ' '.join(re.sub("(@[A-Za-z0-9]+)|(\w+:\/\/\S+)", " ", text).split())
    return text.lower().strip()


# Function to get predictions
def get_predictions(model, test_data):
    test_dataset = TweetDataset(test_data)
    test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=8)
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    
    predictions = []
    prediction_probs = []

    with torch.no_grad():
        for test_input, test_label in tqdm(test_dataloader):
            test_label = test_label.to(device)
            mask = test_input['attention_mask'].to(device)
            input_ids = test_input['input_ids'].squeeze(1).to(device)

            model_output = model(input_ids, mask)

            predictions.extend(model_output.argmax(dim=1))
            prediction_probs.extend(model_output)

    predictions = torch.stack(predictions).cpu()
    prediction_probs = torch.stack(prediction_probs).cpu()
    return predictions, prediction_probs


# Function to plot confusion matrix
def plot_confusion_matrix(confusion_matrix):
    hmap = sns.heatmap(confusion_matrix, annot=True, fmt="d", cmap="Blues")
    hmap.yaxis.set_ticklabels(
        hmap.yaxis.get_ticklabels(), rotation=0, ha='right')
    hmap.xaxis.set_ticklabels(
        hmap.xaxis.get_ticklabels(), rotation=30, ha='right')
    plt.ylabel('True sentiment')
    plt.xlabel('Predicted sentiment')

In [None]:
BERT_MODEL_PATH = 'bert-base-uncased'
EPOCHS = 3
LR = 1e-6

# https://github.com/dair-ai/emotion_dataset
working = pd.read_pickle(
    '----------').rename({'text': 'TweetContent', 'emotions': 'class'}, axis='columns')

# Sample and clean data
min_sample = min(min(working.groupby('class').count()['TweetContent']), 1500)
new_working = pd.DataFrame({'TweetContent': [], 'class': []})
for emotion in ['sadness', 'joy', 'anger', 'fear', 'neutral']:
    new_working = new_working.append(
        working[working['class'] == emotion].sample(min_sample, random_state=520))
new_working['TweetContent'] = new_working['TweetContent'].apply(clean_text)
working = new_working.reset_index(drop=True)

# Split data into train, validation, and test sets
df_train, df_val, df_test = np.split(working.sample(frac=1, random_state=520),
                                     [int(.8 * len(working)), int(.9 * len(working))])

# Tokenizer and labels
tokenizer = BertTokenizer.from_pretrained(BERT_MODEL_PATH)
labels = {'sadness': 0, 'joy': 1, 'anger': 2, 'fear': 3, 'neutral': 4}

# Initialize and train the model
model = BertClassifier()
train(model, df_train, df_val, LR, EPOCHS)

# Save the trained model
model.bert.save_pretrained('---------')

# Evaluate the model on the test set
predictions = get_predictions(model, df_test)
y_preds = predictions[0].tolist()
y_test = list(df_test['class'].apply(lambda x: labels[x]))

# Display classification report and confusion matrix
print(classification_report(y_test, y_preds, target_names=labels.keys()))
cm = confusion_matrix(y_test, y_preds)
df_cm = pd.DataFrame(cm, index=labels.keys(), columns=labels.keys())
plot_confusion_matrix(df_cm)