In [None]:
!pip install torch
!pip install datasets
!pip install tensorflow
!pip install numpy
!pip install keras
!pip install sklearn

In [None]:
import pandas as pd
import numpy as np
from tqdm.auto import tqdm
import tensorflow as tf
import matplotlib.pyplot as plt
import torch
from datasets import Dataset, load_dataset
from sklearn.naive_bayes import MultinomialNB
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from transformers import (BertTokenizer, BertModel,AutoTokenizer, pipeline, AutoModelForSequenceClassification, TrainingArguments,
                          Trainer, DataCollatorWithPadding)
from sklearn.metrics import (accuracy_score, recall_score, precision_score, log_loss, roc_auc_score,
                             roc_curve, precision_recall_curve)
from keras.optimizers import Adam
from keras.models import Sequential
from keras.utils import to_categorical
from keras.metrics import Precision, Recall, AUC
from keras.layers import LSTM, Dense, Embedding, Conv1D, GlobalMaxPooling1D, SpatialDropout1D, TextVectorization

In [None]:
distilbert_tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")

id2label = {0: "benign", 1: "phishing"}
label2id = {"benign": 0, "phishing": 1}

data_collator = DataCollatorWithPadding(tokenizer=bert_tokenizer)

distilbert = AutoModelForSequenceClassification.from_pretrained(
    "distilbert-base-uncased",
    num_labels=2,
    id2label=id2label,
    label2id=label2id
)

In [None]:
def import_data(type):
  return load_dataset("ealvaradob/phishing-dataset", type, trust_remote_code=True)['train'].to_pandas()

In [None]:
def split_dataset(df):
  return train_test_split(
    df['text'],
    df['label'],
    stratify=df['label'],
    test_size=0.2,
    random_state=42
  )

In [None]:
texts = import_data("texts")
urls = import_data("urls")
webs = import_data("webs")
combined = import_data("combined_reduced")
urls, _ = train_test_split(urls, test_size=0.95, stratify=urls['label'], random_state=42)


In [None]:
xtrain_text, xtest_text, ytrain_text, ytest_text = split_dataset(texts)
xtrain_url, xtest_url, ytrain_url, ytest_url = split_dataset(urls)
xtrain_web, xtest_web, ytrain_web, ytest_web = split_dataset(webs)
xtrain_comb, xtest_comb, ytrain_comb, ytest_comb = split_dataset(combined)

In [None]:
MAX_WORDS_NUM = 50000
N_GRAMS = (1, 2)

def vectorize_dataset(train, test):
  # Create TF-IDF Vectorizer
  tfidf_vect = TfidfVectorizer(ngram_range=N_GRAMS, max_df=0.25, stop_words='english', max_features=MAX_WORDS_NUM)
  # Fit TF-IDF Vectorizer on train
  train_vect = tfidf_vect.fit_transform(train)
  # Transform
  test_vect = tfidf_vect.transform(test)
  return train_vect, test_vect

In [None]:
xtrain_text_vec, xtest_text_vec = vectorize_dataset(xtrain_text, xtest_text)
xtrain_url_vec, xtest_url_vec = vectorize_dataset(xtrain_url, xtest_url)
xtrain_web_vec, xtest_web_vec = vectorize_dataset(xtrain_web, xtest_web)

In [None]:

device = 0 if torch.cuda.is_available() else -1

distilbert_base = pipeline(
    task='text-classification',
    model=distilbert,
    tokenizer=distilbert_tokenizer,
    truncation=True,
    device=device
)

In [None]:
def get_predict_proba(label, score):
  """
  Get prediction probabilities from the positive class, i.e phishing
  """
  if label == 'benign':
    return 1 - score
  return score

def distilbert_results(pipe, pipe_name, xtest, dataset):
  print(f"Making inferences on {dataset} dataset by {pipe_name} ...")
  pred, predproba = [], []
  for out in tqdm(pipe(Dataset.from_pandas(xtest.to_frame())['text'])):
    pred.append(label2id[out['label']])
    predproba.append(get_predict_proba(out['label'], out['score']))
  return [pred, predproba]

In [None]:
def preprocess_function(example):
    return distilbert_tokenizer(example['text'], truncation=True)

def trained_distilbert_results(xtrain, xtest, ytrain, ytest, dataset, epochs=1, learning_rate=2e-5):

  global distilbert

  x = Dataset.from_pandas(pd.concat([xtrain, ytrain], axis=1).reset_index(drop=True))
  y = Dataset.from_pandas(pd.concat([xtest, ytest], axis=1).reset_index(drop=True))
  x_tokenized = x.map(preprocess_function, batched=True)
  y_tokenized = y.map(preprocess_function, batched=True)

  training_args = TrainingArguments(
      output_dir=f"./distilbert-finetuned-phishing-{dataset}",
      learning_rate=learning_rate,
      per_device_train_batch_size=20,
      per_device_eval_batch_size=20,
      num_train_epochs=epochs,
      evaluation_strategy='epoch',
      save_strategy='no',
      weight_decay=0.01
  )

  trainer = Trainer(
      model=bert,
      args=training_args,
      train_dataset=x_tokenized,
      eval_dataset=y_tokenized,
      tokenizer=distilbert_tokenizer,
      data_collator=data_collator
  )

  print(f"Training distilBERT on {dataset} dataset ...")

  trainer.train()

  # save model
  trainer.save_model(f"./distilbert-finetuned-phishing-{dataset}")

  # free gpu memory
  del bert, trainer
  torch.cuda.empty_cache()

  print("\n<-- Finished training, performing inference ...")

  bert_trained = pipeline(
      task='text-classification',
      model=f"./distilbert-finetuned-phishing-{dataset}",
      tokenizer=distilbert_tokenizer,
      truncation=True,
      device=0
  )

  return bert_results(bert_trained, 'trained distilBERT', xtest, dataset)

In [None]:
distilbert_base_text_results = bert_results(distilbert_base, 'distilBERT base', xtest_text, 'text')

In [None]:
torch.cuda.empty_cache()

In [None]:
distilbert_trained_text_results = trained_bert_results(xtrain_text, xtest_text, ytrain_text, ytest_text, 'text')

In [None]:
def plot_roc_auc_curve(models_predproba, ytest, results):

  plt.figure()

  for i, model_pred_proba in enumerate(models_predproba):
      fpr, tpr, _ = roc_curve(ytest,  model_pred_proba)
      plt.plot(fpr, tpr, label=f"{results.index.values[i]} AUC={results['auc'][i]:.2f}")

  plt.plot([0, 1], [0, 1], "k--")
  plt.xlabel("False Positive Rate")
  plt.ylabel("True Positive Rate")
  plt.title("ROC-AUC")
  plt.legend(loc='lower right')
  plt.show()

In [None]:
def plot_pre_rec_curve(models_predproba, ytest, results):

  plt.figure()

  for i, model_pred_proba in enumerate(models_predproba):
      precision, recall, _ = precision_recall_curve(ytest,  model_pred_proba)
      plt.plot(recall, precision, label=f"{results.index.values[i]}")

  plt.xlabel("Recall")
  plt.ylabel("Precision")
  plt.title("PRECISION-RECALL")
  plt.legend(loc='lower left')
  plt.show()

In [None]:
def metrics_evaluation(models_results, ytest):
  metrics_results = []
  for model_result in models_results:
    metrics_results.append({
        'accuracy': accuracy_score(ytest, model_result[0]),
        'precision': precision_score(ytest, model_result[0]),
        'recall': recall_score(ytest, model_result[0]),
        'auc': roc_auc_score(ytest, model_result[1])
    })
  return metrics_results

In [None]:
base_model_results=[distilbert_base_text_results]

In [None]:
trained_model_results = [distilbert_trained_text_results]

In [None]:
base_text_results = pd.DataFrame(
    metrics_evaluation(trained_model_results, ytest_text),
    index = ['distilBERT-Trained']
)

base_text_results

In [None]:
trained_text_results = pd.DataFrame(
    metrics_evaluation(trained_model_results, ytest_text),
    index = ['distilBERT-Trained']
)

trained_text_results

In [None]:
def plot_pre_rec_curve(models_predproba, ytest, results):

  plt.figure()

  for i, model_pred_proba in enumerate(models_predproba):
      precision, recall, _ = precision_recall_curve(ytest,  model_pred_proba)
      plt.plot(recall, precision, label=f"{results.index.values[i]}", color=colors[i])

  plt.xlabel("Recall")
  plt.ylabel("Precision")
  plt.title("PRECISION-RECALL")
  plt.legend(loc='lower left')
  plt.show()

In [None]:
colors = ['orange']

In [None]:
plot_roc_auc_curve([row[1] for row in base_text_results], ytest_text, text_results)

In [None]:
plot_roc_auc_curve([row[1] for row in trained_text_results], ytest_text, text_results)

In [None]:
plot_pre_rec_curve([row[1] for row in base_text_results], ytest_text, text_results)

In [None]:
plot_pre_rec_curve([row[1] for row in trained_text_results], ytest_text, text_results)