In [1]:
import dgl
from dgl import load_graphs

In [2]:
# graph check

codebertg = "/home/wp3/confmodel/VFunDet/sourcescripts/storage/cache/Graph/dataset_svuldet_codebert_pdg+raw/18483"
sbertg = "/home/wp3/confmodel/VFunDet/sourcescripts/storage/cache/Graph/dataset_svuldet_sbert_pdg+raw/18483"
wordvg = "/home/wp3/confmodel/VFunDet/sourcescripts/storage/cache/Graph/dataset_svuldet_word2vec_pdg+raw/18482"


In [8]:
gcb = load_graphs(codebertg)[0][0]
# gcb.ndata

In [6]:
# gsb = load_graphs(sbertg)[0][0]
# gsb.ndata

In [7]:
# gwv = load_graphs(wordvg)[0][0]
# gwv.ndata

In [4]:
def print_graph_data(g):
    """Print all node and edge data names from a DGL graph."""
    print("Node Data:")
    for key in g.ndata:
        print(f" - {key}: {g.ndata[key].shape}")

    print("\nEdge Data:")
    for key in g.edata:
        print(f" - {key}: {g.edata[key].shape}")

In [9]:
print_graph_data(gcb)

Node Data:
 - _FVULN: torch.Size([2])
 - _VULN: torch.Size([2])
 - _LINE: torch.Size([2])
 - _RANDFEAT: torch.Size([2, 100])
 - _FUNC_EMB: torch.Size([2, 768])
 - _CODEBERT: torch.Size([2, 768])

Edge Data:
 - _ETYPE: torch.Size([6])


In [13]:
import dgl
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, roc_auc_score, matthews_corrcoef
import matplotlib.pyplot as plt
import pandas as pd
import os

# Dataset class
class GraphFunctionDataset(Dataset):
    def __init__(self, df, graph_dir, split='train'):
        self.df = df[df['label'] == split]
        self.graph_dir = graph_dir
        self.graph_ids = self.df['id'].tolist()

    def __len__(self):
        return len(self.graph_ids)

    def __getitem__(self, idx):
        graph_id = self.graph_ids[idx]
        graph_path = os.path.join(self.graph_dir, f"{graph_id}")
        g = dgl.load_graphs(graph_path)[0][0]
        return g

# GAT Model
class GATClassifier(nn.Module):
    def __init__(self, in_feats, hidden_feats, num_heads, num_classes, dropout=0.3):
        super().__init__()
        self.gat1 = dgl.nn.GATConv(in_feats, hidden_feats, num_heads, feat_drop=dropout, attn_drop=dropout)
        self.gat2 = dgl.nn.GATConv(hidden_feats * num_heads, hidden_feats, 1, feat_drop=dropout, attn_drop=dropout)
        self.classifier = nn.Sequential(
            nn.Linear(hidden_feats, hidden_feats // 2),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_feats // 2, num_classes)
        )

    def forward(self, g):
        h = torch.cat([
            g.ndata['_RANDFEAT'],
            g.ndata['_FUNC_EMB'],
            g.ndata['_CODEBERT']
        ], dim=1)
        h = self.gat1(g, h)
        h = F.elu(h.flatten(1))
        h = self.gat2(g, h)
        h = h.squeeze(1)
        out = self.classifier(h)
        return out

# Training and Evaluation Functions
def evaluate(model, dataloader, device):
    model.eval()
    y_true, y_pred, y_prob = [], [], []
    with torch.no_grad():
        for g in dataloader:
            g = g.to(device)
            logits = model(g)
            labels = g.ndata['_VULN'].long().to(device)
            probs = F.softmax(logits, dim=1)[:, 1]
            preds = torch.argmax(logits, dim=1)
            
            y_true.extend(labels.cpu().numpy())
            y_pred.extend(preds.cpu().numpy())
            y_prob.extend(probs.cpu().numpy())

    metrics = {
        'accuracy': accuracy_score(y_true, y_pred),
        'precision': precision_score(y_true, y_pred, zero_division=0),
        'recall': recall_score(y_true, y_pred, zero_division=0),
        'f1': f1_score(y_true, y_pred, zero_division=0),
        'roc_auc': roc_auc_score(y_true, y_prob),
        'mcc': matthews_corrcoef(y_true, y_pred)
    }
    return metrics

def train_model(df, graph_dir, in_feats=1636, hidden_feats=128, num_heads=4, num_classes=2, epochs=30, lr=1e-4, batch_size=1):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    train_set = GraphFunctionDataset(df, graph_dir, split='train')
    val_set = GraphFunctionDataset(df, graph_dir, split='val')
    test_set = GraphFunctionDataset(df, graph_dir, split='test')

    train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_set, batch_size=batch_size)
    test_loader = DataLoader(test_set, batch_size=batch_size)

    model = GATClassifier(in_feats, hidden_feats, num_heads, num_classes).to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    loss_fn = nn.CrossEntropyLoss()

    history = {'train_loss': [], 'val_acc': []}

    for epoch in range(epochs):
        model.train()
        total_loss = 0
        for g in train_loader:
            g = g.to(device)
            logits = model(g)
            labels = g.ndata['_VULN'].long().to(device)

            loss = loss_fn(logits, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        val_metrics = evaluate(model, val_loader, device)
        avg_loss = total_loss / len(train_loader)
        history['train_loss'].append(avg_loss)
        history['val_acc'].append(val_metrics['accuracy'])
        print(f"Epoch {epoch+1}/{epochs} - Loss: {avg_loss:.4f} - Val Acc: {val_metrics['accuracy']:.4f}")

    # Final test evaluation
    test_metrics = evaluate(model, test_loader, device)
    print("\nTest Metrics:")
    for key, value in test_metrics.items():
        print(f"{key.capitalize()}: {value:.4f}")

    # Plot training loss and validation accuracy
    plt.figure(figsize=(12, 5))
    plt.subplot(1, 2, 1)
    plt.plot(history['train_loss'], label='Train Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training Loss')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(history['val_acc'], label='Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.title('Validation Accuracy')
    plt.legend()

    plt.tight_layout()
    plt.show()

    return model, history


In [15]:

df = pd.read_csv("/home/wp3/confmodel/VFunDet/sourcescripts/storage/cache/dataset/dataset_metadata.csv")
print(df.columns)
graph_dir = "/home/wp3/confmodel/VFunDet/sourcescripts/storage/cache/Graph/dataset_svuldet_codebert_pdg+raw"
train_model(df, graph_dir, in_feats=1636, hidden_feats=128, num_heads=4, num_classes=2, 
            epochs=30, lr=1e-4, batch_size=1)


Index(['id', 'commit_ID', 'CVE-ID', 'CWE-ID', 'project', 'func_before',
       'func_after', 'diff_lines', 'vul', 'Domain_decsriptions',
       'Description_Mitre', 'P Language', 'Sample Code', 'Link-sample',
       'dataset', 'info', 'diff', 'project.1'],
      dtype='object')


KeyError: 'label'