In [None]:
import pandas as pd
import numpy as np
from tqdm.auto import tqdm
import tensorflow as tf
import matplotlib.pyplot as plt
from xgboost import XGBClassifier
from datasets import Dataset, load_dataset
from sklearn.naive_bayes import MultinomialNB
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from transformers import (AutoTokenizer, pipeline, AutoModelForSequenceClassification, TrainingArguments,
                          Trainer, DataCollatorWithPadding)
from sklearn.metrics import (accuracy_score, recall_score, precision_score, log_loss, roc_auc_score,
                             roc_curve, precision_recall_curve)

from keras.optimizers import Adam
from keras.models import Sequential
from keras.utils import to_categorical
from keras.metrics import Precision, Recall, AUC
from keras.layers import LSTM, Dense, Embedding, Conv1D, GlobalMaxPooling1D, SpatialDropout1D, TextVectorization

In [None]:
colors = ['orange', 'gold', 'mediumturquoise', 'lightblue', 'midnightblue']

## **Phishing Datasets**

In [None]:
def import_dataset_from_hf(name):
  return load_dataset("../phishing-dataset", name, trust_remote_code=True)['train'].to_pandas()

In [None]:
texts = import_dataset_from_hf("texts")
urls = import_dataset_from_hf("urls")
webs = import_dataset_from_hf("webs")
combined = import_dataset_from_hf("combined_reduced")

We will only keep 5% of the urls samples because that is how is constructed the combined phishing dataset

In [None]:
urls, _ = train_test_split(urls, test_size=0.95, stratify=urls['label'], random_state=42)

Splitting in train-test sets ...

In [None]:
def split_dataset(df):
  return train_test_split(
    df['text'],
    df['label'],
    stratify=df['label'],
    test_size=0.2,
    random_state=42
  )

In [None]:
xtrain_text, xtest_text, ytrain_text, ytest_text = split_dataset(texts)
xtrain_url, xtest_url, ytrain_url, ytest_url = split_dataset(urls)
xtrain_web, xtest_web, ytrain_web, ytest_web = split_dataset(webs)
xtrain_comb, xtest_comb, ytrain_comb, ytest_comb = split_dataset(combined)

## **Vectorizing data with TF-IDF**

In [None]:
MAX_WORDS_NUM = 2000
N_GRAMS = (1, 2)

def vectorize_dataset(train, test):
  # Create TF-IDF Vectorizer
  tfidf_vect = TfidfVectorizer(ngram_range=N_GRAMS, max_df=0.25, stop_words='english', max_features=MAX_WORDS_NUM)
  # Fit TF-IDF Vectorizer on train
  train_vect = tfidf_vect.fit_transform(train)
  # Transform
  test_vect = tfidf_vect.transform(test)
  return train_vect, test_vect

In [None]:
xtrain_text_vect, xtest_text_vect = vectorize_dataset(xtrain_text, xtest_text)
xtrain_url_vect, xtest_url_vect = vectorize_dataset(xtrain_url, xtest_url)
xtrain_web_vect, xtest_web_vect = vectorize_dataset(xtrain_web, xtest_web)
xtrain_comb_vect, xtest_comb_vect = vectorize_dataset(xtrain_comb, xtest_comb)

## **Model's Definitions**

### XGBoost

In [None]:
xgb = XGBClassifier(
    colsample_bytree = 0.7,
    gamma = 0.2,
    learning_rate = 0.1,
    max_depth = 12,
    min_child_weight = 2,
    n_estimators = 100,
    subsample = 0.8,
    objective = 'binary:logistic'
)

### Multinomial Naive Bayes (MNB)

In [None]:
mnb = MultinomialNB(alpha=0.01)

### LSTM-CNN

In [None]:
MAX_SEQUENCE_LENGTH = 100
EMBEDDING_DIM = 10

def compile_lstm_cnn_model(text):

  tv = TextVectorization(max_tokens=MAX_WORDS_NUM, output_sequence_length=MAX_SEQUENCE_LENGTH, ngrams=N_GRAMS)
  tv.adapt(text)

  # Create LSTM-CNN model
  lstm_cnn = Sequential()
  lstm_cnn.add(tf.keras.Input(shape=(1,), dtype=tf.string))
  lstm_cnn.add(tv)
  # embedded layer that uses EMBEDDING_DIM length vectors to represent each word.
  lstm_cnn.add(Embedding(MAX_WORDS_NUM, EMBEDDING_DIM, input_length=MAX_SEQUENCE_LENGTH))
  # SpatialDropout1D performs variational dropout in NLP models.
  lstm_cnn.add(SpatialDropout1D(0.2))
  lstm_cnn.add(LSTM(100, return_sequences=True))
  lstm_cnn.add(Conv1D(50, kernel_size=3, activation='relu'))
  lstm_cnn.add(GlobalMaxPooling1D())
  lstm_cnn.add(Dense(32))
  lstm_cnn.add(Dense(2, activation="softmax"))
  # Because it is a binary classification problem, binary_crossentropy is used as the loss function.
  lstm_cnn.compile(
      optimizer=Adam(learning_rate=1e-4),
      loss="binary_crossentropy",
      metrics=['accuracy', Precision(), Recall(), AUC()]
  )

  return lstm_cnn

### BERT

In [None]:
# retrieving BERT tokenizer
bert_tokenizer = AutoTokenizer.from_pretrained("bert-large-uncased")

# mapping the expected ids to their labels
id2label = {0: "benign", 1: "phishing"}
label2id = {"benign": 0, "phishing": 1}

# dynamic padding
data_collator = DataCollatorWithPadding(tokenizer=bert_tokenizer)

# bert model
bert = AutoModelForSequenceClassification.from_pretrained(
    "bert-large-uncased",
    num_labels=2,
    id2label=id2label,
    label2id=label2id
)

In [None]:
# pre-trained bert base classifier
bert_base = pipeline(
    task='text-classification',
    model=bert,
    tokenizer=bert_tokenizer,
    truncation=True,
    device=0
)

In [None]:
# bert finetuned on phishing detection
bert_finetuned = pipeline(
    model='../bert-finetuned-phishing',
    tokenizer=bert_tokenizer,
    truncation=True,
    device=0
)

## **Training models**

### XGBOOST

In [None]:
def xgboost_results(xtrain, xtest, ytrain, dataset):
  print(f"Training XGBoost on {dataset} dataset ...")
  # training xgboost in text dataset
  xgb.fit(xtrain, ytrain)
  print("\n<--- Finished, returning predictions")
  # predictions
  xgb_pred = xgb.predict(xtest)
  xgb_predproba = xgb.predict_proba(xtest)[:, 1]
  return [xgb_pred, xgb_predproba]

In [None]:
%%time
xgb_text_results = xgboost_results(xtrain_text_vect, xtest_text_vect, ytrain_text, 'text')

In [None]:
%%time
xgb_url_results = xgboost_results(xtrain_url_vect, xtest_url_vect, ytrain_url, 'url')

In [None]:
%%time
xgb_web_results = xgboost_results(xtrain_web_vect, xtest_web_vect, ytrain_web, 'web')

In [None]:
%%time
xgb_comb_results = xgboost_results(xtrain_comb_vect, xtest_comb_vect, ytrain_comb, 'combined')

### MNB

In [None]:
def mnb_results(xtrain, xtest, ytrain, dataset):
  print(f"Training MNB on {dataset} dataset ...")
  # training mnb in text dataset
  mnb.fit(xtrain, ytrain)
  print("\n<--- Finished, returning predictions")
  # predictions
  mnb_pred = mnb.predict(xtest)
  mnb_predproba = mnb.predict_proba(xtest)[:, 1]
  return [mnb_pred, mnb_predproba]

In [None]:
%%time
mnb_text_results = mnb_results(xtrain_text_vect, xtest_text_vect, ytrain_text, 'text')

In [None]:
%%time
mnb_url_results = mnb_results(xtrain_url_vect, xtest_url_vect, ytrain_url, 'url')

In [None]:
%%time
mnb_web_results = mnb_results(xtrain_web_vect, xtest_web_vect, ytrain_web, 'web')

In [None]:
%%time
mnb_comb_results = mnb_results(xtrain_comb_vect, xtest_comb_vect, ytrain_comb, 'combined')

### LSTM-CNN

In [None]:
def lstm_cnn_results(xtrain, xtest, ytrain, dataset, epochs=4, batch_size=32):
  print(f"Training LSTM-CNN on {dataset} dataset ...")
  # training lstm_cnn in text dataset
  nn = compile_lstm_cnn_model(xtrain)
  nn.fit(
      xtrain,
      to_categorical(ytrain, num_classes=2),
      epochs=epochs,
      batch_size=batch_size
  )
  print("\n<--- Finished, returning predictions")
  nn_pred = tf.argmax(nn.predict(xtest), axis=1)
  nn_predproba = nn.predict(xtest)[:, 1]
  return [nn_pred, nn_predproba]

In [None]:
%%time
nn_text_results = lstm_cnn_results(xtrain_text, xtest_text, ytrain_text, 'text')

In [None]:
%%time
nn_url_results = lstm_cnn_results(xtrain_url, xtest_url, ytrain_url, 'url', epochs=12)

In [None]:
%%time
nn_web_results = lstm_cnn_results(xtrain_web, xtest_web, ytrain_web, 'web', epochs=7)

In [None]:
%%time
nn_comb_results = lstm_cnn_results(xtrain_comb, xtest_comb, ytrain_comb, 'combined', epochs=7)

### BERT

In [None]:
def get_predict_proba(label, score):
  """
  Get prediction probabilities from the positive class, i.e phishing
  """
  if label == 'benign':
    return 1 - score
  return score

def bert_results(pipe, pipe_name, xtest, dataset):
  print(f"Making inferences on {dataset} dataset by {pipe_name} ...")
  pred, predproba = [], []
  for out in tqdm(pipe(Dataset.from_pandas(xtest.to_frame())['text'])):
    pred.append(label2id[out['label']])
    predproba.append(get_predict_proba(out['label'], out['score']))
  print("\n<--- Finished inference, returning predictions")
  return [pred, predproba]

#### Pre-trained BERT base

In [None]:
%%time
bert_base_text_results = bert_results(bert_base, 'pre-trained BERT base', xtest_text, 'text')

In [None]:
%%time
bert_base_url_results = bert_results(bert_base, 'pre-trained BERT base', xtest_url, 'url')

In [None]:
%%time
bert_base_web_results = bert_results(bert_base, 'pre-trained BERT base', xtest_web, 'web')

In [None]:
%%time
bert_base_comb_results = bert_results(bert_base, 'pre-trained BERT base', xtest_comb, 'combined')

#### BERT for training

In [None]:
import torch

def preprocess_function(example):
    return bert_tokenizer(example['text'], truncation=True)

def trained_bert_results(xtrain, xtest, ytrain, ytest, dataset, epochs=1, learning_rate=2e-5):

  global bert

  x = Dataset.from_pandas(pd.concat([xtrain, ytrain], axis=1).reset_index(drop=True))
  y = Dataset.from_pandas(pd.concat([xtest, ytest], axis=1).reset_index(drop=True))
  x_tokenized = x.map(preprocess_function, batched=True)
  y_tokenized = y.map(preprocess_function, batched=True)

  training_args = TrainingArguments(
      output_dir=f"./bert-finetuned-phishing-{dataset}",
      learning_rate=learning_rate,
      per_device_train_batch_size=4,
      per_device_eval_batch_size=4,
      num_train_epochs=epochs,
      evaluation_strategy='epoch',
      save_strategy='no',
      weight_decay=0.01
  )

  trainer = Trainer(
      model=bert,
      args=training_args,
      train_dataset=x_tokenized,
      eval_dataset=y_tokenized,
      tokenizer=bert_tokenizer,
      data_collator=data_collator
  )

  print(f"Training BERT on {dataset} dataset ...")

  trainer.train()

  # save model
  trainer.save_model(f"./bert-finetuned-phishing-{dataset}")

  # free gpu memory
  del bert, trainer
  torch.cuda.empty_cache()

  print("\n<-- Finished training, performing inference ...")

  bert_trained = pipeline(
      task='text-classification',
      model=f"./bert-finetuned-phishing-{dataset}",
      tokenizer=bert_tokenizer,
      truncation=True,
      device=0
  )

  return bert_results(bert_trained, 'trained BERT', xtest, dataset)

In [None]:
%%time
bert_trained_text_results = trained_bert_results(xtrain_text, xtest_text, ytrain_text, ytest_text, 'text')

In [None]:
%%time
bert_trained_url_results = trained_bert_results(xtrain_url, xtest_url, ytrain_url, ytest_url, 'url')

In [None]:
%%time
bert_trained_web_results = trained_bert_results(xtrain_web, xtest_web, ytrain_web, ytest_web, 'web')

#### BERT finetuned for phishing detection

In [None]:
%%time
bert_tuned_comb_results = bert_results(bert_finetuned, 'BERT finetuned for phishing detection', xtest_comb, 'combined')

## **Model's Classification Performance Comparison**

In [None]:
def plot_roc_auc_curve(models_predproba, ytest, results):

  plt.figure()

  for i, model_pred_proba in enumerate(models_predproba):
      fpr, tpr, _ = roc_curve(ytest,  model_pred_proba)
      plt.plot(fpr, tpr, label=f"{results.index.values[i]} AUC={results['auc'][i]:.2f}", color=colors[i])

  plt.plot([0, 1], [0, 1], "k--")
  plt.xlabel("False Positive Rate")
  plt.ylabel("True Positive Rate")
  plt.title("CURVAS ROC-AUC")
  plt.legend(loc='lower right')
  plt.show()

In [None]:
def plot_pre_rec_curve(models_predproba, ytest, results):

  plt.figure()

  for i, model_pred_proba in enumerate(models_predproba):
      precision, recall, _ = precision_recall_curve(ytest,  model_pred_proba)
      plt.plot(recall, precision, label=f"{results.index.values[i]}", color=colors[i])

  plt.xlabel("Recall")
  plt.ylabel("Precision")
  plt.title("CURVAS PRECISION-RECALL")
  plt.legend(loc='lower left')
  plt.show()

In [None]:
def metrics_evaluation(models_results, ytest):
  metrics_results = []
  for model_result in models_results:
    metrics_results.append({
        'accuracy': accuracy_score(ytest, model_result[0]),
        'precision': precision_score(ytest, model_result[0]),
        'recall': recall_score(ytest, model_result[0]),
        'auc': roc_auc_score(ytest, model_result[1])
    })
  return metrics_results

### Text Classification

In [None]:
# text models predictions
models_text_results = [xgb_text_results, mnb_text_results, nn_text_results, bert_base_text_results, bert_trained_text_results]

In [None]:
text_results = pd.DataFrame(
    metrics_evaluation(models_text_results, ytest_text),
    index = ['XGBoost', 'MNB', 'LSTM-CNN', 'BERT-Base', 'BERT-Finetuned']
)

text_results

In [None]:
plot_roc_auc_curve([row[1] for row in models_text_results], ytest_text, text_results)

In [None]:
plot_pre_rec_curve([row[1] for row in models_text_results], ytest_text, text_results)

### URL Classification

In [None]:
# url models predictions
models_url_results = [xgb_url_results, mnb_url_results, nn_url_results, bert_base_url_results, bert_trained_url_results]

In [None]:
url_results = pd.DataFrame(
    metrics_evaluation(models_url_results, ytest_url),
    index = ['XGBoost', 'MNB', 'LSTM-CNN', 'BERT-Base', 'BERT-Finetuned']
)

url_results

In [None]:
plot_roc_auc_curve([row[1] for row in models_url_results], ytest_url, url_results)

In [None]:
plot_pre_rec_curve([row[1] for row in models_url_results], ytest_url, url_results)

### Website Classification

In [None]:
# web models predictions
models_web_results = [xgb_web_results, mnb_web_results, nn_web_results, bert_base_web_results, bert_trained_web_results]

In [None]:
web_results = pd.DataFrame(
    metrics_evaluation(models_web_results, ytest_web),
    index = ['XGBoost', 'MNB', 'LSTM-CNN', 'BERT-Base', 'BERT-Finetuned']
)

web_results

In [None]:
plot_roc_auc_curve([row[1] for row in models_web_results], ytest_web, web_results)

In [None]:
plot_pre_rec_curve([row[1] for row in models_web_results], ytest_web, web_results)

### Combined Phishing Dataset Classification

In [None]:
# combined dataset models predictions
models_comb_results = [xgb_comb_results, mnb_comb_results, nn_comb_results, bert_base_comb_results, bert_tuned_comb_results]

In [None]:
comb_results = pd.DataFrame(
    metrics_evaluation(models_comb_results, ytest_comb),
    index = ['XGBoost', 'MNB', 'LSTM-CNN', 'BERT-Base', 'BERT-Finetuned']
)

comb_results

In [None]:
plot_roc_auc_curve([row[1] for row in models_comb_results], ytest_comb, comb_results)

In [None]:
plot_pre_rec_curve([row[1] for row in models_comb_results], ytest_comb, comb_results)

## **BERT Finetuned Performance**

In [None]:
def bert_finetuned_inference(xtest):
  pred, predproba = [], []
  for out in tqdm(bert_finetuned(Dataset.from_pandas(xtest.to_frame())['text'])):
    pred.append(label2id[out['label']])
    predproba.append(get_predict_proba(out['label'], out['score']))
  return [pred, predproba]

def bert_finetuned_metrics_eval(inferences_results, ytests):
  metrics_results = []
  for i, result in enumerate(inferences_results):
    metrics_results.append({
        'accuracy': accuracy_score(ytests[i], result[0]),
        'precision': precision_score(ytests[i], result[0]),
        'recall': recall_score(ytests[i], result[0]),
        'auc': roc_auc_score(ytests[i], result[1])
    })
  return metrics_results

In [None]:
bert_finetuned_inferences = []
for xtest in [xtest_text, xtest_url, xtest_web, xtest_comb]:
  bert_finetuned_inferences.append(bert_finetuned_inference(xtest))

In [None]:
bert_finetuned_results = pd.DataFrame(
    bert_finetuned_metrics_eval(bert_finetuned_inferences, [ytest_text, ytest_url, ytest_web, ytest_comb]),
    index = ['Texto', 'URL', 'Sitios web', 'Combinado']
)

bert_finetuned_results

In [None]:
def plot_roc_auc_curve(bert_inferences_predproba, ytests, results):

  plt.figure()

  for i, inference_pred_proba in enumerate(bert_inferences_predproba):
      fpr, tpr, _ = roc_curve(ytests[i],  inference_pred_proba)
      plt.plot(fpr, tpr, label=f"{results.index.values[i]} AUC={results['auc'][i]:.2f}", color=colors[i])

  plt.plot([0, 1], [0, 1], "k--")
  plt.xlabel("False Positive Rate")
  plt.ylabel("True Positive Rate")
  plt.title("CURVAS ROC-AUC")
  plt.legend(loc='lower right')
  plt.show()

plot_roc_auc_curve(
    bert_inferences_predproba=[row[1] for row in bert_finetuned_inferences],
    ytests=[ytest_text, ytest_url, ytest_web, ytest_comb],
    results=bert_finetuned_results
)

In [None]:
def plot_pre_rec_curve(bert_inferences_predproba, ytests, results):

  plt.figure()

  for i, inference_predproba in enumerate(bert_inferences_predproba):
      precision, recall, _ = precision_recall_curve(ytests[i],  inference_predproba)
      plt.plot(recall, precision, label=f"{results.index.values[i]}", color=colors[i])

  plt.xlabel("Recall")
  plt.ylabel("Precision")
  plt.title("CURVAS PRECISION-RECALL")
  plt.legend(loc='lower left')
  plt.show()

plot_pre_rec_curve(
    bert_inferences_predproba=[row[1] for row in bert_finetuned_inferences],
    ytests=[ytest_text, ytest_url, ytest_web, ytest_comb],
    results=bert_finetuned_results
)