# Code Similarity with Contrastive Learning

## Dependencies

In [6]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
from transformers import AutoTokenizer, AutoModel

## Code Embeddings

In [2]:
PRETRAINED_MODEL = "microsoft/codebert-base"
tokenizer, model = AutoTokenizer.from_pretrained(PRETRAINED_MODEL), AutoModel.from_pretrained(PRETRAINED_MODEL)

In [3]:
def get_code_embedding(code: str):
    inputs = tokenizer(code, return_tensors='pt', truncation=True, padding=True)
    with torch.no_grad():
        outputs = model(**inputs)
        outputs = outputs.last_hidden_state.mean(dim=1)  # [1, 9, 768] mean pooled -> [1, 768]
        return outputs

In [4]:
# Example code snippets and pairs

code_1 = "def add(a, b): return a + b"
code_2 = "def sum(x, y): return x + y"
code_3 = "print('Hello, World!')"

ppair = code_1, code_2
npair = code_1, code_3

ppair_emb = tuple(map(get_code_embedding, ppair))
npair_emb = tuple(map(get_code_embedding, npair))

# Example similarity and distance calculation

p_cosine_sim = F.cosine_similarity(*ppair_emb)
n_cosine_sim = F.cosine_similarity(*npair_emb)
p_pair_dist = F.pairwise_distance(*ppair_emb)
n_pair_dist = F.pairwise_distance(*npair_emb)

print('Cosine similarity:')
print('- positive pair:', p_cosine_sim)
print('- negative pair:', n_cosine_sim)

print('Pairwise distance:')
print('- positive pair:', p_pair_dist)
print('- negative pair:', n_pair_dist)

Cosine similarity:
- positive pair: tensor([0.9913])
- negative pair: tensor([0.8534])
Pairwise distance:
- positive pair: tensor([2.3960])
- negative pair: tensor([10.0942])


## The Classic Contrastive Loss Function

In [5]:
def contrastive_loss(x1, x2, label: float, margin: float):
    """
    The label indicates whether the pair is negative.
        - 1.0 means positive
        - 0.0 means negative
    
    The loss is is calculated like this:
        `dist ** 2` if the label is positive `min(margin - dist)` otherwise
    """
    dist = F.pairwise_distance(x1, x2)
    loss = label * torch.pow(dist, 2) + (1 - label) * torch.pow(torch.clamp(margin - dist, min=0.0), 2)
    return loss.mean()

## Starting Model

In [8]:
# Model
# TODO

class CodeSimilarityModel(nn.Module):
    def __init__(self, pretrained_model_name="microsoft/codebert-base", hidden_size=768):
        # NOTE: The base value for hidden_size is the size of size of the pooled output of the transformer
        super(CodeSimilarityModel, self).__init__()
        
        # Load the pre-trained tokenizer and transformer model
        self.tokenizer = AutoTokenizer.from_pretrained(pretrained_model_name)
        self.transformer = AutoModel.from_pretrained(pretrained_model_name)
        
        # Additional layers for fine-tuning the embeddings
        self.fc1     = nn.Linear(hidden_size, hidden_size)
        self.relu    = nn.ReLU()
        self.dropout = nn.Dropout(0.1)
        self.fc2     = nn.Linear(hidden_size, hidden_size)
    
    def _tokenize(self, code: str):
        return self.tokenizer(code, return_tensors='pt', truncation=True, padding='max_length', max_length=512)
    
    
    def forward(self, code1: str, code2: str):
        # TODO: Only passing one tensor through the network MAY lead to better training time ...
        
        # Tokenize both code snippets
        inputs1 = self._tokenize(code1)
        inputs2 = self._tokenize(code2)
        
        # Get the transformer outputs for both inputs
        transformer_outputs1 = self.transformer(**inputs1)
        transformer_outputs2 = self.transformer(**inputs2)
        
        # Pool the transformer outputs (mean pooling)
        pooled_output1 = transformer_outputs1.last_hidden_state.mean(dim=1)
        pooled_output2 = transformer_outputs2.last_hidden_state.mean(dim=1)
        
        # Pass through the additional layers for inputs1
        x1 = self.fc1(pooled_output1)
        x1 = self.relu(x1)
        x1 = self.dropout(x1)
        x1 = self.fc2(x1)
        
        # Pass through the additional layers for inputs2
        x2 = self.fc1(pooled_output2)
        x2 = self.relu(x2)
        x2 = self.dropout(x2)
        x2 = self.fc2(x2)
        
        return x1, x2

In [7]:
# Training dataset
# TODO

class CodePairDataset(Dataset):
    def __init__(self, code_pairs, labels):
        self.code_pairs = code_pairs
        self.labels     = labels

    def __getitem__(self, idx):
        code1, code2 = self.code_pairs[idx]
        label        = self.labels[idx]
        return code1, code2, torch.tensor(label, dtype=torch.float)
    
    def __len__(self):
        return len(self.code_pairs)
    
    @classmethod
    def from_csv_data(cls, path: str = "training.csv"):
        df = pd.read_csv(path)
        code_pairs, labels = [], []
        # TODO: there must be a more efficient way to do this
        for _, row in df.iterrows():
            code_pairs.append((row['src_x'], row['src_y'])); labels.append(row['label'])
        return cls(code_pairs, labels)
        

In [9]:
# Training
# TODO

def train(model, dataloader, optimizer, epochs, margin):
    model.train()
    for epoch in range(epochs):
        total_loss = 0
        for code1, code2, labels in dataloader:
            optimizer.zero_grad()
            
            # Get embeddings for both code snippets
            embeddings1, embeddings2 = model(code1, code2)
            
            # Compute the loss
            loss = contrastive_loss(embeddings1, embeddings2, labels, margin)
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
        
        avg_loss = total_loss / len(dataloader)
        print(f"Epoch {epoch + 1}/{epochs}, Loss: {avg_loss}")

In [None]:
# Define model, tokenizer, and optimizer
model = CodeSimilarityModel(pretrained_model_name="microsoft/codebert-base")
optimizer = torch.optim.Adam(model.parameters(), lr=1e-5)

dataset = CodePairDataset.from_csv_data()
dataloader = DataLoader(dataset, batch_size=50, shuffle=True)

# Train the model
epochs = 5; margin = 1.0
train(model, dataloader, optimizer, epochs, margin)

In [None]:
# Evaluation dataset

# TODO: 
# If CodeNet is used for evaluation the code pairs can't be overlapping with training data !!!

# placeholder
test_code_pairs = [
    ("def add(a, b): return a + b", "def sum(x, y): return x + y"), 
    ("def add(a, b): return a + b", "def subtract(a, b): return a - b"),
#...
]
test_labels = [1, 0]  # Corresponding labels for the test pairs

test_dataset = CodePairDataset(test_code_pairs, test_labels)
test_dataloader = DataLoader(test_dataset, batch_size=2, shuffle=False)

In [None]:
# Evaluation
# TODO

def evaluate(model, dataloader):
    model.eval()
    all_labels = []
    all_distances = []
    
    with torch.no_grad():
        for code1, code2, labels in dataloader:
            # Get embeddings for both code snippets
            embeddings1, embeddings2 = model(code1, code2)
            
            # Compute the Euclidean distance between the embeddings
            distances = F.pairwise_distance(embeddings1, embeddings2)
            
            # Store the distances and labels
            all_distances.extend(distances.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    return all_labels, all_distances

In [None]:
# Evaluate the model on the test dataset
test_labels, test_distances = evaluate(model, test_dataloader)

# Print results
for label, distance in zip(test_labels, test_distances):
    print(f"Label: {label}, Distance: {distance}")