# Customizing codebase embeddings with a projection matrix

## Goal

The outcome of this exercise is to learn a projection matrix that tailors embeddings for a codebase retrieval use case, and then measure the improvement in retrieval quality.

The notebook is mostly filled out, but has a series of small gaps that you will need to fill in (everywhere you see a "TODO" comment):
- Define the similarity functions (both basic and with projection matrix)
- Define a suitable loss function
- Construct examples for training from the pre-existing dataset
- Complete the training loop code
- Finish the retrieval function logic
- Evaluate the improvement in retrieval quality

## Background

A basic retrieval augmented generation (RAG) system will typically use embeddings to represent a set of documents that are to be searched over. Then the user input can also be converted to an embedding, and the system will use the dot product of the two embeddings to determine the relevance of the input to the documents in the database.

Many embedding models are "symmetric", which means that they treat user input text and documents (e.g. code snippets) in the same way. It might be preferable to calculate the embedding differently ("asymmetrically") for the user input because is is a fundamentally different type of text.

One way of doing this is to use the same embedding model, and then apply a matrix multiplication to the embedding of the user input. What we'll try to do here is find such a matrix that can improve retrieval quality.

## Environment

We recommend using a virtual environment to install the necessary packages.

```bash
python3.11 -m venv env
source env/bin/activate
```

### Install packages

```bash
pip install -r requirements.txt
```

In [5]:
!pip uninstall numpy openai sentence-transformers torch python-dotenv -y
!pip install numpy==1.26.4 openai==1.20.0 sentence-transformers==3.3.1 torch==2.5.1 python-dotenv==1.0.1
# #originally numpy 2.1.3 in requirements.txt, encountering dependency issues
# !pip install numpy openai sentence-transformers torch python-dotenv
# !pip install numpy==1.24.3
# !pip install --upgrade numpy
# !pip install --upgrade OpenAI
# !pip install --upgrade sentence-transformers
# !pip install --upgrade torch
# !pip install --upgrade python-dotenv
# !pip install --upgrade scikit-learn
# !pip install --upgrade scipy



Found existing installation: numpy 1.26.4
Uninstalling numpy-1.26.4:
  Successfully uninstalled numpy-1.26.4
Found existing installation: openai 1.20.0
Uninstalling openai-1.20.0:
  Successfully uninstalled openai-1.20.0
Found existing installation: sentence-transformers 3.3.1
Uninstalling sentence-transformers-3.3.1:
  Successfully uninstalled sentence-transformers-3.3.1
Found existing installation: torch 2.5.1+cu121
Uninstalling torch-2.5.1+cu121:
  Successfully uninstalled torch-2.5.1+cu121
Found existing installation: python-dotenv 1.0.1
Uninstalling python-dotenv-1.0.1:
  Successfully uninstalled python-dotenv-1.0.1
Collecting numpy==1.26.4
  Using cached numpy-1.26.4-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (61 kB)
Collecting openai==1.20.0
  Using cached openai-1.20.0-py3-none-any.whl.metadata (21 kB)
Collecting sentence-transformers==3.3.1
  Using cached sentence_transformers-3.3.1-py3-none-any.whl.metadata (10 kB)
Collecting torch==2.5.1
  Downloadin

## Setup

Here we generate a sample embedding with `sentence_transformers`

In [1]:
from openai import OpenAI
import os
from dotenv import load_dotenv
import numpy as np
import torch
from sentence_transformers import SentenceTransformer
# try:
#     from sentence_transformers import SentenceTransformer
#     print("Import successful!")
# except ImportError as e:
#     print("Error:", e)


# Load a pre-trained model (this will be slow the first time)
model = SentenceTransformer("all-MiniLM-L12-v2")

def embed(text):
    embedding = model.encode([text])[0]
    return torch.tensor(embedding, dtype=torch.float32)

embedding = embed("Hello world")
dim = len(embedding)

print(f"Embedding dimension: {dim}")
print(f"Embedding: [{embedding[0]}, {embedding[1]}, ..., {embedding[-1]}]")


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md:   0%|          | 0.00/10.7k [00:00<?, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/615 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/133M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/352 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

1_Pooling/config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

Embedding dimension: 384
Embedding: [-0.07597317546606064, -0.005261995829641819, ..., 0.034954674541950226]


## Similarity

First, we'll define our definition of similarity. This can be calculated using a dot product between two embeddings. For example, if we were trying to find the similarity between a user input $x_i$ and a code snippet $x_c$, then the similarity would be

$$h(x_i, x_c) = e(x_i) \cdot e(x_c)$$

Fill out the function below:

In [5]:
# Define the similarity function using torch
def similarity(x_i, x_c):
    # TODO
    embedding1 = embed(x_i)
    embedding2 = embed(x_c)
    return np.dot(embedding1, embedding2)
    # raise NotImplementedError

# Calculate the similarity between two strings
x_i = "Where in the codebase do we do auth?"
x_c_1 = "```python\n# Authentication\ndef authenticate(username, password):\n    # Code to authenticate the user\n```"
x_c_2 = "function sum(a, b) {\n    return a + b;\n}"

similarity1 = similarity(x_i, x_c_1)
similarity2 = similarity(x_i, x_c_2)
print(f"Similarity 1: {similarity1}")
print(f"Similarity 2: {similarity2}")

Similarity 1: 0.45098456740379333
Similarity 2: 0.03275972977280617


## Similarity with projection matrix

Next, we'll calculate similarity using the projection matrix

$$h_\theta(x_i, x_c) = e(x_c) \theta e(x_i)$$

Fill in the function below:

In [6]:
def similarity_with_projection(x_i, x_c, P):
    # TODO
    # raise NotImplementedError
    embedding_i = embed(x_i)
    embedding_c = embed(x_c)
    projected_i = torch.matmul(P, embedding_i)

    similarity = torch.dot(embedding_c, projected_i).item()#scalar
    return similarity

# Generate a dim by dim random matrix
P_random = torch.randn(dim, dim, dtype=torch.float32)
print(P_random)

# Calculate the similarity with the random projection matrix
similarity_with_projection1 = similarity_with_projection(x_i, x_c_1, P_random)
similarity_with_projection2 = similarity_with_projection(x_i, x_c_2, P_random)
print(f"Similarity with projection 1: {similarity_with_projection1}")
print(f"Similarity with projection 2: {similarity_with_projection2}")

tensor([[ 0.5744,  1.5972, -0.0319,  ..., -1.4614,  0.2722,  0.7955],
        [-0.2195, -1.9729,  2.0041,  ...,  0.5185,  1.1267,  0.1744],
        [-1.5279, -1.4316, -0.3869,  ..., -0.8362,  0.0891, -1.0363],
        ...,
        [ 2.7770, -0.9434, -2.0261,  ..., -1.5686, -1.2728, -1.1773],
        [ 0.4899,  0.1790,  1.1235,  ..., -0.4884, -1.8705,  0.6968],
        [ 2.4122, -0.1082,  2.6366,  ..., -0.2530,  1.2358,  1.1347]])
Similarity with projection 1: 1.2573026418685913
Similarity with projection 2: 0.9207971096038818


## Load dataset

To train and test a matrix that is more helpful than the random one above, we will use a pre-existing dataset, which includes a list of (question, relevant code snippets) pairs, which happen to have been generated by a language model.

In [8]:
# Load the dataset from XML file (dataset.xml)

import xml.etree.ElementTree as ET
from dataclasses import dataclass
from typing import List

@dataclass
class Example:
    user_input: str
    snippets: List[str]

class DatasetParser:
    def __init__(self, xml_file: str):
        self.tree = ET.parse(xml_file)
        self.root = self.tree.getroot()

    def parse(self) -> List[Example]:
        examples = []

        for example in self.root.findall('example'):
            user_input = example.find('user_input').text
            snippets_list = []

            for snippet in example.find('snippets').findall('snippet'):
                # Extract code and filename from the snippet text
                snippet_text = snippet.text.strip()

                # Parse the filename from the code block header
                first_line = snippet_text.split('\n')[0]
                filename = first_line.split(' ')[1] if len(first_line.split(' ')) > 1 else None

                # Remove the code block markers and get just the code
                code_lines = snippet_text.split('\n')[1:-1]
                code = '\n'.join(code_lines)

                snippets_list.append(code)

            examples.append(Example(
                user_input=user_input,
                snippets=snippets_list
            ))

        return examples


parser = DatasetParser('dataset.xml')
dataset = parser.parse()


## Construct examples

Convert the dataset into a set of examples that can be used to train the projection matrix. These should include both examples of input/snippet pairs where the snippet is relevant, and pairs where the snippet is not relevant.

In [13]:
# Next, you should generate a list of positive and negative pairs from the dataset
# These will be used to train the matrix

# TODO: Create example pairs from the dataset
example_pairs = []  # list of tuples (user input, code snippet, 1 if snippet is relevant to user input else 0)


import xml.etree.ElementTree as ET


tree = ET.parse('dataset.xml')
root = tree.getroot()

example_pairs = []

for example in root.findall('example'):
    user_input = example.find('user_input').text.strip()
    snippets = example.find('snippets')
    relevant_snippet = snippets.find('snippet').text.strip()

    example_pairs.append((user_input, relevant_snippet, 1))


    for other_example in root.findall('example'):
        if other_example != example:
            negative_snippet = other_example.find('snippets/snippet').text.strip()
            example_pairs.append((user_input, negative_snippet, 0))

# output
print(f"Total pairs generated: {len(example_pairs)}")
print("Example pair:", example_pairs[0])

Total pairs generated: 400
Example pair: ('How do we handle password reset flows?', '```python auth/password_reset.py\ndef initiate_password_reset(email):\n    token = generate_reset_token()\n    send_reset_email(email, token)\n    store_reset_token(email, token, expiry=24*hours)\n    return True\n\ndef validate_reset_token(token, new_password):\n    if is_token_valid(token):\n        user = get_user_by_token(token)\n        update_password(user, new_password)\n        invalidate_token(token)\n        return True\n    return False\n```', 1)


In [14]:
# Here we split the example pairs into training and validation sets
np.random.shuffle(example_pairs)
split_index = int(0.8 * len(example_pairs))
train_pairs = example_pairs[:split_index]
val_pairs = example_pairs[split_index:]

print(f"Number of training pairs: {len(train_pairs)}")
print(f"Number of validation pairs: {len(val_pairs)}")

Number of training pairs: 320
Number of validation pairs: 80


## Define a loss function

With a model to calculate similarity, and a dataset of positive and negative examples, we're almost ready to train. The last thing we need is a loss function. Design a loss function that is suitable for this use case.

In [15]:
def loss_func(predictions, targets):
    # TODO
    # raise NotImplementedError
      # contrastive loss
    positive_loss = targets * (1 - predictions) ** 2

    negative_loss = (1 - targets) * torch.clamp(predictions - margin, min=0) ** 2

    loss = torch.mean(positive_loss + negative_loss)
    return loss

## Train the projection matrix

The entire training loop has been set up, except for a couple of lines to calculate the prediction given an example pair and to get $y$, which will then be used together to calculate the loss.

In [None]:
import torch.optim as optim

# Initialize the projection matrix P
P = torch.randn(
    dim, dim, requires_grad=True
)

# Set hyperparameters
lr = 0.1
num_epochs = 25
optimizer = optim.Adam([P], lr=lr)
epochs, types, losses, accuracies, matrices = [], [], [], [], []

for epoch in range(num_epochs):
    # Reset gradients
    optimizer.zero_grad()

    # Iterate through training pairs
    for pair in train_pairs:
        # TODO: Get `prediction` and `y` to pass to `loss_func`
        prediction = 0 # <-- TODO
        y = 0 # <-- TODO

        loss = loss_func(prediction, y)
        loss.backward()

    # Update weights using Adam optimizer
    optimizer.step()

    # Calculate validation loss
    val_loss = 0
    for pair in val_pairs:
        # TODO: Get `prediction` and `y` to pass to `loss_func`
        prediction = 0 # <-- TODO
        y = 0 # <-- TODO

        val_loss += loss_func(prediction, y)

    print(f"Epoch {epoch}/{num_epochs}: validation loss: {val_loss.item() / len(val_pairs)}")

## Retrieval strategy

We now have a potentially improved embedding model, but need to use it for retrieval. Finish the retrieval function, which will take a user input and return relevant code snippets from the full list. Note: a vector database is not necessary.

In [None]:
all_snippets = []

for example in dataset:
    for snippet in example.snippets:
        all_snippets.append(snippet)

# Use similarity search with the embeddings model to retrieve relevant snippets
def retrieve_relevant_snippets(user_input: str):
    # TODO
    raise NotImplementedError

## Evaluate the new retrieval strategy

If the loss was lower by the last epoch, then we know that we improved the similarity function (at least for the validation set), but we still need a way of evaluating the retrieval strategy as a whole.

Your last task is to design an evaluation metric suitable for codebase retrieval, which we can run over the examples in the above dataset. The result of the evaluation should be a single number that attempts to represent the quality of the retrieval strategy.

In [None]:
def evaluate_retrieval_strategy(retrieval_strategy):
    # TODO
    raise NotImplementedError

result = evaluate_retrieval_strategy(retrieve_relevant_snippets)
print(result)