In [7]:
# HGNN-based Spam/Ham Classifier using PyTorch Geometric

import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import pickle
from torch_geometric.nn import GCNConv
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import confusion_matrix, classification_report

# Load dataset
df = pd.read_csv("spam_detector/final_spam.csv")
df = df[df['Labels'].isin(['ham', 'spam'])].copy()
df.reset_index(drop=True, inplace=True)

# Load vectorizer and label encoder
with open("vectorizer_fixed.pkl", "rb") as f:
    vectorizer = pickle.load(f)

label_enc = LabelEncoder()
y = label_enc.fit_transform(df['Labels'])
X = vectorizer.transform(df['Message']).toarray()

# Train/test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

X_train_tensor = torch.tensor(X_train, dtype=torch.float)
X_test_tensor = torch.tensor(X_test, dtype=torch.float)
y_train_tensor = torch.tensor(y_train, dtype=torch.long)
y_test_tensor = torch.tensor(y_test, dtype=torch.long)

# Hypergraph builder
def build_hypergraph(X):
    num_samples, num_features = X.shape
    edge_index = [[], []]

    for sample_idx in range(num_samples):
        active_features = torch.nonzero(X[sample_idx]).squeeze().tolist()
        if isinstance(active_features, int):
            active_features = [active_features]
        for feat in active_features:
            edge_index[0].append(feat)
            edge_index[1].append(num_features + sample_idx)

    edge_index = torch.tensor(edge_index, dtype=torch.long)

    # Create node features for both feature and sample nodes
    x_feat = torch.eye(num_features)
    x_samp = X
    x_all = torch.cat([x_feat, x_samp], dim=0)

    return x_all, edge_index

X_all, edge_index = build_hypergraph(X_train_tensor)

# Define HGNN model
class HGNN(nn.Module):
    def __init__(self, in_channels, hidden_channels, out_channels):
        super(HGNN, self).__init__()
        self.lin1 = nn.Linear(in_channels, hidden_channels)
        self.gcn1 = GCNConv(hidden_channels, hidden_channels)
        self.lin2 = nn.Linear(hidden_channels, out_channels)

    def forward(self, x, edge_index):
        x = self.lin1(x)
        x = F.relu(x)
        x = self.gcn1(x, edge_index)
        x = self.lin2(x)
        return x

# Training setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = HGNN(in_channels=X_all.size(1), hidden_channels=64, out_channels=2).to(device)
X_all = X_all.to(device)
edge_index = edge_index.to(device)
y_train_tensor = y_train_tensor.to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
criterion = nn.CrossEntropyLoss()

num_features = vectorizer.max_features

# Training loop
for epoch in range(20):
    model.train()
    optimizer.zero_grad()
    out = model(X_all, edge_index)
    out_samples = out[num_features:]  # outputs only for sample nodes
    loss = criterion(out_samples, y_train_tensor)
    loss.backward()
    optimizer.step()
    pred = out_samples.argmax(dim=1)
    acc = (pred == y_train_tensor).sum().item() / y_train_tensor.size(0)
    print(f"Epoch {epoch+1}: Loss = {loss.item():.4f}, Accuracy = {acc:.4f}")

# Evaluation
model.eval()
with torch.no_grad():
    # Build test hypergraph
    X_all_test, edge_index_test = build_hypergraph(X_test_tensor)
    X_all_test = X_all_test.to(device)
    edge_index_test = edge_index_test.to(device)
    y_test_tensor = y_test_tensor.to(device)

    out_test = model(X_all_test, edge_index_test)
    out_samples_test = out_test[num_features:]
    pred_test = out_samples_test.argmax(dim=1)
    acc_test = (pred_test == y_test_tensor).sum().item() / y_test_tensor.size(0)
    print(f"\nTest Accuracy: {acc_test:.4f}")

    # Confusion Matrix & Report
    print("\nConfusion Matrix:")
    print(confusion_matrix(y_test_tensor.cpu(), pred_test.cpu()))
    print("\nClassification Report:")
    print(classification_report(y_test_tensor.cpu(), pred_test.cpu(), target_names=label_enc.classes_))

# Save model and encoder
torch.save(model.state_dict(), "hgnn_spam_classifier.pt")
with open("label_encoder.pkl", "wb") as f:
    pickle.dump(label_enc, f)

print("\nModel and label encoder saved successfully.")


Epoch 1: Loss = 0.6953, Accuracy = 0.3977
Epoch 2: Loss = 0.5791, Accuracy = 0.8341
Epoch 3: Loss = 0.4859, Accuracy = 0.8341
Epoch 4: Loss = 0.4447, Accuracy = 0.8341
Epoch 5: Loss = 0.4464, Accuracy = 0.8341
Epoch 6: Loss = 0.4148, Accuracy = 0.8341
Epoch 7: Loss = 0.3732, Accuracy = 0.8341
Epoch 8: Loss = 0.3433, Accuracy = 0.8341
Epoch 9: Loss = 0.3246, Accuracy = 0.8341
Epoch 10: Loss = 0.3052, Accuracy = 0.8440
Epoch 11: Loss = 0.2785, Accuracy = 0.8673
Epoch 12: Loss = 0.2461, Accuracy = 0.9030
Epoch 13: Loss = 0.2143, Accuracy = 0.9210
Epoch 14: Loss = 0.1908, Accuracy = 0.9267
Epoch 15: Loss = 0.1809, Accuracy = 0.9287
Epoch 16: Loss = 0.1824, Accuracy = 0.9307
Epoch 17: Loss = 0.1860, Accuracy = 0.9309
Epoch 18: Loss = 0.1866, Accuracy = 0.9309
Epoch 19: Loss = 0.1854, Accuracy = 0.9299
Epoch 20: Loss = 0.1833, Accuracy = 0.9301

Test Accuracy: 0.9231

Confusion Matrix:
[[999  43]
 [ 52 141]]

Classification Report:
              precision    recall  f1-score   support

     