In [None]:
# Dependencies
!pip install torch_geometric
!pip install rdkit



In [None]:
# Imports
import torch
from torch_geometric.data import DataLoader
from torch_geometric.nn import GCNConv
import torch.nn.functional as F
import pandas as pd
import random as rd
import os
from rdkit import Chem
from rdkit.Chem import rdmolops
from google.colab import drive
from torch_geometric.loader import DataLoader
from torch_geometric.data import Data


In [None]:
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## Load the data
This code cell connects to my previously connected private google drive and loads 1000 benign function call graphs and 1000 malicious ones from the artemis family, labels them accordingly and adds them to a list

In [None]:
def read_edgelist_to_graph(filepath, label):
    try:
        edge_index = []
        with open(filepath, 'r') as f:
            for line in f:
                if line.startswith('#'):
                    continue
                source, target = map(int, line.strip().split())
                edge_index.append([source, target])

        edge_index = torch.tensor(edge_index, dtype=torch.long).t().contiguous()
        x = torch.ones(edge_index.max().item() + 1, 1, dtype=torch.float)
        data = Data(x=x, edge_index=edge_index, y=torch.tensor([label]))
        return data
    except Exception as e:
        print(f"Error reading file {filepath}: {e}")
        return None

benign_dir = '/content/drive/My Drive/malnet/benign/'
artemis_dir = '/content/drive/My Drive/malnet/artemis/'

data_list = []

# Load graphs from the 'benign' directory
for filename in os.listdir(benign_dir):
    if filename.endswith(".edgelist"):
        filepath = os.path.join(benign_dir, filename)
        # label 0 is non malicious
        graph_data = read_edgelist_to_graph(filepath, 0)
        if graph_data:
            data_list.append(graph_data)

# Load graphs from the 'artemis' directory
for filename in os.listdir(artemis_dir):
    if filename.endswith(".edgelist"):
        filepath = os.path.join(artemis_dir, filename)
        # label 1 is malicious
        graph_data = read_edgelist_to_graph(filepath, 1)
        if graph_data:
            data_list.append(graph_data)

print(f"Loaded {len(data_list)} graphs.")

Loaded 2000 graphs.


## Define a data loader

In [None]:
# Shuffle the data list
rd.shuffle(data_list)

# Define the split ratios
train_ratio = 0.8
val_ratio = 0.1
test_ratio = 0.1

# Calculate the split indices
train_size = int(len(data_list) * train_ratio)
val_size = int(len(data_list) * val_ratio)
test_size = len(data_list) - train_size - val_size

# Split the data
train_data = data_list[:train_size]
val_data = data_list[train_size:train_size + val_size]
test_data = data_list[train_size + val_size:]

# Create data loaders
train_loader = DataLoader(train_data, batch_size=32, shuffle=True)
val_loader = DataLoader(val_data, batch_size=32, shuffle=False)
test_loader = DataLoader(test_data, batch_size=32, shuffle=False)

print(f"Train size: {len(train_data)}")
print(f"Validation size: {len(val_data)}")
print(f"Test size: {len(test_data)}")

Train size: 1600
Validation size: 200
Test size: 200


## Define a GNN model

In [None]:
class GCN(torch.nn.Module):
    def __init__(self, hidden_channels):
        super(GCN, self).__init__()
        torch.manual_seed(12345)
        self.conv1 = GCNConv(1, hidden_channels)
        self.conv2 = GCNConv(hidden_channels, hidden_channels)
        self.conv3 = GCNConv(hidden_channels, hidden_channels)
        self.lin = torch.nn.Linear(hidden_channels, 2)

    def forward(self, x, edge_index, batch):
        x = self.conv1(x, edge_index)
        x = x.relu()
        x = self.conv2(x, edge_index)
        x = x.relu()
        x = self.conv3(x, edge_index)

        x = torch.nn.functional.global_mean_pool(x, batch)
        x = self.lin(x)
        return x

## Train the Model

## Validation