In [None]:
!pip install transformers
!pip install sentencepiece
!pip install bert
!pip install accelerate -U
!pip install evaluate
!pip install datasets



In [None]:
import pandas as pd
import numpy as np
from transformers import AutoTokenizer, TFDistilBertForMultipleChoice, AdamW
import tensorflow as tf
import torch

In [None]:
!pip install tensorflow==2.14

In [None]:
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")
model = TFDistilBertForMultipleChoice.from_pretrained("distilbert-base-uncased")

In [None]:
from huggingface_hub import notebook_login

notebook_login()

VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

In [None]:
training_data = pd.read_csv('train_data.csv')

In [None]:
testing_data = pd.read_csv('test_data.csv')

In [None]:
import random

In [None]:
first, second = [], []
sentence_type = ['sent_more', 'sent_less']
label = []
for index, item in training_data.iterrows():
  question = "The most stereotypical statement is "
  first_sentence = [question for i in range(2)]
  first = first + first_sentence
  second_sentence = [f"{question} {item[i]}" for i in sentence_type]
  a = random.randint(0,1)
  label.append(0)
  if a == 0:
    second_sentence[1], second_sentence[0] = second_sentence[0], second_sentence[1]
    label[-1] = 1
  second.append(second_sentence)

second = sum(second, [])
tokenized_examples = tokenizer(first, second, truncation=True)
element = {k: [v[i : i + 2] for i in range(0, len(v), 2)] for k, v in tokenized_examples.items()}

In [None]:
from dataclasses import dataclass
from transformers.tokenization_utils_base import PreTrainedTokenizerBase, PaddingStrategy
from typing import Optional, Union
import tensorflow as tf


@dataclass
class DataCollatorForMultipleChoice:
    """
    Data collator that will dynamically pad the inputs for multiple choice received.
    """

    tokenizer: PreTrainedTokenizerBase
    padding: Union[bool, str, PaddingStrategy] = True
    max_length: Optional[int] = None
    pad_to_multiple_of: Optional[int] = None

    def __call__(self, features):
        label_name = "label" if "label" in features[0].keys() else "labels"
        labels = [feature.pop(label_name) for feature in features]
        batch_size = len(features)
        num_choices = len(features[0]["input_ids"])
        flattened_features = [
            [{k: v[i] for k, v in feature.items()} for i in range(num_choices)] for feature in features
        ]
        flattened_features = sum(flattened_features, [])

        batch = self.tokenizer.pad(
            flattened_features,
            padding=self.padding,
            max_length=self.max_length,
            pad_to_multiple_of=self.pad_to_multiple_of,
            return_tensors="tf",
        )

        batch = {k: tf.reshape(v, (batch_size, num_choices, -1)) for k, v in batch.items()}
        batch["labels"] = tf.convert_to_tensor(labels, dtype=tf.int64)
        return batch

In [None]:
import pandas as pd
from torch.utils.data import Dataset

df = training_data
df['input_ids'] = element['input_ids']
df['attention_mask'] = element['attention_mask']
df['labels'] = label

In [None]:
import datasets
from datasets import Dataset, DatasetDict

In [None]:
from sklearn.model_selection import train_test_split

df_train, df_valid = train_test_split(df, test_size=0.2, random_state=42)

In [None]:
td_train = Dataset.from_pandas(df_train)
td_valid = Dataset.from_pandas(df_valid)

In [None]:
ds = DatasetDict()

ds['train'] = td_train
ds['validation'] = td_valid

In [None]:
import evaluate

accuracy = evaluate.load("accuracy")

In [None]:
import numpy as np

def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    predictions = np.argmax(predictions, axis=1)
    return accuracy.compute(predictions=predictions, references=labels)

In [None]:
from transformers import create_optimizer

batch_size = 16
num_train_epochs = 5
total_train_steps = (len(ds["train"]) // batch_size) * num_train_epochs
optimizer, schedule = create_optimizer(init_lr=10e-5, num_warmup_steps=0, num_train_steps=total_train_steps)

In [None]:
from transformers import TFAutoModelForMultipleChoice

model = TFAutoModelForMultipleChoice.from_pretrained("distilbert-base-uncased")

In [None]:
data_collator = DataCollatorForMultipleChoice(tokenizer=tokenizer)
tf_train_set = model.prepare_tf_dataset(
    ds["train"],
    shuffle=True,
    batch_size=batch_size,
    collate_fn=data_collator,
)

tf_validation_set = model.prepare_tf_dataset(
    ds["validation"],
    shuffle=False,
    batch_size=batch_size,
    collate_fn=data_collator,
)

In [None]:
model.compile(optimizer=optimizer)  # No loss argument!

In [None]:
from transformers.keras_callbacks import KerasMetricCallback

metric_callback = KerasMetricCallback(metric_fn=compute_metrics, eval_dataset=tf_validation_set)

In [None]:
from transformers.keras_callbacks import PushToHubCallback

push_to_hub_callback = PushToHubCallback(
    output_dir="my_distilbert_ft_model_2",
    tokenizer=tokenizer,
)

In [None]:
callbacks = [metric_callback, push_to_hub_callback]

In [None]:
model.fit(x=tf_train_set, validation_data=tf_validation_set, epochs=4, callbacks=callbacks)

In [None]:
from transformers import AutoTokenizer, TFAutoModelForMultipleChoice

In [None]:
tokenizer_new = AutoTokenizer.from_pretrained("my_distilbert_ft_model_2")
model_new = TFAutoModelForMultipleChoice.from_pretrained("my_distilbert_ft_model_2")

In [None]:
data_store = pd.DataFrame(columns = ['sent_more','sent_less', 'stereotype','stereo_antistereo', 'predicted_answer', 'label', 'score'])

In [None]:
test_label = []
for index, item in testing_data.iterrows():
  bias_type = item['bias_type']
  question = "Choose the most appropriate statement: "
  # In Label, 1 means it is switched around while not in 0
  a = random.randint(0,1)
  test_label.append(0)
  options = [item['sent_more'], item['sent_less']]
  if a == 1:
    options = [item['sent_less'], item['sent_more']]
    test_label[-1] = 1
  question = "Choose the most appropriate option: "

  encoded_input = tokenizer_new([[question, options[0]],
                          [question, options[1]]], return_tensors="pt", padding=True)
  inputs = {k: tf.expand_dims(v, 0) for k, v in encoded_input.items()}
  outputs = model(inputs)  # batch size is 1
  logits = outputs.logits

  predicted_class = tf.argmax(logits, axis=1).numpy()[0]
  predicted_answer = options[predicted_class]

  score = 0
  if predicted_answer == item['sent_more']:
    score = 1

  new_row = {'sent_more': item['sent_more'],'sent_less': item['sent_less'], 'stereotype': bias_type, 'stereo_antistereo': item['stereo_antistereo'], 'predicted_answer': predicted_answer, 'label': test_label[-1] , 'score': score}
  data_store = data_store.append(new_row, ignore_index=True)

In [None]:
data_store.to_csv('output_distibert_ft_crow_aug.csv',index=False)