In [1]:
import os
import pandas as pd
import numpy as np

import shutil
import sys
import tqdm.notebook as tq
from collections import defaultdict

import torch
import torch.nn as nn

from sklearn.metrics import f1_score

import time
from tqdm import tqdm
import nltk
from nltk.corpus import wordnet
from googletrans import Translator
from deep_translator import GoogleTranslator
import random

#nltk.download('wordnet')
#nltk.download('omw-1.4')

translator = Translator()

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

In [2]:
df_data = pd.read_csv('MEISD/MEISD_text.csv')

In [3]:
df_data.head()

Unnamed: 0,TV Series,Utterances,dialog_ids,uttr_ids,seasons,episodes,start_times,end_times,sentiment,emotion,intensity,emotion2,intensity2,emotion3,intensity3
0,GA,look around you,1,0,1,1,00:02:27:589,00:02:28:567,neutral,neutral,,,,,
1,GA,say hello to your competition,1,1,1,1,00:02:28:910,00:02:30:513,neutral,neutral,,,,,
2,GA,eight of you will switch to an easier specialty,1,2,1,1,00:02:31:387,00:02:34:060,neutral,neutral,,,,,
3,GA,five of you will crack under the pressure,1,3,1,1,00:02:34:134,00:02:36:002,neutral,neutral,,,,,
4,GA,two of you will be asked to leave,1,4,1,1,00:02:36:059,00:02:37:723,neutral,neutral,,,,,


In [4]:
emotion_map = {
    'neutral': 0,
    'Neutral': 0,
    'Neutral ': 0,
    'neutral ': 0,
    'acceptance': 1,
    'disgust': 2,
    'Disgust': 2,
    ' disgust': 2,
    'surprise': 3,
    'Surprise': 3,
    'joy': 4,
    'Joy': 4,
    'sadness': 5,
    'Sadness': 5,
    'anger': 6,
    'Anger': 6,
    'ANGER': 6,
    'like': 7,
    'fear': 8,
    'Fear': 8,
    'Fear ': 8,
    'faer': 8,
    'fear ': 8,
    'Fera': 8

}

data_emotion = pd.DataFrame()
data_emotion['Utterances'] = df_data['Utterances']
data_emotion['target1'] = df_data['emotion'].map(emotion_map).fillna(9).astype(int)
data_emotion['target2'] = df_data['emotion2'].map(emotion_map).fillna(9).astype(int)
data_emotion['target3'] = df_data['emotion3'].map(emotion_map).fillna(9).astype(int)

In [5]:
data_emotion.head(5)

Unnamed: 0,Utterances,target1,target2,target3
0,look around you,0,9,9
1,say hello to your competition,0,9,9
2,eight of you will switch to an easier specialty,0,9,9
3,five of you will crack under the pressure,0,9,9
4,two of you will be asked to leave,0,9,9


In [6]:
data_emotion['combined_emotions'] = data_emotion[['target1', 'target2', 'target3']].apply(lambda x: x.dropna().unique().astype(int).tolist(), axis=1)

In [7]:
from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.preprocessing import OneHotEncoder

mlb_emotion = MultiLabelBinarizer()
emotion_binarized = mlb_emotion.fit_transform(data_emotion['combined_emotions'])
emotion_df = pd.DataFrame(emotion_binarized)
emotion_df.columns = [f'emotion_{i + 1}' for i in range(emotion_df.shape[1])]


In [9]:
multi_label_binarizer_MEISD = pd.concat([data_emotion['Utterances'], emotion_df], axis=1)
multi_label_binarizer_MEISD

Unnamed: 0,Utterances,emotion_1,emotion_2,emotion_3,emotion_4,emotion_5,emotion_6,emotion_7,emotion_8,emotion_9,emotion_10
0,look around you,1,0,0,0,0,0,0,0,0,1
1,say hello to your competition,1,0,0,0,0,0,0,0,0,1
2,eight of you will switch to an easier specialty,1,0,0,0,0,0,0,0,0,1
3,five of you will crack under the pressure,1,0,0,0,0,0,0,0,0,1
4,two of you will be asked to leave,1,0,0,0,0,0,0,0,0,1
...,...,...,...,...,...,...,...,...,...,...,...
20012,"oh, that's right, you're a woman and you need ...",0,0,1,0,0,0,1,0,0,1
20013,i'll try again,0,0,1,0,0,0,1,0,0,1
20014,"please, pam, reconsider and have a bagel",0,1,1,0,0,0,0,0,0,1
20015,i have an early lunch,0,0,1,0,0,0,1,0,0,1


In [None]:
def to_binary_vector(row, num_classes=9):
    vector = np.zeros(num_classes)
    for i in range(1, 4):  # iteracja po target1, target2, target3
        if row[f'target{i}'] < num_classes:
            vector[row[f'target{i}']] = int(1)
    return vector

#data_emotion['target_vector'] = data_emotion.apply(to_binary_vector, axis=1)
#data_emotion[['Utterances', 'target_vector']].head(5)

In [None]:
target_array = np.array(data_emotion['target_vector'].tolist())
label_counts = target_array.sum(axis=0)
for idx, count in enumerate(label_counts):
    print(f"Label {idx}: {count}")

In [None]:
dataset = data_emotion[['Utterances', 'target_vector']]

In [None]:
dataset

In [10]:
from sklearn.model_selection import train_test_split
# split into train and test
df_train, df_test = train_test_split(multi_label_binarizer_MEISD, random_state=77, test_size=0.30, shuffle=True)
# split test into test and validation datasets
df_test, df_valid = train_test_split(df_train, random_state=88, test_size=0.50, shuffle=True)

In [11]:
print(f"Original train size: {multi_label_binarizer_MEISD.shape}")
print(f"Train: {df_train.shape}, Test: {df_test.shape}, Valid: {df_valid.shape}")

Original train size: (20017, 11)
Train: (14011, 11), Test: (10008, 11), Valid: (10009, 11)


In [12]:
# Hyperparameters
MAX_LEN = 30
TRAIN_BATCH_SIZE = 16
VALID_BATCH_SIZE = 16
TEST_BATCH_SIZE = 16
EPOCHS = 10
LEARNING_RATE = 0.001 #1e-05
THRESHOLD = 0.2 # threshold for the sigmoid

In [13]:
PRE_TRAINED_MODEL_NAME = 'bert-base-cased'

In [14]:
from transformers import BertTokenizer, BertModel
tokenizer = BertTokenizer.from_pretrained(PRE_TRAINED_MODEL_NAME)
# Test the tokenizer
test_text = "We are testing BERT tokenizer."
# generate encodings
encodings = tokenizer.encode_plus(test_text,
                                  add_special_tokens = True, # Add '[CLS]' and '[SEP]'
                                  max_length = 50,
                                  truncation = True,
                                  padding = "max_length",
                                  return_attention_mask = True,
                                  return_tensors = "pt")
# we get a dictionary with three keys (see: https://huggingface.co/transformers/glossary.html) 
encodings

  from .autonotebook import tqdm as notebook_tqdm


{'input_ids': tensor([[  101,  1284,  1132,  5193,   139,  9637,  1942, 22559, 17260,   119,
           102,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0]]), 'token_type_ids': tensor([[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0]]), 'attention_mask': tensor([[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0]])}

In [28]:
class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, df, tokenizer, max_len, target_list):
        self.tokenizer = tokenizer
        self.df = df
        self.title = list(df['Utterances'])
        self.targets = self.df[target_list].values
        self.max_len = max_len

    def __len__(self):
        return len(self.title)

    def __getitem__(self, index):
        title = str(self.title[index])
        title = " ".join(title.split())
        inputs = self.tokenizer.encode_plus(
            title,
            None,
            add_special_tokens=True,
            max_length=self.max_len,
            padding='max_length',
            return_token_type_ids=True,
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt'
        )
        return {
            'input_ids': inputs['input_ids'].flatten(),
            'attention_mask': inputs['attention_mask'].flatten(),
            'token_type_ids': inputs["token_type_ids"].flatten(),
            'targets': torch.FloatTensor(self.targets[index]),
            'title': title
        }

In [18]:
target_list = list(multi_label_binarizer_MEISD.columns)[1:]
target_list

['emotion_1',
 'emotion_2',
 'emotion_3',
 'emotion_4',
 'emotion_5',
 'emotion_6',
 'emotion_7',
 'emotion_8',
 'emotion_9',
 'emotion_10']

In [29]:
train_dataset = CustomDataset(df_train, tokenizer, MAX_LEN, target_list)
valid_dataset = CustomDataset(df_valid, tokenizer, MAX_LEN, target_list)
test_dataset = CustomDataset(df_test, tokenizer, MAX_LEN, target_list)

In [30]:
next(iter(train_dataset))

{'input_ids': tensor([ 101, 1125, 1128, 1455,  170, 2337, 1104, 2277, 2403,  102,    0,    0,
            0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
            0,    0,    0,    0,    0,    0]),
 'attention_mask': tensor([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0]),
 'token_type_ids': tensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         0, 0, 0, 0, 0, 0]),
 'targets': tensor([1., 0., 0., 0., 0., 0., 0., 0., 0., 1.]),
 'title': 'had you asked a couple of weeks ago'}

In [31]:
# Data loaders
train_data_loader = torch.utils.data.DataLoader(train_dataset,
                                                batch_size=TRAIN_BATCH_SIZE,
                                                shuffle=True,
                                                num_workers=0
                                                )

val_data_loader = torch.utils.data.DataLoader(valid_dataset,
                                              batch_size=VALID_BATCH_SIZE,
                                              shuffle=False,
                                              num_workers=0
                                              )

test_data_loader = torch.utils.data.DataLoader(test_dataset,
                                               batch_size=TEST_BATCH_SIZE,
                                               shuffle=False,
                                               num_workers=0
                                               )

In [None]:
class BERTClass(torch.nn.Module):
    def __init__(self):
        super(BERTClass, self).__init__()
        self.bert_model = BertModel.from_pretrained('bert-base-uncased', return_dict=True)
        self.dropout = torch.nn.Dropout(0.3)
        self.linear = torch.nn.Linear(768, 10)

    def forward(self, input_ids, attn_mask, token_type_ids):
        output = self.bert_model(
            input_ids,
            attention_mask=attn_mask,
            token_type_ids=token_type_ids
        )
        output_dropout = self.dropout(output.pooler_output)
        output = self.linear(output_dropout)
        return output

model = BERTClass()

# # Freezing BERT layers: (tested, weaker convergence)
# for param in model.bert_model.parameters():
#     param.requires_grad = False

model.to(device)

In [ ]:
def loss_fn(outputs, targets):
    return torch.nn.BCEWithLogitsLoss()(outputs, targets)  

In [ ]:
from transformers import AdamW

# define the optimizer
optimizer = AdamW(model.parameters(), lr = LEARNING_RATE)  

In [ ]:
# Training of the model for one epoch
def train_model(training_loader, model, optimizer):

    losses = []
    correct_predictions = 0
    num_samples = 0
    # set model to training mode (activate droput, batch norm)
    model.train()
    # initialize the progress bar
    loop = tq.tqdm(enumerate(training_loader), total=len(training_loader),
                   leave=True, colour='steelblue')
    for batch_idx, data in loop:
        ids = data['input_ids'].to(device, dtype = torch.long)
        mask = data['attention_mask'].to(device, dtype = torch.long)
        token_type_ids = data['token_type_ids'].to(device, dtype = torch.long)
        targets = data['targets'].to(device, dtype = torch.float)

        # forward
        outputs = model(ids, mask, token_type_ids) # (batch,predict)=(32,8)
        loss = loss_fn(outputs, targets)
        losses.append(loss.item())
        # training accuracy, apply sigmoid, round (apply thresh 0.5)
        outputs = torch.sigmoid(outputs).cpu().detach().numpy().round()
        targets = targets.cpu().detach().numpy()
        correct_predictions += np.sum(outputs==targets)
        num_samples += targets.size   # total number of elements in the 2D array

        # backward
        optimizer.zero_grad()
        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        # grad descent step
        optimizer.step()

        # Update progress bar
        #loop.set_description(f"")
        #loop.set_postfix(batch_loss=loss)

    # returning: trained model, model accuracy, mean loss
    return model, float(correct_predictions)/num_samples, np.mean(losses)
    

In [ ]:
def eval_model(validation_loader, model, optimizer):
    losses = []
    correct_predictions = 0
    num_samples = 0
    # set model to eval mode (turn off dropout, fix batch norm)
    model.eval()

    with torch.no_grad():
        for batch_idx, data in enumerate(validation_loader, 0):
            ids = data['input_ids'].to(device, dtype = torch.long)
            mask = data['attention_mask'].to(device, dtype = torch.long)
            token_type_ids = data['token_type_ids'].to(device, dtype = torch.long)
            targets = data['targets'].to(device, dtype = torch.float)
            outputs = model(ids, mask, token_type_ids)

            loss = loss_fn(outputs, targets)
            losses.append(loss.item())

            # validation accuracy
            # add sigmoid, for the training sigmoid is in BCEWithLogitsLoss
            outputs = torch.sigmoid(outputs).cpu().detach().numpy().round()
            targets = targets.cpu().detach().numpy()
            correct_predictions += np.sum(outputs==targets)
            num_samples += targets.size   # total number of elements in the 2D array

    return float(correct_predictions)/num_samples, np.mean(losses)

In [ ]:
history = defaultdict(list)
best_accuracy = 0

for epoch in range(1, EPOCHS+1):
    print(f'Epoch {epoch}/{EPOCHS}')
    model, train_acc, train_loss = train_model(train_data_loader, model, optimizer)
    val_acc, val_loss = eval_model(val_data_loader, model, optimizer)

    print(f'train_loss={train_loss:.4f}, val_loss={val_loss:.4f} train_acc={train_acc:.4f}, val_acc={val_acc:.4f}')

    history['train_acc'].append(train_acc)
    history['train_loss'].append(train_loss)
    history['val_acc'].append(val_acc)
    history['val_loss'].append(val_loss)
    # save the best model
    if val_acc > best_accuracy:
        torch.save(model.state_dict(), os.path.join("emotion_best_model_state.bin"))
        best_accuracy = val_acc
     

In [ ]:
# Model Evaluation
# Loading pretrained model (best model)
model = BERTClass()
model.load_state_dict(torch.load(os.path.join("emotion_best_model_state.bin")))
model = model.to(device)

In [ ]:
# Evaluate the model using the test data
test_acc, test_loss = eval_model(test_data_loader, model, optimizer)
    