<a href="https://colab.research.google.com/github/Retmmmy/finding-vulnerabilities/blob/main/norm_ib_project_colab_single_v2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# IB Project


In [None]:

!pip -q uninstall -y transformers tokenizers accelerate datasets evaluate safetensors || true
!pip -q install -U "transformers" "datasets" "accelerate" "evaluate" "safetensors"


## 1) Импорты и настройки


In [None]:
import os, random, json, math
from dataclasses import dataclass
from typing import Dict, List, Tuple

import numpy as np
import pandas as pd

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, roc_auc_score

SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print("DEVICE:", DEVICE)


## 2) Загрузка датасета и единый split

Используем `DynaOuchebara/BigVul` (поле кода: `func_before`, метка: `vul`).


In [None]:
from datasets import load_dataset

MAX_TRAIN = 20000
MAX_VAL   = 2000
MAX_TEST  = 2000

ds = load_dataset("DynaOuchebara/BigVul")


train_ds = ds["train"]

texts = train_ds["func_before"]
labels = train_ds["vul"]

clean = [(t, int(y)) for t, y in zip(texts, labels) if isinstance(t, str) and t.strip() != ""]
texts = [t for t,_ in clean]
labels = [y for _,y in clean]


X_train, X_temp, y_train, y_temp = train_test_split(
    texts, labels, test_size=0.2, random_state=SEED, stratify=labels
)

X_val, X_test, y_val, y_test = train_test_split(
    X_temp, y_temp, test_size=0.5, random_state=SEED, stratify=y_temp
)

def maybe_trunc(X, y, max_n):
    if max_n is None:
        return X, y
    X = X[:max_n]
    y = y[:max_n]
    return X, y

X_train, y_train = maybe_trunc(X_train, y_train, MAX_TRAIN)
X_val,   y_val   = maybe_trunc(X_val,   y_val,   MAX_VAL)
X_test,  y_test  = maybe_trunc(X_test,  y_test,  MAX_TEST)

print("Sizes:", len(X_train), len(X_val), len(X_test))
print("Pos rate test:", np.mean(y_test))


## 3) Утилиты метрик


In [None]:
def compute_metrics_from_logits(logits: np.ndarray, y_true: np.ndarray) -> Dict[str, float]:

    if logits.ndim == 2 and logits.shape[1] == 2:
        probs = torch.softmax(torch.tensor(logits), dim=1).numpy()[:, 1]
        y_pred = (probs >= 0.5).astype(int)
    else:
        probs = 1/(1+np.exp(-logits))
        y_pred = (probs >= 0.5).astype(int)

    acc = accuracy_score(y_true, y_pred)
    prec, rec, f1, _ = precision_recall_fscore_support(y_true, y_pred, average="binary", zero_division=0)
    try:
        auc = roc_auc_score(y_true, probs)
    except Exception:
        auc = float("nan")

    return {
        "accuracy": float(acc),
        "precision": float(prec),
        "recall": float(rec),
        "f1": float(f1),
        "roc_auc": float(auc),
    }


## 4) Baselines на PyTorch

### 4.1 Простая byte-level токенизация
baselines делается на PyTorch, код токенизируется как последовательность байт


In [None]:
@dataclass
class ByteTokenizerConfig:
    max_len: int = 512

class ByteTokenizer:
    def __init__(self, cfg: ByteTokenizerConfig):
        self.cfg = cfg

    def encode(self, text: str) -> List[int]:

        b = text.encode("utf-8", errors="ignore")[: self.cfg.max_len]
        ids = [x + 1 for x in b]
        if len(ids) < self.cfg.max_len:
            ids = ids + [0] * (self.cfg.max_len - len(ids))
        return ids

byte_tok = ByteTokenizer(ByteTokenizerConfig(max_len=512))


### 4.2 Dataset/Dataloader


In [None]:
class TextClsDataset(Dataset):
    def __init__(self, X: List[str], y: List[int], tokenizer: ByteTokenizer):
        self.X = X
        self.y = y
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx: int):
        ids = torch.tensor(self.tokenizer.encode(self.X[idx]), dtype=torch.long)
        label = torch.tensor(self.y[idx], dtype=torch.long)
        return {"input_ids": ids, "labels": label}

def make_loader(X, y, bs=32, shuffle=False):
    ds = TextClsDataset(X, y, byte_tok)
    return DataLoader(ds, batch_size=bs, shuffle=shuffle, num_workers=2, pin_memory=True)

train_loader = make_loader(X_train, y_train, bs=32, shuffle=True)
val_loader   = make_loader(X_val,   y_val,   bs=64, shuffle=False)
test_loader  = make_loader(X_test,  y_test,  bs=64, shuffle=False)


### 4.3 Модели: MLP / CNN / GRU / LSTM


In [None]:
class BaseByteModel(nn.Module):
    def __init__(self, vocab_size=257, emb_dim=128, num_classes=2):
        super().__init__()
        self.emb = nn.Embedding(vocab_size, emb_dim, padding_idx=0)
        self.num_classes = num_classes

class MLPByte(BaseByteModel):
    def __init__(self, **kw):
        super().__init__(**kw)
        self.pool = nn.AdaptiveAvgPool1d(1)
        self.net = nn.Sequential(
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(128, 2),
        )

    def forward(self, input_ids):
        x = self.emb(input_ids)
        x = x.transpose(1,2)
        x = self.pool(x).squeeze(-1)
        return self.net(x)

class CNNByte(BaseByteModel):
    def __init__(self, **kw):
        super().__init__(**kw)
        self.conv = nn.Sequential(
            nn.Conv1d(128, 128, kernel_size=5, padding=2),
            nn.ReLU(),
            nn.MaxPool1d(2),
            nn.Conv1d(128, 128, kernel_size=5, padding=2),
            nn.ReLU(),
            nn.AdaptiveMaxPool1d(1),
        )
        self.fc = nn.Linear(128, 2)

    def forward(self, input_ids):
        x = self.emb(input_ids).transpose(1,2)
        x = self.conv(x).squeeze(-1)
        return self.fc(x)

class GRUByte(BaseByteModel):
    def __init__(self, **kw):
        super().__init__(**kw)
        self.rnn = nn.GRU(input_size=128, hidden_size=128, batch_first=True, bidirectional=True)
        self.fc = nn.Linear(256, 2)

    def forward(self, input_ids):
        x = self.emb(input_ids)
        out, h = self.rnn(x)
        h = torch.cat([h[0], h[1]], dim=1)
        return self.fc(h)

class LSTMByte(BaseByteModel):
    def __init__(self, **kw):
        super().__init__(**kw)
        self.rnn = nn.LSTM(input_size=128, hidden_size=128, batch_first=True, bidirectional=True)
        self.fc = nn.Linear(256, 2)

    def forward(self, input_ids):
        x = self.emb(input_ids)
        out, (h, c) = self.rnn(x)
        h = torch.cat([h[0], h[1]], dim=1)
        return self.fc(h)


### 4.4 Train/Eval loop для baselines


In [None]:
def train_one_model(model: nn.Module, name: str, epochs: int = 2, lr: float = 2e-4):
    model = model.to(DEVICE)
    opt = torch.optim.AdamW(model.parameters(), lr=lr)
    loss_fn = nn.CrossEntropyLoss()

    best_val_f1 = -1.0
    best_state = None

    for ep in range(1, epochs+1):
        model.train()
        total = 0.0
        for batch in train_loader:
            ids = batch["input_ids"].to(DEVICE)
            y = batch["labels"].to(DEVICE)
            opt.zero_grad(set_to_none=True)
            logits = model(ids)
            loss = loss_fn(logits, y)
            loss.backward()
            opt.step()
            total += float(loss.item())


        model.eval()
        all_logits, all_y = [], []
        with torch.no_grad():
            for batch in val_loader:
                ids = batch["input_ids"].to(DEVICE)
                y = batch["labels"].cpu().numpy()
                logits = model(ids).cpu().numpy()
                all_logits.append(logits)
                all_y.append(y)
        all_logits = np.concatenate(all_logits, axis=0)
        all_y = np.concatenate(all_y, axis=0)
        m = compute_metrics_from_logits(all_logits, all_y)

        print(f"[{name}] epoch {ep} train_loss={total/len(train_loader):.4f} val_f1={m['f1']:.4f}")

        if m["f1"] > best_val_f1:
            best_val_f1 = m["f1"]
            best_state = {k: v.cpu().clone() for k, v in model.state_dict().items()}


    if best_state is not None:
        model.load_state_dict(best_state)


    model.eval()
    all_logits, all_y = [], []
    with torch.no_grad():
        for batch in test_loader:
            ids = batch["input_ids"].to(DEVICE)
            y = batch["labels"].cpu().numpy()
            logits = model(ids).cpu().numpy()
            all_logits.append(logits)
            all_y.append(y)
    all_logits = np.concatenate(all_logits, axis=0)
    all_y = np.concatenate(all_y, axis=0)
    test_metrics = compute_metrics_from_logits(all_logits, all_y)
    return test_metrics

baseline_results = {}
baseline_results["MLP"]  = train_one_model(MLPByte(), "MLP", epochs=2)
baseline_results["CNN"]  = train_one_model(CNNByte(), "CNN", epochs=2)
baseline_results["GRU"]  = train_one_model(GRUByte(), "GRU", epochs=2)
baseline_results["LSTM"] = train_one_model(LSTMByte(), "LSTM", epochs=2)

baseline_results


## 5) CodeBERT (Transformers)


In [None]:
import inspect
import torch
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    TrainingArguments,
    Trainer,
    DataCollatorWithPadding,
)

MODEL_NAME = "microsoft/codebert-base"


tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

def tokenize_batch(examples):
    return tokenizer(examples["text"], truncation=True, max_length=512)


from datasets import Dataset as HFDataset

hf_train = HFDataset.from_dict({"text": X_train, "label": y_train}).map(tokenize_batch, batched=True)
hf_val   = HFDataset.from_dict({"text": X_val,   "label": y_val}).map(tokenize_batch, batched=True)
hf_test  = HFDataset.from_dict({"text": X_test,  "label": y_test}).map(tokenize_batch, batched=True)

data_collator = DataCollatorWithPadding(tokenizer=tokenizer)


model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME, num_labels=2)


from sklearn.metrics import accuracy_score, precision_recall_fscore_support, roc_auc_score
import numpy as np

def hf_compute_metrics(eval_pred):
    logits, labels = eval_pred
    probs = torch.softmax(torch.tensor(logits), dim=1).numpy()[:, 1]
    preds = (probs >= 0.5).astype(int)

    acc = accuracy_score(labels, preds)
    prec, rec, f1, _ = precision_recall_fscore_support(labels, preds, average="binary", zero_division=0)
    try:
        auc = roc_auc_score(labels, probs)
    except Exception:
        auc = float("nan")
    return {"accuracy": acc, "precision": prec, "recall": rec, "f1": f1, "roc_auc": auc}


out_dir = "artifacts/codebert_model"
ta_params = set(inspect.signature(TrainingArguments.__init__).parameters.keys())
eval_key = "evaluation_strategy" if "evaluation_strategy" in ta_params else "eval_strategy"

ta_kwargs = dict(
    output_dir=out_dir,
    learning_rate=2e-5,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=16,
    num_train_epochs=2,
    weight_decay=0.01,
    load_best_model_at_end=True,
    metric_for_best_model="f1",
    greater_is_better=True,
    logging_steps=50,
    report_to="none",
    fp16=torch.cuda.is_available(),
)
ta_kwargs[eval_key] = "epoch"
ta_kwargs["save_strategy"] = "epoch"

args = TrainingArguments(**ta_kwargs)


trainer = Trainer(
    model=model,
    args=args,
    train_dataset=hf_train,
    eval_dataset=hf_val,
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=hf_compute_metrics,
)

trainer.train()
codebert_test = trainer.evaluate(hf_test)
codebert_test


## 6) Сводная таблица метрик и сохранение (CSV/JSON)


In [None]:
os.makedirs("artifacts", exist_ok=True)


rows = []
for k, v in baseline_results.items():
    rows.append({"model": k, **v})
rows.append({"model": "CodeBERT", **{k: float(v) for k, v in codebert_test.items() if k in ["eval_accuracy","eval_precision","eval_recall","eval_f1","eval_roc_auc"]}})


fixed_rows = []
for r in rows:
    rr = dict(r)
    if rr["model"] == "CodeBERT":
        rr["accuracy"]  = rr.pop("eval_accuracy")
        rr["precision"] = rr.pop("eval_precision")
        rr["recall"]    = rr.pop("eval_recall")
        rr["f1"]        = rr.pop("eval_f1")
        rr["roc_auc"]   = rr.pop("eval_roc_auc")
    fixed_rows.append(rr)

df = pd.DataFrame(fixed_rows)[["model","accuracy","precision","recall","f1","roc_auc"]]
df.to_csv("artifacts/metrics_comparison.csv", index=False)
with open("artifacts/metrics_comparison.json", "w", encoding="utf-8") as f:
    json.dump(fixed_rows, f, ensure_ascii=False, indent=2)

df


## 7)Gradio-инференс для CodeBERT


In [None]:
!pip -q install gradio


In [None]:
import gradio as gr
import torch


In [None]:
def predict(text: str):
    model.eval()

    inputs = tokenizer(
        text,
        return_tensors="pt",
        truncation=True,
        max_length=512,
        padding=True
    ).to(DEVICE)

    with torch.no_grad():
        logits = model(**inputs).logits
        probs = torch.softmax(logits, dim=1)[0]

    return {
        "non_vuln": round(float(probs[0]), 4),
        "vuln": round(float(probs[1]), 4),
        "decision": "VULNERABLE" if probs[1] > 0.5 else "SAFE"
    }


demo = gr.Interface(
    fn=predict,
    inputs=gr.Textbox(lines=12, label="C/C++ code"),
    outputs=gr.JSON(),
    title="CodeBERT Vuln Classifier"
)

demo.launch()


In [None]:
try:
    import gradio as gr
except Exception:
    !pip -q install gradio
    import gradio as gr

import random
import numpy as np
import torch


vuln_idx = [i for i, y in enumerate(y_test) if int(y) == 1]
safe_idx = [i for i, y in enumerate(y_test) if int(y) == 0]
random.shuffle(vuln_idx)
random.shuffle(safe_idx)

def _predict_one(text: str):
    model.to(DEVICE).eval()
    inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=512).to(DEVICE)
    with torch.no_grad():
        logits = model(**inputs).logits
        probs = torch.softmax(logits, dim=1).detach().cpu().numpy()[0]
    pred = int(np.argmax(probs))
    return {
        "non-vuln": float(probs[0]),
        "vuln": float(probs[1]),
        "__pred_label__": pred,
    }

def _format_truth(i: int):
    y = int(y_test[i])
    return f"**Ground truth:** `{y}` ({'vuln' if y==1 else 'non-vuln'})  \n**Index:** `{i}`"

def next_example(kind: str, ptr_v: int, ptr_s: int):
    if kind == "vuln":
        i = vuln_idx[ptr_v % len(vuln_idx)]
        ptr_v += 1
    else:
        i = safe_idx[ptr_s % len(safe_idx)]
        ptr_s += 1

    text = X_test[i]
    truth_md = _format_truth(i)
    return text, truth_md, ptr_v, ptr_s

def random_example():
    i = random.randrange(len(X_test))
    return X_test[i], _format_truth(i)

def predict_current(text: str):
    out = _predict_one(text)
    pred = out["__pred_label__"]

    label_dict = {"non-vuln": out["non-vuln"], "vuln": out["vuln"]}
    pred_md = f"**Prediction:** `{pred}` ({'vuln' if pred==1 else 'non-vuln'})"
    return label_dict, pred_md

with gr.Blocks() as demo:
    gr.Markdown("# CodeBERT Vuln Classifier —"
                "Кнопки **Next vuln / Next non-vuln** подставляют реальные примеры из `X_test`.\n"
                "Нажми **Predict** чтобы увидеть вероятности и предсказание.\n")

    ptr_v = gr.State(0)
    ptr_s = gr.State(0)

    with gr.Row():
        btn_v = gr.Button("Next vuln (y=1)")
        btn_s = gr.Button("Next non-vuln (y=0)")
        btn_r = gr.Button("Random example")

    text = gr.Textbox(lines=14, label="C/C++ code (from test set or your own)")
    truth = gr.Markdown("**Ground truth:** (press Next/Random)")
    with gr.Row():
        btn_p = gr.Button("Predict")
        out_label = gr.Label(label="Probabilities")
    pred_md = gr.Markdown("**Prediction:** (press Predict)")

    btn_v.click(fn=lambda pv, ps: next_example("vuln", pv, ps),
                inputs=[ptr_v, ptr_s],
                outputs=[text, truth, ptr_v, ptr_s])

    btn_s.click(fn=lambda pv, ps: next_example("safe", pv, ps),
                inputs=[ptr_v, ptr_s],
                outputs=[text, truth, ptr_v, ptr_s])

    btn_r.click(fn=random_example, inputs=[], outputs=[text, truth])

    btn_p.click(fn=predict_current, inputs=[text], outputs=[out_label, pred_md])

demo.launch(debug=False)


In [None]:
import numpy as np

pred_out = trainer.predict(hf_test)
logits = pred_out.predictions
labels = pred_out.label_ids

probs_vuln = np.exp(logits[:,1]) / (np.exp(logits[:,0]) + np.exp(logits[:,1]))
preds = (probs_vuln >= 0.5).astype(int)

tp = int(((preds == 1) & (labels == 1)).sum())
fn = int(((preds == 0) & (labels == 1)).sum())

recall_vuln = tp / (tp + fn) if (tp + fn) > 0 else float("nan")

print("TP:", tp, "FN:", fn)
print(f"Recall(vuln) = {recall_vuln:.4f}  ({recall_vuln*100:.2f}%)")
