# MBTI Classification

## 0. Imports

In [2]:
import pandas as pd
import numpy as np
import torch

from torch import nn
from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss
from torch.utils.data import Dataset, DataLoader, random_split

from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from sklearn.feature_extraction.text import TfidfVectorizer
from torchtext.vocab import GloVe,vocab
# from transformers import (set_seed,
#                           TrainingArguments,
#                           Trainer,
#                           GPT2Config,
#                           GPT2Tokenizer,
#                           AdamW,
#                           get_linear_schedule_with_warmup,
#                           GPT2ForSequenceClassification,
#                           GPT2PreTrainedModel,
#                           GPT2Model)

from tqdm import tqdm
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, accuracy_score, f1_score, precision_score, recall_score
from typing import Optional, Tuple, Union
# from transformers.modeling_outputs import SequenceClassifierOutputWithPast

In [3]:
## Playground

print(torch.cuda.is_available())

!ls "../input/mbti-personality-types-500-dataset/MBTI 500.csv"

True
'../input/mbti-personality-types-500-dataset/MBTI 500.csv'


In [6]:
# CONSTANTS

MAX_SEQ_LEN = 500
N_LABELS = 16
N_LABELS_BINARY = 1

## 1. Helper methods

In [4]:
def convert_personality_type_to_binary(mbti_type):
    mapper = {
        'I':0,
        'E':1,
        'N':0,
        'S':1,
        'T':0,
        'F':1,
        'J':0,
        'P':1,
    }

    return [mapper[i] for i in mbti_type]

def convert_personality_type_to_int(mbti_type):
    types = [
                'INTJ', 'INTP', 'ISFJ', 'ISFP',
                'ISTJ', 'ISTP', 'ENFJ', 'ENFP',
                'ENTJ', 'ENTP','ESFJ', 'ESFP',
                'ESTJ', 'ESTP', 'INFJ', 'INFP'
            ]
    ints = [i for i in range(len(types))]
    mapper = dict(zip(types, ints))

    return mapper[mbti_type]

def convert_binary_to_personality_type(binary_mbti_type):
    mbti_arrays = [['I', 'E'], ['N', 'S'], ['T', 'F'], ['J', 'P']]
    mbti_string = ''
    for idx, mbti_type in enumerate(binary_mbti_type):
        mbti_string += mbti_arrays[idx][int(mbti_type)]
    return mbti_string

In [5]:
class MBTIDataset(Dataset):
    def __init__(self, data_path, vectorizing_method = None, binary_outputs = False, max_seq_len=500):
        """
        Vectorizing methods:
        None - returns raw text
        basic - basic builtin pytorch vectorizer
        TfIdf - tf-idf vectorizer
        GloVe - Global Vectors pretrained embedding
        """
        self.df = pd.read_csv(data_path)
        self.vectorizing_method = vectorizing_method
        self.max_seq_len = max_seq_len
        self.split_dataframe(self.max_seq_len)

        if vectorizing_method:
            if vectorizing_method.lower == 'basic':
                self.tokenizer = get_tokenizer('basic_english')
                self.vocab = build_vocab_from_iterator(self.yield_tokens_from_dataframe(), specials=['<unk>'])
                self.vocab.set_default_index(self.vocab["<unk>"])

            if vectorizing_method.lower == 'tfidf' or vectorizing_method.lower == 'tf-idf':
                self.tokenizer = TfidfVectorizer(stop_words= 'english')
                self.vocab = self.tokenizer.fit_transform(self.df['posts']) # Sparse matrix representation - could use different field names
            if vectorizing_method.lower == 'glove':
                unk_index = 0
                self.global_vectors = GloVe(name='6B', dim=50)
                self.vocab=vocab(self.global_vectors.stoi)
                self.vocab.insert_token("<unk>",unk_index)
                self.vocab.set_default_index(unk_index)

                self.pretrained_embeddings = self.global_vectors.vectors
                self.pretrained_embeddings = torch.cat((torch.zeros(1,self.pretrained_embeddings.shape[1]),self.pretrained_embeddings))


        self.binary_outputs = binary_outputs
        if binary_outputs:
            self.df['type'] = self.df['type'].apply(convert_personality_type_to_binary)
        else:
            self.df['type'] = self.df['type'].apply(convert_personality_type_to_int)

    def yield_tokens_from_dataframe(self):
        for post in self.df['posts']:
            yield self.tokenizer(post)

    def split_dataframe(self, new_seq_len):
        new_df = pd.DataFrame(columns=self.df.columns)
        new_posts = []
        new_types = []
        for idx, row in self.df.iterrows():
            split_posts = row['posts'].split(' ')
            i = 0
            while i < len(split_posts):
                new_posts.append((' ').join(split_posts[i:i+new_seq_len]))
                new_types.append(row['type'])
                i += new_seq_len

        new_df['posts'] = new_posts
        new_df['type'] = new_types
        self.df = new_df

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        if idx >= len(self): raise IndexError

        if not self.vectorizing_method :
            return self.df['posts'][idx], self.df['type'][idx]  #Return raw text

        input_text = self.vocab(self.df['posts'][idx].split(' '))
        if len(input_text) < self.max_seq_len:
            input_text.extend([0] * (self.max_seq_len-len(input_text)))
        label = self.df['type'][idx]

        return input_text[0:self.max_seq_len], label

In [None]:
class Gpt2ClassificationCollator(object):

    def __init__(self, use_tokenizer, max_sequence_len=None):

        # Tokenizer to be used inside the class.
        self.use_tokenizer = use_tokenizer
        # Check max sequence length.
        self.max_sequence_len = use_tokenizer.model_max_length if max_sequence_len is None else max_sequence_len

        return

    def __call__(self, sequences):

        # Get all texts from sequences list.
        texts = [sequence[0] for sequence in sequences]
        # Get all labels from sequences list.
        labels = [sequence[1] for sequence in sequences]
        # Call tokenizer on all texts to convert into tensors of numbers with
        # appropriate padding.
        inputs = self.use_tokenizer(text=texts, return_tensors="pt", padding=True, truncation=True,  max_length=self.max_sequence_len)
        # Update the inputs with the associated encoded labels as tensor.
        inputs.update({'labels':torch.tensor(labels)})

        return inputs

## Neural Network

In [None]:
# Load data
max_seq_len = 500

# Need tokenizer
# tokenizer = GPT2Tokenizer.from_pretrained(pretrained_model_name_or_path='gpt2')
# tokenizer.padding_side = "left"
# tokenizer.pad_token = tokenizer.eos_token
# gpt2_classificaiton_collator = Gpt2ClassificationCollator(use_tokenizer=tokenizer, max_sequence_len=max_seq_len)


ds = MBTIDataset('../input/mbti-personality-types-500-dataset/MBTI 500.csv', vectorizing_method = "tfidf", binary_outputs=True, max_seq_len = max_seq_len)
train_set_size = int(len(ds)*0.7)
val_set_size = int(len(ds)*0.2)
test_set_size = len(ds) - train_set_size - val_set_size
train_ds, val_ds, test_ds = random_split(ds, [train_set_size, val_set_size, test_set_size])

train_dataloader = DataLoader(train_ds, batch_size=8, shuffle=True, collate_fn=gpt2_classificaiton_collator)
val_dataloader = DataLoader(val_ds, batch_size=8, shuffle=True, collate_fn=gpt2_classificaiton_collator)
test_dataloader = DataLoader(test_ds, batch_size=8, shuffle=True, collate_fn=gpt2_classificaiton_collator)

In [None]:
class LSTM_model(nn.Module):
    
    def __init__(self, hidden_dimension=128):
        super(LSTM_model, self).__init__()
        
        self.dimension = hidden_dimension
        
        self.embedding = nn.Embedding(len(text_field.vocab), 300) # Embedding dim 300
        self.lstm_1 = nn.LSTM(input_size=MAX_SEQ_LEN,
                            hidden_size=hidden_dimension,
                            num_layers=1,
                            batch_first=True,
                            bidirectional=True)
        self.lstm_2 = nn.LSTM(input_size=MAX_SEQ_LEN,
                            hidden_size=hidden_dimension,
                            num_layers=1,
                            batch_first=True,
                            bidirectional=True)
        
#         self.max_pooling = nn.MaxPool1d()
        self.dense1 = nn.Linear(hidden_dimension * 2, out_features=hidden_dimension/2) # total worlds / 2
        self.dropout = nn.Dropout(0.5)
        self.dense2 = nn.Linear(hidden_dimension/2, N_LABELS)  # total worlds
        
    def forward(self, x):
        
        out = self.embedding(x)
        
        # Packing output
        # packed_input = pack_padded_sequence(out, text_len, batch_first=True, enforce_sorted=False)
        # packed_output, _ = self.lstm(packed_input)
        # out, _ = pad_packed_sequence(packed_output, batch_first=True)
        
#         out_forward = output[range(len(output)), text_len - 1, :self.dimension]
#         out_reverse = output[:, 0, self.dimension:]
#         out_reduced = torch.cat((out_forward, out_reverse), 1)
#         text_fea = self.drop(out_reduced)

#         text_fea = self.fc(text_fea)
#         text_fea = torch.squeeze(text_fea, 1)


#         h = torch.zeros((self.LSTM_layers, x.size(0), self.hidden_dim))
#         c = torch.zeros((self.LSTM_layers, x.size(0), self.hidden_dim))

#         torch.nn.init.xavier_normal_(h)
#         torch.nn.init.xavier_normal_(c)
#         out, (_, _) = self.lstm(out, (h,c))
        
        out, (_, _) = self.lstm(out)
        
        out = self.drouput(out)
        out = self.dense1(out)
        out = torch.tanh(out[:,-1,:])
        out = self.dropout(out)
        out = self.dense2(out)
        out = torch.softmax(out)
    
        return out 


# model_config = GPT2Config.from_pretrained(pretrained_model_name_or_path='gpt2', num_labels=n_labels)

# tokenizer.padding_side = "left"
# tokenizer.pad_token = tokenizer.eos_token

# model = GPT2ForBinaryMBTIClassification.from_pretrained(pretrained_model_name_or_path='gpt2', config=model_config)
# model.resize_token_embeddings(len(tokenizer))
# model.config.pad_token_id = model.config.eos_token_id

In [None]:
model = LSTM_model()

In [None]:
# Use later
# class GlobalMaxPooling1D(nn.Module):

# def __init__(self, data_format='channels_last'):
#     super(GlobalMaxPooling1D, self).__init__()
#     self.data_format = data_format
#     self.step_axis = 1 if self.data_format == 'channels_last' else 2

# def forward(self, input):
#     return torch.max(input, axis=self.step_axis).values

## Tranining

In [None]:
# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device = torch.device('cuda')
model.to(device)

optimizer = torch.optim.AdamW(model.parameters(),
                  lr = 2e-5, # default is 5e-5, our notebook had 2e-5
                  eps = 1e-8 # default is 1e-8.
                  )

# Total number of training steps is number of batches * number of epochs.
# `train_dataloader` contains batched data so `len(train_dataloader)` gives
# us the number of batches.
epochs = 2
total_steps = len(train_dataloader) * epochs

# Create the learning rate scheduler.
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps = 0, num_training_steps = total_steps)

In [None]:
for epoch in range(epochs):
    model.train()
    
    predictions_labels = []
    true_labels = []
    # Total loss for this epoch.
    total_loss = 0
    # For each batch of training data...
    i = 0
    i_to_zero_count = 0
    for batch in tqdm(train_dataloader, total=len(train_dataloader), position=0, leave=True):

        true_labels += batch['labels'].numpy().flatten().tolist()
        batch = {k:v.type(torch.long).to(device) for k,v in batch.items()}
        model.zero_grad()
        outputs = model(**batch)
        loss, logits = outputs[:2]
        total_loss += loss.item()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        scheduler.step()
        logits = logits.detach().cpu().numpy()
        predictions_labels += logits.argmax(axis=-1).flatten().tolist()
        
        if i % 500 == 0:
            i=0
            i_to_zero_count += 1
            print(total_loss / (i_to_zero_count * 500 * 8))
                  
        i += 1

    avg_epoch_loss = total_loss / len(train_dataloader)
    print(avg_epoch_loss)
    
    predictions_labels = []
    true_labels = []
    total_loss = 0

    model.eval()

    # Evaluate data for one epoch
    for batch in tqdm(val_dataloader, total=len(val_dataloader), position=0, leave=True):

        true_labels += batch['labels'].numpy().flatten().tolist()
        batch = {k:v.type(torch.long).to(device) for k,v in batch.items()}
        with torch.no_grad():        
            outputs = model(**batch)
            loss, logits = outputs[:2]
            logits = logits.detach().cpu().numpy()
            total_loss += loss.item()
            predict_content = logits.argmax(axis=-1).flatten().tolist()
            predictions_labels += predict_content

    avg_epoch_loss = total_loss / len(val_dataloader)
    print(avg_epoch_loss)
    
    

In [None]:
torch.save(model.state_dict(), './manual_model_epoch1_binary.pt')

In [None]:
predictions_labels = []
true_labels = []
total_loss = 0

model.eval()

# Evaluate data for one epoch
for batch in tqdm(test_dataloader, total=len(test_dataloader)):

    true_labels += batch['labels'].numpy().flatten().tolist()
    batch = {k:v.type(torch.long).to(device) for k,v in batch.items()}
    with torch.no_grad():        
        outputs = model(**batch)
        loss, logits = outputs[:2]
        logits = logits.detach().cpu().numpy()
        total_loss += loss.item()
        predict_content = logits.argmax(axis=-1).flatten().tolist()
        predictions_labels += predict_content

avg_epoch_loss = total_loss / len(test_dataloader)
print(avg_epoch_loss)

In [None]:
mbti_types = [
                'INTJ', 'INTP', 'ISFJ', 'ISFP',
                'ISTJ', 'ISTP', 'ENFJ', 'ENFP',
                'ENTJ', 'ENTP','ESFJ', 'ESFP',
                'ESTJ', 'ESTP', 'INFJ', 'INFP'
            ]

fig, ax = plt.subplots(figsize=(20, 20))
cm = confusion_matrix(true_labels, predictions_labels, labels=[i for i in range(16)], normalize='true')
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=mbti_types)
disp.plot(ax=ax)

In [None]:
print('Accuracy:', accuracy_score(true_labels, predictions_labels))
print('F1 score:', f1_score(true_labels, predictions_labels, average='weighted'))
print('Precision:', precision_score(true_labels, predictions_labels, average='weighted'))
print('Recall:', recall_score(true_labels, predictions_labels, average='weighted'))