- **Input Embeddings**: Token + Positional embeddings for each token
- **Contextual Embeddings**: Embedding after each layer

In [1]:
# Warning control
import warnings
warnings.filterwarnings('ignore')

In [2]:
import torch
import numpy as np
import pandas as pd

from transformers import AutoTokenizer


Contrastive loss is a type of loss function used in machine learning, particularly in tasks involving similarity learning, such as in Siamese networks. It works by comparing pairs of samples and adjusting their representations based on their similarity:

- **For similar pairs**: The loss function minimizes the distance between their embeddings.
- **For dissimilar pairs**: The loss function maximizes the distance between their embeddings, often with a margin to prevent overlap¹(https://pyimagesearch.com/2021/01/18/contrastive-loss-for-siamese-networks-with-keras-and-tensorflow/)²(https://arxiv.org/abs/2012.09740).

This approach helps the model learn to distinguish between similar and dissimilar samples effectively³(https://codelabsacademy.com/blog/understanding-contrastive-loss-and-reconstruction-loss-in-machine-learning)⁴(https://www.baeldung.com/cs/contrastive-learning).

Is there a specific application or example of contrastive loss you're curious about?

Source: Conversation with Copilot, 2024/9/2
(1) Contrastive Loss for Siamese Networks with Keras and TensorFlow. https://pyimagesearch.com/2021/01/18/contrastive-loss-for-siamese-networks-with-keras-and-tensorflow/.
(2) [2012.09740] Understanding the Behaviour of Contrastive Loss - arXiv.org. https://arxiv.org/abs/2012.09740.
(3) Understanding Contrastive Loss and Reconstruction Loss in Machine .... https://codelabsacademy.com/blog/understanding-contrastive-loss-and-reconstruction-loss-in-machine-learning.
(4) An Introduction to Contrastive Learning | Baeldung on Computer Science. https://www.baeldung.com/cs/contrastive-learning.

Contrastive loss is a type of loss function used in machine learning, particularly in tasks involving similarity learning, such as in Siamese networks. It works by comparing pairs of samples and adjusting their representations based on their similarity:

- **For similar pairs**: The loss function minimizes the distance between their embeddings.
- **For dissimilar pairs**: The loss function maximizes the distance between their embeddings, often with a margin to prevent overlap¹(https://pyimagesearch.com/2021/01/18/contrastive-loss-for-siamese-networks-with-keras-and-tensorflow/)²(https://arxiv.org/abs/2012.09740).

This approach helps the model learn to distinguish between similar and dissimilar samples effectively³(https://codelabsacademy.com/blog/understanding-contrastive-loss-and-reconstruction-loss-in-machine-learning)⁴(https://www.baeldung.com/cs/contrastive-learning).

Is there a specific application or example of contrastive loss you're curious about?

Source: Conversation with Copilot, 2024/9/2
(1) Contrastive Loss for Siamese Networks with Keras and TensorFlow. https://pyimagesearch.com/2021/01/18/contrastive-loss-for-siamese-networks-with-keras-and-tensorflow/.
(2) [2012.09740] Understanding the Behaviour of Contrastive Loss - arXiv.org. https://arxiv.org/abs/2012.09740.
(3) Understanding Contrastive Loss and Reconstruction Loss in Machine .... https://codelabsacademy.com/blog/understanding-contrastive-loss-and-reconstruction-loss-in-machine-learning.
(4) An Introduction to Contrastive Learning | Baeldung on Computer Science. https://www.baeldung.com/cs/contrastive-learning.

In [3]:
df = pd.DataFrame(
    [
        [4.3, 1.2, 0.05, 1.07],
        [0.18, 3.2, 0.09, 0.05],
        [0.85, 0.27, 2.2, 1.03],
        [0.23, 0.57, 0.12, 5.1]
    ]
)

data = torch.tensor(df.values, dtype=torch.float32)

In [4]:
def contrastive_loss(data):
    target = torch.arange(data.size(0))
    loss = torch.nn.CrossEntropyLoss()(data, target)
    return loss

In [5]:
torch.nn.Softmax(dim=1)(data)

tensor([[0.9100, 0.0410, 0.0130, 0.0360],
        [0.0429, 0.8801, 0.0393, 0.0377],
        [0.1512, 0.0846, 0.5832, 0.1810],
        [0.0075, 0.0105, 0.0067, 0.9753]])

In [6]:
torch.nn.Softmax(dim=1)(data).sum(dim=1)

tensor([1.0000, 1.0000, 1.0000, 1.0000])

In [7]:
N = data.size(0)
non_diag_mask = ~torch.eye(N, N, dtype=bool)

for inx in range(3):
    data = torch.tensor(df.values, dtype=torch.float32)
    data[range(N), range(N)] += inx*0.5
    data[non_diag_mask] -= inx*0.02
    print(data)
    print(f"Loss = {contrastive_loss(data)}")

tensor([[4.3000, 1.2000, 0.0500, 1.0700],
        [0.1800, 3.2000, 0.0900, 0.0500],
        [0.8500, 0.2700, 2.2000, 1.0300],
        [0.2300, 0.5700, 0.1200, 5.1000]])
Loss = 0.19657586514949799
tensor([[4.8000, 1.1800, 0.0300, 1.0500],
        [0.1600, 3.7000, 0.0700, 0.0300],
        [0.8300, 0.2500, 2.7000, 1.0100],
        [0.2100, 0.5500, 0.1000, 5.6000]])
Loss = 0.12602083384990692
tensor([[5.3000, 1.1600, 0.0100, 1.0300],
        [0.1400, 4.2000, 0.0500, 0.0100],
        [0.8100, 0.2300, 3.2000, 0.9900],
        [0.1900, 0.5300, 0.0800, 6.1000]])
Loss = 0.07888662070035934


In [9]:
class Encoder(torch.nn.Module):
    def __init__(self, vocab_size, embed_dim, output_embed_dim):
        super().__init__()
        self.embedding_layer = torch.nn.Embedding(vocab_size, embed_dim)
        self.encoder = torch.nn.TransformerEncoder(
            torch.nn.TransformerEncoderLayer(
                embed_dim,
                nhead=8,
                batch_first=True
            ),
            num_layers=3,
            norm=torch.nn.LayerNorm(embed_dim),
            enable_nested_tensor=False
        )
        self.projection = torch.nn.Linear(embed_dim, output_embed_dim)

    def forward(self, tokenizer_output):
        x = self.embedding_layer(tokenizer_output['input_ids'])
        x = self.encoder(x, src_key_padding_mask=tokenizer_output['attention_mask'])
        cls_embed = x[:, 0, :]
        return self.projection(cls_embed)
        

### Training Loop

In [10]:
def train_loop(dataset):
    embed_size = 512
    output_embed_size = 128
    max_seq_len = 64
    batch_size = 32

    # define the question/answer encoders
    tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')
    question_encoder = Encoder(tokenizer.vocab_size, embed_size, output_embed_size)
    answer_encoder = Encoder(tokenizer.vocab_size, embed_size, output_embed_size)

    dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)
    optimizer = torch.optim.Adam(
        list(question_encoder.parameters()) + list(answer_encoder.parameters()),
        lr=1e-5
    )
    loss_fn = torch.nn.CrossEntropyLoss()

    running_loss = []
    for _, data_batch in enumerate(dataloader):
        # Tokenize the question/answer pairs (each is batch of 32 questions and 32 answers)
        question, answer = data_batch
        question_tok = tokenizer(question, padding=True, truncation=True, max_length=max_seq_len, return_tensors='pt')
        answer_tok = tokenizer(answer, padding=True, truncation=True, max_length=max_seq_len, return_tensors='pt')

        # Compute the embeddings: the output is of dim = 32 x 128
        question_embed = question_encoder(question_tok)
        answer_embed = answer_encoder(answer_tok)

        # Compute similarity scores: a 32x32 matrix
        # row[N] reflects similarity between question[N] and all answers
        similarity_scores = question_embed @ answer_embed.T

        # We want to maximize the values in the diagonal and minimize the rest
        target = torch.arrange(question_embed.shape[0], dtype=torch.long)
        loss = loss_fn(similarity_scores, target)
        running_loss += [loss.item()]

        # This is where the magic happens
        optimizer.zero_grad() # reset optimizer so gradients are all-zero
        loss.backward()
        optimizer.step()
    
    return question_encoder, answer_encoder

In [11]:
def train(dataset, num_epochs=10):
    embed_size = 512
    output_embed_size = 128
    max_seq_len = 64
    batch_size = 32

    n_iters = len(dataset) // batch_size + 1

    # Define the question/answer encoders
    tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')
    question_encoder = Encoder(tokenizer.vocab_size, embed_size, output_embed_size)
    answer_encoder = Encoder(tokenizer.vocab_size, embed_size, output_embed_size)

    # Define the dataloader, optimizer and loss function
    dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)
    optimizer = torch.optim.Adam(
        list(question_encoder.parameters()) + list(answer_encoder.parameters()),
        lr=1e-5
    )
    loss_fn = torch.nn.CrossEntropyLoss()

    for epoch in range(num_epochs):
        running_loss = []
        for inx, data_batch in enumerate(dataloader):
            # Tokenize the question/answer pairs (each is batch of 32 questions and 32 answers)
            question, answer = data_batch
            question_tok = tokenizer(question, padding=True, truncation=True, max_length=max_seq_len, return_tensors='pt')
            answer_tok = tokenizer(answer, padding=True, truncation=True, max_length=max_seq_len, return_tensors='pt')
            if inx == 0 and epoch == 0:
                print(question_tok['input_ids'].shape, answer_tok['input_ids'].shape)                

            # Compute the embeddings: the output is of dim = 32 x 128
            question_embed = question_encoder(question_tok)
            answer_embed = answer_encoder(answer_tok)
            if inx == 0 and epoch == 0:
                print(question_embed.shape, answer_embed.shape)

            # Compute similarity scores: a 32x32 matrix
            # row[N] reflects similarity between question[N] and all answers
            similarity_scores = question_embed @ answer_embed.T
            if inx == 0 and epoch == 0:
                print(similarity_scores.shape)

            # We want to maximize the values in the diagonal and minimize the rest
            target = torch.arrange(question_embed.shape[0], dtype=torch.long)
            loss = loss_fn(similarity_scores, target)
            running_loss += [loss.item()]
            if inx == n_iters - 1:
                print(f"Epoch {epoch}, loss =", np.mean(running_loss))
                      
            # This is where the magic happens
            optimizer.zero_grad() # reset optimizer so gradients are all-zero
            loss.backward()
            optimizer.step()
    
    return question_encoder, answer_encoder

In [None]:
class MyDataset(torch.utils.data.Dataset):
    def __init__(self, datapath):
        self.data = pd.read_csv(datapath, sep='\t'), nrows=300)
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        return self.data.iloc[idx]['question'], self.data.iloc[idx]['answer']
    
dataset = MyDataset('../shared_data/nq_sample.tsv')
dataset.data.head(5)

In [None]:
qe, ae = train(dataset, num_epochs=10)

In [None]:
question = "What is the tallest mountain in the world?"
tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')
question_tok = tokenizer(question, padding=True, truncation=True, max_length=64, return_tensors='pt')
question_embed = qe(question_tok)[0]
print(question_tok)
print(question_embed[:5])

In [None]:
answers = [
    "What is the tallest mountain in the world?",
    "The tallest mountain in the world is Mount Everest.",
    "Who is donald duck?",
]

answer_tok = []
answer_emb = []

for answer in answers:
    tok = tokenizer(answer, padding=True, truncation=True, max_length=64, return_tensors='pt')
    answer_tok.append(tok['input_ids'])
    emb = ae(tok)[0]
    answer_emb.append(emb)

print(answer_tok)
print(answer_emb[0][:5])
print(answer_emb[1][:5])
print(answer_emb[2][:5])

In [None]:
question_emb @ torch.stack(answer_emb).T