# Data

Dataset obtained from https://www.kaggle.com/datasets/jarvis11/text-dataset-for-text-emotion-detection/metadata 

In [None]:
import pandas as pd
from google.colab import files

def read_from_google_drive(url: str) -> pd.DataFrame:
  url = "https://drive.google.com/uc?id=" + url.split("/")[-2]
  return pd.read_csv(url, sep=']', header=None)

URL = "https://drive.google.com/file/d/1kIklUQY8S-wik2B58onlU75jX5zWELLy/view?usp=sharing"


df = read_from_google_drive(URL)
df.columns = ['label', 'text']

# Preprocessing

1 - Joy

2 - Fear

3 - Anger

4 - Sadness

5 - Disgust

6 - Ashamed

7 - Guilt

Translate the array labels into their corresponding integer (index) values so they're easier to work with

In [None]:
unique_labels = df.label.unique()
label_dict = {}
for index, possible_label in enumerate(unique_labels):
    label_dict[possible_label] = index
df['label'] = df.label.replace(label_dict)

Divide the dataset into train (85%) and test (15%) sets

In [None]:
from sklearn.model_selection import train_test_split

x_train, x_val, y_train, y_val = train_test_split(
    df.index.values,
    df.label.values,
    test_size=0.15,
    stratify=df.label.values
)

In [None]:
df.loc[(x_train), 'data_type'] = 'train'
df.loc[x_val, 'data_type'] = 'val'

In [None]:
# del
df.groupby(['label', 'data_type']).count()

Unnamed: 0_level_0,Unnamed: 1_level_0,text
label,data_type,Unnamed: 2_level_1
0,train,921
0,val,163
1,train,916
1,val,162
2,train,918
2,val,162
3,train,917
3,val,162
4,train,899
4,val,158


# Model Preparation 

In [None]:
!pip install transformers

from transformers import BertTokenizer
import torch
from transformers import BertForSequenceClassification

model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels = 7)
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertForSequenceClassification: ['cls.predictions.decoder.weight', 'cls.predictions.transform.dense.weight', 'cls.seq_relationship.bias', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.bias', 'cls.predictions.transform.dense.bias', 'cls.seq_relationship.weight', 'cls.predictions.transform.LayerNorm.bias']
- This IS expected if you are initializing BertForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of BertForSequenceClassification were not initialized from the model checkpoint at

In [None]:
pad_length = 17
tokens_train = tokenizer.batch_encode_plus(
    df[df.data_type == 'train'].text.tolist(),
    max_length = pad_length,
    pad_to_max_length = True,
    truncation = True,
    return_attention_mask = True,
    return_tensors = 'pt'
)
tokens_val = tokenizer.batch_encode_plus(
    df[df.data_type == 'val'].text.tolist(),
    max_length = pad_length,
    pad_to_max_length = True,
    truncation = True,
    return_attention_mask = True,
    return_tensors = 'pt'
)



In [None]:
# a temporary list to store the string labels
temp_list = df[df.data_type == 'train'].label.unique()

# dictionary that maps integer to its string value 
label_dict = {}
label_dict_inv = {}
for index, possible_label in enumerate(temp_list):
    label_dict[possible_label] = index
    label_dict_inv[index] = possible_label

# train and val tokenized label vectors
int_labels_train = []
for i in df[df.data_type == 'train'].label:
    int_labels_train.append(label_dict[i])

int_labels_val = []
for i in df[df.data_type == 'val'].label:
    int_labels_val.append(label_dict[i])

In [None]:
train_seq = torch.tensor(tokens_train['input_ids'])
train_mask = torch.tensor(tokens_train['attention_mask'])
train_labels = torch.tensor(int_labels_train)

val_seq = torch.tensor(tokens_val['input_ids'])
val_mask = torch.tensor(tokens_val['attention_mask'])
val_labels = torch.tensor(int_labels_val)

  """Entry point for launching an IPython kernel.
  
  """
  


In [None]:
from torch.utils.data import TensorDataset

dataset_train = TensorDataset(train_seq, train_mask, train_labels)
dataset_val = TensorDataset(val_seq, val_mask, val_labels)

In [None]:
from torch.utils.data import DataLoader, RandomSampler, SequentialSampler

batch_size = 32
dataloader_train = DataLoader(
    dataset_train,
    sampler=RandomSampler(dataset_train),
    batch_size = batch_size
)
dataloader_val = DataLoader(
    dataset_val,
    sampler=RandomSampler(dataset_val),
    batch_size = batch_size
)

In [None]:
from transformers import AdamW, get_linear_schedule_with_warmup

optimizer = AdamW(
    model.parameters(),
    lr=1e-5,
    eps=1e-8
)

epochs = 10
scheduler = get_linear_schedule_with_warmup(
    optimizer,
    num_warmup_steps=0,
    num_training_steps=len(dataloader_train)*epochs
)



In [None]:
def f1_score_func(preds, labels):
    preds_flat = np.argmax(preds, axis=1).flatten()
    labels_flat = labels.flatten()
    return f1_score(labels_flat, preds_flat, average='weighted')

def accuracy_per_class(preds, labels):
    label_dict_inverse = {v: k for k, v in label_dict.items()}
    
    preds_flat = np.argmax(preds, axis=1).flatten()
    labels_flat = labels.flatten()
    
    for label in np.unique(labels_flat):
        y_preds = preds_flat[labels_flat==label]
        y_true = labels_flat[labels_flat==label]
        print(f'{label_dict_inverse[label]}')
        print(f'Accuracy: {len(y_preds[y_preds==label])}/{len(y_true)}')

In [None]:
import random
import numpy as np

seed_val = 17
random.seed(seed_val)
np.random.seed(seed_val)
torch.manual_seed(seed_val)
torch.cuda.manual_seed_all(seed_val)

device = torch.device('cpu')
model.to(device)
print(device)

cpu


In [None]:
def evaluate(dataloader_val):

    model.eval()
    
    loss_val_total = 0
    predictions, true_vals = [], []
    
    for batch in dataloader_val:
        
        batch = tuple(b.to(device) for b in batch)
        
        inputs = {'input_ids':      batch[0],
                  'attention_mask': batch[1],
                  'labels':         batch[2],
                 }

        with torch.no_grad():        
            outputs = model(**inputs)
            
        loss = outputs[0]
        logits = outputs[1]
        loss_val_total += loss.item()

        logits = logits.detach().cpu().numpy()
        label_ids = inputs['labels'].cpu().numpy()
        predictions.append(logits)
        true_vals.append(label_ids)
    
    loss_val_avg = loss_val_total/len(dataloader_val) 
    
    predictions = np.concatenate(predictions, axis=0)
    true_vals = np.concatenate(true_vals, axis=0)
            
    return loss_val_avg, predictions, true_vals


# Training

In [None]:
from tqdm.notebook import tqdm

In [None]:
for epoch in tqdm(range(1, epochs+1)):
    model.train()
    loss_train_total = 0
    progress_bar = tqdm(
        dataloader_train, 
        desc='Epoch{:1d}'.format(epoch), 
        leave=False,
        disable=False
    )
    for batch in progress_bar:
        model.zero_grad()
        batch = tuple(b.to(device) for b in batch)
        inputs = {
            'input_ids' : batch[0],
            'attention_mask' : batch[1],
            'labels' : batch[2]
        }
        outputs = model(**inputs)
        loss = outputs[0]
        loss_train_total += loss.item()
        loss.backward()
        
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1)
        
        optimizer.step()
        scheduler.step()
        progress_bar.set_postfix({'training_loss': '{:.3f}'.format(loss.item()/len(batch))})
        
    torch.save(model.state_dict(), f'model.model')
    tqdm.write('\nEpoch {epoch}')
    loss_train_avg = loss_train_total/len(dataloader_train)
    tqdm.write(f'Training loss: {loss_train_avg}')


  0%|          | 0/10 [00:00<?, ?it/s]

Epoch1:   0%|          | 0/199 [00:00<?, ?it/s]


Epoch {epoch}
Training loss: 1.7689894204163672


Epoch2:   0%|          | 0/199 [00:00<?, ?it/s]


Epoch {epoch}
Training loss: 1.3264975362087614


Epoch3:   0%|          | 0/199 [00:00<?, ?it/s]


Epoch {epoch}
Training loss: 1.1034532108498578


Epoch4:   0%|          | 0/199 [00:00<?, ?it/s]


Epoch {epoch}
Training loss: 0.9424859872415438


Epoch5:   0%|          | 0/199 [00:00<?, ?it/s]


Epoch {epoch}
Training loss: 0.8283681003891643


Epoch6:   0%|          | 0/199 [00:00<?, ?it/s]


Epoch {epoch}
Training loss: 0.7340579419279817


Epoch7:   0%|          | 0/199 [00:00<?, ?it/s]


Epoch {epoch}
Training loss: 0.6474150781655431


Epoch8:   0%|          | 0/199 [00:00<?, ?it/s]


Epoch {epoch}
Training loss: 0.5842068750954154


Epoch9:   0%|          | 0/199 [00:00<?, ?it/s]


Epoch {epoch}
Training loss: 0.5475596282350358


Epoch10:   0%|          | 0/199 [00:00<?, ?it/s]


Epoch {epoch}
Training loss: 0.5133044017319703


In [None]:
label_to_word = {
  1: 'Joy',
  2: 'Fear',
  3: 'Anger',
  4: 'Sadness',
  5: 'Disgust',
  6: 'Ashamed',
  7: 'Guilt'
}

Example usage

In [None]:
def sa (intxt):
  encoded = tokenizer(intxt, return_tensors='pt')
  logits = model(**encoded).logits
  predicted_class_id = logits.argmax().item()
  #print(logits)
  print(label_to_word[predicted_class_id+1])