# [Sentence-BERT](https://arxiv.org/pdf/1908.10084.pdf)

[Reference Code](https://www.pinecone.io/learn/series/nlp/train-sentence-transformers-softmax/)

In [2]:
import os
import math
import re
from   random import *
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

# Set GPU device
os.environ["CUDA_VISIBLE_DEVICES"] = "0"

os.environ['http_proxy']  = 'http://192.41.170.23:3128'
os.environ['https_proxy'] = 'http://192.41.170.23:3128'
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

## 1. Data

### Train, Test, Validation

In [3]:
import datasets
snli = datasets.load_dataset('snli')
mnli = datasets.load_dataset('glue', 'mnli')
mnli['train'].features, snli['train'].features

({'premise': Value(dtype='string', id=None),
  'hypothesis': Value(dtype='string', id=None),
  'label': ClassLabel(names=['entailment', 'neutral', 'contradiction'], id=None),
  'idx': Value(dtype='int32', id=None)},
 {'premise': Value(dtype='string', id=None),
  'hypothesis': Value(dtype='string', id=None),
  'label': ClassLabel(names=['entailment', 'neutral', 'contradiction'], id=None)})

In [3]:
pip install datasets


Collecting datasets
  Downloading datasets-3.3.2-py3-none-any.whl.metadata (19 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess<0.70.17 (from datasets)
  Downloading multiprocess-0.70.16-py311-none-any.whl.metadata (7.2 kB)
Downloading datasets-3.3.2-py3-none-any.whl (485 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m485.4/485.4 kB[0m [31m5.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading multiprocess-0.70.16-py311-none-any.whl (143 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m143.5/143.5 kB[0m [31m7.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading xx

In [4]:
# List of datasets to remove 'idx' column from
mnli.column_names.keys()

dict_keys(['train', 'validation_matched', 'validation_mismatched', 'test_matched', 'test_mismatched'])

In [5]:
# Remove 'idx' column from each dataset
for column_names in mnli.column_names.keys():
    mnli[column_names] = mnli[column_names].remove_columns('idx')

In [6]:
mnli.column_names.keys()

dict_keys(['train', 'validation_matched', 'validation_mismatched', 'test_matched', 'test_mismatched'])

In [7]:
import numpy as np
np.unique(mnli['train']['label']), np.unique(snli['train']['label'])
#snli also have -1

(array([0, 1, 2]), array([-1,  0,  1,  2]))

In [8]:
# there are -1 values in the label feature, these are where no class could be decided so we remove
snli = snli.filter(
    lambda x: 0 if x['label'] == -1 else 1
)

In [9]:
import numpy as np
np.unique(mnli['train']['label']), np.unique(snli['train']['label'])
#snli also have -1

(array([0, 1, 2]), array([0, 1, 2]))

In [10]:
# Assuming you have your two DatasetDict objects named snli and mnli
from datasets import DatasetDict
# Merge the two DatasetDict objects
raw_dataset = DatasetDict({
    'train': datasets.concatenate_datasets([snli['train'], mnli['train']]).shuffle(seed=55).select(list(range(1000))),
    'test': datasets.concatenate_datasets([snli['test'], mnli['test_mismatched']]).shuffle(seed=55).select(list(range(100))),
    'validation': datasets.concatenate_datasets([snli['validation'], mnli['validation_mismatched']]).shuffle(seed=55).select(list(range(1000)))
})
#remove .select(list(range(1000))) in order to use full dataset
# Now, merged_dataset_dict contains the combined datasets from snli and mnli
raw_dataset

DatasetDict({
    train: Dataset({
        features: ['premise', 'hypothesis', 'label'],
        num_rows: 1000
    })
    test: Dataset({
        features: ['premise', 'hypothesis', 'label'],
        num_rows: 100
    })
    validation: Dataset({
        features: ['premise', 'hypothesis', 'label'],
        num_rows: 1000
    })
})

## 2. Preprocessing

In [11]:
from transformers import BertTokenizer

tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

In [12]:
def preprocess_function(examples):
    max_seq_length = 128
    padding = 'max_length'
    # Tokenize the premise
    premise_result = tokenizer(
        examples['premise'], padding=padding, max_length=max_seq_length, truncation=True)
    #num_rows, max_seq_length
    # Tokenize the hypothesis
    hypothesis_result = tokenizer(
        examples['hypothesis'], padding=padding, max_length=max_seq_length, truncation=True)
    #num_rows, max_seq_length
    # Extract labels
    labels = examples["label"]
    #num_rows
    return {
        "premise_input_ids": premise_result["input_ids"],
        "premise_attention_mask": premise_result["attention_mask"],
        "hypothesis_input_ids": hypothesis_result["input_ids"],
        "hypothesis_attention_mask": hypothesis_result["attention_mask"],
        "labels" : labels
    }

tokenized_datasets = raw_dataset.map(
    preprocess_function,
    batched=True,
)

tokenized_datasets = tokenized_datasets.remove_columns(['premise','hypothesis','label'])
tokenized_datasets.set_format("torch")

In [13]:
tokenized_datasets

DatasetDict({
    train: Dataset({
        features: ['premise_input_ids', 'premise_attention_mask', 'hypothesis_input_ids', 'hypothesis_attention_mask', 'labels'],
        num_rows: 1000
    })
    test: Dataset({
        features: ['premise_input_ids', 'premise_attention_mask', 'hypothesis_input_ids', 'hypothesis_attention_mask', 'labels'],
        num_rows: 100
    })
    validation: Dataset({
        features: ['premise_input_ids', 'premise_attention_mask', 'hypothesis_input_ids', 'hypothesis_attention_mask', 'labels'],
        num_rows: 1000
    })
})

## 3. Data loader

In [14]:
from torch.utils.data import DataLoader

# initialize the dataloader
batch_size = 6
train_dataloader = DataLoader(
    tokenized_datasets['train'],
    batch_size=batch_size,
    shuffle=True
)
eval_dataloader = DataLoader(
    tokenized_datasets['validation'],
    batch_size=batch_size
)
test_dataloader = DataLoader(
    tokenized_datasets['test'],
    batch_size=batch_size
)

In [15]:
for batch in train_dataloader:
    print(batch['premise_input_ids'].shape)
    print(batch['premise_attention_mask'].shape)
    print(batch['hypothesis_input_ids'].shape)
    print(batch['hypothesis_attention_mask'].shape)
    print(batch['labels'].shape)
    break

torch.Size([6, 128])
torch.Size([6, 128])
torch.Size([6, 128])
torch.Size([6, 128])
torch.Size([6])


## 4. Model

In [16]:
import torch
from model import BERT  # Import your custom BERT class

# ✅ Ensure device is set before creating the model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# ✅ Define hyperparameters (ensure they match your training)
n_layers = 12
n_heads = 12
d_model = 768
d_ff = 768 * 4
d_k = 64
n_segments = 2
vocab_size = 60305  # Match tokenizer vocab size
max_len = 256

# ✅ Initialize the model with `device`
model = BERT(
    n_layers=n_layers,
    n_heads=n_heads,
    d_model=d_model,
    d_ff=d_ff,
    d_k=d_k,
    n_segments=n_segments,
    vocab_size=vocab_size,
    max_len=max_len,
    device=device  # ✅ Ensure device is passed correctly
).to(device)

# # ✅ Load pretrained weights
import os

model_path = "bert_model.pth"

# Check if the file exists
if not os.path.exists(model_path):
    print(f"❌ Model file {model_path} does not exist!")
else:
    print(f"✅ Model file {model_path} found.")

# model.load_state_dict(torch.load("./model/bert_model.pth", map_location=device))

# # ✅ Move model to correct device
model.to(device)

# # ✅ Set model to evaluation mode
# model.eval()

# print("✅ Model successfully loaded on", device)


✅ Model file bert_model.pth found.


BERT(
  (embedding): Embedding(
    (tok_embed): Embedding(60305, 768)
    (pos_embed): Embedding(256, 768)
    (seg_embed): Embedding(2, 768)
    (norm): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
  )
  (layers): ModuleList(
    (0-11): 12 x EncoderLayer(
      (enc_self_attn): MultiHeadAttention(
        (W_Q): Linear(in_features=768, out_features=768, bias=True)
        (W_K): Linear(in_features=768, out_features=768, bias=True)
        (W_V): Linear(in_features=768, out_features=768, bias=True)
      )
      (pos_ffn): PoswiseFeedForwardNet(
        (fc1): Linear(in_features=768, out_features=3072, bias=True)
        (fc2): Linear(in_features=3072, out_features=768, bias=True)
      )
    )
  )
  (fc): Linear(in_features=768, out_features=768, bias=True)
  (activ): Tanh()
  (linear): Linear(in_features=768, out_features=768, bias=True)
  (norm): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
  (classifier): Linear(in_features=768, out_features=2, bias=True)
  (dec

In [17]:
torch.cuda.empty_cache()

In [18]:
model.eval()


BERT(
  (embedding): Embedding(
    (tok_embed): Embedding(60305, 768)
    (pos_embed): Embedding(256, 768)
    (seg_embed): Embedding(2, 768)
    (norm): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
  )
  (layers): ModuleList(
    (0-11): 12 x EncoderLayer(
      (enc_self_attn): MultiHeadAttention(
        (W_Q): Linear(in_features=768, out_features=768, bias=True)
        (W_K): Linear(in_features=768, out_features=768, bias=True)
        (W_V): Linear(in_features=768, out_features=768, bias=True)
      )
      (pos_ffn): PoswiseFeedForwardNet(
        (fc1): Linear(in_features=768, out_features=3072, bias=True)
        (fc2): Linear(in_features=3072, out_features=768, bias=True)
      )
    )
  )
  (fc): Linear(in_features=768, out_features=768, bias=True)
  (activ): Tanh()
  (linear): Linear(in_features=768, out_features=768, bias=True)
  (norm): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
  (classifier): Linear(in_features=768, out_features=2, bias=True)
  (dec

### Pooling
SBERT adds a pooling operation to the output of BERT / RoBERTa to derive a fixed sized sentence embedding

In [19]:
def mean_pool(token_embeds, attention_mask):
    # ✅ Ensure attention_mask has correct shape [batch_size, seq_len, 1]
    in_mask = attention_mask.unsqueeze(-1).float()  # Shape: [batch_size, seq_len, 1]

    # ✅ Compute mean-pooling (excluding padding tokens)
    pool = torch.sum(token_embeds * in_mask, dim=1) / torch.clamp(
        in_mask.sum(dim=1), min=1e-9
    )  # Shape: [batch_size, hidden_dim]

    return pool


## 5. Loss Function

## Classification Objective Function
We concatenate the sentence embeddings $u$ and $v$ with the element-wise difference  $\lvert u - v \rvert $ and multiply the result with the trainable weight  $ W_t ∈  \mathbb{R}^{3n \times k}  $:

$ o = \text{softmax}\left(W^T \cdot \left(u, v, \lvert u - v \rvert\right)\right) $

where $n$ is the dimension of the sentence embeddings and k the number of labels. We optimize cross-entropy loss. This structure is depicted in Figure 1.

## Regression Objective Function.
The cosine similarity between the two sentence embeddings $u$ and $v$ is computed (Figure 2). We use means quared-error loss as the objective function.

(Manhatten / Euclidean distance, semantically  similar sentences can be found.)

<img src="./figures/sbert-architecture.png" >

In [20]:
def configurations(u,v):
    # build the |u-v| tensor
    uv = torch.sub(u, v)   # batch_size,hidden_dim
    uv_abs = torch.abs(uv) # batch_size,hidden_dim

    # concatenate u, v, |u-v|
    x = torch.cat([u, v, uv_abs], dim=-1) # batch_size, 3*hidden_dim
    return x

def cosine_similarity(u, v):
    dot_product = np.dot(u, v)
    norm_u = np.linalg.norm(u)
    norm_v = np.linalg.norm(v)
    similarity = dot_product / (norm_u * norm_v)
    return similarity

<img src="./figures/sbert-ablation.png" width="350" height="300">

In [21]:
classifier_head = torch.nn.Linear(768*3, 3).to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=2e-5)
optimizer_classifier = torch.optim.Adam(classifier_head.parameters(), lr=2e-5)

criterion = nn.CrossEntropyLoss()

In [22]:
from transformers import get_linear_schedule_with_warmup

# and setup a warmup for the first ~10% steps
total_steps = int(len(raw_dataset) / batch_size)
warmup_steps = int(0.1 * total_steps)
scheduler = get_linear_schedule_with_warmup(
		optimizer, num_warmup_steps=warmup_steps,
  	num_training_steps=total_steps - warmup_steps
)

# then during the training loop we update the scheduler per step
scheduler.step()

scheduler_classifier = get_linear_schedule_with_warmup(
		optimizer_classifier, num_warmup_steps=warmup_steps,
  	num_training_steps=total_steps - warmup_steps
)

# then during the training loop we update the scheduler per step
scheduler_classifier.step()



## 6. Training

In [23]:
from tqdm.auto import tqdm

num_epoch = 2
# 1 epoch should be enough, increase if wanted
for epoch in range(num_epoch):
    model.train()
    classifier_head.train()

    # initialize the dataloader loop with tqdm (tqdm == progress bar)
    for step, batch in enumerate(tqdm(train_dataloader, leave=True)):
        # zero all gradients on each new step
        optimizer.zero_grad()
        optimizer_classifier.zero_grad()

        # prepare batches and move all to the active device
        inputs_ids_a = batch['premise_input_ids'].to(device)
        inputs_ids_b = batch['hypothesis_input_ids'].to(device)
        attention_a = batch['premise_attention_mask'].to(device)  # ✅ Restore attention mask
        attention_b = batch['hypothesis_attention_mask'].to(device)  # ✅ Restore attention mask
        label = batch['labels'].to(device)

        # ✅ Create segment IDs (all zeros since only one sentence per input)
        segment_ids_a = torch.zeros_like(inputs_ids_a).to(device)
        segment_ids_b = torch.zeros_like(inputs_ids_b).to(device)

        # ✅ Fix `masked_pos` - Create a tensor instead of passing `None`
        masked_pos_a = torch.zeros((inputs_ids_a.size(0), 1), dtype=torch.long).to(device)
        masked_pos_b = torch.zeros((inputs_ids_b.size(0), 1), dtype=torch.long).to(device)

        # extract token embeddings from BERT at last_hidden_state
        u, _ = model(inputs_ids_a, segment_ids_a, masked_pos_a)
        v, _ = model(inputs_ids_b, segment_ids_b, masked_pos_b)

        u_last_hidden_state = u  # batch_size, seq_len, hidden_dim
        v_last_hidden_state = v  # batch_size, seq_len, hidden_dim

        # ✅ Fix: Restore `attention_mask` for mean pooling
        u_mean_pool = mean_pool(u_last_hidden_state, attention_a)  # batch_size, hidden_dim
        v_mean_pool = mean_pool(v_last_hidden_state, attention_b)  # batch_size, hidden_dim

        # build the |u-v| tensor
        uv = torch.sub(u_mean_pool, v_mean_pool)   # batch_size, hidden_dim
        uv_abs = torch.abs(uv)  # batch_size, hidden_dim

        # concatenate u, v, |u-v|
        x = torch.cat([u_mean_pool, v_mean_pool, uv_abs], dim=-1)  # batch_size, 3*hidden_dim

        # process concatenated tensor through classifier_head
        expected_dim = 768 * 3  # 2304
        x = x[:, :expected_dim]  # Trim extra dimensions if needed

        x = classifier_head(x)  # batch_size, classifier

        # calculate the 'softmax-loss' between predicted and true label
        loss = criterion(x, label)

        # using loss, calculate gradients and then optimize
        loss.backward()
        optimizer.step()
        optimizer_classifier.step()

        scheduler.step()  # update learning rate scheduler
        scheduler_classifier.step()

    print(f'Epoch: {epoch + 1} | loss = {loss.item():.6f}')


  0%|          | 0/167 [00:00<?, ?it/s]

Epoch: 1 | loss = 38.160870


  0%|          | 0/167 [00:00<?, ?it/s]

Epoch: 2 | loss = 17.578230


In [24]:
import torch

# Define the path to save the model
model_save_path = "sbert_nli_model_updated.pth"

# Save model and classifier head
torch.save({
    'sbert_model_state_dict': model.state_dict(),
    'classifier_head_state_dict': classifier_head.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'optimizer_classifier_state_dict': optimizer_classifier.state_dict(),
    'scheduler_state_dict': scheduler.state_dict(),
    'scheduler_classifier_state_dict': scheduler_classifier.state_dict()
}, model_save_path)

print(f"Model saved to {model_save_path}")


Model saved to sbert_nli_model_updated.pth


In [25]:
print(f"Shape of u_mean_pool: {u_mean_pool.shape}")
print(f"Shape of v_mean_pool: {v_mean_pool.shape}")


Shape of u_mean_pool: torch.Size([4, 60305])
Shape of v_mean_pool: torch.Size([4, 60305])


In [26]:
import numpy as np

def cosine_similarity_batch(u, v):
    """
    Computes cosine similarity for a batch of vectors.

    u: numpy array of shape (batch_size, hidden_dim)
    v: numpy array of shape (batch_size, hidden_dim)

    Returns:
    similarity: numpy array of shape (batch_size,)
    """
    dot_product = np.sum(u * v, axis=1)  # Element-wise multiplication and summation along feature dimension
    norm_u = np.linalg.norm(u, axis=1, keepdims=True)  # Compute norm for each row
    norm_v = np.linalg.norm(v, axis=1, keepdims=True)

    similarity = dot_product / (norm_u * norm_v + 1e-8)  # Add small epsilon to avoid division by zero
    return similarity.flatten()


In [27]:
import torch.nn.functional as F

model.eval()
classifier_head.eval()
total_similarity = 0
total_loss = 0
correct_predictions = 0
num_samples = 0

with torch.no_grad():
    for step, batch in enumerate(eval_dataloader):
        # Move inputs to the active device
        inputs_ids_a = batch['premise_input_ids'].to(device)
        inputs_ids_b = batch['hypothesis_input_ids'].to(device)
        attention_a = batch['premise_attention_mask'].to(device)
        attention_b = batch['hypothesis_attention_mask'].to(device)
        label = batch['labels'].to(device)

        # Extract token embeddings
        u, _ = model(inputs_ids_a, torch.zeros_like(inputs_ids_a).to(device), torch.zeros((inputs_ids_a.size(0), 1), dtype=torch.long).to(device))
        v, _ = model(inputs_ids_b, torch.zeros_like(inputs_ids_b).to(device), torch.zeros((inputs_ids_b.size(0), 1), dtype=torch.long).to(device))

        # Mean pool the token embeddings
        u_mean_pool = mean_pool(u, attention_a).to(device)  # Keep on GPU
        v_mean_pool = mean_pool(v, attention_b).to(device)  # Keep on GPU

        # Compute cosine similarity on GPU
        similarity_scores = F.cosine_similarity(u_mean_pool, v_mean_pool, dim=1)  # Fast GPU computation
        batch_similarity = similarity_scores.mean().item()
        total_similarity += batch_similarity

# Compute final metrics
average_similarity = total_similarity / len(eval_dataloader)
print(f"Average Cosine Similarity: {average_similarity:.4f}")


Average Cosine Similarity: 0.4244


## 7. Inference

In [28]:
import torch
import torch.nn.functional as F

def calculate_similarity(model, classifier_head, tokenizer, sentence_a, sentence_b, device):
    """
    Computes similarity between two sentences using a trained model.

    Args:
        model (torch.nn.Module): Pre-trained language model.
        classifier_head (torch.nn.Module): Classification head.
        tokenizer (Tokenizer): Tokenizer for the model.
        sentence_a (str): First sentence.
        sentence_b (str): Second sentence.
        device (str): "cuda" or "cpu".

    Returns:
        float: Cosine similarity score.
        int (optional): Predicted class label if classifier_head is provided.
    """
    # Tokenize and convert sentences to input tensors
    inputs_a = tokenizer(sentence_a, return_tensors='pt', truncation=True, padding=True).to(device)
    inputs_b = tokenizer(sentence_b, return_tensors='pt', truncation=True, padding=True).to(device)

    inputs_ids_a = inputs_a['input_ids']
    attention_a = inputs_a['attention_mask']
    inputs_ids_b = inputs_b['input_ids']
    attention_b = inputs_b['attention_mask']

    # Segment IDs and masked positions (if applicable)
    segment_ids_a = torch.zeros_like(inputs_ids_a).to(device)
    segment_ids_b = torch.zeros_like(inputs_ids_b).to(device)
    masked_pos_a = torch.zeros((inputs_ids_a.size(0), 1), dtype=torch.long).to(device)
    masked_pos_b = torch.zeros((inputs_ids_b.size(0), 1), dtype=torch.long).to(device)

    # Disable gradient computation for faster inference
    with torch.no_grad():
        # Extract token embeddings from BERT
        u, _ = model(inputs_ids_a, segment_ids_a, masked_pos_a)
        v, _ = model(inputs_ids_b, segment_ids_b, masked_pos_b)

        # Mean pool the token embeddings
        u_mean_pool = mean_pool(u, attention_a).to(device)  # batch_size, hidden_dim
        v_mean_pool = mean_pool(v, attention_b).to(device)  # batch_size, hidden_dim

        # Compute cosine similarity directly on GPU
        similarity_score = F.cosine_similarity(u_mean_pool, v_mean_pool, dim=1).item()

        # If a classifier head is used, compute classification logits
        prediction = None
        if classifier_head is not None:
            uv_abs = torch.abs(torch.sub(u_mean_pool, v_mean_pool))  # batch_size, hidden_dim

            # Concatenate u, v, |u-v|
            x = torch.cat([u_mean_pool, v_mean_pool, uv_abs], dim=-1)  # batch_size, 3*hidden_dim
            expected_dim = 768 * 3  # Ensure correct shape before classifier
            x = x[:, :expected_dim]  # Ensure shape matches classifier input

            # Pass through classifier head
            x = classifier_head(x)  # batch_size, num_classes
            prediction = torch.argmax(x, dim=-1).item()  # Get predicted class

    return similarity_score, prediction

# Example usage:
sentence_a = 'Your contribution helped make it possible for us to provide our students with a quality education.'
sentence_b = "Your contributions were of no help with our students' education."

similarity, prediction = calculate_similarity(model, classifier_head, tokenizer, sentence_a, sentence_b, device)

print(f"Cosine Similarity: {similarity:.4f}")
if prediction is not None:
    print(f"Predicted Label: {prediction}")


Cosine Similarity: 0.4543
Predicted Label: 2


In [43]:
import torch
import torch.nn.functional as F
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
import pandas as pd
# import ace_tools  # For displaying the table

import torch
import torch.nn.functional as F

def calculate_similarity(model, classifier_head, tokenizer, sentence_a, sentence_b, device):
    """
    Computes similarity between two sentences using a trained model.

    Args:
        model (torch.nn.Module): Pre-trained language model.
        classifier_head (torch.nn.Module): Classification head.
        tokenizer (Tokenizer): Tokenizer for the model.
        sentence_a (str): First sentence.
        sentence_b (str): Second sentence.
        device (str): "cuda" or "cpu".

    Returns:
        float: Cosine similarity score.
        int (optional): Predicted class label if classifier_head is provided.
    """
    # Tokenize and convert sentences to input tensors
    inputs_a = tokenizer(sentence_a, return_tensors='pt', truncation=True, padding=True).to(device)
    inputs_b = tokenizer(sentence_b, return_tensors='pt', truncation=True, padding=True).to(device)

    inputs_ids_a = inputs_a['input_ids']
    attention_a = inputs_a['attention_mask']
    inputs_ids_b = inputs_b['input_ids']
    attention_b = inputs_b['attention_mask']

    # Segment IDs and masked positions (if applicable)
    segment_ids_a = torch.zeros_like(inputs_ids_a).to(device)
    segment_ids_b = torch.zeros_like(inputs_ids_b).to(device)
    masked_pos_a = torch.zeros((inputs_ids_a.size(0), 1), dtype=torch.long).to(device)
    masked_pos_b = torch.zeros((inputs_ids_b.size(0), 1), dtype=torch.long).to(device)

    # Disable gradient computation for faster inference
    with torch.no_grad():
        # Extract token embeddings from BERT
        u, _ = model(inputs_ids_a, segment_ids_a, masked_pos_a)
        v, _ = model(inputs_ids_b, segment_ids_b, masked_pos_b)

        # Mean pool the token embeddings
        u_mean_pool = mean_pool(u, attention_a).to(device)  # batch_size, hidden_dim
        v_mean_pool = mean_pool(v, attention_b).to(device)  # batch_size, hidden_dim

        # Compute cosine similarity directly on GPU
        similarity_score = F.cosine_similarity(u_mean_pool, v_mean_pool, dim=1).item()

        # If a classifier head is used, compute classification logits
        prediction = None
        if classifier_head is not None:
            uv_abs = torch.abs(torch.sub(u_mean_pool, v_mean_pool))  # batch_size, hidden_dim

            # Concatenate u, v, |u-v|
            x = torch.cat([u_mean_pool, v_mean_pool, uv_abs], dim=-1)  # batch_size, 3*hidden_dim
            expected_dim = 768 * 3  # Ensure correct shape before classifier
            x = x[:, :expected_dim]  # Ensure shape matches classifier input

            # Pass through classifier head
            x = classifier_head(x)  # batch_size, num_classes
            prediction = torch.argmax(x, dim=-1).item()  # Get predicted class

    return similarity_score, prediction

# import ace_tools

def evaluate_on_test_data(model, classifier_head, tokenizer, test_data, device="cuda"):
    """
    Evaluates the model using existing test data.
    Args:
        model (torch.nn.Module): Trained language model.
        classifier_head (torch.nn.Module): Classification head.
        tokenizer (Tokenizer): Tokenizer for the model.
        test_data (list of tuples): List of (sentence_a, sentence_b, label).
        device (str): "cuda" or "cpu".
    """
    predictions = []
    similarities = []
    labels = []

    for sent_a, sent_b, label in test_data:
        similarity, pred = calculate_similarity(model, classifier_head, tokenizer, sent_a, sent_b, device)
        similarities.append(similarity)
        predictions.append(pred)
        labels.append(label)

    # Compute performance metrics (fix undefined precision warning)
    accuracy = accuracy_score(labels, predictions)
    precision, recall, f1, _ = precision_recall_fscore_support(labels, predictions, average='weighted', zero_division=1)

    # Create and display results table
    results_df = pd.DataFrame({
        "Model Type": ["Our Model"],
        "Dataset": ["Test Data"],
        "Accuracy": [accuracy],
        "Precision": [precision],
        "Recall": [recall],
        "F1-Score": [f1]
    })

    return results_df



In [44]:
# Extract test data from the raw dataset
test_dataset = raw_dataset["test"]

# Convert test data into a list of tuples: (premise, hypothesis, label)
test_data = list(zip(test_dataset["premise"], test_dataset["hypothesis"], test_dataset["label"]))

# Now, use this `test_data` for evaluation
evaluate_on_test_data(model, classifier_head, tokenizer, test_data, device="cuda")


Unnamed: 0,Model Type,Dataset,Accuracy,Precision,Recall,F1-Score
0,Our Model,Test Data,0.15,0.598571,0.15,0.058643
