In [None]:
from google.colab import drive
drive.mount('/content/drive')

## Imports and set-up

In [None]:
!pip install transformers
!pip install sentencepiece


In [None]:
!nvidia-smi

In [None]:
import sys
import os
import time
import re
import random
from typing import Dict, List, Optional, Union
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split, KFold, StratifiedKFold
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
import tensorflow as tf
from transformers import BertTokenizer, BertConfig, TFBertForSequenceClassification
from transformers import DistilBertTokenizer, TFDistilBertForSequenceClassification
from transformers import RobertaTokenizer, TFRobertaForSequenceClassification
from transformers import ElectraTokenizer, TFElectraForSequenceClassification
from transformers import XLNetTokenizer, TFXLNetForSequenceClassification
from transformers import LongformerTokenizer, TFLongformerForSequenceClassification

In [None]:
tf.random.set_seed(0)
random.seed(0)
np.random.seed(0)
os.environ['PYTHONHASHSEED']=str(0)
os.environ['TF_DETERMINISTIC_OPS'] = '0'

In [None]:
tf.config.experimental.list_physical_devices()

## Preprocessing

In [None]:
PATH = "/content/drive/MyDrive/Fairness/data/MBIC.xlsx"
df = pd.read_excel(PATH)
df.rename(columns={'sentence': 'sentence', 'label_bias': 'Label_bias'}, inplace=True)
df.head()

In [None]:
skfold = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

In [None]:
def pd_to_tf(df):
    """convert a pandas dataframe into a tensorflow dataset"""
    target = df.pop('Label_bias')
    sentence = df.pop('sentence')
    return tf.data.Dataset.from_tensor_slices((sentence.values, target.values))

def plot_graphs(history, metric):
  plt.plot(history.history[metric])
  plt.plot(history.history['val_'+metric], '')
  plt.xlabel("Epochs")
  plt.ylabel(metric)
  plt.legend([metric, 'val_'+metric])
  plt.show()

def tokenize(df):
    """convert a pandas dataframe into a tensorflow dataset and run hugging face's tokenizer on data"""
    target = df.pop('Label_bias')
    sentence = df.pop('sentence')

    train_encodings = tokenizer(
                        sentence.tolist(),
                        add_special_tokens = True, # add [CLS], [SEP]
                        truncation = True, # cut off at max length of the text that can go to BERT
                        padding='max_length',
                        max_length=512,
                        return_attention_mask = True, # add attention mask to not focus on pad tokens
              )

    dataset = tf.data.Dataset.from_tensor_slices(
        (dict(train_encodings),
         target.tolist()))
    return dataset

## Attention-based models


In [None]:
def run_model_5fold(df_train, model_name, freeze_encoder=True, pretrained=False, plot=False):
  """"freeze flags whether encoder layer should be frozen to not destroy transfer learning. Only set to false when enough data is provided"""

  
  Y = df_train['Label_bias']
  X = df_train['sentence']


  BUFFER_SIZE = 10000
  BATCH_SIZE = 32
  k = 1

  val_loss = []
  val_acc = []
  val_prec = []
  val_rec = []
  val_f1 = []
  val_f1_micro = []
  val_f1_wmacro = []

  for train_index, val_index in skfold.split(X,Y):
    print('### Start fold {}'.format(k))

    
    train_dataset = df_train.iloc[train_index]
    val_dataset = df_train.iloc[val_index]

   
    train_dataset = tokenize(train_dataset)
    val_dataset = tokenize(val_dataset)



   
    train_dataset = train_dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE).prefetch(tf.data.experimental.AUTOTUNE)
    val_dataset = val_dataset.batch(BATCH_SIZE).prefetch(tf.data.experimental.AUTOTUNE)

   
    if model_name == 'bert':
      model = TFBertForSequenceClassification.from_pretrained("bert-base-uncased")
    if model_name == 'distilbert':
      model = TFDistilBertForSequenceClassification.from_pretrained('distilbert-base-uncased')
    elif model_name == 'roberta':
      model = TFRobertaForSequenceClassification.from_pretrained('roberta-base')
    elif model_name == 'electra':
      model = TFElectraForSequenceClassification.from_pretrained('google/electra-small-discriminator')
    elif model_name == 'xlnet':
      model = TFXLNetForSequenceClassification.from_pretrained('xlnet-base-cased')


    if freeze_encoder == True:
      for w in model.get_layer(index=0).weights:
        w._trainable = False

   
    optimizer = tf.keras.optimizers.Adam(learning_rate=5e-5)
    loss_function = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    model.compile(optimizer=optimizer, loss=loss_function)

    
    if pretrained == True:
      model.get_layer(index=0).set_weights(trained_model_layer) 
      

   
    callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=1, restore_best_weights=True)

    
    history = model.fit(train_dataset, epochs=10, validation_data = val_dataset, callbacks=[callback])

   
    if plot:
      plot_graphs(history,'loss')

    
    loss = model.evaluate(val_dataset)

    if model_name == 'xlnet':
      yhats = []
      for row in df_train.iloc[val_index]['sentence']:
        input = tokenizer(row, return_tensors="tf")
        output = model(input)
        logits = output.logits.numpy()[0]
        candidates = logits.tolist()
        decision = candidates.index(max(candidates))
        yhats.append(decision)
    else:
      logits = model.predict(val_dataset)
      yhats = []
      for i in logits[0]:
        candidates = i.tolist()
        decision = candidates.index(max(candidates))
        yhats.append(decision)

    y = []
    for text, label in val_dataset.unbatch():
      y.append(label.numpy())

    val_loss.append(loss)
    val_acc.append(accuracy_score(y, yhats))
    val_prec.append(precision_score(y, yhats))
    val_rec.append(recall_score(y, yhats))
    val_f1.append(f1_score(y, yhats))
    val_f1_micro.append(f1_score(y, yhats, average='micro'))
    val_f1_wmacro.append(f1_score(y, yhats, average='weighted'))

    tf.keras.backend.clear_session()

    k += 1

  return val_loss, val_acc, val_prec, val_rec, val_f1, val_f1_micro, val_f1_wmacro

### BERT

In [None]:
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

val_loss, val_acc, val_prec, val_rec, val_f1, val_f1_micro, val_f1_wmacro = run_model_5fold(df_sg, model_name='bert',
                                                                                            freeze_encoder=False, pretrained=False)


loss_cv = np.mean(val_loss)
acc_cv = np.mean(val_acc)
prec_cv = np.mean(val_prec)
rec_cv = np.mean(val_rec)
f1_cv = np.mean(val_f1)
f1_micro_cv = np.mean(val_f1_micro)
f1_wmacro_cv = np.mean(val_f1_wmacro)

print('5-Fold CV Loss: {}'.format(loss_cv))
print('5-Fold CV Accuracy: {}'.format(acc_cv))
print('5-Fold CV Precision: {}'.format(prec_cv))
print('5-Fold CV Recall: {}'.format(rec_cv))
print('5-Fold CV F1 Score: {}'.format(f1_cv))
print('5-Fold CV Micro F1 Score: {}'.format(f1_micro_cv))
print('5-Fold CV Weighted Macro F1 Score: {}'.format(f1_wmacro_cv))

### DistilBERT

In [None]:
tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased')
val_loss, val_acc, val_prec, val_rec, val_f1, val_f1_micro, val_f1_wmacro = run_model_5fold(df_sg, model_name='distilbert',
                                                                                            freeze_encoder=False, pretrained=False)


In [None]:
loss_cv = np.mean(val_loss)
acc_cv = np.mean(val_acc)
prec_cv = np.mean(val_prec)
rec_cv = np.mean(val_rec)
f1_cv = np.mean(val_f1)
f1_micro_cv = np.mean(val_f1_micro)
f1_wmacro_cv = np.mean(val_f1_wmacro)

print('Results for DistilBERT on SG')
print('5-Fold CV Loss: {}'.format(loss_cv))
print('5-Fold CV Accuracy: {}'.format(acc_cv))
print('5-Fold CV Precision: {}'.format(prec_cv))
print('5-Fold CV Recall: {}'.format(rec_cv))
print('5-Fold CV F1 Score: {}'.format(f1_cv))
print('5-Fold CV Micro F1 Score: {}'.format(f1_micro_cv))
print('5-Fold CV Weighted Macro F1 Score: {}'.format(f1_wmacro_cv))

### RoBERTa

In [None]:
tokenizer = RobertaTokenizer.from_pretrained('roberta-base')
val_loss, val_acc, val_prec, val_rec, val_f1, val_f1_micro, val_f1_wmacro = run_model_5fold(df_sg, model_name='roberta',
                                                                                            freeze_encoder=False, pretrained=False)

In [None]:
loss_cv = np.mean(val_loss)
acc_cv = np.mean(val_acc)
prec_cv = np.mean(val_prec)
rec_cv = np.mean(val_rec)
f1_cv = np.mean(val_f1)
f1_micro_cv = np.mean(val_f1_micro)
f1_wmacro_cv = np.mean(val_f1_wmacro)

print('Results for RoBERTa on SG1')
print('5-Fold CV Loss: {}'.format(loss_cv))
print('5-Fold CV Accuracy: {}'.format(acc_cv))
print('5-Fold CV Precision: {}'.format(prec_cv))
print('5-Fold CV Recall: {}'.format(rec_cv))
print('5-Fold CV F1 Score: {}'.format(f1_cv))
print('5-Fold CV Micro F1 Score: {}'.format(f1_micro_cv))
print('5-Fold CV Weighted Macro F1 Score: {}'.format(f1_wmacro_cv))

### ELECTRA

In [None]:
tokenizer = ElectraTokenizer.from_pretrained('google/electra-small-discriminator')
val_loss, val_acc, val_prec, val_rec, val_f1, val_f1_micro, val_f1_wmacro = run_model_5fold(df_sg, model_name='electra',
                                                                                            freeze_encoder=False, pretrained=False)

In [None]:
loss_cv = np.mean(val_loss)
acc_cv = np.mean(val_acc)
prec_cv = np.mean(val_prec)
rec_cv = np.mean(val_rec)
f1_cv = np.mean(val_f1)
f1_micro_cv = np.mean(val_f1_micro)
f1_wmacro_cv = np.mean(val_f1_wmacro)

print('Results for ELECTRA on SG1')
print('5-Fold CV Loss: {}'.format(loss_cv))
print('5-Fold CV Accuracy: {}'.format(acc_cv))
print('5-Fold CV Precision: {}'.format(prec_cv))
print('5-Fold CV Recall: {}'.format(rec_cv))
print('5-Fold CV F1 Score: {}'.format(f1_cv))
print('5-Fold CV Micro F1 Score: {}'.format(f1_micro_cv))
print('5-Fold CV Weighted Macro F1 Score: {}'.format(f1_wmacro_cv))

### XLNET

In [None]:
tokenizer = XLNetTokenizer.from_pretrained('xlnet-base-cased')
val_loss, val_acc, val_prec, val_rec, val_f1, val_f1_micro, val_f1_wmacro = run_model_5fold(df_sg, model_name='xlnet',
                                                                                            freeze_encoder=False, pretrained=False)