# Replicate previous paper

[This github repository](https://github.com/yashsmehta/personality-prediction#predicting-personality-on-unseen-text) contains code for the paper [Bottom-Up and Top-Down: Predicting Personality with Psycholinguistic and Language Model Features](https://ieeexplore.ieee.org/document/9338428), where the authors propose a novel deep learning-based model which integrates traditional psycholinguistic features with language model embeddings to predict personality from the Essays dataset for Big-Five and Kaggle dataset for MBTI.

We used this paper's result as a baseline of model performance for personality detection.

In [2]:
"""
Try this code only at first time
"""
# Clone the git hub repo onto my Google Drive working folder
#!git clone 'https://github.com/yashsmehta/personality-prediction'

Cloning into 'personality-prediction'...
remote: Enumerating objects: 944, done.[K
remote: Counting objects: 100% (118/118), done.[K
remote: Compressing objects: 100% (51/51), done.[K
remote: Total 944 (delta 71), reused 108 (delta 67), pack-reused 826[K
Receiving objects: 100% (944/944), 53.49 MiB | 35.54 MiB/s, done.
Resolving deltas: 100% (564/564), done.


In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import warnings

In [2]:
%cd personality-prediction

/Users/yuta/Documents/GitHub/LLM-personality-evaluation/personality_detection_model/previous_paper/personality-prediction


## Split data into train/test dataset

In [1]:
SEED = 42

df = pd.read_csv('personality-prediction/data/kaggle/kaggle.csv')

train_df, test_df = train_test_split(df, test_size=0.1, random_state=SEED)
train_df = train_df.reset_index(drop=True)
test_df = test_df.reset_index(drop=True)

train_df.to_csv('data/kaggle/kaggle_train.csv', index=False)
test_df.to_csv('data/kaggle/kaggle_test.csv', index=False)

## Extract features from text data using BERT model

In [None]:
warnings.filterwarnings('ignore', category=FutureWarning)

!python LM_extractor.py -dataset_type 'kaggle' -token_length 512 -batch_size 32 -embed 'bert-base' -op_dir 'pkl_data'

## Fine-tune detection model (MLP)

In [None]:
!python finetune_models/MLP_LM.py -dataset "kaggle" -save_model "yes"

## Predict personality on test dataset

In [None]:
# Modified version of unseen_predictor.py

import os

os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"

from pathlib import Path
from transformers import BertTokenizer, BertModel

import torch
import numpy as np

import re
import sys
import joblib

import tensorflow as tf

#parent_dir = os.path.dirname(os.getcwd())
#sys.path.insert(0, parent_dir)
#sys.path.insert(0, os.getcwd())

# This line are needed only when run code on Colab
sys.path.append('/content/drive/MyDrive/Capstone/personality-prediction')

#import utils.gen_utils as utils
import utils.dataset_processors as dataset_processors

if torch.cuda.is_available():
    DEVICE = torch.device("cuda")
    print("GPU found (", torch.cuda.get_device_name(torch.cuda.current_device()), ")")
    torch.cuda.set_device(torch.cuda.current_device())
    print("num device avail: ", torch.cuda.device_count())
else:
    DEVICE = torch.device("cpu")
    print("Running on cpu")

def softmax(x):
    exp_x = np.exp(x)
    return exp_x / np.sum(exp_x)


def get_bert_model(embed):
    if embed == "bert-base":
        tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
        model = BertModel.from_pretrained("bert-base-uncased")

    elif embed == "bert-large":
        tokenizer = BertTokenizer.from_pretrained("bert-large-uncased")
        model = BertModel.from_pretrained("bert-large-uncased")

    elif embed == "albert-base":
        tokenizer = BertTokenizer.from_pretrained("albert-base-v2")
        model = BertModel.from_pretrained("albert-base-v2")

    elif embed == "albert-large":
        tokenizer = BertTokenizer.from_pretrained("albert-large-v2")
        model = BertModel.from_pretrained("albert-large-v2")

    else:
        print(f"Unknown pre-trained model: {embed}! Aborting...")
        sys.exit(0)

    return tokenizer, model


def load_finetune_model(op_dir, finetune_model, dataset):
    trait_labels = []

    if dataset == "kaggle":
        trait_labels = ["E", "N", "F", "J"]
    else:
        trait_labels = ["EXT", "NEU", "AGR", "CON", "OPN"]

    path_model = op_dir + "finetune_" + str(finetune_model).lower()

    if not Path(path_model).is_dir():
        print(f"The directory with the selected model was not found: {path_model}")
        sys.exit(0)

    def abort_if_model_not_exist(model_name):
        if not Path(model_name).is_file():
            print(
                f"Model not found: {model_name}. Either the model was not trained or the model name is incorrect! Aborting..."
            )
            sys.exit(0)

    models = {}
    for trait in trait_labels:
        if re.search(r"MLP_LM", str(finetune_model).upper()):
            model_name = f"{path_model}/MLP_LM_{trait}.h5"
#            print(f"Load model: {model_name}")
            abort_if_model_not_exist(model_name)
            model = tf.keras.models.load_model(model_name)

        elif re.search(r"SVM_LM", str(finetune_model).upper()):
            model_name = f"{path_model}/SVM_LM_{trait}.pkl"
#            print(f"Load model: {model_name}")
            abort_if_model_not_exist(model_name)
            model = joblib.load(model_name)

        else:
            print(f"Unknown finetune model: {model_name}! Aborting...")
            sys.exit(0)

        models[trait] = model

    return models


def extract_bert_features(text, tokenizer, model, token_length, overlap=256):
    tokens = tokenizer.tokenize(text)
    n_tokens = len(tokens)

    start, segments = 0, []
    while start < n_tokens:
        end = min(start + token_length, n_tokens)
        segment = tokens[start:end]
        segments.append(segment)
        if end == n_tokens:
            break
        start = end - overlap

    embeddings_list = []
    with torch.no_grad():
        for segment in segments:
            inputs = tokenizer(
                " ".join(segment), return_tensors="pt", padding=True, truncation=True
            )
            inputs = inputs.to(DEVICE)
            outputs = model(**inputs)
            embeddings = outputs.last_hidden_state[:, 0, :].cpu().numpy()
            embeddings_list.append(embeddings)

    if len(embeddings_list) > 1:
        embeddings = np.concatenate(embeddings_list, axis=0)
        embeddings = np.mean(embeddings, axis=0, keepdims=True)
    else:
        embeddings = embeddings_list[0]

    return embeddings


def predict(
        new_text,
        dataset: str='essays',
        token_length: int=512,
        batch_size: int=32,
        embed: str='bert-base',
        op_dir: str='pkl_data/',
        mode: str='512_head',
        embed_mode: str='cls',
        finetune_model: str='mlp_lm'
    ):
#    print(
#        "{} | {} | {} | {} | {} | {}".format(
#            dataset, embed, token_length, mode, embed_mode, finetune_model
#        )
#    )
    new_text_pre = dataset_processors.preprocess_text(new_text)

    tokenizer, model = get_bert_model(embed)

    model.to(DEVICE)

    new_embeddings = extract_bert_features(new_text_pre, tokenizer, model, token_length)
#    print("finetune model: ", finetune_model)
    models, predictions = load_finetune_model(op_dir, finetune_model, dataset), {}

    for trait, model in models.items():
        try:
            #prediction = model.predict(new_embeddings)
            prediction = model.predict(new_embeddings, verbose=0) # add verbose=0
            prediction = softmax(prediction)
            prediction = prediction[0][1]

            # find the index of the highest probability (predicted class)
            predictions[trait] = prediction  # get the probability of yes

        except BaseException as e:
            print(f"Failed to make prediction: {e}")

#    print(f"\nPersonality predictions using {str(finetune_model).upper()}:")
#    for trait, prediction in predictions.items():
#        binary_prediction = "Yes" if prediction > 0.5 else "No"
#        print(f"{trait}: {binary_prediction}: {prediction:.3f}")

    return predictions


In [None]:
tqdm.pandas()

def labeling(pred_dict):
    label = ""
    label += "E" if pred_dict['E'] >= 0.5 else "I"
    label += "N" if pred_dict['N'] >= 0.5 else "S"
    label += "F" if pred_dict['F'] >= 0.5 else "T"
    label += "J" if pred_dict['J'] >= 0.5 else "P"
    return label

df_test = pd.read_csv('data/kaggle/kaggle_test.csv')

preds = df_test['text'].progress_apply(lambda x: predict(x, dataset='kaggle'))
pred_labels = preds.apply(lambda x: labeling(x))
pred_labels.name = "pred_label"

pred_labels.to_csv('explogs/test_prediction.csv', index=False)

## Evaluate the result

In [None]:
from collections import defaultdict

ref_labels = pd.read_csv('data/kaggle/kaggle_test.csv')['type']
pred_labels = pd.read_csv('explogs/test_prediction.csv')['pred_label']

cor_dict = defaultdict(int)
multi_cor_list = [] # list to store 1 if all dimension are correct, else 0

for ref, pred in zip(ref_labels, pred_labels):
    multi_cor = 1
    for i in range(4):
        if ref[i] == pred[i]:
            cor_dict[i] += 1
            multi_cor *= 1
        else:
            multi_cor *= 0

    multi_cor_list.append(multi_cor)

total = len(ref_labels)
acc_dict = {idx: round(cor / total, 4) for idx, cor in cor_dict.items()}
print(f"Accuracy for each dimension: {acc_dict}")

multi_acc = round(sum(multi_cor_list) / total, 4)
print(f"Accuracy for 16 class classification: {multi_acc}")