<a href="https://colab.research.google.com/github/Zihooo/Text-selection-codes-pub/blob/main/New_Prediction_Model_(DeBERTa).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Transformer Models for Personality Score Prediction
This colab is written in **Python** to illistrate the process of *fine-tuning*  state-of-the-art **Transformer** models to predict personality scores. In this code sample, we used **Deberta-based** as an example of a transformer and **Extraversion** as a sample personality trait. We've made notes in the code about the changes you'd need to make to use other transformers or predict other personality traits.

In [None]:
# Mount Google drive to get access to the data
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [None]:
## install required pacakges
! pip install transformers==4.28.0
! pip install sentencepiece
! pip install datasets
! pip install scipy

In [None]:
# import pacakges
from transformers import AutoConfig, AutoTokenizer, TrainingArguments, Trainer, EarlyStoppingCallback, IntervalStrategy

import torch
from torch.utils.data import Dataset

import scipy
from scipy.stats import pearsonr
from scipy.special import softmax
import statistics
from sklearn.metrics import precision_recall_fscore_support
import pandas as pd
import numpy as np
from warnings import warn
import os
import sys
import gc


### Using a GPU
To speed things up you can use a *GPU* (*optional*).

First, you'll need to enable GPUs for the notebook:

- Navigate to Edit→Notebook Settings
- select GPU from the Hardware Accelerator drop-down

Next, confirm that you can connect to the GPU with tensorflow:

In [None]:
# A helper function to check for a GPU
def get_gpu ():
  if torch.cuda.is_available():
    torch.cuda.empty_cache()
    gc.collect()
    return torch.cuda.current_device()
  else:
    return -1

In [None]:
get_gpu()

0

In [None]:
!nvidia-smi

Sat Feb  3 17:21:44 2024       
+---------------------------------------------------------------------------------------+
| NVIDIA-SMI 535.104.05             Driver Version: 535.104.05   CUDA Version: 12.2     |
|-----------------------------------------+----------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |         Memory-Usage | GPU-Util  Compute M. |
|                                         |                      |               MIG M. |
|   0  Tesla T4                       Off | 00000000:00:04.0 Off |                    0 |
| N/A   59C    P8              11W /  70W |      3MiB / 15360MiB |      0%      Default |
|                                         |                      |                  N/A |
+-----------------------------------------+----------------------+----------------------+
                                                                    

### Functions and Classes

In [None]:
#@title Load user-defined utility functions
# Import Data function
def import_data(path, text_col, label_col, index_col = None, index_val = None, enc = 'latin1'):
  """Import a CSV of sentences

  Args:
    path: A csv file path
    text_col: Name of column in csv containing sentences
    label_col: Name of column containing labels
    enc: File encoding to be used (optional)
  """
  df = pd.read_csv(path, encoding = enc,keep_default_na=False)
  if not isinstance(index_val, type(None)):
    df = df[df[index_col] == index_val]
  if label_col is None:
    return df[text_col].tolist(), df
  return df[text_col].tolist(), df[label_col].tolist(), df


# Get model for simple transformers
def get_model(model_type):
    if  model_type == "specter":
        model_name = "allenai/specter"
    elif model_type == "bert":
        model_name = "bert-base-cased"
    elif model_type == "roberta":
        model_name = "roberta-large"
    elif model_type == "distilbert":
        model_name = "distilbert-base-cased-distilled-squad"
    elif model_type == "distilroberta":
        model_type = "roberta"
        model_name = "cross-encoder/stsb-distilroberta-base"
    elif model_type == "electra-base":
        model_type = "electra"
        model_name = "cross-encoder/ms-marco-electra-base"
    elif model_type == "xlnet":
        model_name = "xlnet-large-cased"
    elif model_type == "bart":
        model_name = "facebook/bart-large"
    elif model_type == "deberta":
        model_type = "debertav2"
        model_name = "microsoft/deberta-v3-large"
    elif model_type == "albert":
        model_name = "albert-xlarge-v2"
    elif model_type == "xlmroberta":
        model_name = "xlm-roberta-large"
    else:
        warnings.warn("model_type not a pre-defined, setting model_type to model_name")
        model_name = model_type
    return model_type, model_name



In [None]:
# eval metrics
from sklearn.metrics import mean_absolute_error
from sklearn.metrics import mean_squared_error
from sklearn.feature_selection import r_regression
from scipy.stats import pearsonr

def compute_metrics_for_regression(eval_pred):
    logits, labels = eval_pred
    labels = labels.reshape(-1, 1)
    mse = mean_squared_error(labels, logits)
    r = pearsonr(labels.reshape(-1), logits.reshape(-1))
    rscore = r[0].tolist()
    single_squared_errors = ((logits - labels).flatten()**2).tolist()
    return {"mse": mse, "r": rscore}

In [None]:
#@title Data Class
class TextClassificationDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)


### Defining Variables


---


We define our variables for purposes described in our research manuscripte. However, we encourage researchers and practitioners to try out alternative models. In addition, we wanted to minimize the tuning hyper-parameters during training as the aim of this research is to highlight Transformers in a baseline sense.

In [None]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification, DataCollatorWithPadding
from torch.utils.data import DataLoader

BASE_MODEL = 'microsoft/deberta-v3-base' # replace with "allenai/longformer-base-4096" for longformer
LEARNING_RATE = 1e-5
MAX_LENGTH = 512      # can be increased to 4096 when use longformer, a longer sequence leads to heavier computation load
BATCH_SIZE = 12       # batch size is defined based on available computational resource (GPU memory)
EPOCHS = 10           # may increase this number if there is no diminishing return on evaluation metric



---


## Fine-tuning A Transformer Model


---
This example demonstrates the fine-tuning process for the pupose of score prediction from text data.


### Importing and formatting Training Data


---


Since we have already mount this notebood at our drive, we can directly import data from Google drive.

In [None]:
# read the whole dataset (selected text)
all_text, all_labels, all_raw_data = import_data("/content/drive/MyDrive/Text Selection Paper Codes/data/all_text_latent_extract_10.csv", "texte", "escore")


# add tokenizer
tokenizer = AutoTokenizer.from_pretrained(BASE_MODEL)

In [None]:
from sklearn.model_selection import RandomizedSearchCV, KFold, train_test_split

all_pred_scores = []

num_folds = 5
kf_outer = KFold(n_splits=num_folds, shuffle=True, random_state=42)
for fold_index, (train_index, test_index) in enumerate(kf_outer.split(all_raw_data)):
    X_train, X_test = [all_text[i] for i in train_index], [all_text[i] for i in test_index]
    y_train, y_test = [all_labels[i] for i in train_index], [all_labels[i] for i in test_index]

    #creating evaluation set
    train_text, eval_text = train_test_split(X_train, test_size=0.25, random_state=42)
    train_labels, eval_labels = train_test_split(y_train, test_size=0.25, random_state=42)
    #tokenizing the data
    #train_labels_indx, lab_to_id, num_labs = map_labels_to_keys(train_labels)
    train_encodings = tokenizer(train_text, truncation=True, max_length = MAX_LENGTH,padding='max_length')
    train_dataset = TextClassificationDataset(train_encodings, train_labels)

    #eval_labels_indx, _, _ = map_labels_to_keys(eval_labels)
    eval_encodings = tokenizer(eval_text, truncation=True, max_length = MAX_LENGTH,padding='max_length')
    eval_dataset = TextClassificationDataset(eval_encodings, eval_labels)

    #test_labels_indx, _, _ = map_labels_to_keys(test_labels)
    test_encodings = tokenizer(X_test, truncation=True, max_length = MAX_LENGTH,padding='max_length')
    test_dataset = TextClassificationDataset(test_encodings, y_test)

    # load the model
    MODEL = AutoModelForSequenceClassification.from_pretrained(BASE_MODEL, num_labels=1, force_download = True)

    # arguments
    training_args = TrainingArguments(
    output_dir="/content/drive/MyDrive/Text Selection Paper Codes/checkpoints/deberta", # directory to save the model
    learning_rate=LEARNING_RATE,
    seed = 100,                                                    # though the seed number for training is fixed here, there is still some randomness in model innitiations.
    per_device_train_batch_size=BATCH_SIZE,
    per_device_eval_batch_size=BATCH_SIZE,
    num_train_epochs=EPOCHS,
    evaluation_strategy=IntervalStrategy.STEPS,
    eval_steps = 22,
    save_steps = 440,
    save_strategy="steps",
    logging_steps = 22,
    metric_for_best_model="r", greater_is_better = True,    # This metric can also be mse, and change greater is better to False.
                                                           # No matter which to use, an observation on the training log is necessary for model selection.
    load_best_model_at_end=True,     # this will save the epoch with the lowest loss metric as final output.
    weight_decay=0.01)

    # initialize trainer
    trainer = Trainer(model=MODEL,
    args = training_args,
    train_dataset = train_dataset,
    eval_dataset = eval_dataset,
    compute_metrics = compute_metrics_for_regression,
    callbacks = [EarlyStoppingCallback(early_stopping_patience=3)])

    #training
    trainer.train()

    #evaluation
    trainer.eval_dataset=eval_dataset
    print(trainer.evaluate())

    #prediction
    # run prediction
    pred_set = trainer.predict(test_dataset)
    # save the predicted results into a list
    xss = pred_set[0]
    #flat_list = [x for xs in xss for x in xs]

    #convergent validity
    correlation = pearsonr(xss,pred_set[1])

    # calculate criterion validity
    task = all_raw_data.task.iloc[test_index]
    people = all_raw_data.people.iloc[test_index]
    char = all_raw_data.char.iloc[test_index]
    ethic = all_raw_data.ethic.iloc[test_index]


    criterion_task = np.corrcoef(xss,task)[0, 1]
    criterion_people = np.corrcoef(xss,people)[0, 1]
    criterion_char = np.corrcoef(xss,char)[0, 1]
    criterion_ethic = np.corrcoef(xss,ethic)[0, 1]

    # Save the DataFrame to a CSV file
    fold_results = pd.DataFrame({'fold_index': fold_index, 'test_index': test_index, 'predicted_scores': xss})

    # Append the DataFrame to the list
    all_pred_scores.append(fold_results)

    # calculate the correlation between predicted scores and labels
    print(f"Pearson Correlation on Test Set: {correlation}")
    print(f"Criterion Correlation on task: {criterion_task}")
    print(f"Criterion Correlation on people: {criterion_people}")
    print(f"Criterion Correlation on char: {criterion_char}")
    print(f"Criterion Correlation on ethic: {criterion_ethic}")
    print("-" * 30)

    torch.cuda.empty_cache()

In [None]:
save_pred_scores = pd.concat(all_pred_scores, ignore_index=True)
save_pred_scores.to_csv('/content/drive/MyDrive/Text Selection Paper Codes/final saved outputs/selection/Oscore2.csv')