In [2]:
import random
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from torch.optim import AdamW

In [3]:
class TripletDataset(Dataset):
    def __init__(self, data, embed_model, negative_sampler):
        self.data = data
        self.embed_model = embed_model
        self.negative_sampler = negative_sampler

    def __getitem__(self, index):
        row = self.data.iloc[index]
        anchor, description = row["question"], row["description"]
        anchor = self.embed_model.encode(anchor)
        positive = self.embed_model.encode(description)
        negative = self.embed_model.encode(self.negative_sampler.sample(description))

        return anchor, positive, negative

    def __len__(self):
        return len(self.data)


In [4]:
class NegativeSampler:
  def __init__(self, data):
    self.description = data["description"]

  def sample(self, positive):
    choice = random.choice(self.description)

    while choice == positive:
      choice = random.choice(self.description)

    return choice

In [5]:
class LinearAdapter(nn.Module):
        def __init__(self, input_dim):
            super().__init__()
            self.linear = nn.Linear(input_dim, input_dim)

        def forward(self, x):
            x = self.linear(x)

            return x

In [6]:
from sentence_transformers import SentenceTransformer

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print (device)
model_name = "all-MiniLM-L6-v2"
model = SentenceTransformer(model_name)

  from .autonotebook import tqdm as notebook_tqdm


cpu


In [7]:
import pandas as pd

dataset = pd.read_csv("training_dataset.csv")
sampler = NegativeSampler(dataset)
triplet_dataset = TripletDataset(dataset, model, sampler)

In [8]:
from tqdm import tqdm

margin = 0.7
learning_rate = 0.003
num_epochs = 5
batch_size = 32

adapter = LinearAdapter(model.get_sentence_embedding_dimension()).to(device)

triplet_loss = nn.TripletMarginLoss(margin=margin, p=2)
optimizer = AdamW(adapter.parameters(), lr=learning_rate)
dataloader = DataLoader(triplet_dataset, batch_size=batch_size, shuffle=True)

# Training loop
for epoch in range(num_epochs):
   total_loss = 0

   with tqdm(dataloader, desc=f"Epoch {epoch + 1}/{num_epochs}", unit="batch") as tepoch:
    for batch in tepoch:
        query_emb, positive_emb, negative_emb = [x.to(device) for x in batch]

        # Forward pass
        adapted_query_emb = adapter(query_emb)

        # Compute loss
        loss = triplet_loss(adapted_query_emb, positive_emb, negative_emb)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()

        optimizer.step()

        total_loss += loss.item()

   print(f"Epoch {epoch+1}/{num_epochs}, Loss: {total_loss/len(dataloader):.4f}")

Epoch 1/5: 100%|██████████| 47/47 [00:37<00:00,  1.25batch/s]


Epoch 1/5, Loss: 0.3770


Epoch 2/5: 100%|██████████| 47/47 [00:37<00:00,  1.26batch/s]


Epoch 2/5, Loss: 0.1609


Epoch 3/5: 100%|██████████| 47/47 [00:39<00:00,  1.19batch/s]


Epoch 3/5, Loss: 0.1210


Epoch 4/5: 100%|██████████| 47/47 [00:35<00:00,  1.31batch/s]


Epoch 4/5, Loss: 0.1013


Epoch 5/5: 100%|██████████| 47/47 [00:36<00:00,  1.28batch/s]

Epoch 5/5, Loss: 0.0853





In [9]:
test_dataset = pd.read_csv("test_dataset.csv")

test_sampler = NegativeSampler(test_dataset)

test_negatives = [test_sampler.sample(description) for description in test_dataset["description"]]

question_embeddings = torch.Tensor(model.encode(test_dataset["question"])).to(device)
description_embeddings = torch.Tensor(model.encode(test_dataset["description"])).to(device)
negative_embeddings = torch.Tensor(model.encode(test_negatives)).to(device)

adapted_embeddings = adapter(question_embeddings)

In [10]:
def calculate_cosine_similarity(embeddingsA, embeddingsB):
    similarity = torch.nn.functional.cosine_similarity(embeddingsA, embeddingsB)
    return similarity

In [11]:
base_positive_score = calculate_cosine_similarity(question_embeddings, description_embeddings).mean()
base_neg_score = calculate_cosine_similarity(question_embeddings, negative_embeddings).mean()

base_positive_score, base_neg_score

(tensor(0.4520), tensor(0.0939))

In [12]:
adapted_positive_score = calculate_cosine_similarity(adapted_embeddings, description_embeddings).mean()
adapted_neg_score = calculate_cosine_similarity(adapted_embeddings, negative_embeddings).mean()

adapted_positive_score, adapted_neg_score

(tensor(0.6043, grad_fn=<MeanBackward0>),
 tensor(0.1007, grad_fn=<MeanBackward0>))

In [None]:
import torch

def evaluate_embeddings(question_embeddings, description_embeddings, dataset, k):
    num_queries = question_embeddings.shape[0]
    recall_sum = 0
    ranks = 0

    for i in range(num_queries):
        # Calculate cosine similarity between the question and all descriptions
        scores = torch.nn.functional.cosine_similarity(question_embeddings[i], description_embeddings)

        # Get the indices that would sort the scores in descending order
        _, indices = torch.sort(scores, descending=True)

        ranks += torch.nonzero(indices == i).item()

        for j in range(k):
            sorted_index = indices[j].item()
            if dataset["description"][i] == dataset["description"][sorted_index]:
                recall_sum += 1
                break

    recall_at_k = recall_sum / num_queries
    average_rank = ranks / num_queries

    return recall_at_k, average_rank

# Example usage
k_values = [1, 5, 10, 25]

# Calculate base model recall@k
base_recall_at_ks = [evaluate_embeddings(question_embeddings, description_embeddings, test_dataset, k) for k in k_values]
for k, (recall, average_rank) in zip(k_values, base_recall_at_ks):
    print(f"Base Model Recall@{k}: {recall}")
print(f"Base Model Average Rank: {average_rank}")


Base Model Recall@1: 0.3489583333333333
Base Model Recall@5: 0.7291666666666666
Base Model Recall@10: 0.859375
Base Model Recall@25: 0.90625
Base Model Average Rank: 6.03125

Adapted Model Recall@1: 0.390625
Adapted Model Recall@5: 0.7291666666666666
Adapted Model Recall@10: 0.875
Adapted Model Recall@25: 0.96875
Adapted Model Average Rank: 4.291666666666667


In [None]:

print ("")
# Calculate adapted model recall@k
adapted_recall_at_ks = [evaluate_embeddings(adapted_embeddings, description_embeddings, test_dataset, k) for k in k_values]
for k, (recall, average_rank) in zip(k_values, adapted_recall_at_ks):
    print(f"Adapted Model Recall@{k}: {recall}")
print(f"Adapted Model Average Rank: {average_rank}")

In [18]:
torch.jit.script(adapter).save("./adapter.pt")