# Let's build a cross encoder - Step by step guide

We'll use it for the STS task. We'll use the pretrained BERT model for transfer learning on this new semantic sim task.

In [1]:
import logging, os, sys, json, torch
import torch.nn as nn
import pandas as pd
from torch.utils.data.dataset import Dataset
from torch.utils.data import DataLoader
from torch.nn import CrossEntropyLoss
import pytorch_lightning as pl
from transformers import AutoTokenizer, AutoModel, AutoConfig, Trainer, TrainingArguments
from pytorch_lightning.callbacks import EarlyStopping
import numpy as np

# we'll define or model name here
transformer_model_name = "dumitrescustefan/bert-base-romanian-cased-v1"

## Data loading

In [2]:
# before writting any code we're going to need our tokenizer:
tokenizer = AutoTokenizer.from_pretrained(transformer_model_name, strip_accents=False)

In [3]:
class MyDataset(Dataset):
    def __init__(self, tokenizer, file, test=False):
        self.tokenizer = tokenizer  # we'll need this in the __getitem__ function
        self.instances = []
        df = pd.read_csv(file)
        for index, row in df.iterrows():
            if test:
                self.instances.append({
                    "title": row['title'],
                    "content": row['content']
                })
            else:
                self.instances.append({
                    "title": row['title'],
                    "content": row['content'],
                    "class": row['class']
                })
        

    def __len__(self):
        return len(self.instances)  # return how many instances we have. It's a list after all

    def __getitem__(self, index):
        return self.instances[index]

Let's test it's working. Load a dataset and print the first example.

In [4]:
# create the MyDataset object with the test_data
test_dataset = MyDataset(tokenizer, "train.csv")
instance = test_dataset[0]  # this calls our __getitem__(0) method

# now let's print what it contains:
for key in instance:
  print(f"{key}: {instance[key]}")

title: PSD în alertă
content: Prăbușirea PSD de la altitudinea sigură a celor 70% în incertitudinea legată de păstrarea guvernării a stîrnit groază în partid. Vulnerabil, instabil și pus în fața propriei prostii, PSD duce o luptă vizibilă cu depresia. Baronii fornăie ca o turmă cuprinsă de neliniște, pe care un singur zgomot brusc o poate pune pe fugă. Socotelile sînt clare: dacă pierde președinția, USD pierde tot, iar coșmarul revenirii lui Băsescu la putere pe căi democratice devine tot mai palpabil. Consultanții Sultănoiu, Teodorescu, Palada și Dîncu caută febrili o candidatură pentru Cotroceni, fiindcă Ponta n-ar vrea să lase guvernul. Pe de altă parte, sondajele estimează o victorie a Pisicului azi, dar nu și în noiembrie. Oprescu și Isărescu sînt plimbați prin fața eșantioanelor naționale, dar nici unul nu prezintă garanții destule partidului. Partidul dorește să-și vadă liderul pe tron, iar liderul înțelege că, dacă nu-și asumă responsabilitatea candidaturii, autoritatea i se fî

Now, we need to collate the instances in a batch.

In [5]:
class MyCollator(object):
    def __init__(self, tokenizer, max_seq_len):
        self.max_seq_len = max_seq_len  # this will be our model's maximum sequence length
        self.tokenizer = tokenizer   # we still need our tokenizer to know that the pad token's id is

    def __call__(self, input_batch):
        titles = []
        contents = []
        labels = []  # Initialize labels as an empty list

        # Check if 'class' is in instance before appending
        for instance in input_batch:
            titles.append(instance['title'])
            contents.append(instance['content'])
            if 'class' in instance:  # Only add label if 'class' exists
                labels.append(instance['class'])

        tokenized_batch = self.tokenizer(
            list(map(lambda x: f"[CLS]{x[0]}[SEP]{x[1]}[SEP]", zip(titles, contents))),
            padding=True,
            max_length=self.max_seq_len,
            truncation=True,
            return_tensors="pt"
        )

        # Convert labels to a tensor only if labels are not empty
        if labels:
            labels = torch.tensor(labels, dtype=torch.long)  # Changed dtype to torch.long for classification
        else:
            labels = None

        return {
            "tokenized_batch": tokenized_batch,
            "labels": labels
        }

In [6]:
# let's test our collator
test_dataset = MyDataset(tokenizer, "train.csv")
my_collator = MyCollator(tokenizer=tokenizer, max_seq_len=512)

# crete a dataloader and get first batch of 3
test_dataloader = DataLoader(test_dataset, batch_size=3, collate_fn=my_collator)

iterable_data = iter(test_dataloader)
first_batch = next(iterable_data) # this is the output_batch from above
for key in first_batch:
  print(f"{key} is a {first_batch[key]}")

tokenized_batch is a {'input_ids': tensor([[   2,    2, 2761,  ...,    0,    0,    0],
        [   2,    2,  538,  ...,   18,    3,    3],
        [   2,    2, 4140,  ...,    0,    0,    0]]), 'token_type_ids': tensor([[0, 0, 0,  ..., 0, 0, 0],
        [0, 0, 0,  ..., 0, 0, 0],
        [0, 0, 0,  ..., 0, 0, 0]]), 'attention_mask': tensor([[1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 1, 1, 1],
        [1, 1, 1,  ..., 0, 0, 0]])}
labels is a tensor([1, 1, 1])


## Model preparation

We're finally here :)

As we're using Pytorch Lightning to do the behind-the-scenes training, we do need to define a few functions: 

* ``__init__``, ``forward``
* ``training_step``
* ``validation_step``
* ``configure_optimizers``

As this is a single block of code, comments will be inline:


In [7]:
class TransformerModel(pl.LightningModule):
    def __init__(self, model_name, num_classes, lr=2e-05, model_max_length=512):
        super().__init__()
        # Load model, tokenizer, and configure the new output layer for classification
        self.model = AutoModel.from_pretrained(model_name)
        self.output_layer = nn.Linear(768, num_classes)  # Assuming 768 is the hidden size
        self.loss_fct = CrossEntropyLoss()  # Changed to CrossEntropyLoss for classification
        self.lr = lr
        self.model_max_length = model_max_length
      
    def forward(self, tokenized_batch):
        # we're just wrapping the code on the AutoModelForTokenClassification
        # it needs the input_ids, attention_mask and labels

        output = self.model(
            input_ids=tokenized_batch['input_ids'],
            attention_mask=tokenized_batch['attention_mask'],
            return_dict=True
        )
        pooler_output = output['pooler_output']  # [batch_size, 768]
        prediction = self.output_layer(pooler_output)  # [batch_size, 1]

        return prediction.flatten()
        

    def training_step(self, batch, batch_idx):
        tokenized_batch = batch['tokenized_batch']
        labels = batch['labels']

        prediction = self.forward(tokenized_batch)  # [batch_size, 1]
        
        loss = self.loss_fct(prediction, labels)

        self.log("train_loss", loss.detach().cpu().item(), on_step=True, on_epoch=True, prog_bar=True,)
        return {"loss": loss}


    def validation_step(self, batch, batch_idx):
        tokenized_batch = batch['tokenized_batch']
        labels = batch['labels']

        prediction = self.forward(tokenized_batch)  # [batch_size, seq_len, 768]
       
        loss = self.loss_fct(prediction, labels)

        self.log("val_loss", loss.detach().cpu().item(), on_step=True, on_epoch=True, prog_bar=True,)
        return {"loss": loss}

    def configure_optimizers(self):
        return torch.optim.AdamW([p for p in self.parameters() if p.requires_grad], lr=self.lr, eps=1e-08)

## Training phase

At this point we're ready to start training. When the code is ready, switch your colab to GPU, and run every cell up to this point, to have the training run on the GPU. Notice that Pytorch Lightning abstracts all the hassle of training on different devices. 

So, what do we need?

We need the model itself (the ``TransformerModel`` object), and the trainer object which receives a few parameters detailed below. The trainer will move the data on GPU automatically, call ``train_step`` and ``train_epoch_end``, then do the same for validation, and then do backprop (internally calls Pytorch's ``.backward()``, ``optimizer_step`` and ``zero_grad`` to update the model weights. It also handles all the gritty stuff like early stopping, logging, model saving, distributed training (if you have more than 1 GPU), etc.


In [None]:
model = TransformerModel(
    model_name=transformer_model_name,
    lr=2e-5,
    model_max_length=512,
    num_classes=2  # we have 2 classes
)

trainer = pl.Trainer(
    # devices=-1,  # uncomment this when training on gpus
    accelerator="gpu",  # uncomment this when training on gpus
    max_epochs=2,  # set this to -1 when training fully 
    #limit_train_batches=10,  # comment this out when training fully
    #limit_val_batches=5,  # comment this out when training fully
    gradient_clip_val=1.0,
    enable_checkpointing=False  # this disables saving the model each epoch
)

# instantiate dataloaders
# a batch_size of 8 should work fine on 16GB GPUs
train_dataloader = DataLoader(MyDataset(tokenizer, "train.csv"), batch_size=64, collate_fn=my_collator, shuffle=True, pin_memory=True, num_workers=20)
validation_dataloader = DataLoader(MyDataset(tokenizer, "test.csv", test=True), batch_size=64, collate_fn=my_collator, shuffle=False, pin_memory=True, num_workers=20)

# call this to start training
trainer.fit(model, train_dataloader, validation_dataloader)

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
C:\Users\Alex\Lib\site-packages\pytorch_lightning\trainer\connectors\logger_connector\logger_connector.py:75: Starting from v1.9.0, `tensorboardX` has been removed as a dependency of the `pytorch_lightning` package, due to potential conflicts with other packages in the ML ecosystem. For this reason, `logger=True` will use `CSVLogger` as the default logger, unless the `tensorboard` or `tensorboardX` packages are found. Please `pip install lightning[extra]` or one of them to enable TensorBoard support by default
You are using a CUDA device ('NVIDIA GeForce RTX 3090') that has Tensor Cores. To properly utilize them, you should set `torch.set_float32_matmul_precision('medium' | 'high')` which will trade-off precision for performance. For more details, read https://pytorch.org/docs/stable/generated/torch.set_float32_matmul_precision.html#tor

Sanity Checking: |          | 0/? [00:00<?, ?it/s]

C:\Users\Alex\Lib\site-packages\pytorch_lightning\trainer\connectors\data_connector.py:436: Consider setting `persistent_workers=True` in 'val_dataloader' to speed up the dataloader worker initialization.


## Let's use our model

### Solution (hidden)

In [None]:
def predict (model, sent1, sent2):
    concatenated_sentences = f"[CLS]{sent1.strip()}[SEP]{sent2.strip()}[SEP]"

    tokenized_batch = model.tokenizer([concatenated_sentences], padding=True, max_length = 512, truncation=True, return_tensors="pt")
    
    predictions = model.forward(tokenized_batch)  # returns a [batch_size, ]
    
    return predictions[0].item()*5.  # select the first item and multiply by 5

## Evaluation

In [None]:
# let's test our code
model.eval()

