## Import Data, Define, Train and Test the Model

In [None]:
from torch_geometric.data import Dataset, DataLoader
import torch
import os
from collections import defaultdict

class MalwareGraphDataset(Dataset):
    def __init__(self, root_dir, label_dict, transform=None, pre_transform=None):
        """
        Custom dataset to load labeled graph data from multiple folders.
        
        Parameters:
        - root_dir: str, root directory containing subdirectories for each label.
        - label_dict: dict, mapping of subdirectory names to numeric labels.
        """
        super(MalwareGraphDataset, self).__init__(root=root_dir, transform=transform, pre_transform=pre_transform)
        self.root_dir = root_dir
        self.label_dict = label_dict
        self.graph_files = []
        
        # Collect all .pt files with assigned labels
        for folder_name, label in label_dict.items():
            folder_path = os.path.join(root_dir, folder_name)
            if os.path.isdir(folder_path):
                for file_name in os.listdir(folder_path):
                    if file_name.endswith(".pt"):
                        self.graph_files.append((os.path.join(folder_path, file_name), label))

    def len(self):
        return len(self.graph_files)

    def get(self, idx):
        # Load graph data and assign label
        file_path, label = self.graph_files[idx]
        data = torch.load(file_path)
        data.y = torch.tensor([label], dtype=torch.long)  # Set the label
        return data

    def count_samples_per_class(self):
        """
        Counts the number of samples per class based on the label dictionary.
        """
        class_counts = defaultdict(int)
        for _, label in self.graph_files:
            class_counts[label] += 1

        # Print the count for each class
        print("Number of data points per class:")
        for label, count in class_counts.items():
            class_name = list(self.label_dict.keys())[list(self.label_dict.values()).index(label)]
            print(f"{class_name} (Label {label}): {count} samples")

# Define the label mapping based on folder names
label_dict = {
    "Benign": 0,
    "Adware": 1,
    "Banking": 2,
    "Riskware": 3,
    "Smsware": 4
}

# Example usage
root_directory = "/home/belief/Desktop/MalwareDetection/Dynamic 1/Graphs"  # Root directory containing all folders
dataset = MalwareGraphDataset(root_directory, label_dict)

# Count samples per class
dataset.count_samples_per_class()


In [None]:
from torch_geometric.data import DataLoader
from sklearn.model_selection import train_test_split
import torch
from collections import defaultdict

# Get indices of each class
label_to_indices = {label: [] for label in label_dict.values()}

for idx, data in enumerate(dataset):
    label = data.y.item()  # Extract label
    label_to_indices[label].append(idx)

# Determine the minimum class size to balance all classes
min_class_size = min(len(indices) for indices in label_to_indices.values())

# Balance the dataset by sampling min_class_size samples from each class
balanced_indices = []
for indices in label_to_indices.values():
    balanced_indices.extend(indices[:min_class_size])

# Split balanced indices into 80% for training and 20% for testing
train_indices, test_indices = train_test_split(
    balanced_indices, test_size=0.2, stratify=[dataset[i].y.item() for i in balanced_indices], random_state=42
)

# Function to count samples per class
def count_samples_per_class(indices, dataset, label_dict):
    class_counts = defaultdict(int)
    for idx in indices:
        label = dataset[idx].y.item()
        class_counts[label] += 1

    print("Number of data points per class:")
    for label, count in class_counts.items():
        class_name = list(label_dict.keys())[list(label_dict.values()).index(label)]
        print(f"{class_name} (Label {label}): {count} samples")

# Count samples per class in the training set
print("Training Set (entire dataset):")
count_samples_per_class(train_indices, dataset, label_dict)

# Count samples per class in the testing set (20% of dataset)
print("\nTesting Set (20% of dataset):")
count_samples_per_class(test_indices, dataset, label_dict)

# Create train and test subsets
train_dataset = torch.utils.data.Subset(dataset, train_indices)
test_dataset = torch.utils.data.Subset(dataset, test_indices)

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)


In [None]:
# Load a sample data point from the dataset
sample_data = dataset[31]  # Access the first graph in your dataset

# Get the input dimension by checking the number of columns in data.x
input_dim = sample_data.x.size(1)
print(f"Input dimension (number of node features): {input_dim}")


In [None]:
import torch
import torch.nn.functional as F
from torch_geometric.nn import GCNConv, global_mean_pool

class GNNClassifier(torch.nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(GNNClassifier, self).__init__()
        self.conv1 = GCNConv(input_dim, hidden_dim)
        self.conv2 = GCNConv(hidden_dim, hidden_dim)
        self.fc1 = torch.nn.Linear(hidden_dim, output_dim)

    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch
        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = self.conv2(x, edge_index)
        x = F.relu(x)
        x = global_mean_pool(x, batch)  # Graph-level pooling
        x = self.fc1(x)
        return F.log_softmax(x, dim=1)

# Define model parameters
input_dim = 1  # Node feature size (adjust based on your actual feature size)
hidden_dim = 64
output_dim = 5  # Number of classes (benign, adware, etc.)

model = GNNClassifier(input_dim, hidden_dim, output_dim)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
criterion = torch.nn.CrossEntropyLoss()


In [None]:
def train(model, loader, optimizer, criterion, device):
    model.train()
    total_loss = 0
    for data in loader:
        data = data.to(device)
        out = model(data)
        
        # Debugging: print batch sizes
        print(f"Output batch size: {out.size(0)}, Target batch size: {data.y.size(0)}")
        
        if out.size(0) != data.y.size(0):
            continue  # Skip mismatched batches as before
        
        optimizer.zero_grad()
        loss = criterion(out, data.y)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        
    return total_loss / len(loader)


In [None]:
def test(model, loader, device):
    model.eval()
    correct = 0
    for data in loader:
        data = data.to(device)
        out = model(data)
        pred = out.argmax(dim=1)
        correct += (pred == data.y).sum().item()
    return correct / len(loader.dataset)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

epochs = 1
for epoch in range(epochs):
    train_loss = train(model, train_loader, optimizer, criterion, device)
    test_accuracy = test(model, test_loader, device)
    print(f'Epoch {epoch+1}, Loss: {train_loss:.4f}, Test Accuracy: {test_accuracy:.4f}')


In [None]:
from sklearn.metrics import classification_report

def evaluate_model(model, loader, device):
    model.eval()
    all_preds = []
    all_labels = []
    
    for data in loader:
        data = data.to(device)
        out = model(data)
        preds = out.argmax(dim=1).cpu().numpy()
        labels = data.y.cpu().numpy()
        all_preds.extend(preds)
        all_labels.extend(labels)
    
    print("Classification Report:")
    print(classification_report(all_labels, all_preds, target_names=label_dict.keys()))

# Run evaluation
evaluate_model(model, train_loader, device)


In [None]:
# Save the model's state_dict (recommended method)
torch.save(model.state_dict(), "gnn_model.pth")
print("Model saved as gnn_model.pth")


In [None]:
# Create a new instance of the model
model = GNNClassifier(input_dim=input_dim, hidden_dim=64, output_dim=5)

# Load the saved state_dict into the model
model.load_state_dict(torch.load("gnn_model.pth"))
print("Model loaded from gnn_model.pth")

# If you plan to use the model for inference, set it to evaluation mode
model.eval()
