# PhishHook - A Phishing URL Detector
#### By: Aryaan Khan and Bradley Lewis

## Import Libraries

In [1]:
import pandas as pd

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader

from sklearn.model_selection import train_test_split

from urllib.parse import urlparse, parse_qs

## Extract URL Features

In [2]:
def extract_url_features(url):
    features = {}
    parsed_url = urlparse(url)
    query_params = parse_qs(parsed_url.query)

    # Basic features from URL components
    features['length_url'] = len(url)
    features['length_hostname'] = len(parsed_url.netloc)
    features['ip'] = 1 if parsed_url.hostname and parsed_url.hostname.replace('.', '').isdigit() else 0
    features['nb_dots'] = url.count('.')
    features['nb_hyphens'] = url.count('-')
    features['nb_at'] = url.count('@')
    features['nb_qm'] = url.count('?')
    features['nb_and'] = url.count('&')
    features['nb_or'] = url.count('|')
    features['nb_eq'] = len(query_params)
    features['nb_underscore'] = url.count('_')
    features['nb_tilde'] = url.count('~')
    features['nb_percent'] = url.count('%')
    features['nb_slash'] = url.count('/')
    features['nb_star'] = url.count('*')
    features['nb_colon'] = url.count(':')
    features['nb_comma'] = url.count(',')
    features['nb_semicolumn'] = url.count(';')
    features['nb_dollar'] = url.count('$')
    features['nb_space'] = url.count(' ')
    features['nb_dslash'] = url.count('//')

    return features

## Load the Data

In [3]:
# Load the dataset
dataset = pd.read_csv("phishing_data.csv")

# Assuming extract_url_features is already defined and properly imported
features = [extract_url_features(url) for url in dataset['URL']]
features_df = pd.DataFrame(features)

# Assuming the label column is named 'Label' and needs conversion from text to binary
labels = (dataset['Label'] == 'bad').astype(int)

## Create the Dataloader

In [4]:
# Check if MPS is available
if torch.backends.mps.is_available():
    print("MPS is available!")
    # Set the device to MPS
    device = torch.device("mps")
else:
    print("MPS not available, using CPU")
    device = torch.device("cpu")

# Initially split the data into temporary training data and final test data
X_temp, X_test, y_temp, y_test = train_test_split(features_df, labels, test_size=0.2)

# Split the temporary training data into final training data and validation data
X_train, X_val, y_train, y_val = train_test_split(X_temp, y_temp, test_size=0.25)  # 0.25 x 0.8 = 0.2

# Convert data to tensors and transfer to the specified device
X_train_tensor = torch.tensor(X_train.values, dtype=torch.float32).to(device)
y_train_tensor = torch.tensor(y_train.values, dtype=torch.float32).view(-1, 1).to(device)
X_val_tensor = torch.tensor(X_val.values, dtype=torch.float32).to(device)
y_val_tensor = torch.tensor(y_val.values, dtype=torch.float32).view(-1, 1).to(device)
X_test_tensor = torch.tensor(X_test.values, dtype=torch.float32).to(device)
y_test_tensor = torch.tensor(y_test.values, dtype=torch.float32).view(-1, 1).to(device)

# Create Tensor datasets for all sets
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
val_dataset = TensorDataset(X_val_tensor, y_val_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

# Create DataLoaders for all datasets
train_loader = DataLoader(train_dataset, batch_size=30000, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=10000, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=10000, shuffle=False)

# Confirm the DataLoader details
print(train_loader.dataset.tensors[0].shape, train_loader.dataset.tensors[1].shape)
print(val_loader.dataset.tensors[0].shape, val_loader.dataset.tensors[1].shape)
print(test_loader.dataset.tensors[0].shape, test_loader.dataset.tensors[1].shape)

MPS is available!
torch.Size([329607, 21]) torch.Size([329607, 1])
torch.Size([109869, 21]) torch.Size([109869, 1])
torch.Size([109870, 21]) torch.Size([109870, 1])


## Define the Neural Network

In [5]:
class PhishHookNet(nn.Module):
    def __init__(self, input_size):
        super(PhishHookNet, self).__init__()
        self.layer1 = nn.Linear(input_size, 128)
        self.layer2 = nn.Linear(128, 256)
        self.layer3 = nn.Linear(256, 64)
        self.output = nn.Linear(64, 1)
        self.dropout = nn.Dropout(0.1)

    def forward(self, x):
        x = torch.relu(self.layer1(x))
        x = self.dropout(x)
        x = torch.relu(self.layer2(x))
        x = self.dropout(x)
        x = torch.relu(self.layer3(x))
        x = torch.sigmoid(self.output(x))
        return x

## Define the Training and Validation Loops

In [6]:
# Function to train the model
def train(model, train_loader, optimizer, criterion):
    model.train()  # Set the model to training mode
    total_loss = 0
    for inputs, labels in train_loader:
        optimizer.zero_grad()  # Clear gradients
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * inputs.size(0)
    return total_loss / len(train_loader.dataset)

# Function to validate the model
def validate(model, val_loader, criterion):
    model.eval()  # Set the model to evaluation mode
    total_loss = 0
    total = 0
    correct = 0
    with torch.no_grad():  # No need to track gradients
        for inputs, labels in val_loader:
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            total_loss += loss.item() * inputs.size(0)
            predicted = outputs.round()
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    avg_loss = total_loss / len(val_loader.dataset)
    accuracy = correct / total * 100
    return avg_loss, accuracy

## Define the Early Stopping Condition Class

In [7]:
class EarlyStopping:
    def __init__(self, patience=5, verbose=False, delta=0):
        """
        Args:
            patience (int): How long to wait after last time validation loss improved.
                            Default: 7
            verbose (bool): If True, prints a message for each validation loss improvement. 
                            Default: False
            delta (float): Minimum change in the monitored quantity to qualify as an improvement.
                            Default: 0
        """
        self.patience = patience
        self.verbose = verbose
        self.delta = delta
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = float('inf')

    def __call__(self, val_loss, model):
        score = -val_loss

        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
        elif score < self.best_score + self.delta:
            self.counter += 1
            if self.verbose:
                print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
            self.counter = 0

    def save_checkpoint(self, val_loss, model):
        '''Saves model when validation loss decrease.'''
        if self.verbose:
            print(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}).  Saving model ...')
        torch.save(model.state_dict(), 'phishing_url_model.pth')
        self.val_loss_min = val_loss


## Train the Model

In [8]:
# Assuming the model and dataset are already defined
model = PhishHookNet(input_size=X_train.shape[1]).to(device)  # Adjust input size based on actual features

# Loss function, optimizer, and early stopping
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.01, weight_decay=0.0001)
early_stopping = EarlyStopping(verbose=True, delta=0.001)

# Training loop with model saving based on validation loss improvement
num_epochs = 50
best_val_loss = float('inf')
for epoch in range(num_epochs):
    train_loss = train(model, train_loader, optimizer, criterion)
    val_loss, accuracy = validate(model, val_loader, criterion)
    
    # Early stopping
    early_stopping(val_loss, model)
    if early_stopping.early_stop:
        print("Early stopping")
        break

    print(f'Epoch {epoch+1}: train_loss = {train_loss:.4f}, val_loss = {val_loss:.4f}')

Validation loss decreased (inf --> 0.672610).  Saving model ...
Epoch 1: train_loss = 1.0449, val_loss = 0.6726
Validation loss decreased (0.672610 --> 0.631430).  Saving model ...
Epoch 2: train_loss = 0.6617, val_loss = 0.6314
Validation loss decreased (0.631430 --> 0.548653).  Saving model ...
Epoch 3: train_loss = 0.5915, val_loss = 0.5487
Validation loss decreased (0.548653 --> 0.513974).  Saving model ...
Epoch 4: train_loss = 0.5387, val_loss = 0.5140
Validation loss decreased (0.513974 --> 0.499698).  Saving model ...
Epoch 5: train_loss = 0.5127, val_loss = 0.4997
Validation loss decreased (0.499698 --> 0.487050).  Saving model ...
Epoch 6: train_loss = 0.4992, val_loss = 0.4870
Validation loss decreased (0.487050 --> 0.475671).  Saving model ...
Epoch 7: train_loss = 0.4880, val_loss = 0.4757
Validation loss decreased (0.475671 --> 0.470374).  Saving model ...
Epoch 8: train_loss = 0.4801, val_loss = 0.4704
EarlyStopping counter: 1 out of 5
Epoch 9: train_loss = 0.4801, val_l

## Evaluate the Model

In [9]:
# Load the best model back
model.load_state_dict(torch.load('phishing_url_model.pth'))

# Evaluate the model on the test dataset
model.eval()

# Calculate accuracy
train_accuracy = validate(model, train_loader, criterion)[1]
val_accuracy = validate(model, val_loader, criterion)[1]
test_accuracy = validate(model, test_loader, criterion)[1]

# Print the results
print(f'Training accuracy: {train_accuracy:.2f}%')
print(f'Validation accuracy: {val_accuracy:.2f}%')
print(f'Test accuracy: {test_accuracy:.2f}%')

Training accuracy: 80.08%
Validation accuracy: 80.40%
Test accuracy: 80.15%
