In [None]:
!pip install evaluate

In [50]:
import os
import urllib.request
import zipfile
import numpy as np
import torch
from torch.utils.data import Dataset as TorchDataset
from datasets import load_dataset, Dataset, ClassLabel
from transformers import (
    RobertaTokenizerFast,
    RobertaForSequenceClassification,
    RobertaForTokenClassification,
    Trainer,
    TrainingArguments,
)
import evaluate
from tqdm import tqdm
from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split
import pandas as pd
from google.colab import files
import shutil

In [51]:
seed = 42

dataset_percent = 1 # 0.1 => 10%

train_percent = 0.75
eval_percent  = 0.10
test_percent  = 0.15

epochs = 3

tokenizer_name        = "neulab/codebert-cpp"
fn_level_model_name   = "neulab/codebert-cpp"
line_level_model_name = "neulab/codebert-cpp"

use_tokenizer_max_length = True # if False: use below
tokenizer_max_length     = 2048

download_model = True

fn_level_trainer_args = TrainingArguments(
    output_dir="./fn-level",
    learning_rate=2e-5,
    eval_strategy="epoch",
    per_device_train_batch_size=32,
    per_device_eval_batch_size=32,
    gradient_accumulation_steps=4,
    num_train_epochs=epochs,
    save_strategy="epoch",
    logging_dir="./logs",
    load_best_model_at_end=True,
    metric_for_best_model="mcc",
    greater_is_better=True,
    fp16=torch.cuda.is_available(),
    report_to="none",
)

line_level_trainer_args = TrainingArguments(
    output_dir="./line-level",
    learning_rate=2e-5,
    eval_strategy="epoch",
    per_device_train_batch_size=32,
    per_device_eval_batch_size=32,
    num_train_epochs=epochs,
    save_strategy="epoch",
    logging_dir="./logs",
    load_best_model_at_end=True,
    metric_for_best_model="mcc",
    greater_is_better=True,
    fp16=torch.cuda.is_available(),
    report_to="none",
)

In [52]:
assert train_percent + eval_percent + test_percent == 1
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)

In [53]:
bugvul_zip_url = "https://raw.githubusercontent.com/Meerschwein/Automating-SE/refs/heads/main/Big-Vul-dataset.zip"
data_path = "Big-Vul-dataset/data.json"

if not os.path.exists("Big-Vul-dataset.zip"):
    urllib.request.urlretrieve(bugvul_zip_url, "Big-Vul-dataset.zip")

if not os.path.exists("Big-Vul-dataset"):
    with zipfile.ZipFile("Big-Vul-dataset.zip", "r") as zip_ref:
        zip_ref.extractall("Big-Vul-dataset")

In [54]:
df = pd.read_json(data_path, dtype={"vul": "int8"})

df = (df.drop(["bigvul_id"], axis=1)
        .rename(columns={"vul": "labels"})
        .dropna(subset=["code", "labels"])
        .drop_duplicates("code")
        .reset_index(drop=True))

if 0 < dataset_percent < 1: # smaller for training
    df, _ = train_test_split(df, test_size=1-dataset_percent, stratify=df['labels'], random_state=seed)

train_df, eval_test_df = train_test_split(df, train_size=train_percent, stratify=df['labels'], random_state=seed)
eval_df, test_df = train_test_split(eval_test_df, test_size=test_percent/(test_percent+eval_percent), stratify=eval_test_df['labels'], random_state=seed)

raw_train_ds = Dataset.from_pandas(train_df, preserve_index=False)
raw_eval_ds  = Dataset.from_pandas(eval_df, preserve_index=False)
raw_test_ds  = Dataset.from_pandas(test_df, preserve_index=False)

print(f"Training Dataset   {((len(raw_train_ds)/len(df))*100):.2f}% {len(raw_train_ds)}")
print(f"Validation Dataset {((len(raw_eval_ds)/len(df))*100):.2f}% {len(raw_eval_ds)}")
print(f"Test Dataset       {((len(raw_test_ds)/len(df))*100):.2f}% {len(raw_test_ds)}")

Training Dataset   75.00% 139706
Validation Dataset 10.00% 18627
Test Dataset       15.00% 27942


In [None]:
tokenizer = RobertaTokenizerFast.from_pretrained(tokenizer_name)
fn_level_model = RobertaForSequenceClassification.from_pretrained(fn_level_model_name, num_labels=2)

In [None]:
def tokenize(batch):
    max_length = tokenizer.model_max_length if use_tokenizer_max_length else tokenizer_max_length
    return tokenizer(batch["code"], padding="max_length", truncation=True, max_length=max_length)

fn_level_train_ds = raw_train_ds.map(tokenize, batched=True, remove_columns=["code"])
fn_level_eval_ds  = raw_eval_ds.map(tokenize, batched=True, remove_columns=["code"])
fn_level_test_ds  = raw_test_ds.map(tokenize, batched=True, remove_columns=["code"])

In [57]:
accuracy_metric = evaluate.load("accuracy")
precision_metric = evaluate.load("precision")
recall_metric = evaluate.load("recall")
f1_metric = evaluate.load("f1")
mcc_metric = evaluate.load("matthews_correlation")
auc_metric = evaluate.load("roc_auc")

metrics_include_report = False

def fn_level_compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    probs = torch.nn.functional.softmax(torch.tensor(logits), dim=1)[:, 1].numpy()  # Probability of class 1 (vulnerable)

    accuracy = accuracy_metric.compute(predictions=predictions, references=labels)["accuracy"]
    precision = precision_metric.compute(predictions=predictions, references=labels)["precision"]
    recall = recall_metric.compute(predictions=predictions, references=labels)["recall"]
    f1 = f1_metric.compute(predictions=predictions, references=labels)["f1"]
    mcc = mcc_metric.compute(predictions=predictions, references=labels)["matthews_correlation"]
    auc = auc_metric.compute(prediction_scores=probs, references=labels)["roc_auc"]

    metrics = {
        "accuracy": accuracy,
        "precision": precision,
        "recall": recall,
        "f1": f1,
        "mcc": mcc,
        "auc": auc,
    }

    if metrics_include_report:
        report = classification_report(labels, predictions, target_names=["Non-vulnerable", "Vulnerable"])
        metrics["report"] = report

    return metrics

def test_model(trainer, test_dataset):
    global metrics_include_report
    metrics_include_report = True
    evaluation_results = trainer.evaluate(test_dataset)
    evaluation_df = pd.DataFrame([evaluation_results])
    evaluation_df.columns = evaluation_df.columns.str.replace('eval_', '')
    evaluation_df = evaluation_df.drop(["samples_per_second", "steps_per_second", "epoch", "runtime", "report", "loss"], axis=1)
    display(evaluation_df)
    print(evaluation_results["eval_report"])
    metrics_include_report = False

In [58]:
fn_level_trainer = Trainer(
    args=fn_level_trainer_args,
    model=fn_level_model,
    train_dataset=fn_level_train_ds,
    eval_dataset=fn_level_eval_ds,
    processing_class=tokenizer,
    compute_metrics=fn_level_compute_metrics,
)

fn_level_trainer.train()
fn_level_trainer.save_model("fn-level-model")

Epoch,Training Loss,Validation Loss,Accuracy,Precision,Recall,F1,Mcc,Auc
1,0.0597,0.051294,0.987169,0.958393,0.76082,0.848254,0.847766,0.969998
2,0.0487,0.048854,0.987491,0.961373,0.765376,0.852251,0.851805,0.969701
3,0.0387,0.048408,0.988243,0.957004,0.785877,0.863039,0.861501,0.97561


In [59]:
test_model(fn_level_trainer, fn_level_test_ds)

Unnamed: 0,accuracy,precision,recall,f1,mcc,auc
0,0.987904,0.940647,0.793627,0.860905,0.858004,0.976235


                precision    recall  f1-score   support

Non-vulnerable       0.99      1.00      0.99     26624
    Vulnerable       0.94      0.79      0.86      1318

      accuracy                           0.99     27942
     macro avg       0.97      0.90      0.93     27942
  weighted avg       0.99      0.99      0.99     27942



In [65]:
def download_model(dir):
    files.download(shutil.make_archive(dir, 'zip', dir))

In [None]:
if download_model:
    download_model("./fn-level-model")

In [None]:
def add_token_labels(example):
    code        = example["code"]
    vuln_lines  = set(example["flaw_line_no"]) # [] if benign

    max_length = tokenizer.model_max_length if use_tokenizer_max_length else tokenizer_max_length
    enc = tokenizer(
        code,
        return_offsets_mapping=True,
        truncation=True,
        max_length=max_length,
        padding="max_length",
    )

    # map every token to its source-code line
    labels = np.full(len(enc["input_ids"]), -100, dtype=np.int8) # pad value
    line_start = [0] + [i + 1 for i, c in enumerate(code) if c == "\n"]

    for idx, (start, _) in enumerate(enc["offset_mapping"]):
        if start == 0 and idx == 0: # [CLS] token => keep -100
            continue
        # line numbers are 1-based
        line_no = 1 + sum(start >= ls for ls in line_start)
        labels[idx] = int(line_no in vuln_lines)

    enc.pop("offset_mapping")
    enc["labels"] = labels.tolist()
    return enc

line_level_train_ds = raw_train_ds.map(add_token_labels, remove_columns=list(train_df.columns))
line_level_eval_ds  = raw_eval_ds.map(add_token_labels, remove_columns=list(train_df.columns))
line_level_test_ds  = raw_test_ds.map(add_token_labels, remove_columns=list(train_df.columns))

In [None]:
metrics_include_report = False
def line_level_metrics(eval_pred):
    logits, y = eval_pred
    logits_flat = logits.reshape(-1, logits.shape[-1]) # Flatten logits for masking
    p = logits.argmax(-1).flatten()
    y = y.flatten()
    mask = y != -100 # ignore padding tokens
    predictions, labels = p[mask], y[mask]
    logits_masked = logits_flat[mask]
    probs = torch.nn.functional.softmax(torch.tensor(logits_masked), dim=1)[:, 1].numpy()  # Probability of class 1 (vulnerable)

    accuracy = accuracy_metric.compute(predictions=predictions, references=labels)["accuracy"]
    precision = precision_metric.compute(predictions=predictions, references=labels)["precision"]
    recall = recall_metric.compute(predictions=predictions, references=labels)["recall"]
    f1 = f1_metric.compute(predictions=predictions, references=labels)["f1"]
    mcc = mcc_metric.compute(predictions=predictions, references=labels)["matthews_correlation"]
    auc = auc_metric.compute(prediction_scores=probs, references=labels)["roc_auc"]

    metrics = {
        "accuracy": accuracy,
        "precision": precision,
        "recall": recall,
        "f1": f1,
        "mcc": mcc,
        "auc": auc,
    }

    if metrics_include_report:
        report = classification_report(labels, predictions, target_names=["Non-vulnerable", "Vulnerable"])
        metrics["report"] = report

    return metrics

In [None]:
line_level_model = RobertaForTokenClassification.from_pretrained(line_level_model_name, num_labels=2)

In [None]:
line_level_trainer = Trainer(
    args=line_level_trainer_args,
    model=line_level_model,
    train_dataset=line_level_train_ds,
    eval_dataset=line_level_eval_ds,
    processing_class=tokenizer,
    compute_metrics=line_level_metrics,
)

line_level_trainer.train()
line_level_trainer.save_model("line-level-model")

In [None]:
test_model(line_level_trainer, line_level_test_ds)

In [None]:
if download_model:
    download_model("line-level-model")

In [None]:
uploaded = files.upload()
for filename in uploaded.keys():
    if filename.endswith(".zip"):
        folder_name = filename.replace(".zip", "")
        os.makedirs(folder_name, exist_ok=True)
        !unzip -q "$filename" -d "$folder_name"

In [None]:
trained_fn_level_model = RobertaForSequenceClassification.from_pretrained("./fn-level-model")
trained_line_level_model = RobertaForTokenClassification.from_pretrained("./line-level-model")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
_ = trained_fn_level_model.to(device).eval()
_ = trained_line_level_model.to(device).eval()

In [None]:
def get_vuln_lines(example):
    code = example["code"]

    # Function-level classification
    fn_inputs = tokenizer(code, return_tensors="pt", truncation=True, padding="max_length", max_length=512)
    fn_inputs = {k: v.to(device) for k, v in fn_inputs.items()}
    with torch.no_grad():
        fn_outputs = trained_fn_level_model(**fn_inputs)
        fn_probs = torch.softmax(fn_outputs.logits, dim=1)
    is_vulnerable = fn_probs[0][1].item() > 0.5  # Class 1 = vulnerable

    if not is_vulnerable:
        return {"vulnerable": False, "lines": []}

    # Line-level classification
    enc = tokenizer(code, return_offsets_mapping=True, return_tensors="pt",
                    truncation=True, padding="max_length", max_length=512)
    offset_mapping = enc.pop("offset_mapping")[0]
    enc = {k: v.to(device) for k, v in enc.items()}

    with torch.no_grad():
        line_outputs = trained_line_level_model(**enc)
    line_logits = line_outputs.logits
    line_preds = torch.argmax(line_logits, dim=-1)[0]  # shape: [seq_len]

    # Map tokens to line numbers
    lines = code.split('\n')
    line_start_positions = [0]
    for line in lines:
        line_start_positions.append(line_start_positions[-1] + len(line) + 1)

    line_indices = set()
    for idx, (start_offset, _) in enumerate(offset_mapping):
        if start_offset == 0 and idx == 0:  # [CLS] token
            continue
        if line_preds[idx].item() == 1:
            start = start_offset.item()
            line_no = 1 + sum(start >= pos for pos in line_start_positions)
            line_indices.add(line_no)

    return {"vulnerable": True, "lines": sorted(line_indices)}

def display_vulnerability_result(example, predicted_lines):
    code_lines = example["code"].split("\n")
    actual_lines = set(example.get("flaw_line_no", []))
    predicted_lines = set(predicted_lines)

    max_line_no_width = len(str(len(code_lines)))

    print(f"lines{sorted(actual_lines)} pred{sorted(predicted_lines)}")
    for i, line in enumerate(code_lines, start=1):
        line_no = str(i).rjust(max_line_no_width)
        actual_flag = "v" if i in actual_lines else " "
        predicted_flag = "p" if i in predicted_lines else " "
        print(f"{line_no} {actual_flag}{predicted_flag}|{line}")

small_vuln_examples = df[
    (df["labels"] == 1) &
    (df["code"].apply(lambda c: len(c.splitlines()) <= 10))  # max 7 lines
]
examples_to_test = small_vuln_examples.sample(n=5, random_state=seed).to_dict(orient="records")

for ex in examples_to_test:
    result = get_vuln_lines(ex)
    print(f"vuln {ex['labels']==1} pred {result['vulnerable']}")
    display_vulnerability_result(ex, result["lines"])
    print()

keep colab running
```js
function ClickConnect() {
    console.log("Working");
    document.querySelector("#top-toolbar > colab-connect-button").shadowRoot.querySelector("#connect").click();
}
var clicker = setInterval(ClickConnect, 60000);
```