In [1]:
import os
import json
import torch
from torch_geometric.data import Data
from sklearn.model_selection import train_test_split

In [2]:
def load_ast_files(folder_path):
    ast_files = []
    for filename in os.listdir(folder_path):
        if filename.endswith(".json"):
            file_path = os.path.join(folder_path, filename)
            try:
                with open(file_path) as f:
                    print(f"Loading file: {file_path}")
                    ast_data = json.load(f)
                    ast_files.append(ast_data)
            except Exception as e:
                print(f"Error loading file: {file_path}")
                os.remove(file_path)
    return ast_files

# Load ASTs from the two folders
vulnerable_asts = load_ast_files('./AST/Re-entrancy-modified')
non_vulnerable_asts = load_ast_files('./AST/Verified-portion1')

Loading file: ./AST/Re-entrancy-modified\0x039963c07e62eb7af39eeeb871cb2de9cbc65d78_ast.json
Loading file: ./AST/Re-entrancy-modified\0x0459ebad0ba09901fda1441ee72e6cb664257f61_ast.json
Loading file: ./AST/Re-entrancy-modified\0x05f49e3e0a27efa05d60c19cd8f0ecc951d3717e_ast.json
Loading file: ./AST/Re-entrancy-modified\0x0da76de0916ef2da3c58a97e4d09d501c56a9f15_ast.json
Loading file: ./AST/Re-entrancy-modified\0x23a91059fdc9579a9fbd0edc5f2ea0bfdb70deb4_ast.json
Loading file: ./AST/Re-entrancy-modified\0x2a98d8fc14b31b346ff6c56dc2a252c434f628f2_ast.json
Loading file: ./AST/Re-entrancy-modified\0x3023868433f6086cd8ce0c4083fe2e11b37ce0b7_ast.json
Loading file: ./AST/Re-entrancy-modified\0x4c67b3db1d4474c0ebb2db8bec4e345526d9e2fd_ast.json
Loading file: ./AST/Re-entrancy-modified\0x4e73b32ed6c35f570686b89848e5f39f20ecc106_ast.json
Loading file: ./AST/Re-entrancy-modified\0x561eac93c92360949ab1f1403323e6db345cbf31_ast.json
Loading file: ./AST/Re-entrancy-modified\0x7a8721a9d64c74da899424c1b52

In [3]:
def extract_nodes_edges(ast):
    nodes = []
    edges = []

    def traverse(node, parent_index=None):
        node_index = len(nodes)
        nodes.append(node)

        if parent_index is not None:
            edges.append((parent_index, node_index))

        for key, value in node.items():
            if isinstance(value, dict):
                traverse(value, node_index)
            elif isinstance(value, list):
                for item in value:
                    if isinstance(item, dict):
                        traverse(item, node_index)
    
    traverse(ast)
    return nodes, edges

def create_node_features(nodes):
    features = []
    for node in nodes:
        node_type = node.get('type', 'Unknown')
        feature_vector = one_hot_encode_node_type(node_type)
        features.append(feature_vector)
    return torch.tensor(features, dtype=torch.float)

def one_hot_encode_node_type(node_type):
    types = ['PragmaDirective', 'ContractDefinition', 'FunctionDefinition', 'VariableDeclaration', 'BinaryOperation', 'Unknown']
    vector = [0] * len(types)
    if node_type in types:
        vector[types.index(node_type)] = 1
    return vector

def create_edge_index(edges):
    return torch.tensor(edges, dtype=torch.long).t().contiguous()

def process_asts(asts):
    data_list = []
    for ast in asts:
        nodes, edges = extract_nodes_edges(ast)
        node_features = create_node_features(nodes)
        edge_index = create_edge_index(edges)
        graph_data = Data(x=node_features, edge_index=edge_index)
        data_list.append(graph_data)
    return data_list

# Process ASTs from both folders
vulnerable_graphs = process_asts(vulnerable_asts)
non_vulnerable_graphs = process_asts(non_vulnerable_asts)


In [4]:
# Assign labels
vulnerable_labels = [1] * len(vulnerable_graphs)
non_vulnerable_labels = [0] * len(non_vulnerable_graphs)

# Combine the graphs and labels
graphs = vulnerable_graphs + non_vulnerable_graphs
labels = vulnerable_labels + non_vulnerable_labels


In [5]:
# Split the data into training and testing sets
train_graphs, test_graphs, train_labels, test_labels = train_test_split(graphs, labels, test_size=0.2, random_state=42)

# Convert labels to tensors
train_labels = torch.tensor(train_labels, dtype=torch.long)
test_labels = torch.tensor(test_labels, dtype=torch.long)

# Example: Print the number of training and testing samples
print(f'Training samples: {len(train_graphs)}')
print(f'Testing samples: {len(test_graphs)}')


Training samples: 49433
Testing samples: 12359


# Training

In [9]:
import torch
import torch.nn.functional as F
from torch_geometric.nn import GCNConv, global_mean_pool

class GNN(torch.nn.Module):
    def __init__(self, num_node_features, hidden_dim, num_classes):
        super(GNN, self).__init__()
        self.conv1 = GCNConv(num_node_features, hidden_dim)
        self.conv2 = GCNConv(hidden_dim, hidden_dim)
        self.fc = torch.nn.Linear(hidden_dim, num_classes)

    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch

        # GCN layers
        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = self.conv2(x, edge_index)
        x = F.relu(x)

        # Global mean pooling (to get a fixed size output per graph)
        x = global_mean_pool(x, batch)

        # Fully connected layer
        x = self.fc(x)
        return F.log_softmax(x, dim=1)


In [10]:
from torch_geometric.loader import DataLoader

# Combine the train graphs and labels into a dataset
train_dataset = list(zip(train_graphs, train_labels))
test_dataset = list(zip(test_graphs, test_labels))

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)


In [11]:
import torch.optim as optim

# Set up model, loss, and optimizer
num_node_features = train_graphs[0].x.size(1)
hidden_dim = 64
num_classes = 2

model = GNN(num_node_features, hidden_dim, num_classes)
optimizer = optim.Adam(model.parameters(), lr=0.01)
criterion = torch.nn.CrossEntropyLoss()

def train():
    model.train()
    for data, label in train_loader:
        optimizer.zero_grad()
        out = model(data)
        loss = criterion(out, label)
        loss.backward()
        optimizer.step()

def test(loader):
    model.eval()
    correct = 0
    for data, label in loader:
        out = model(data)
        pred = out.argmax(dim=1)
        correct += (pred == label).sum().item()
    return correct / len(loader.dataset)

# Training loop
epochs = 20
for epoch in range(epochs):
    train()
    train_acc = test(train_loader)
    test_acc = test(test_loader)
    print(f'Epoch: {epoch+1}, Train Acc: {train_acc:.4f}, Test Acc: {test_acc:.4f}')


KeyboardInterrupt: 

# Save Model

In [9]:
# Define the path where you want to save the model
model_save_path = "gnn_model.pth"

# Save the state dictionary
torch.save(model.state_dict(), model_save_path)

print(f"Model saved to {model_save_path}")


Model saved to gnn_model.pth


# Evaluate Model

In [12]:
import torch

# Initialize a new model instance
loaded_model = GNN(num_node_features, hidden_dim, num_classes)

# Load the saved state dictionary
model_save_path = "gnn_model.pth"  # Ensure this path matches where your model is saved
loaded_model.load_state_dict(torch.load(model_save_path))

# Set the model to evaluation mode
loaded_model.eval()

# Evaluate the loaded model on the test dataset
def test_loaded_model(loader, model):
    model.eval()  # Set the model to evaluation mode
    correct = 0
    for data, label in loader:
        out = model(data)
        pred = out.argmax(dim=1)
        correct += (pred == label).sum().item()
    return correct / len(loader.dataset)

# Test the model
test_accuracy = test_loaded_model(test_loader, loaded_model)

print(f'Test Accuracy of the loaded model: {test_accuracy:.4f}')


Test Accuracy of the loaded model: 0.8534


In [13]:
from sklearn.metrics import classification_report

def evaluate_model(loader, model):
    model.eval()
    all_preds = []
    all_labels = []
    for data, label in loader:
        out = model(data)
        preds = out.argmax(dim=1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(label.cpu().numpy())

    return classification_report(all_labels, all_preds, target_names=['Non-vulnerable', 'Vulnerable'])

# Print classification report
print(evaluate_model(test_loader, loaded_model))


                precision    recall  f1-score   support

Non-vulnerable       0.86      0.95      0.91      9230
    Vulnerable       0.81      0.56      0.66      3129

      accuracy                           0.85     12359
     macro avg       0.83      0.75      0.78     12359
  weighted avg       0.85      0.85      0.84     12359

