In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import json
from sklearn.metrics import fbeta_score, classification_report, confusion_matrix, precision_recall_curve
from sklearn.model_selection import StratifiedShuffleSplit
from torch_geometric.nn import GCNConv

In [None]:
data = torch.load("../data/graphs/alemari_graph.pt")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
data = data.to(device)

In [None]:
class GCN(nn.Module):
    def __init__(self, in_channels, hidden_channels, out_channels, num_layers=2, dropout=0.5, use_batchnorm=False):
        super(GCN, self).__init__()
        self.layers = nn.ModuleList()
        self.bns = nn.ModuleList() if use_batchnorm else None
        self.use_batchnorm = use_batchnorm
        self.dropout = nn.Dropout(dropout)

        self.layers.append(GCNConv(in_channels, hidden_channels))
        if use_batchnorm:
            self.bns.append(nn.BatchNorm1d(hidden_channels))

        for _ in range(num_layers - 2):
            self.layers.append(GCNConv(hidden_channels, hidden_channels))
            if use_batchnorm:
                self.bns.append(nn.BatchNorm1d(hidden_channels))

        self.layers.append(GCNConv(hidden_channels, out_channels))

    def forward(self, data):
        x, edge_index = data.x, data.edge_index
        for i, layer in enumerate(self.layers[:-1]):
            x = layer(x, edge_index)
            if self.use_batchnorm:
                x = self.bns[i](x)
            x = F.relu(x)
            x = self.dropout(x)
        return F.log_softmax(self.layers[-1](x, edge_index), dim=1)

In [None]:
def grid_search_gcn(data, m_grid, fp_exact=146, beta=2.0): #change fp exact -> 146 for 1%, 73 for 0.5%
    split = StratifiedShuffleSplit(n_splits=1, test_size=0.2, random_state=42)
    train_idx, test_idx = next(split.split(torch.arange(len(data.y)), data.y.cpu()))
    train_mask = torch.tensor(train_idx, dtype=torch.long, device=device)
    test_mask = torch.tensor(test_idx, dtype=torch.long, device=device)
    torch.save(test_mask, "../configs/gcn_test_mask_4feat_146fp.pt")

    best_result = None
    best_f2 = -1

    arch_grid = [
        {"hidden_dim": 128, "lr": 0.001,  "weight_decay": 5e-4, "layers": 2, "dropout": 0.5, "bn": False},
        {"hidden_dim": 128, "lr": 0.0005, "weight_decay": 5e-4, "layers": 3, "dropout": 0.3, "bn": True},
        {"hidden_dim": 64,  "lr": 0.001,  "weight_decay": 1e-4, "layers": 2, "dropout": 0.2, "bn": False},
        {"hidden_dim": 128, "lr": 0.001,  "weight_decay": 1e-3, "layers": 4, "dropout": 0.6, "bn": True},
        {"hidden_dim": 256, "lr": 0.0005, "weight_decay": 1e-4, "layers": 3, "dropout": 0.4, "bn": True},
        {"hidden_dim": 128, "lr": 0.001,  "weight_decay": 0.0,  "layers": 3, "dropout": 0.0, "bn": False},
        {"hidden_dim": 64,  "lr": 0.001,  "weight_decay": 5e-4, "layers": 2, "dropout": 0.3, "bn": True},
        {"hidden_dim": 128, "lr": 0.0005, "weight_decay": 1e-3, "layers": 5, "dropout": 0.5, "bn": True}
    ]

    for arch in arch_grid:
        for m in m_grid:
            model = GCN(
                in_channels=data.num_features,
                hidden_channels=arch["hidden_dim"],
                out_channels=2,
                num_layers=arch["layers"],
                dropout=arch["dropout"],
                use_batchnorm=arch["bn"]
            ).to(device)

            optimizer = torch.optim.Adam(model.parameters(), lr=arch["lr"], weight_decay=arch["weight_decay"])
            class_weights = 1.0 / torch.bincount(data.y).float()
            class_weights[1] *= m
            criterion = nn.CrossEntropyLoss(weight=class_weights.to(device))

            for _ in range(200):
                model.train()
                optimizer.zero_grad()
                out = model(data)
                loss = criterion(out[train_mask], data.y[train_mask])
                loss.backward()
                optimizer.step()

            model.eval()
            with torch.no_grad():
                logits = model(data)[test_mask]
                probs = torch.exp(logits)[:, 1].cpu().numpy()
                labels = data.y[test_mask].cpu().numpy()

            prec, rec, thr = precision_recall_curve(labels, probs)
            f2_arr = (1 + beta**2) * prec * rec / (beta**2 * prec + rec + 1e-8)

            for i in range(len(thr)):
                if thr[i] < 1e-6:
                    continue
                preds = (probs > thr[i]).astype(int)
                fp = ((labels == 0) & (preds == 1)).sum()
                if fp == fp_exact and f2_arr[i] > best_f2:
                    best_f2 = f2_arr[i]
                    best_result = {
                        "m": float(m),
                        "thr": float(thr[i]),
                        "Fbeta": float(f2_arr[i]),
                        "precision": float(prec[i]),
                        "recall": float(rec[i]),
                        "false_positives": int(fp),
                        "state_dict": model.state_dict(),
                        "arch": arch
                    }

    if best_result:
        torch.save(best_result["state_dict"], "../models/best_gcn_model_4feat_146fp.pth")
        with open("../configs/gcn_best_config_4feat_146fp.json", "w") as f:
            json.dump({k: v for k, v in best_result.items() if k != "state_dict"}, f, indent=2)
        print(f"Saved best GCN model with 4 features (FP={fp_exact}, F2={best_f2:.4f})")
        return best_result
    else:
        raise ValueError("No GCN configuration met the FP constraint.")

In [None]:
#run grid sweep
m_grid = np.round(np.arange(1.0, 4.0, 0.1), 2)
best_result = grid_search_gcn(data, m_grid)

In [None]:
arch = best_result["arch"]
model = GCNConv(
    in_channels=data.num_features,
    hidden_channels=arch["hidden_dim"],
    out_channels=2,
    num_layers=arch["layers"],
    dropout=arch["dropout"],
    use_batchnorm=arch["bn"]
).to(device)

model.load_state_dict(best_result["state_dict"])
model.eval()
test_mask = torch.load("../configs/gcn_test_mask_exact146.pt").to(device)

with torch.no_grad():
    logits = model(data)[test_mask]
    probs = torch.exp(logits)[:, 1].cpu().numpy()
    labels = data.y[test_mask].cpu().numpy()

#apply best threshold
threshold = best_result["thr"]
preds = (probs > threshold).astype(int)

#Classification report
print("\nClassification Report:")
print(classification_report(labels, preds, target_names=["Legit", "Phishing"]))
#compute F2-score
f2 = fbeta_score(labels, preds, beta=2.0)
print(f"\nF2-Score (beta=2.0): {f2:.4f}")


#Confusion Matrix
cm = confusion_matrix(labels, preds)
plt.figure(figsize=(6,5))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=["Legit", "Phishing"], yticklabels=["Legit", "Phishing"])
plt.xlabel("Predicted")
plt.ylabel("True")
plt.title("Confusion Matrix (Threshold = {:.4f})".format(threshold))
plt.show()

prec, rec, thr = precision_recall_curve(labels, probs)

plt.figure(figsize=(8,6))
plt.plot(rec, prec, label="PR Curve")
plt.scatter(best_result["recall"], best_result["precision"], color='red', label="Best Threshold = {:.4f}".format(threshold))
plt.xlabel("Recall")
plt.ylabel("Precision")
plt.title("Precision-Recall Curve")
plt.legend()
plt.grid()
plt.show()