In [None]:
import sys
sys.path.append('../')
from dataset.cpg_dataset import InMemoryCPGDataset
import torch
from torch_geometric.nn import DataParallel
from torch_geometric.loader import DataListLoader
from sklearn.metrics import f1_score, accuracy_score, precision_score, recall_score, roc_auc_score
from vulgnn import VulGAT, VulDeeperGCN, resume
import pandas as pd
from tqdm import tqdm

In [None]:
# Load processed graph datasets
train_dataset = InMemoryCPGDataset(root='../dataset/cpg_dataset/')
val_dataset = InMemoryCPGDataset(root='../dataset/cpg_dataset/', val=True)
test_dataset = InMemoryCPGDataset(root='../dataset/cpg_dataset/', test=True)

In [None]:
# Load raw datasets
train_df = pd.read_pickle('../dataset/cpg_dataset/raw/big_vul_ir_cpg_train.zip')
test_df = pd.read_pickle('../dataset/cpg_dataset/raw/big_vul_ir_cpg_test.zip')
val_df = pd.read_pickle('../dataset/cpg_dataset/raw/big_vul_ir_cpg_val.zip')

In [None]:
# Load the saved model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model_filename = 'vuldeepergcn_32.pt'
model = resume(model_filename)
model = DataParallel(model)
model.to(device)

In [None]:
# Function for making predictions
@torch.no_grad()
def pred(model: torch.nn.Module, dataset: DataListLoader):
    model.eval()
    y_pred = []
    for data in tqdm(dataset):
        out = model(data)
        y_pred.append(out.argmax(dim=1).cpu().detach().numpy())
    return [int(y) for y in y_pred]

In [None]:
# Predict on train, test, val sets
y_train_pred = pred(model, DataListLoader(train_dataset))
y_test_pred = pred(model, DataListLoader(test_dataset))
y_val_pred = pred(model, DataListLoader(val_dataset))

In [None]:
# Add the results to the raw datasets
train_df['pred'] = y_train_pred
test_df['pred'] = y_test_pred
val_df['pred'] = y_val_pred
result_df = pd.concat([train_df, test_df, val_df], ignore_index=True).drop_duplicates(subset='old_id')

In [None]:
def calculate_metrics(y_pred, y_true):
    f1 = f1_score(y_true, y_pred)
    accuracy = accuracy_score(y_true, y_pred)
    precision = precision_score(y_true, y_pred)
    recall = recall_score(y_true, y_pred)
    roc = None
    try:
        roc = roc_auc_score(y_true, y_pred)
    except:
        pass
    return f1, accuracy, precision, recall, roc

# Calculate the metrics: F1-score, accuracy, precision, recall, and ROC-AUC
metrics = {}
train_f1, train_accuracy, train_precision, train_recall, train_roc = calculate_metrics(train_df['pred'].tolist(), train_df['vul'].tolist())
val_f1, val_accuracy, val_precision, val_recall, val_roc = calculate_metrics(val_df['pred'].tolist(), val_df['vul'].tolist())
test_f1, test_accuracy, test_precision, test_recall, test_roc = calculate_metrics(test_df['pred'].tolist(), test_df['vul'].tolist())
all_f1, all_accuracy, all_precision, all_recall, all_roc = calculate_metrics(result_df['pred'].tolist(), result_df['vul'].tolist())
metrics['F1-Score'] = [train_f1, val_f1, test_f1, all_f1]
metrics['Accuracy'] = [train_accuracy, val_accuracy, test_accuracy, all_accuracy]
metrics['Precision'] = [train_precision, val_precision, test_precision, all_precision]
metrics['Recall'] = [train_recall, val_recall, test_recall, all_recall]
metrics['ROC-AUC'] = [train_roc, val_roc, test_roc, all_roc]
pd.DataFrame(metrics, index=['Train', 'Val', 'Test', 'All'])

In [None]:
# Load the full dataset
big_vul_df = pd.read_pickle('../dataset/big_vul_preprocessed.zip')

In [None]:
# Make in an inner join on the 'old_id' column (the original id in Big-Vul)
pred_df = big_vul_df[['old_id', 'cwe_id', 'vulnerability_classification']].join(result_df[['old_id', 'vul', 'pred']], on='old_id', how='inner', lsuffix='_raw', rsuffix='_pred')
# Find the top correctly predicted vulnerability types
pred_df[pred_df['vul'] == pred_df['pred'] and pred_df['vul'] == 1][['cwe_id', 'vulnerability_classification']].value_counts()[:10]