In [3]:
%pip install torch
%pip install torch_geometric
%pip install transformers


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.0.1[0m[39;49m -> [0m[32;49m25.1.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.
Collecting torch_geometric
  Downloading torch_geometric-2.6.1-py3-none-any.whl.metadata (63 kB)
Downloading torch_geometric-2.6.1-py3-none-any.whl (1.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m13.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: torch_geometric
Successfully installed torch_geometric-2.6.1

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.0.1[0m[39;49m -> [0m[32;49m25.1.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated 

In [4]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch_geometric.data import Data, DataLoader
from torch_geometric.nn import GatedGraphConv
from transformers import AutoModel, AutoTokenizer
import numpy as np

# Check GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

Using device: cpu


In [5]:
class HybridVulnDetector(nn.Module):
    def __init__(self, ggnn_hidden_dim=128, num_edge_types=12, codebert_model="microsoft/codebert-base"):
        super().__init__()
        
        # Graph Component (GGNN)
        self.ggnn = GatedGraphConv(ggnn_hidden_dim, num_edge_types)
        
        # Sequence Component (CodeBERT)
        self.codebert = AutoModel.from_pretrained(codebert_model)
        self.tokenizer = AutoTokenizer.from_pretrained(codebert_model)
        self.codebert_dim = self.codebert.config.hidden_size
        
        # Classifier
        self.fc = nn.Sequential(
            nn.Linear(ggnn_hidden_dim + self.codebert_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 1),
            nn.Sigmoid()
        )

    def forward(self, graph_data, code_texts):
        # Process graph with GGNN
        x_graph = self.ggnn(graph_data.x, graph_data.edge_index, graph_data.edge_attr)
        
        # Process text with CodeBERT
        inputs = self.tokenizer(code_texts, return_tensors="pt", padding=True, truncation=True).to(device)
        codebert_output = self.codebert(**inputs).last_hidden_state[:, 0, :]  # CLS token
        
        # Concatenate features
        x_combined = torch.cat([x_graph, codebert_output], dim=1)
        
        # Classify
        return self.fc(x_combined)

In [6]:
def create_graph_data(node_features, edge_index, edge_types, labels):
    """Convert CPG data to PyG format."""
    edge_attr = torch.tensor(edge_types, dtype=torch.long)
    return Data(
        x=torch.tensor(node_features, dtype=torch.float32),
        edge_index=torch.tensor(edge_index, dtype=torch.long),
        edge_attr=edge_attr,
        y=torch.tensor(labels, dtype=torch.float32)
    )

# Example usage
node_features = np.random.rand(100, 32)  # 32-dim node embeddings (e.g., Word2Vec)
edge_index = [[0, 1], [1, 2], [2, 0]]   # Example edges
edge_types = [1, 2, 3]                   # Edge types (e.g., 1=AST, 2=CFG, 3=PDG)
labels = [1]                             # 1=Vulnerable, 0=Clean

graph_data = create_graph_data(node_features, edge_index, edge_types, labels).to(device)
code_texts = ["void foo() { char buf[10]; strcpy(buf, input); }"]  # Raw code

In [8]:
def train(model, dataloader, epochs=10):
    model.train()
    optimizer = optim.Adam(model.parameters(), lr=1e-4)
    criterion = nn.BCELoss()
    
    for epoch in range(epochs):
        total_loss = 0
        for batch in dataloader:
            optimizer.zero_grad()
            
            # Forward pass
            outputs = model(batch.graph_data, batch.code_texts)
            loss = criterion(outputs.squeeze(), batch.graph_data.y)
            
            # Backward pass (with mixed precision)
            with torch.cuda.amp.autocast():
                loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
        
        print(f"Epoch {epoch+1}, Loss: {total_loss / len(dataloader):.4f}")

In [9]:
from torch_geometric.data import Batch

class VulnDataset(torch.utils.data.Dataset):
    def __init__(self, graph_list, text_list, labels):
        self.graph_data = graph_list
        self.code_texts = text_list
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return {
            "graph_data": self.graph_data[idx],
            "code_texts": self.code_texts[idx],
            "labels": self.labels[idx]
        }

def collate_fn(batch):
    return Batch.from_data_list([item["graph_data"] for item in batch]), \
           [item["code_texts"] for item in batch], \
           torch.tensor([item["labels"] for item in batch])

# Example dataset
dataset = VulnDataset([graph_data], code_texts, labels)
dataloader = torch.utils.data.DataLoader(dataset, batch_size=4, shuffle=True, collate_fn=collate_fn)

In [10]:
# Initialize
model = HybridVulnDetector().to(device)

# Train
train(model, dataloader, epochs=10)

# Save model
torch.save(model.state_dict(), "hybrid_vuln_detector.pt")

config.json:   0%|          | 0.00/498 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/499M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/25.0 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/499M [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/150 [00:00<?, ?B/s]

AttributeError: 'tuple' object has no attribute 'graph_data'