# **Malicious URLs Notebook**

## Step 1: Handle imports and import CSV file from shareable link

In [None]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import time
import re

from sklearn.dummy import DummyClassifier # baseline model
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.neural_network import MLPClassifier

from sklearn.model_selection import (
    train_test_split,
    GridSearchCV
)
from sklearn.metrics import (
    accuracy_score,
    f1_score,
    recall_score,
    precision_score,
    roc_auc_score,
    roc_curve,
    ConfusionMatrixDisplay
)
from sklearn.preprocessing import StandardScaler # data is normal distributed, hence using StandardScaler

from xgboost import XGBClassifier

from typing import Tuple, Union

from scipy.stats import kstest

from urllib.parse import urlparse
from tld import get_tld, is_tld

In [None]:
# Read csv file to dataframe
df = pd.read_csv("malicious_phish.csv")
df.head()

## Step 2: Gain understanding of the data

In [None]:
# Initial information about the dataset, i.e. columns number, column name, count, and dtype
df.info()

In [None]:
# How many null values are present
df.isnull().sum()

In [None]:
# Store and display value counts
value_counts = df.type.value_counts()
value_counts

In [None]:
# Map every category found in df["type"] to numerical categorical variable
numerical_categories = {}
for index, category in enumerate(value_counts.index):
    numerical_categories[f"{category}"] = index

# Create new feature with target variable corresponding to numerical_categories
df["target"] = [numerical_categories[category] for category in df["type"]]
df.head()

In [None]:
# Nice little color palette, we can use throughout the paper
colors = sns.color_palette("pastel")[0:5]

# Donut chart for the target variable ratios
plt.figure(figsize=(5, 5))

centre_circle = plt.Circle((0,0),0.70,fc='white')
fig = plt.gcf()
fig.gca().add_artist(centre_circle)

plt.pie(value_counts, labels=value_counts.index, colors=colors, autopct="%.0f%%")
plt.tight_layout()
plt.show()

In [None]:
# Supplied barplot visualisation
sns.barplot(x=value_counts.index, y=value_counts, palette=colors)
plt.xlabel("target")
plt.ylabel("count")

## Step 3: Data preprocessing and feature engineering

In [None]:
# Function to map new feature based on whether ip address in url
# courtesy of https://www.kaggle.com/code/jingyanshang/url-s-feature-analysis/notebook
def url_is_ip_address(url: str) -> int:
    match = re.search(
            '(([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.'
            '([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\/)|'  # IPv4
            '(([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.'
            '([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\/)|'  # IPv4 with port
            '((0x[0-9a-fA-F]{1,2})\\.(0x[0-9a-fA-F]{1,2})\\.(0x[0-9a-fA-F]{1,2})\\.(0x[0-9a-fA-F]{1,2})\\/)' # IPv4 in hexadecimal
            '(?:[a-fA-F0-9]{1,4}:){7}[a-fA-F0-9]{1,4}|'
            '([0-9]+(?:\.[0-9]+){3}:[0-9]+)|'
            '((?:(?:\d|[01]?\d\d|2[0-4]\d|25[0-5])\.){3}(?:25[0-5]|2[0-4]\d|[01]?\d\d|\d)(?:\/\d{1,2})?)', url)  # Ipv6

    if match:
        return 1
    else:
        return 0

# Function to map new feature based on http[s] protocol
def https_secured(url: str) -> int:
    try:
        protocol = re.search("^(http|https)://", url)

        if protocol == None:
            return 0
        elif protocol.group(1) == "https":
            return 1
        else:
            return 0
    except Exception as err:
        print(f"[Error]: {err}")
        return 0

def count_digits(url: str) -> int:
    digits = 0
    for x in url:
        if x.isnumeric():
            digits += 1
    return digits

def count_letters(url: str) -> int:
    letters = 0
    for x in url:
        if x.isalpha():
            letters += 1
    
    return letters

In [None]:
def process_tld(url: str, fix_protos: bool = False) -> Tuple[str, str, str, str]:
    res = get_tld(url, as_object=True, fail_silently=False, fix_protocol=fix_protos)

    subdomain = res.subdomain
    domain = res.domain
    tld = res.tld
    fld = res.fld

    return subdomain, domain, tld, fld

def process_url_with_tld(row: pd.Series) -> Tuple[str, str, str, str]:
    try:
        if row["is_ip"] == 0:
            return process_tld(row["url"], fix_protos=True)

        else:
            subdomain = domain = tld = fld = None

            return subdomain, domain, tld, fld
    except Exception as err:
        return None, None, None, None

def contains_shortening_service(url: str) -> int:
    match = re.search('bit\.ly|goo\.gl|shorte\.st|go2l\.ink|x\.co|ow\.ly|t\.co|tinyurl|tr\.im|is\.gd|cli\.gs|'
                      'yfrog\.com|migre\.me|ff\.im|tiny\.cc|url4\.eu|twit\.ac|su\.pr|twurl\.nl|snipurl\.com|'
                      'short\.to|BudURL\.com|ping\.fm|post\.ly|Just\.as|bkite\.com|snipr\.com|fic\.kr|loopt\.us|'
                      'doiop\.com|short\.ie|kl\.am|wp\.me|rubyurl\.com|om\.ly|to\.ly|bit\.do|t\.co|lnkd\.in|'
                      'db\.tt|qr\.ae|adf\.ly|goo\.gl|bitly\.com|cur\.lv|tinyurl\.com|ow\.ly|bit\.ly|ity\.im|'
                      'q\.gs|is\.gd|po\.st|bc\.vc|twitthis\.com|u\.to|j\.mp|buzurl\.com|cutt\.us|u\.bb|yourls\.org|'
                      'x\.co|prettylinkpro\.com|scrnch\.me|filoops\.info|vzturl\.com|qr\.net|1url\.com|tweez\.me|v\.gd|'
                      'tr\.im|link\.zip\.net',
                      url)
    
    if match:
        return 1
    else:
        return 0

# Function for returning length, even if text is None
def count_len(text: str) -> int:
    if text == None:
        return 0
    else:
        return len(text)

In [None]:
start = time.time()
# For every url, check if ip address is embedded within
df["is_ip"] = df["url"].apply(lambda x: url_is_ip_address(x))

# For every url, check if http[s] is present and map to new feature
df["is_https_secured"] = df["url"].apply(lambda x: https_secured(x))

# Count numbers of characters inherent in url string
df["url_length"] = df["url"].apply(lambda x: len(x))

# Extract the primary domain from the url
df[['subdomain', 'domain', 'tld', 'fld']] = df.apply(lambda x: process_url_with_tld(x), axis=1, result_type="expand")

"""
All rows missing ['subdomain', 'domain', 'tld', 'fld'] (that is, because they
are invalid) will return 0, because they don't adhere to the standard
- most of them are likely to return 0 if the url is an ip address
"""
df["subdomain_len"] = df["subdomain"].apply(lambda x: count_len(x))
df["tld_len"] = df["tld"].apply(lambda x: count_len(x))
df["fld_len"] = df["fld"].apply(lambda x: count_len(x))

# Count the digits in the url
df["digit_count"] = df["url"].apply(lambda x: count_digits(x))

# Count the letters in the url
df["letter_count"] = df["url"].apply(lambda x: count_letters(x))

# Count selected special characters, excluding directory seperator single slash
special_characters = ['@','?','-','=','.','#','%','+','$','!','*',',','//']

for char in special_characters:
    df[char] = df["url"].apply(lambda x: x.count(char))

df["contains_shortening"] = df["url"].apply(lambda x: contains_shortening_service(x))

# Total count of special characters in the url
df["special_count"] = df[special_characters].sum(axis=1)

print(f"time elapsed: {(time.time() - start):.2f} seconds")

df.head()

In [None]:
plt.figure(figsize=(12, 10))
sns.heatmap(df.corr(), linewidths=.5, cmap=sns.color_palette("pastel"), annot=True)
plt.tight_layout()
plt.show()

In [None]:
# checkpoint
df.to_csv("final_malicious_phish.csv", index=False)

## Step 4: Create X and y subsets; Create training and testing subsets

In [None]:
# method for reducing memory size of dataframe (https://www.kaggle.com/code/arjanso/reducing-dataframe-memory-size-by-65/notebook)
def reduce_memory(df: pd.DataFrame) -> pd.DataFrame:
    current_memory_usage = df.memory_usage().sum() / 1024 ** 2 # convert bytes to MB
    print(f"Memory of dataframe: {current_memory_usage} MB\n")

    for col in df.columns:
        if df[col].dtype != object:

            print("**********************")
            print(f"Column: {col}")
            print("----------------------")
            print(f"Dtype (before): {df[col].dtype}")
            print(f"Memory (before): {df[col].memory_usage() / 1024 ** 2:.2f} MB")
            print("----------------------")

            is_int = False

            min, max = df[col].min(), df[col].max()
            
            to_int = df[col].fillna(0).astype(np.int64)
            res = (df[col] - to_int)
            res = res.sum()
            if res > -0.01 and res < 0.01:
                is_int = True

            if is_int:
                if min >= 0:
                    if max < 255:
                        df[col] = df[col].astype(np.uint8)
                    elif max < 65535:
                        df[col] = df[col].astype(np.uint16)
                    elif max < 4294967295:
                        df[col] = df[col].astype(np.uint32)
                    else:
                        df[col] = df[col].astype(np.uint64)
                else:
                    if min > np.iinfo(np.int8).min and max < np.iinfo(np.int8).max:
                        df[col] = df[col].astype(np.int8)
                    elif min > np.iinfo(np.int16).min and max < np.iinfo(np.int16).max:
                        df[col] = df[col].astype(np.int16)
                    elif min > np.iinfo(np.int32).min and max < np.iinfo(np.int32).max:
                        df[col] = df[col].astype(np.int32)
                    elif min > np.iinfo(np.int64).min and max < np.iinfo(np.int64).max:
                        df[col] = df[col].astype(np.int64)
            else:
                df[col] = df[col].astype(np.float32)

            print(f"Dtype (after): {df[col].dtype}")
            print(f"memory (after): {df[col].memory_usage() / 1024 ** 2:.2f} MB")
            print("**********************\n")

    new_memory_usage = df.memory_usage().sum() / 1024 ** 2
    print(f"Memory usage of {current_memory_usage:.2f} MB reduced to {new_memory_usage:.2f} MB")
    print(f"% reduced: {100 - (100 * new_memory_usage/current_memory_usage):.2f}%")

    return df

In [None]:
df = pd.read_csv("final_malicious_phish.csv")
df.head()

In [None]:
# method to reduce dataframe by a fraction, while preserving target variable distribution
def shorten_dataframe(df: pd.DataFrame, column: str, fraction: float) -> pd.DataFrame:
    final_df = df.iloc[0:0]
    
    print(f"Target column: {column}\nShorten to {fraction * 100}%\n")

    for index in df[column].value_counts().index:
        final_df = pd.concat([final_df, df[df[column] == index].sample(frac=fraction, random_state=42)])
        print(f"{index} reduced from {len(df[df[column] == index])} to {len(final_df[final_df[column] == index])}")

    print(f"\n{final_df[column].value_counts()}")

    return final_df

In [None]:
final_df = shorten_dataframe(df, column="target", fraction=0.02)

In [None]:
# change from multi-class to binary class, because either malicious or not
final_df["target"] = df["target"].replace([2, 3], 1)
final_df.url_length.value_counts()

In [None]:
reduce_memory(final_df)

In [None]:
X = final_df.drop(["url", "type", "target", "subdomain", "domain", "tld", "fld"], axis=1)
y = final_df["target"]

In [None]:
# Check data distribution for normality to determine how to scale the data
# kolmogorov-smirnov test
for column in X.columns:
    print(f"{column:20s}:\t{'Normal' if kstest(X[column], 'norm')[1]<0.05 else 'Not normal':10s}:\t{kstest(X[column], 'norm')}")

In [None]:
# check for null variables in the training subset features
X.isna().sum()

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=True, stratify=y, random_state=42)

# based on kstest we observe a normal distribution and hence use standardscaler
scaler = StandardScaler()

X_train_std = scaler.fit_transform(X_train)
X_test_std = scaler.fit_transform(X_test)

In [None]:
y_test.value_counts()

## Step 5: Train machine learning models

In [None]:
from tabulate import tabulate
from sklearn.metrics import confusion_matrix

# helper function to print out evaluation metrics and confusion matrix
def classification_report(y_test: pd.Series, y_pred: pd.Series, y_pred_proba: pd.Series):
    evaluation_methods = [f1_score, recall_score, precision_score]

    print("\n****************************************************")
    print(f"accuracy: {accuracy_score(y_test, y_pred):.4f}")
    print(f"roc_auc_score: {roc_auc_score(y_test, y_pred):.4f}")
    print("----------------------------------------------------")

    for method in evaluation_methods:
        print(f"{method.__name__:20s}:{method(y_test, y_pred, zero_division=0):.4f}")
    
    print("----------------------------------------------------")
    fpr, tpr, thresholds = roc_curve(y_test, y_pred_proba[:, 1])
    plt.plot([0, 1], [0, 1], linestyle='--')

    # plot the roc curve for the model
    plt.plot(fpr, tpr, marker='.')

    # show the plot
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title("ROC-curve")
    plt.show()
    print("----------------------------------------------------")
    cm = confusion_matrix(y_test, y_pred)
    
    matrix = sns.heatmap(cm, annot=True, cmap=colors, fmt="g", linewidths=0.5, linecolor="black")
    matrix.set_title("Confusion matrix")
    
    matrix.set_xlabel("\nPredicted values")
    matrix.set_ylabel("Actual values")
    
    matrix.xaxis.set_ticklabels(["Benign", "Malicious"])
    matrix.yaxis.set_ticklabels(["Benign", "Malicious"])
    
    plt.show()
    print("\n****************************************************")



### 5.1 Dummy classifier
The dummy classifier serves as the baseline model for comparison purposes. 
We use most_frequent param to always predict the most frequent label, in this case 0 for benign. In this estimator and throughout to the end, we will use a random seed of 42 to ensure that results are reproducible

In [None]:
dummy_classifier = DummyClassifier(strategy="most_frequent", random_state=42)

dummy_classifier.fit(X_train_std, y_train)

dummy_test_pred = dummy_classifier.predict(X_test_std)
dummy_test_pred_proba = dummy_classifier.predict_proba(X_test_std)

classification_report(y_test, dummy_test_pred, dummy_test_pred_proba)

### 5.2 Random Forest
The random forest classifier is the first algorithm that we use to classify. We use 10 fold cross-validated gridsearch to fine-tune selected parameters. The gridsearch fits a number of random forest classifiers equal to the product of all parameters combinations specified in the param_grid dictionary multiplied by the amount of folds. 
In this case 3 * 2 * 3 * 3 = 64 models * 10 folds = 640 fits

In [None]:
random_forest = RandomForestClassifier(random_state=42)

param_grid = {
    "n_estimators": [10, 50, 100],
    "max_features": ["log2", "sqrt"],
    "max_depth": [2, 3, 5],
    "max_leaf_nodes": [2, 4, 6]
}

rf_grid = GridSearchCV(random_forest, param_grid=param_grid, cv=10)

rf_grid.fit(X_train_std, y_train)

rf_grid_pred = rf_grid.best_estimator_.predict(X_test_std)
rf_grid_pred_proba = rf_grid.best_estimator_.predict_proba(X_test_std)

classification_report(y_test, rf_grid_pred, rf_grid_pred_proba)

In [None]:
rf_grid.best_params_

### 5.3 Logistic Regression

In [None]:
logistic_regression = LogisticRegression(random_state=42, max_iter=1000)

param_grid = {
    "penalty": ["l1", "l2"],
    "C": [0.0001, 0.001, 0.01, 0.1],
    "solver": ["liblinear", "saga"]
}

lr_grid = GridSearchCV(logistic_regression, param_grid=param_grid, cv=10)

lr_grid.fit(X_train_std, y_train)

lr_grid_pred = lr_grid.best_estimator_.predict(X_test_std)
lr_grid_pred_proba = lr_grid.best_estimator_.predict_proba(X_test_std)

classification_report(y_test, lr_grid_pred, lr_grid_pred_proba)

In [None]:
lr_grid.best_params_

### 5.4 Multi Layer Perceptron

In [None]:
perceptron = MLPClassifier(random_state=42, max_iter=1000)

param_grid = {
    "hidden_layer_sizes": [x for x in range(5, 10)],
    "activation": ["tanh", "relu"],
    "solver": ["sgd", "adam"],
    "alpha":  [0.0001, 0.001, 0.01],
}

perceptron_grid = GridSearchCV(perceptron, param_grid=param_grid, cv=10)

perceptron_grid.fit(X_train_std, y_train)

perceptron_grid_pred = perceptron_grid.best_estimator_.predict(X_test_std)
perceptron_grid_pred_proba = perceptron_grid.best_estimator_.predict_proba(X_test_std)

classification_report(y_test, perceptron_grid_pred, perceptron_grid_pred_proba)

In [None]:
perceptron_grid.best_params_

### 5.5 XGBoost

In [None]:
xgboost = XGBClassifier(random_state=42)

param_grid = {
    "colsample_bytree": [0.7, 0.8],
    "max_depth": [3, 4],
    "min_child_weight": [4, 5],
    "subsample": [i/10.0 for i in range(6, 11)],
    "gamma": [i/10.0 for i in range(3, 6)],
}

xgb_grid = GridSearchCV(xgboost, param_grid=param_grid, cv=10)

xgb_grid.fit(X_train_std, y_train)

xgb_grid_pred = xgb_grid.best_estimator_.predict(X_test_std)
xgb_grid_pred_proba = xgb_grid.best_estimator_.predict_proba(X_test_std)

classification_report(y_test, xgb_grid_pred, xgb_grid_pred_proba)

In [None]:
xgb_grid.best_params_   

In [None]:
#
classifiers = [
    rf_grid.best_estimator_, 
    lr_grid.best_estimator_,
    perceptron_grid.best_estimator_, 
    xgb_grid.best_estimator_
]

fig, axs = plt.subplots(ncols=2, nrows=2, figsize=(10, 10))

for cls, ax in zip(classifiers, axs.flatten()):
    y_pred = cls.predict(X_test_std)
    cm = confusion_matrix(y_test, y_pred)
    matrix = sns.heatmap(
        cm, 
        cmap=colors, 
        annot=True, 
        fmt="g", 
        linewidths=0.5, 
        linecolor="black", 
        cbar=False,
        ax=ax,
    )
    matrix.set_xlabel("\nPredicted values")
    matrix.set_ylabel("Actual values")
    
    matrix.xaxis.set_ticklabels(["Benign", "Malicious"])
    matrix.yaxis.set_ticklabels(["Benign", "Malicious"])
        
    ax.title.set_text(type(cls).__name__)
plt.tight_layout()  

plt.show()

for ax in axs.flat:
    ax.label_outer()