In [1]:
import json
import pickle
import torch
import numpy as np
from transformers import AutoTokenizer, AutoModel
from datasets import Dataset
from datasets import load_dataset

buggy_list = []
fixed_list = []

base_dir = "/mimer/NOBACKUP/groups/naiss2025-5-243/buggy_fixed_embeddings" # Location of buggy+fixed pairs

for chunk_num in range(23):
    file_path = f"{base_dir}/buggy_fixed_embeddings_chunk_{chunk_num:04d}.pkl"
    with open(file_path, 'rb') as f:
        data = pickle.load(f)
        buggy_list.extend(data['buggy_embeddings'].tolist())
        fixed_list.extend(data['fixed_embeddings'].tolist())

In [4]:
import random

pairs = list(zip(buggy_list, fixed_list))
random.shuffle(pairs)

train_size = int(0.8 * len(pairs))
val_size = int(0.1 * len(pairs))

train_pairs = pairs[:train_size]
val_pairs = pairs[train_size:train_size+val_size]
test_pairs = pairs[train_size+val_size:]

train_buggy, train_fixed = zip(*train_pairs)
val_buggy, val_fixed = zip(*val_pairs)
test_buggy, test_fixed = zip(*test_pairs)

train_buggy = np.array(train_buggy)
train_fixed = np.array(train_fixed)
val_buggy = np.array(val_buggy)
val_fixed = np.array(val_fixed)
test_buggy = np.array(test_buggy)
test_fixed = np.array(test_fixed)

In [6]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

# Dataset
class VectorPairDataset(Dataset):
    def __init__(self, buggy, fixed):
        self.buggy = torch.tensor(buggy, dtype=torch.float32)
        self.fixed = torch.tensor(fixed, dtype=torch.float32)
    def __len__(self):
        return len(self.buggy)
    def __getitem__(self, i):
        return self.buggy[i], self.fixed[i]

# Model
class MLP_Model(nn.Module):
    def __init__(self, input_size=1024, output_size=1024, hidden_sizes=[4096, 2048, 1024]):
        super().__init__()
        layers, in_f = [], input_size
        for h in hidden_sizes:
            layers += [nn.Linear(in_f, h), nn.ReLU()]
            in_f = h
        layers.append(nn.Linear(in_f, output_size))
        self.model = nn.Sequential(*layers)
    def forward(self, x):
        return self.model(x)

# Cosine similarity loss + MSE
class MSECosineLoss(nn.Module):
    def __init__(self, alpha=0.5, beta=0.5):
        super().__init__()
        self.mse = nn.MSELoss()
        self.cos = nn.CosineSimilarity(dim=1)
        self.alpha = alpha
        self.beta = beta

    def forward(self, x, y):
        mse_loss = self.mse(x, y)
        cosine_loss = 1 - self.cos(x, y).mean()
        return self.alpha * mse_loss + self.beta * cosine_loss
        
# Device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Loaders
train_loader = DataLoader(VectorPairDataset(train_buggy, train_fixed), batch_size=512, shuffle=True)
val_loader = DataLoader(VectorPairDataset(val_buggy, val_fixed), batch_size=512)
test_loader = DataLoader(VectorPairDataset(test_buggy, test_fixed), batch_size=512)

# Model
model = MLP_Model(input_size=train_buggy.shape[1], output_size=train_fixed.shape[1]).to(device)

# Loss + Optimizer
loss_fn = MSECosineLoss(alpha=0.5, beta=0.5)
cos_fn = nn.CosineSimilarity(dim=1)
opt = torch.optim.Adam(model.parameters(), lr=1e-3)

# Training loop
for epoch in range(20):
    model.train()
    tloss, tsim = 0, 0
    for b, f in train_loader:
        b, f = b.to(device), f.to(device)
        opt.zero_grad()
        out = model(b)
        loss = loss_fn(out, f)
        loss.backward()
        opt.step()
        tloss += loss.item() * b.size(0)
        tsim += cos_fn(out, f).mean().item() * b.size(0)
    tloss /= len(train_loader.dataset)
    tsim /= len(train_loader.dataset)

    model.eval()
    vloss, vsim = 0, 0
    with torch.no_grad():
        for b, f in val_loader:
            b, f = b.to(device), f.to(device)
            out = model(b)
            vloss += loss_fn(out, f).item() * b.size(0)
            vsim += cos_fn(out, f).mean().item() * b.size(0)
    vloss /= len(val_loader.dataset)
    vsim /= len(val_loader.dataset)

    print(f"{epoch+1}: train_loss={tloss:.6f}, val_loss={vloss:.6f}, "
          f"train_sim={tsim:.6f}, val_sim={vsim:.6f}")

# Final test
model.eval()
test_loss, test_sim = 0, 0
with torch.no_grad():
    for b, f in test_loader:
        b, f = b.to(device), f.to(device)
        out = model(b)
        test_loss += loss_fn(out, f).item() * b.size(0)
        test_sim += cos_fn(out, f).mean().item() * b.size(0)
print(f"Test: loss={test_loss/len(test_loader.dataset):.6f}, "
      f"sim={test_sim/len(test_loader.dataset):.6f}")


1: train_loss=0.015660, val_loss=0.008174, train_sim=0.983648, val_sim=0.991775
2: train_loss=0.006955, val_loss=0.005954, train_sim=0.993013, val_sim=0.993935
3: train_loss=0.005502, val_loss=0.005095, train_sim=0.994503, val_sim=0.994987
4: train_loss=0.004734, val_loss=0.004361, train_sim=0.995276, val_sim=0.995566
5: train_loss=0.004270, val_loss=0.004503, train_sim=0.995750, val_sim=0.995486
6: train_loss=0.003923, val_loss=0.003966, train_sim=0.996084, val_sim=0.996099
7: train_loss=0.003649, val_loss=0.003462, train_sim=0.996361, val_sim=0.996490
8: train_loss=0.003455, val_loss=0.003353, train_sim=0.996551, val_sim=0.996648
9: train_loss=0.003290, val_loss=0.003104, train_sim=0.996712, val_sim=0.996850
10: train_loss=0.003251, val_loss=0.003107, train_sim=0.996776, val_sim=0.996853
11: train_loss=0.003044, val_loss=0.002983, train_sim=0.996949, val_sim=0.996974
12: train_loss=0.002966, val_loss=0.003161, train_sim=0.997034, val_sim=0.996811
13: train_loss=0.002920, val_loss=0.0