<a href="https://colab.research.google.com/github/Shea-Fyffe/transforming-personality-scales/blob/main/vignettes/fine_tuning_transformers_for_text_classification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Fine-tuning Transformer Models for Text Classification
This colab is written in **Python** to illistrate the process of *fine-tuning* (see [Lui et al., 2020](https://doi.org/10.1007/978-981-15-5573-2)) state-of-the-art **Transformer** models to classify personality items. In this context the fine-tuning process involves training models with a relatively small amount of items with known trait labels. While this notebook demonstrates how these models can be used for text classification of personality items (i.e., as an automated form of content analysis; [Short et al., 2018](https://doi.org/10.1146/annurev-orgpsych-032117-104622)), the same steps can be taken with other scale inventories or forms of text.

### Libraries

Colab comes with a large number of Python libraries pre-loaded. However, `Transformers` is not initially available in Colab. The `Transformers` library can be installed by using the code below.

More information on the `Transformers` library can be seen [here](https://huggingface.co/transformers/quicktour.html).

In [None]:
#@title Installing Transformers

## Uncomment command below to install Transformers
! pip install transformers
! pip install sentencepiece

In [None]:
# load text classification modules from simpletransformers
from transformers import AutoModelForSequenceClassification, AutoTokenizer, TrainingArguments, Trainer

# data libraries
from torch.utils.data import Dataset
import torch
# util libraries
from scipy.special import softmax
from sklearn.metrics import classification_report

import pandas as pd
import numpy as np
from google.colab import drive # optional for getting data
from typing import Dict, List # for type hinting

import os
import sys
import datetime
import gc
import warnings
import requests
from io import StringIO

### Using a GPU
To speed things up you can use a *GPU* (*optional*).

First, you'll need to enable GPUs for the notebook:

- Navigate to Edit→Notebook Settings
- select GPU from the Hardware Accelerator drop-down

Next, confirm that you can connect to the GPU with tensorflow:

In [None]:
# A helper function to check for a GPU
# To check if you are able to use a GPU environment in Colab click the `Runtime` menu above, then select `Change Runtime Type`, the pick "GPU" for the `Hardware Accelerator` dropdown
def get_gpu ():
  if torch.cuda.is_available():
    torch.cuda.empty_cache()
    gc.collect()
    return torch.cuda.current_device()
  else:
    return -1

In [None]:
!nvidia-smi

### Functions and Classes

In [None]:
#@title Data Class
class TextClassificationDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels=None):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        if self.labels:
            item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.encodings["input_ids"])
      

In [None]:
#@title Fine-tuning function
def fine_tune(model, text, labels, train_args, multi_label: bool = False,
              time_stamp_out_dir: bool = True, max_seq_len: str = 'longest'):
  """Fine-tune a Transformers model for text classification
  
  Args:
    model: a valid string representing the model_type
    text: a list of sentences to use for fine-tuning
    labels: a list of labels
    train_args: dictionary of training arguments
    multi_label: A boolean (True/False). If True (False by default) will perform multi-label classification 
    time_stamp_out_dir: Perform multi-label classification (optional)
    max_seq_len: string determining how to pad text sequences (optional)
  """
  if time_stamp_out_dir:
    _, new_out_dir = update_directories(train_args.output_dir)
    train_args.output_dir = new_out_dir

  _, model_name = get_model(model)

  tokenizer = AutoTokenizer.from_pretrained(model_name)

  train_labels_indx, lab_to_id, num_labs = map_labels_to_keys(labels)
  
  if max_seq_len == 'longest':
    train_encodings = tokenizer(text, truncation=True, padding=True)
  else:
    train_encodings = tokenizer(text, padding='max_len', max_length=max_seq_len)

  train_dataset = TextClassificationDataset(train_encodings, train_labels_indx)
    
  model = AutoModelForSequenceClassification.from_pretrained(
      model_name, num_labels=num_labs, label2id = lab_to_id
      )
  
  if multi_label:
    model.problem_type = "multi_label_classification"

  trainer = Trainer(model=model,
      args = training_args,
      train_dataset = train_dataset
    )
 
  trainer.train()
    
  return trainer, tokenizer

In [None]:
#@title Load user-defined utility functions

# Import Data function
def import_data(path: str, text_col, label_col = None, enc = 'latin1'):
  """Import a CSV of sentences
  
  Args:
    path: A csv file path or url pointing at CSV file
    text_col: Name of column in csv containing sentences
    label_col: Name of column containing labels
    enc: File encoding to be used (optional)
  """
  if (path.startswith("http")):
      res = requests.get(path,
                         headers= {'User-Agent': 'Mozilla/5.0',
                                   "X-Requested-With": "XMLHttpRequest"})
      path = StringIO(res.text)
  df = pd.read_csv(path, encoding = enc)
  
  if label_col is None:
    return df[text_col].tolist(), df
  return df[text_col].tolist(), df[label_col].tolist(), df

# Map labels to keys
def map_labels_to_keys(labels: str, sort_labels = True):
  """Map text labels to integers
  
  Args:
    labels: a list/vector of text labels
    sort_labels: Sort labels alphabetically before recoding (optional)
  """
  k = list(dict.fromkeys(labels))
  if sort_labels:
    k.sort()
  labels_to_id = {k[i] : int(i) for i in range(0, len(k))}
  labels_out = []
  for j in labels:
    labels_out.append(labels_to_id[j])
  return labels_out, labels_to_id, len(k)

# Update model directories
def update_directories(model_output_dir: str) -> str:
    file_time = datetime.datetime.now().strftime("%Y_%m_%d-%I_%M_%S_%p")
    model_output_dir = f'{model_output_dir}-{file_time}/'
    out_file = f"{model_output_dir}/{file_time}_results.csv"
    return out_file, model_output_dir

# Get model for simple transformers
def get_model(model_type: str) -> List[str]:
    model_dict = {
        'albert': "albert-xlarge-v2",
        'bart': "facebook/bart-large",
        'bert': "bert-base-cased",
        'deberta': ["debertav2", "microsoft/deberta-v3-large"],
        'distilbert': "distilbert-base-cased-distilled-squad",
        'distilroberta': ['roberta', "cross-encoder/stsb-distilroberta-base"],
        'electra': "cross-encoder/ms-marco-electra-base",
        'roberta': "roberta-large",
        'xlnet': "xlnet-large-cased",
        'xmlroberta': "xlm-roberta-large",
    }
    model_name = model_dict.get(model_type, [model_type, model_type])
    if isinstance(model_name, str):
        model_name = [model_type, model_name]
    return model_name

# Format output data function
def format_output_data(raw_outputs, test_case_ids = None, label_values = None, output_probabilities: bool = True,
                       output_predicted_label: bool = True):
  """Format test data to be output to CSV
  
  Args:
    raw_outputs: The raw_outputs from transformers model.predict()
    test_case_ids: A list of test case ids (optional)
    label_values: A list of *unique ordered* labels (optional)
    output_probabilities: A boolean (True/False). If True (the default) will convert logit predictions to probabilities
    output_predicted_label: A boolean (True/False). If True (the default) will append a 'predicted' column as most likely label  
  """
  
  out_df = pd.DataFrame(raw_outputs)

  if output_probabilities:
      out_df = softmax(out_df, axis=1)
  
  if output_predicted_label:
      out_df['predicted'] = np.argmax(out_df, axis=1)
  
  if label_values is not None:
      out_df.columns = label_values
  
  if test_case_ids is not None:
      out_df.insert(0, 'id', test_case_ids)

  return out_df
  
# compute evaluation metrics
def evaluate_model(actual: List, predicted: List, label_values = None, **kwargs):
  """Calculate evaluation metrics on test labels
  
  Args:
    actual: list of actual labels
    predicted: list of predicted labels
    label_values: A *unique ordered* list of labels (optional)
    kwargs: Additional arguments to pass to sklearn.metrics.classification_report
  """

  if label_values is not None:
      kwargs.update({'target_names': label_values})
  else:
      kwargs.update({'target_names': list(dict.fromkeys(actual))})
      
  return classification_report(y_true = actual, y_pred = predicted, **kwargs)
    

### Selecting Model and Hyper-Parameters

---


We define our variables for purposes described in our research manuscript. However, we encourage researchers and practitioners to try out alternative models (by manually overriding `transformer_model`). In addition, we wanted to minimize the tuning hyper-parameters during training as the aim of this research is to highlight Transformers in a baseline sense.

In [None]:
#@title Define model to train
transformer_model = "deberta" #@param ["deberta", "albert", "bert", "bart", "distilbert","distilroberta", "electra", "roberta", "xlnet", "xlmroberta"]

In [None]:
#@title Define training hyper-parameters

# length to pad items to (~each word is 1.15 sequence units)
SEQ_LEN = 32

# first we can initialized the ClassificationArguments object
training_args = TrainingArguments(
   num_train_epochs = 10,
   learning_rate = 2e-5,
   warmup_ratio = 0.10,
   weight_decay = 0.01,
   per_device_train_batch_size = 16,
   seed = 42,
   load_best_model_at_end=True,
   evaluation_strategy="steps", 
   output_dir = f"{transformer_model}/outputs",
)



---


## Fine-tuning A Transformer Model


---
This example demonstrates the fine-tuning process for the purpose of classifying personality items into their respective content domains.


### Importing and formatting Training Data


While there are several ways to import data into Colab ([see here](https://colab.research.google.com/notebooks/io.ipynb)), the most intuitive way is to use the project's code repository url:

```
# Assign the online data repository to a url so it does not have to be repeated later
repository_data_url = "https://anonymous.4open.science/api/repo/transforming-personality-scales/file/data/text-classification/"

# the import_data function will return a list of sentences, a list of labels, and the original dataset
train_text, train_labels, train_raw_data = import_data(repository_data_url + 'train-data.csv', "text", "label")
```


You can also upload a local `.csv` file. You can do this by:
- Visiting the project url above and clicking the `download file` button (top right in project repository)
- Clicking the ***Files*** pane in Colab (the folder icon on the left in Colab)
- Clicking the ***Upload to session storage*** icon (left-most icon in Colab)
- Selecting the local data file you would like to use (e.g., `.csv`,`.tsv`)

In [None]:
# Assign the online data repository to a url so it doesn't have to be repeated laterr
repository_data_url = 'https://anonymous.4open.science/api/repo/transforming-personality-scales/file/data/text-classification/'

For this example, I've imported a file named `train-data.csv` (found on our [GitHub repo](https://anonymous.4open.science/r/transforming-personality-scales/data/text-classification/train-data.csv) in the directory `data/text-classification/`)



In [None]:
#@title Importing training dataset
# the import_data function will return a list of sentences, a list of labels, and the original dataset
train_text, train_labels, raw_training_data = import_data(repository_data_url + 'train-data.csv', "text", "label")

To properly import the training data we must specify the file path, column name containing our items, and column name containing our labels. Then, the `import_data()` returns three objects:

- a list (vector) of items
- a list (vector) of labels
- a copy of our training data

The code above assigns these to objects names `train_text`, `train_labels` and `raw_data` respectively.

### Training the Model

---

Our fine-tune function only requires that we define the `Transformer model` we would like to use, as well as `input a vector of text` (i.e., personality items in this example), the `trait labels`, and the `training arguments` (which we defined in the **Selecting Model and Hyper-Parameters** section of this tutorial). There are optional arguments, such as time-stamping the output directory, which would be a good ideal if training mulitple models.

In [None]:
# tune the model using the labeled personality items
fine_tuned_model, tokenizer = fine_tune(transformer_model, train_text, train_labels, training_args)

### Testing the Model

---

Since we've fined tuned the model we can use the `.predict()` method to predict the labels of new text---for example---personality items, survey responses, and even performance evaluations.

#### Import the test data
First, we must import the test data (`test-data.csv`), making sure we only specify the `path (url)` and `text_col` in the `import_data()` function.

In [None]:
#@title Importing testing dataset
# the import_data function will return a list of sentences and the original dataset if label is left blank
test_text, raw_test_data = import_data(repository_data_url + 'test-data.csv', "text")

In [None]:
# pre-process the test data before prediction
test_encodings = tokenizer(test_text, truncation=True, padding=True)
test_dataset = TextClassificationDataset(test_encodings)

#### Predict labels of the test items

In [None]:
# predict the test set and return single label predictions and the raw logits
predictions, _, _ = fine_tuned_model.predict(test_dataset)

By default the `format_output_data` function will return multi-class probabilities and the most likely label, which is appended as a column named *'predicted'*. These options can be modified by setting the arguments `output_probabilities` and `output_predicted_label` to `False`. For example:

```
# output predicted label and logit values
out_test_df = format_output_data(predictions, output_probabilities = False)

# output probabilities but no predicted label
out_test_df = format_output_data(predictions, output_predicted_label = False)

```

In [None]:
# we can format the output and save it
out_test_df = format_output_data(predictions)

In [None]:
# save results
out_test_df.to_csv(f"{transformer_model}-test-preds.csv", index=False)

### Evaluating the Model

---

In a case where we are provided the *ground truth* test labels (e.g., the *'label'* column in the `raw_test_data` dataset), we provide the `evaluate_model()` function to calculate model evaluation metrics (see ***Load user-defined utility functions*** code block for function documentation).

**Note:** The *'predicted'* column needs to be present in the `out_test_df` (or calculated manually) and then defined as `predicted =` argument.

In [None]:
# Calculate model evaluation metrics
eval_metrics = evaluate_model(actual = raw_test_data["label"], predicted = out_test_df["predicted"])

### Saving the model
fine-tuned models can also be saved and used for down-stream tasks


In [None]:
# Uncomment the line below to save the fine-tuned model for later use
# fine_tuned_model.save_model(f"{transformer_model}-fine-tuned-big5-personality")