In [None]:
!rm -r *

In [None]:
config = {
    "lr": 1e-4,
    "bsz": 1,
    "epochs": 50,
    "max_sequence_length": 1024,
    "cwe_list": ["CWE-119", "CWE-125", "CWE-787"]
}

In [None]:
#!pip install wandb

#import wandb
#wandb.login(key='')
#wandb.init(project='', config=config)

In [None]:
# calculate the basic performance metrics
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score, f1_score

def calculate_perf_metrics(all_labels, all_predictions):
    # evaluation scores
    cm = confusion_matrix(all_labels, all_predictions)
    accuracy = accuracy_score(all_labels, all_predictions)
    precision = precision_score(all_labels, all_predictions)
    recall = recall_score(all_labels, all_predictions)
    f1 = f1_score(all_labels, all_predictions)

    # Calculate false positive rate (FPR) and false negative rate (FNR)
    tn, fp, fn, tp = cm.ravel()
    fpr = fp / (fp + tn)
    fnr = fn / (fn + tp)

    return {"accuracy": accuracy,
            "precision": precision,
            "recall": recall,
            "f1-score": f1,
            "FPR": fpr,
            "FNR": fnr
           }

In [None]:
def eval_model(model, tokenizer, test_dataloader, max_steps=None):
    all_labels = []
    all_predictions = []

    model.eval()
    with torch.no_grad():
        for step, samples in enumerate(test_dataloader):
        
            if max_steps:
                if step >= max_steps:
                    break
            
            # take each sample and set its label
            inputs = samples['vuln'] + samples['patch']
            labels = [1] * len(samples['vuln']) + [0] * len(samples['patch'])

            # tokenize and pad all to same length
            tokenizer_output = tokenizer(inputs,
                                                 return_tensors='pt',
                                                 padding='longest',
                                                 truncation=True,
                                                 max_length=config["max_sequence_length"])

            input_ids = tokenizer_output['input_ids'].to(device)
            attention_mask = tokenizer_output['attention_mask'].to(device)

            # get model predictions
            outputs = torch.tensor([])

            for i, a in zip(input_ids, attention_mask):
                outputs = torch.cat( (outputs, model(i.unsqueeze(0), a.unsqueeze(0)).cpu()) )

            all_labels += labels
            all_predictions += torch.min(outputs, dim=1).indices.tolist()

    metrics = calculate_perf_metrics(all_labels, all_predictions)
    return metrics

# Defining the Dataset

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader

import pandas as pd

class DiverseVPPDataset(Dataset):
    def __init__(self, path):
        self.df = pd.read_csv(path)
        self.df.drop(columns=self.df.columns[0], axis=1, inplace=True)
    
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        return {"vuln": self.df.iloc[idx].loc["vuln"],
                "patch": self.df.iloc[idx].loc["patch"]}
    
    def get_df(self):
        return self.df
    
    def set_df(self, df):
        self.df = df

In [None]:
from torch.utils.data import DataLoader, RandomSampler
from torch.utils.data.dataset import random_split

dset = DiverseVPPDataset('/kaggle/input/diversevulpatchpairs/DiverseVulPatchPairs.csv')

df = dset.get_df().drop_duplicates(subset=['patch_hash'], keep=False)
df = df[df['cwe'].map(lambda e: any([(cwe in e) for cwe in config['cwe_list']]))]

dset.set_df(df)

train_dset, test_dset = random_split(dset, [0.85, 0.15])

print("Training set size: {} pairs, {} functions total.".format(len(train_dset), len(train_dset)*2))
print("Testing set size:  {} pairs, {} functions total.".format(len(test_dset), len(test_dset)*2))
print("Full set size:     {} pairs, {} functions total.".format(len(dset), len(dset)*2))

train_dataloader = DataLoader(dataset=train_dset,
                              batch_size=config["bsz"],
                              shuffle=True,
                              drop_last=False,
                              num_workers=2)

test_dataloader = DataLoader(dataset=test_dset,
                              batch_size=config["bsz"],
                              shuffle=False,
                              drop_last=False,
                              num_workers=2)

# Defining the Model

In [None]:
import torch.nn as nn
from transformers import AutoModel, AutoTokenizer

checkpoint = "Salesforce/codet5p-110m-embedding"

codet5p_tokenizer = AutoTokenizer.from_pretrained(checkpoint, trust_remote_code=True)
codet5p_model = AutoModel.from_pretrained(checkpoint, trust_remote_code=True)

In [None]:
import torch.nn.functional as F

class VulnCodeT5(nn.Module):
    
    def __init__(self, codet5p):
        super(VulnCodeT5, self).__init__()
        
        # codet5+ does its own embedding and normalization
        # we want to customize this, so we only take the encoder
        self.encoder = codet5p.encoder
        
        self.cls = nn.Sequential(
            nn.Dropout(p=0.1),
            nn.Linear(768, 3072),
            nn.Tanh(),
            nn.Dropout(p=0.1),
            nn.Linear(3072, 3072),
            nn.Linear(3072, 2)
        )
    
    def forward(self, input_ids, attention_mask):
        encoder_outputs = self.encoder(input_ids, attention_mask)
        output = self.cls(encoder_outputs.last_hidden_state[:, 0, :])
        
        return output

# Defining the Training loop

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = VulnCodeT5(codet5p_model)
model = model.to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=config["lr"])

criterion = nn.BCEWithLogitsLoss()

#print(eval_model(model, codet5p_tokenizer, test_dataloader, max_steps=250))

In [None]:
for epoch in range(config["epochs"]):
    model.train()
    
    all_labels_train = []
    all_predictions_train = []
    
    for samples in train_dataloader:
        # take each sample and set its label
        inputs = samples['vuln'] + samples['patch']
        labels = [1] * len(samples['vuln']) + [0] * len(samples['patch'])
        
        # tokenize and pad all to same length
        tokenizer_output = codet5p_tokenizer(inputs,
                                             return_tensors='pt',
                                             padding='longest',
                                             truncation=True,
                                             max_length=config["max_sequence_length"])
        
        input_ids = tokenizer_output['input_ids'].to(device)
        attention_mask = tokenizer_output['attention_mask'].to(device)
        labels = [[l, -1*(l-1)] for l in labels]
        labels = torch.FloatTensor(labels)
        
        # forward pass
        output = model(input_ids, attention_mask)
            
        # calc loss
        loss = criterion(output.cpu(), labels)
            
        # backprop
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        #wandb.log({'epoch': epoch, 'loss': loss.item()})
        
        all_labels_train += torch.min(labels, dim=1).indices.tolist()
        all_predictions_train += torch.min(output.cpu(), dim=1).indices.tolist()
    
    torch.save(model.state_dict(), './model.tmp')
    
    train_metrics = calculate_perf_metrics(all_labels_train, all_predictions_train)
    test_metrics = eval_model(model, codet5p_tokenizer, test_dataloader, max_steps=250)
    
    train_metrics_wandb = {}
    for k, v in train_metrics.items():
        train_metrics_wandb[k+'_train'] = v
    
    test_metrics_wandb = {}
    for k, v in test_metrics.items():
        test_metrics_wandb[k+'_test'] = v
        
    #wandb.log(train_metrics_wandb)
    #wandb.log(test_metrics_wandb)
    
torch.save(model.state_dict(), './model.pt')

# Model Eval

In [None]:
metrics = eval_model(model, codet5p_tokenizer, test_dataloader)
#wandb.log({'final_metrics': metrics})
#wandb.finish()
print(metrics)