In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GCNConv, global_mean_pool
from torch_geometric.data import Data
from torch_geometric.loader import DataLoader
from tqdm import tqdm
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix, roc_curve, auc
import matplotlib.pyplot as plt
import seaborn as sns
import copy
import pandas as pd
from collections import Counter
from urllib.parse import urlparse
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset

In [None]:
# -------------------- Tokenizer and Vocab Builder --------------------
def build_char_vocab(df, max_len=200):
    chars = Counter()
    for url in df['text']:
        chars.update(url[:max_len])
    vocab = {'<PAD>': 0, '<UNK>': 1}
    for i, (ch, _) in enumerate(chars.most_common(), start=2):
        vocab[ch] = i
    return vocab

def tokenize_char_url(url, vocab, max_len=200):
    tokens = [vocab.get(c, vocab['<UNK>']) for c in url[:max_len]]
    if len(tokens) < max_len:
        tokens += [vocab['<PAD>']] * (max_len - len(tokens))
    return torch.tensor(tokens)

def tokenize_graph_url(url):
    parsed = urlparse(url)
    tokens = []
    if parsed.scheme:
        tokens.append(parsed.scheme)
    if parsed.hostname:
        tokens += parsed.hostname.split('.')
    if parsed.path:
        tokens += parsed.path.strip("/").split("/")
    return tokens[:30]

def build_graph_vocab(df):
    token_counter = Counter()
    for url in df['text']:
        token_counter.update(tokenize_graph_url(url))
    vocab = {'<UNK>': 0}
    for i, (tok, _) in enumerate(token_counter.items(), start=1):
        vocab[tok] = i
    return vocab

def url_to_graph(url, label, vocab):
    tokens = tokenize_graph_url(url)
    node_ids = [vocab.get(t, vocab['<UNK>']) for t in tokens]
    if len(node_ids) < 2:
        return None
    edge_index = torch.tensor([[i, i+1] for i in range(len(node_ids)-1)], dtype=torch.long).t()
    x = torch.tensor(node_ids, dtype=torch.long).unsqueeze(1)
    return Data(x=x, edge_index=edge_index, y=torch.tensor([label], dtype=torch.float))


In [None]:
# -------------------- Fusion Dataset --------------------
class FusionDataset(Dataset):
    def __init__(self, df, char_vocab, graph_vocab, max_len=200):
        self.data = []
        for _, row in df.iterrows():
            char_tensor = tokenize_char_url(row['text'], char_vocab, max_len)
            graph_data = url_to_graph(row['text'], row['label'], graph_vocab)
            if graph_data is not None:
                self.data.append((char_tensor, graph_data))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]

In [None]:
# -------------------- CharCNN Module --------------------
class CharCNN(nn.Module):
    def __init__(self, vocab_size, embed_dim=32, seq_len=200):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
        self.conv1 = nn.Conv1d(embed_dim, 128, kernel_size=3, padding=1)
        self.conv2 = nn.Conv1d(128, 128, kernel_size=3, padding=1)
        self.pool = nn.AdaptiveMaxPool1d(1)
        self.fc = nn.Linear(128, 64)

    def forward(self, x):
        x = self.embedding(x).permute(0, 2, 1)  # [B, E, L]
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = self.pool(x).squeeze(-1)  # [B, 128]
        x = self.fc(x)
        return x  # [B, 64]

# -------------------- GNN Module --------------------
class URLGNN(nn.Module):
    def __init__(self, vocab_size, embed_dim=64):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.conv1 = GCNConv(embed_dim, 128)
        self.conv2 = GCNConv(128, 64)

    def forward(self, data):
        x = self.embedding(data.x.squeeze(1))
        x = F.relu(self.conv1(x, data.edge_index))
        x = F.relu(self.conv2(x, data.edge_index))
        x = global_mean_pool(x, data.batch)
        return x  # [B, 64]

# -------------------- Fusion Model --------------------
class FusionModel(nn.Module):
    def __init__(self, cnn_vocab_size, gnn_vocab_size):
        super().__init__()
        self.cnn = CharCNN(cnn_vocab_size)
        self.gnn = URLGNN(gnn_vocab_size)
        self.dropout = nn.Dropout(0.3)
        self.fc1 = nn.Linear(128, 64)
        self.fc2 = nn.Linear(64, 1)

    def forward(self, char_input, graph_data):
        x1 = self.cnn(char_input)
        x2 = self.gnn(graph_data)
        x = torch.cat([x1, x2], dim=1)
        x = self.dropout(F.relu(self.fc1(x)))
        return torch.sigmoid(self.fc2(x)).squeeze()

In [None]:
def train_fusion(model, train_loader, val_loader, optimizer, criterion, scheduler=None, epochs=30, patience=5):
    best_model_wts = copy.deepcopy(model.state_dict())
    best_f1 = 0
    patience_counter = 0
    history = {'train_loss': [], 'val_loss': [], 'train_acc': [], 'val_acc': [], 'val_preds': [], 'val_probs': [], 'val_labels': []}

    for epoch in range(epochs):
        model.train()
        train_loss, correct, total = 0, 0, 0
        for char_input, graph_data in tqdm(train_loader, desc=f"Epoch {epoch+1} [Train]"):
            char_input = char_input.to(device)
            graph_data = graph_data.to(device)
            optimizer.zero_grad()
            out = model(char_input, graph_data)
            loss = criterion(out, graph_data.y)
            loss.backward()
            optimizer.step()
            train_loss += loss.item() * graph_data.num_graphs
            preds = (out > 0.5).int()
            correct += (preds == graph_data.y.int()).sum().item()
            total += graph_data.num_graphs
        train_acc = correct / total
        history['train_loss'].append(train_loss / total)
        history['train_acc'].append(train_acc)

        # Validation
        model.eval()
        val_loss, val_preds, val_probs, val_labels = 0, [], [], []
        with torch.no_grad():
            for char_input, graph_data in tqdm(val_loader, desc=f"Epoch {epoch+1} [Val]"):
                char_input = char_input.to(device)
                graph_data = graph_data.to(device)
                out = model(char_input, graph_data)
                loss = criterion(out, graph_data.y)
                val_loss += loss.item() * graph_data.num_graphs
                probs = out.cpu().numpy().tolist()
                preds = (out > 0.5).int().cpu().numpy().tolist()
                labels = graph_data.y.int().cpu().numpy().tolist()
                val_probs += probs
                val_preds += preds
                val_labels += labels
        val_acc = accuracy_score(val_labels, val_preds)
        val_f1 = f1_score(val_labels, val_preds)

        history['val_loss'].append(val_loss / len(val_loader.dataset))
        history['val_acc'].append(val_acc)
        history['val_preds'] = val_preds
        history['val_probs'] = val_probs
        history['val_labels'] = val_labels

        print(f"\nEpoch {epoch+1}: Train Loss={history['train_loss'][-1]:.4f}, Acc={train_acc:.4f} | Val Loss={history['val_loss'][-1]:.4f}, Acc={val_acc:.4f}, F1={val_f1:.4f}")

        if scheduler:
            scheduler.step(val_f1)

        if val_f1 > best_f1:
            best_f1 = val_f1
            best_model_wts = copy.deepcopy(model.state_dict())
            patience_counter = 0
            torch.save(model.state_dict(), "best_fusion_model.pt")
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print("\n⛔ Early stopping triggered.")
                break

    model.load_state_dict(best_model_wts)
    return model, history


In [None]:
# -------------------- Plotting --------------------
def plot_metrics(history):
    plt.figure(figsize=(12, 5))
    plt.subplot(1, 2, 1)
    plt.plot(history['train_loss'], label='Train Loss')
    plt.plot(history['val_loss'], label='Val Loss')
    plt.legend(); plt.title('Loss'); plt.xlabel('Epoch')
    plt.subplot(1, 2, 2)
    plt.plot(history['train_acc'], label='Train Acc')
    plt.plot(history['val_acc'], label='Val Acc')
    plt.legend(); plt.title('Accuracy'); plt.xlabel('Epoch')
    plt.tight_layout(); plt.show()

def plot_confusion_matrix(y_true, y_pred):
    cm = confusion_matrix(y_true, y_pred)
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['Legit', 'Phish'], yticklabels=['Legit', 'Phish'])
    plt.xlabel("Predicted"); plt.ylabel("Actual"); plt.title("Confusion Matrix"); plt.tight_layout(); plt.show()

def plot_roc_curve(y_true, y_probs):
    fpr, tpr, _ = roc_curve(y_true, y_probs)
    roc_auc = auc(fpr, tpr)
    plt.plot(fpr, tpr, label=f"AUC = {roc_auc:.2f}")
    plt.plot([0, 1], [0, 1], linestyle='--', color='gray')
    plt.xlabel("FPR"); plt.ylabel("TPR"); plt.title("ROC Curve"); plt.legend(); plt.tight_layout(); plt.show()

# -------------------- Device --------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


In [None]:
if __name__ == '__main__':
    df = pd.read_json(r"E:\Phising_detection\dataset\urls\urls.json")

    train_df, val_df = train_test_split(df, test_size=0.2, stratify=df['label'], random_state=42)

    char_vocab = build_char_vocab(train_df)
    graph_vocab = build_graph_vocab(train_df)

    train_data = FusionDataset(train_df, char_vocab, graph_vocab)
    val_data = FusionDataset(val_df, char_vocab, graph_vocab)

    train_loader = [(char, graph) for char, graph in train_data]
    val_loader = [(char, graph) for char, graph in val_data]

    from torch_geometric.loader import DataLoader as GeoLoader
    train_loader = GeoLoader(train_loader, batch_size=64, shuffle=True)
    val_loader = GeoLoader(val_loader, batch_size=64)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = FusionModel(cnn_vocab_size=len(char_vocab), gnn_vocab_size=len(graph_vocab)).to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', patience=2)
    criterion = nn.BCELoss()

    model, history = train_fusion(model, train_loader, val_loader, optimizer, criterion, scheduler)

    plot_metrics(history)
    plot_confusion_matrix(history['val_labels'], history['val_preds'])
    plot_roc_curve(history['val_labels'], history['val_probs'])