In [None]:
!pip install transformers
!pip install datasets
!pip install huggingface_hub
!pip install ipywidgets==7.7.1
!pip install seqeval
!pip install accelerate -U
!pip install setfit==1.0.2

In [3]:
from datasets import load_dataset
from huggingface_hub import notebook_login
import json
import numpy as np
import random
import torch
from sentence_transformers.losses import CosineSimilarityLoss
from setfit import SetFitModel, SetFitTrainer
from huggingface_hub import notebook_login
import random
from datasets import Dataset
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix

#Check if CUDA (GPU support) is available
if torch.cuda.is_available():
    # Set the device to GPU
    device = torch.device("cuda")
    print("Using GPU.")
else:
    # Set the device to CPU
    device = torch.device("cpu")
    print("No GPU available, using CPU.")

Using GPU.


In [None]:
# Log-in to Hugging Face
notebook_login()

In [None]:
# Hyperparameters
EPOCHS = 1
ITERATIONS = 10 # Number of text pairs to generate for contrastive learning
NUM_NON_SEXIST_SAMPLES = 45 # Number of non sexist sentences to sample
CURRENT_FOLD = 0 # Current fold used as test test

In [None]:
# ------------------------------------------------------- Load dataset from local folder ------------------------------------------------

def get_dataset(path):
  """
      Reads a dataset from the given path and returns the train and test set.
  
      @:param path: The path to the dataset file for pipeline.

      @:return train_set, test_set: The train and test set in the dataset.
      
  """
  train = []
  test = []

  for i in range(5):
    with open(path+str(i)+".json", mode="r", encoding="utf-8") as data:
      dataset = json.load(data)
    
    print(len(dataset["Data"]))
    if i == CURRENT_FOLD:
      test.extend(dataset["Data"])
    else:
      train.extend(dataset["Data"])
  return train, test

In [None]:
# ------------------------------------------------------- Load dataset from Hugging Face ------------------------------------------------
def get_dataset(name):
  """
      Reads a dataset from the given Hugging Face dataset name and fold and returns the train and test set.
  
      @:param name: The name of the dataset for the baseline.

      @:return train_set, test_set: The train and test set in the dataset.
      
  """
  train = []
  test = []
  for i in range(5):
    dataset = load_dataset(name, "fold"+str(i))
    if i == CURRENT_FOLD:
      test.extend(dataset["train"]["Data"][0])
    else:
      train.extend(dataset["train"]["Data"][0])

  return train, test


In [None]:
def modify_labels(labels_file):

    """
    Modify labels file to support BIO-format.

    @:param labels_file: the path to the labels file

    @:return: 
        modified_labels2ids: mapping of BIO-format labels to ids
        modified_ids2labels: mapping of ids to BIO-format labels

    """
    modified_labels2ids = {}
    modified_ids2labels = {}
    with open(labels_file, mode="r", encoding="utf-8") as lab_file:
        labels = json.load(lab_file)

    index = 1
    for _, label in labels.items():
        label_begin = "B-" + label
        label_inside = "I-" + label
        modified_labels2ids[index] = label_begin
        modified_labels2ids[index + len(labels)] = label_inside
        modified_ids2labels[label_begin] = index
        modified_ids2labels[label_inside] = index +len(labels)
        index += 1
    modified_labels2ids[0] = "O"
    modified_ids2labels["O"] = 0

    return modified_labels2ids, modified_ids2labels

def get_original_labels(labels_file):
  """
    Get the original labels from the labels file (without BIO-format).

    @:param labels_file: the path to the labels file

    @:return label2id: mapping of original labels to ids

    """
  with open(labels_file, "r", encoding="utf-8") as lab_file:
    json_labels = json.load(lab_file)

  label2id = {v:k for k,v in json_labels.items()}
  return label2id

In [None]:
# ---------------------------------------------------------- Train SetFit - BINARY CLASSIFICATION ----------------------------------------------------------------

def get_samples(data, max):

  """
  Get samples from the dataset.
  
  This function receives a dataset and a maximum number of samples to be extracted. 
  For each label except 0, "max" samples are extracted (or all available samples if "max" is greater than the number of samples with label 0).
  The number of samples with label 0 to be extracted is instead defined by the hyperparameter NUM_NON_SEXIST_SAMPLES.
  @:param dataset: The input sentences of the dataset.        
  @:param max: An integer representing the maximum number of samples to be extracted for labels other than 0.
  @:return: The list of samples extracted from the dataset.
  """
  examples_sexist = {} # Each key represent a label and the value is a list of all examples with that label in the dataset
  others = [] # Contains non-sexist sentences
  for datum in data:

    if datum["label"] == 0:
      others.append(datum)

    else:
      for label in datum["entities"]:
        if label["label"] not in examples_sexist:
          examples_sexist[label["label"]] = []
          examples_sexist[label["label"]].append(datum)
        else:
          examples_sexist[label["label"]].append(datum)
  
  # Initialize the final training set by sampling from the non-sexist sentences
  final = random.sample(others,NUM_NON_SEXIST_SAMPLES)

  # Then sample "max" sentences from the sexist sentences and add them to the final training set
  for _,v in examples_sexist.items():
    if len(v) < max:
      final.extend(v)
    else:
      final.extend(random.sample(v,max))
  return final



def train_SetFit(train, test):

  # Create trainer
  trainer = SetFitTrainer(
      model=model,
      train_dataset=train,
      eval_dataset=test,
      loss_class=CosineSimilarityLoss,
      batch_size=16,
      num_iterations=ITERATIONS, # Number of text pairs to generate for contrastive learning
      num_epochs=EPOCHS # Number of epochs to use for contrastive learning
  )

  trainer.train()
  metrics = trainer.evaluate()
  return trainer


MODEL_NAME = "sentence-transformers/distiluse-base-multilingual-cased-v1"

'''
Models tried during hyperparameter tuning:
1) sentence-transformers/distiluse-base-multilingual-cased-v1
2) sentence-transformers/distiluse-base-multilingual-cased-v2
3) sentence-transformers/paraphrase-multilingual-mpnet-base-v2
4) sentence-transformers/paraphrase-mpnet-base-v2
'''

# Load SetFit model from Hub
model = SetFitModel.from_pretrained(MODEL_NAME)


def main():
  
  # Get dataset from Hugging Face
  train, test = get_dataset("fede-m/setfit_dataset_coreference_folds")
  # Get dataset from local folder
  # train, test = get_dataset("dataset/pipeline/data_sent_coref_fold")

  # Samples 30 sentences per label from the train set
  train_samples = get_samples(train,30)

  # Convert to Dataset object to be compatible with SetFit
  train_dataset = Dataset.from_dict({k: [dic[k] for dic in train_samples] for k in train_samples[0]})
  test_dataset = Dataset.from_dict({k: [dic[k] for dic in test] for k in test[0]})

  train_SetFit(train_dataset, test_dataset)

  # Test example
  #pred = model("[Francesca Visconti] Il ministro Visconti ha deliberato a favore della legge sulle adozioni.")
  #print(pred)
  #pred = model("[Francesco Visconti] Il ministro Visconti ha deliberato a favore della legge sulle adozioni.")
  #print(pred)


main()

# Push model to HuggingFace
# model.push_to_hub("fede-m/setfit_fold_"+str(CURRENT_FOLD))

In [None]:

# ---------------------------------------------------------- Test SetFit - BINARY CLASSIFICATION ----------------------------------------------------------------
def test_setfit_binary():
  """
  Test function for binary classification with SetFit on the test set.

  @:param None
  @:return test set to send to RoBERTa as a result of the filtering step of the pipeline

  """
  # Get dataset from Hugging Face
  _, test = get_dataset("fede-m/setfit_dataset_coreference_folds")
  # Get dataset from local folder
  # train, test = get_dataset("dataset/pipeline/data_sent_coref_fold")


  roberta_test = []
  test_pred_labels = []
  test_true_labels = []

  # Sentences classified with label 1 will be passed on the the next step of the pipeline
  for test_sent in test:
    pred = model(test_sent["text"])
    if pred == 1:
      roberta_test.append(test_sent)
    test_pred_labels.append(pred)
    test_true_labels.append(torch.tensor(test_sent["label"]))

    # Check false negatives (sentences we were not able to classify as sexist)
    if pred == 0 and int(test_sent["label"]) == 1:
      print("This sentence was wrongly classified as negative")
      print(test_sent["text"])

    # Check false negatives (sentences we were not able to classify as sexist)
    if pred == 0 and int(test_sent["label"]) == 1:
      print("This sentence was wrongly classified as negative")
      print(test_sent["text"])


  test_pred_labels = np.array(test_pred_labels)
  test_true_labels = np.array(test_true_labels)

  # Calculate metrics
  accuracy = accuracy_score(test_true_labels, test_pred_labels)
  precision = precision_score(test_true_labels, test_pred_labels)
  recall = recall_score(test_true_labels, test_pred_labels)
  f1 = f1_score(test_true_labels, test_pred_labels)
  confusion_mat = confusion_matrix(test_true_labels, test_pred_labels)

    #print(accuracy)
    #print(precision)
    #print(recall)
    #print(f1)
    #print(len(test))
    #print(len(roberta_test))
    #print(confusion_mat)

  return roberta_test

# Test using SetFit binary as filtering step.
# test set will be used as test set for the RoBERTa token classifier in the next step of the pipeline
roberta_test = test_setfit_binary()


In [None]:
# ---------------------------------------------------------- Train SetFit - MULTICLASS CLASSIFICATION ----------------------------------------------------------------

def get_samples_multiclass(data, max):
  """
  Get samples from the dataset.
  
  This function receives a dataset and a maximum number of samples to be extracted. 
  For each label except 0, "max" samples are extracted (or all available samples if "max" is greater than the number of samples with label 0).
  The number of samples with label 0 to be extracted is instead defined by the hyperparameter NUM_NON_SEXIST_SAMPLES.

  @:param dataset: The input sentences of the dataset.        
  @:param max: An integer representing the maximum number of samples to be extracted for labels other than 0.
  @:return: The list of samples extracted from the dataset.
  """
  examples_sexist = {}
  others = []
  for datum in data:
    if datum["label"] == 0:
      others.append(datum)
    else:
      if datum["label"] not in examples_sexist:
        examples_sexist[datum["label"]] = []
        examples_sexist[datum["label"]].append(datum)
      else:
        examples_sexist[datum["label"]].append(datum)


  final = random.sample(others,NUM_NON_SEXIST_SAMPLES)

  for _,v in examples_sexist.items():
    if len(v) < max:
      final.extend(v)
    else:
      final.extend(random.sample(v,max))
  return final


def change_labels(dataset, label2id):
  """
  Change the labels of the dataset to fit the multi-classification task.
  Instead of having only labels 0 and 1, sentences with label 1 change the label taking the one of specific entity in the sentence 
  (or the first entity in case there are multiple entities).
  
  @:param dataset: The input sentences of the dataset.    
  @:param label2id: The mapping from the original labels to the id.
  @:return: The new dataset with the labels changed to fit the multi-classification task.
  """
  new_data = []
  for datum in dataset:
    if datum["label"] == 0:
      new_data.append(datum)
    else:
        datum["label"] = int(label2id[datum["entities"][0]["label"]])
        new_data.append(datum)

  return new_data


def train_SetFit_multiclass(train, test):

  # Create trainer
  trainer = SetFitTrainer(
      model=model,
      train_dataset=train,
      eval_dataset=test,
      loss_class=CosineSimilarityLoss,
      batch_size=16,
      num_iterations=ITERATIONS, # Number of text pairs to generate for contrastive learning
      num_epochs=EPOCHS # Number of epochs to use for contrastive learning
  )

  trainer.train()
  metrics = trainer.evaluate()
  return trainer


MODEL_NAME = "sentence-transformers/distiluse-base-multilingual-cased-v1"

'''
Models to try:
1) sentence-transformers/distiluse-base-multilingual-cased-v1
2) sentence-transformers/distiluse-base-multilingual-cased-v2
3) sentence-transformers/paraphrase-multilingual-mpnet-base-v2
4) sentence-transformers/paraphrase-mpnet-base-v2
'''


label2id = get_original_labels("labels/id2labels.json")
labels = [int(v) for v in label2id.values()]
labels.sort()

# Load SetFit model from Hub. In this case we need to specify the labels ids to perform multiclass classification
model = SetFitModel.from_pretrained(MODEL_NAME, labels=labels)


def main():


  train, test = get_dataset("fede-m/setfit_dataset_coreference_folds")

  # Change the train and test labels to fit the multi-classification task
  new_train = change_labels(train, label2id)
  new_test = change_labels(test, label2id)
  

  # Samples 30 sentences per label from the train set
  train_samples = get_samples_multiclass(new_train,30)

  # Convert to Dataset object to be compatible with SetFit
  train_dataset = Dataset.from_dict({k: [dic[k] for dic in train_samples] for k in train_samples[0]})
  test_dataset = Dataset.from_dict({k: [dic[k] for dic in test] for k in test[0]})

  train_SetFit_multiclass(train_dataset, test_dataset)

  # Test example
  #pred = model("[Francesca Visconti] Il ministro Visconti ha deliberato a favore della legge sulle adozioni.")
  #print(pred)
  #pred = model("[Francesco Visconti] Il ministro Visconti ha deliberato a favore della legge sulle adozioni.")
  #print(pred)


main()

In [None]:
# ---------------------------------------------------------- Test SetFit - MULTICLASS CLASSIFICATION ----------------------------------------------------------------

def test_setfit_multiclass():
    """
    Test function for multi-class classification with SetFit on the test set.

    @:param None
    @:return test set to send to RoBERTa as a result of the filtering step of the pipeline

    """
    
    _, test = get_dataset("fede-m/setfit_dataset_coreference_folds")
  # Get dataset from local folder
  # train, test = get_dataset("dataset/pipeline/data_sent_coref_fold")

  # 3) Filter the train and test sets using SetFit for inference
    roberta_train = []
    roberta_test = []
    test_pred_labels = []
    test_true_labels = []
    for test_sent in test:
      pred = model(test_sent["text"])
      #final_pred = 0 # Used to check fasle negatives
      # All sentences with a prediction different from 0 (meaning, any marker of sexist was found) will be passed on the the next step of the pipeline
      if pred != 0:
        roberta_test.append(test_sent)
        test_pred_labels.append(torch.tensor(1)) # Reconvert task to binary classification
        #final_pred = 1
      else:
        test_pred_labels.append(pred)

      test_true_labels.append(torch.tensor(test_sent["label"]))

      # Check false negatives
      #if final_pred == 0 and int(test_sent["label"]) == 1:
       # print("This sentence was wrongly classified as negative")
        #print(test_sent["text"])



    test_pred_labels = np.array(test_pred_labels)
    test_true_labels = np.array(test_true_labels)

    # #Calculate accuracy
    accuracy = accuracy_score(test_true_labels, test_pred_labels)
    precision = precision_score(test_true_labels, test_pred_labels)
    recall = recall_score(test_true_labels, test_pred_labels)
    f1 = f1_score(test_true_labels, test_pred_labels)
    confusion_mat = confusion_matrix(test_true_labels, test_pred_labels)

    #print(accuracy)
    #print(precision)
    #print(recall)
    #print(f1)
    #print(len(test))
    #print(len(roberta_test))
    #print("These were classified as 0: ", label_0_count)
    #print(confusion_mat)

    return roberta_train, roberta_test

roberta_train, roberta_test = test_setfit_multiclass()
