This notebook was used to evaluate the performance of the NLP model XLNet, based on a grid search algorithm to look for the best performing combination of hyperparameters.

This notebook was run on Google Colab.

In [None]:
import evaluate
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from datasets import load_metric
import torch
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score
from transformers import TFXLNetModel, XLNetTokenizer, XLNetForSequenceClassification, AdamW
from google.colab import userdata
userdata.get('HF_TOKEN')
from keras.preprocessing.sequence import pad_sequences

In [None]:
# Identify and specify GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
n_gpu = torch.cuda.device_count()
torch.cuda.get_device_name(0)

In [None]:
# Mount Google Drive files. No need to rerun after running the first time

from google.colab import drive
drive.mount('/content/drive')

In [None]:
train = pd.read_excel(r'/content/drive/My Drive/data/train_val_test/train.xlsx')
val = pd.read_excel(r'/content/drive/My Drive/data/train_val_test/val.xlsx')
test = pd.read_excel(r'/content/drive/My Drive/data/train_val_test/test.xlsx')


# Extract patent claims and labels
X_train = train['text']
X_train = [sentence +" [SEP] [CLS]" for sentence in X_train]                      # Special tokens to be added to end of sentences for XLNet
y_train = train['label_bin']

X_val = val['text']
X_val = [sentence +" [SEP] [CLS]" for sentence in X_val]
y_val = val['label_bin']

X_test = test['text']
X_test = [sentence +" [SEP] [CLS]" for sentence in X_test]
y_test = test['label_bin']

y_train = y_train.astype('int')
y_val = y_val.astype('int')
y_test = y_test.astype('int')



In [None]:
# Initialize the tokenizer and convert text into tokens that correspond to XLNet's vocabulary
tokenizer = XLNetTokenizer.from_pretrained('xlnet-base-cased',do_lower_case = True)
train_tokenized_texts = [tokenizer.tokenize(sent) for sent in X_train]
val_tokenized_texts = [tokenizer.tokenize(sent) for sent in X_val]
test_tokenized_texts = [tokenizer.tokenize(sent) for sent in X_test]

MAX_LEN = 256

# Use the XLNet tokenizer to convert the tokens to their index numbers in the XLNet vocabulary
train_input_ids = [tokenizer.convert_tokens_to_ids(x) for x in train_tokenized_texts]
val_input_ids = [tokenizer.convert_tokens_to_ids(x) for x in val_tokenized_texts]
test_input_ids = [tokenizer.convert_tokens_to_ids(x) for x in test_tokenized_texts]

# Pad our input tokens
train_input_ids = pad_sequences(train_input_ids, maxlen=MAX_LEN, dtype="long", truncating="post", padding="post")
val_input_ids = pad_sequences(val_input_ids, maxlen=MAX_LEN, dtype="long", truncating="post", padding="post")
test_input_ids = pad_sequences(test_input_ids, maxlen=MAX_LEN, dtype="long", truncating="post", padding="post")

# Create attention masks
train_attention_masks = []
for seq in train_input_ids:
  seq_mask = [float(i>0) for i in seq]
  train_attention_masks.append(seq_mask)

val_attention_masks = []
for seq in val_input_ids:
  seq_mask = [float(i>0) for i in seq]
  val_attention_masks.append(seq_mask)

test_attention_masks = []
for seq in test_input_ids:
  seq_mask = [float(i>0) for i in seq]
  test_attention_masks.append(seq_mask)

# Convert data into torch tensors, the required datatype for the model

train_inputs = torch.tensor(train_input_ids)
val_inputs = torch.tensor(val_input_ids)
test_inputs = torch.tensor(test_input_ids)

train_labels = torch.tensor(y_train)
val_labels = torch.tensor(y_val)
test_labels = torch.tensor(y_test)

train_masks = torch.tensor(train_attention_masks)
val_masks = torch.tensor(val_attention_masks)
test_masks = torch.tensor(test_attention_masks)

In [None]:
train_data = TensorDataset(train_inputs, train_masks, train_labels)
validation_data = TensorDataset(val_inputs, val_masks, val_labels)
test_data = TensorDataset(test_inputs, test_masks, test_labels)

f1score_metric = evaluate.load("f1")
accuracy_metric = evaluate.load("accuracy")


In [None]:
def grid_search(train_data, validation_data, learning_rates, weight_decays,batch_sizes,num_epochs):

    results = []

    for bs in batch_sizes:
      for lr in learning_rates:
          for wd in weight_decays:
              # Create an iterator of our data with torch DataLoader. This helps save on memory during training because, unlike a for loop,
              # with an iterator the entire dataset does not need to be loaded into memory
              train_dataloader = DataLoader(train_data, batch_size=bs,shuffle = True)
              validation_dataloader = DataLoader(validation_data, batch_size=bs)

              # Load XLNEtForSequenceClassification, the pretrained XLNet model with a single linear classification layer on top.

              model = XLNetForSequenceClassification.from_pretrained("xlnet-base-cased", num_labels=2)
              model.cuda()

              # Initialize the optimizer with the current set of hyperparameters
              optimizer = AdamW(model.parameters(), lr=lr, weight_decay=wd,no_deprecation_warning = True)

              # Training and validation loop goes here
              model.train()
              for epoch in range(num_epochs):

                  # tracking variables
                  tr_loss = 0
                  nb_tr_examples, nb_tr_steps = 0, 0

                  for step,batch in enumerate(train_dataloader):

                      # Add batch to GPU
                      batch = tuple(t.to(device) for t in batch)
                      # Unpack the inputs from our dataloader
                      b_input_ids, b_input_mask, b_labels = batch
                      # Clear out gradients
                      optimizer.zero_grad()
                      # Forward pass
                      outputs = model(b_input_ids, token_type_ids=None, attention_mask=b_input_mask, labels=b_labels)
                      loss = outputs.loss
                      # Backward pass
                      loss.backward()
                      # Backward propagation
                      optimizer.step()

                      # Update tracking variables
                      tr_loss += loss.item()
                      nb_tr_examples += b_input_ids.size(0)
                      nb_tr_steps += 1

                  print("Train loss: {}".format(tr_loss/nb_tr_steps))


              # Evaluate the model
              model.eval()
              total_val_f1score = 0
              total_val_accuracy = 0
              for batch in validation_dataloader:
                  # Add batch to GPU
                  batch = tuple(t.to(device) for t in batch)
                  # Unpack the inputs from our dataloader
                  b_input_ids, b_input_mask, b_labels = batch
                  # Telling the model not to compute or store gradients, saving memory and speeding up validation
                  with torch.no_grad():
                    # Forward pass, calculate logit predictions
                      output = model(b_input_ids, token_type_ids=None, attention_mask=b_input_mask)
                      logits = output.logits

                  # Move logits and labels to CPU
                  logits = logits.detach().cpu().numpy()
                  label_ids = b_labels.to('cpu').numpy()

                  pred_flat = np.argmax(logits, axis=1).flatten()
                  labels_flat = label_ids.flatten()

                  # Calculate validation accuracy
                  val_accuracy = accuracy_score(labels_flat,pred_flat)
                  total_val_accuracy +=val_accuracy
                  # Calculate validation f1 score
                  # All True Negative results to return f1 score of 1
                  if np.sum(pred_flat) == 0 and np.sum(labels_flat) ==0:
                    val_f1score = 1
                  else:
                    val_f1score = f1_score(labels_flat,pred_flat,average='binary',zero_division=0)
                  total_val_f1score += val_f1score

              # Save best performing model
              # model.save_pretrained('/content/drive/MyDrive/Colab Notebooks/Trained Models/' + str(lr) +'_' + str(wd) + '_' + str(bs))
              avg_val_accuracy = total_val_accuracy/len(validation_dataloader)
              avg_val_f1score = total_val_f1score/len(validation_dataloader)

              results.append({'lr': lr, 'weight_decay': wd,'batch_size':bs, 'validation_f1_score': avg_val_f1score,'validation_accuracy':avg_val_accuracy})
              print(f"Learning Rate: {lr}, Weight Decay: {wd}, Batch Size: {bs}, Validation F1 Score: {avg_val_f1score}, Validation Accuracy: {avg_val_accuracy}")

    return results



In [None]:
# Specify hyperparameters to be tested
#learning_rates = [1e-5, 2e-5, 5e-5]
#weight_decays = [0.01, 0.0]
#batch_sizes = [8,16,32]

learning_rates = [5e-5]
weight_decays = [0.0]
batch_sizes = [32]

num_epochs = 1

# Run grid search
hyperparams_results = grid_search(train_data, validation_data, learning_rates, weight_decays,batch_sizes, num_epochs)
best_setting = max(hyperparams_results, key=lambda x: x['validation_f1_score'])
print(f"Best Hyperparameters: {best_setting}")


In [None]:
def evaluate_model(test_data,model_path,bs):

    # Create an iterator of our data with torch DataLoader. This helps save on memory during training because, unlike a for loop,
    # with an iterator the entire dataset does not need to be loaded into memory

    test_dataloader = DataLoader(test_data,batch_size = bs)

    model = XLNetForSequenceClassification.from_pretrained(model_path,num_labels = 2)
    model.cuda()

    # Evaluate the model
    model.eval()
    total_test_f1score = 0
    total_test_accuracy = 0

    for batch in test_dataloader:
        # Add batch to GPU
        batch = tuple(t.to(device) for t in batch)
        # Unpack the inputs from our dataloader
        b_input_ids, b_input_mask, b_labels = batch
        # Telling the model not to compute or store gradients, saving memory and speeding up validation
        with torch.no_grad():
          # Forward pass, calculate logit predictions
            output = model(b_input_ids, token_type_ids=None, attention_mask=b_input_mask)
            logits = output.logits

        # Move logits and labels to CPU
        logits = logits.detach().cpu().numpy()
        label_ids = b_labels.to('cpu').numpy()

        pred_flat = np.argmax(logits, axis=1).flatten()
        labels_flat = label_ids.flatten()

        # Calculate test accuracy
        test_accuracy = accuracy_score(labels_flat,pred_flat)
        total_test_accuracy += test_accuracy

        # Calculate validation f1 score
        # All True Negative results to return f1 score of 1
        if np.sum(pred_flat) == 0 and np.sum(labels_flat) ==0:
          test_f1score = 1
        else:
          test_f1score = f1_score(labels_flat,pred_flat,average='binary',zero_division=0)
        total_test_f1score += test_f1score


    avg_test_accuracy = total_test_accuracy/len(test_dataloader)
    avg_test_f1score = total_test_f1score/len(test_dataloader)

    print(f"F1 Score: {avg_test_f1score}, Accuracy: {avg_test_accuracy}")


In [None]:
evaluate_model(test_data,'/content/drive/MyDrive/Colab Notebooks/Trained Models/2e-05_0.01_32',32)