This notebook uses a Gradio front-end to demonstrate our previously-trained machine learning models classifying user-inputted text as Legitimate or Phishing.

**Run all the cells and a shareable Gradio link will be generated below.**

In [None]:
pip install gradio transformers



**Declare previous functions used for Classical Machine Learning Methods (Feature Extraction)**

In [None]:
import sys
import os
import re

from bs4 import BeautifulSoup # used to parse HTML content

# declare functions used in preprocessing data and feature extraction

# get subject string from an email.message object
def get_subject(message):
    if message['Subject'] == None:
        return ' '
    else:
        return message['Subject']

# get body string from an email.message object
# if email has multiple parts, concatenate text from all parts
def get_body(message):
    if message.is_multipart():
        contents = []
        for part in message.walk():
            if part.is_multipart() or part.get_content_disposition()=='attachment':
                continue
            contents.append(str(get_body(part)))
        content = '\n\n'.join(contents)
    else:
        content = message.get_payload()
    return content

# use regex to find all urls in an email.message object's body
def get_urls(message):
    return re.findall('http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', get_body(message))

# use regex to find all email addresses in a string,
# return none if no email addresses, the email message string if 1 email, list of emails otherwise
def get_email_from_string(string):
    email_address = re.findall(r'[a-zA-Z0-9+._-]+@[a-zA-Z0-9._-]+\.[a-zA-Z0-9_-]+', string)
    if len(email_address) == 0:
        email_address = None
    elif len(email_address) == 1:
        email_address = email_address[0]
    return email_address

# count characters in a string by using regex to list all unicode word characters
def count_chars(string):
    return len(re.findall(r'\w', string))

# use regex to list all grouped sequences of unicode word characters
def get_words(string):
    return re.findall(r'\w+', string)

# count words in a string by getting the length of an array of the words
def count_words(string):
    return len(get_words(string))

# count distinct words in a string by getting the length of a set of the words
def count_distinct_words(string):
    return len(set(get_words(string)))

# use regex to get the count of each functional word found in a string
def count_functional_words(string):
    functional_word_counts = {}

    for word in functional_words:
        word_count = len(re.findall(word, string))
        functional_word_counts[word] = word_count
    return functional_word_counts

# extract features from every .txt file in a directory
def process_txt_files(path, phishing):
    data = []

    for file in sorted([file for file in os.listdir(path) if file.endswith('txt')], key = lambda x: int(x.split(".")[0])):
        message = email.message_from_file(open(os.path.join(path, file)), policy=email.policy.SMTPUTF8)
        features = process_message(message, phishing)
        data.append(features)

    return data

# extract features from every message found in an .mbox file
def process_mbox(path, phishing):
    data = []

    mbox = mailbox.mbox(path, factory=BytesParser(policy=email.policy.SMTPUTF8).parse)

    for message in mbox:
        features = process_message(message, phishing)
        data.append(features)

    return data

# count the total number of functional words found in a string
def total_functional_words(string):
    return sum(count_functional_words(string).values())

# make a list of words commonly found in phishing emails
functional_words = ["access", "account","agree", "alert", "bank", "credit",
                "click", "confirm", "identity", "inconvenience", "information",
                "limited", "log", "password", "recently", "security"]


# declare functions used to extract header and subject features

# concatenate the values of all header field: data pairs into a single string
def header_to_string(message):
    header_string = ""
    for key in message.keys():
        try:
            header_string = header_string + str(key) + ": " + message[key] + "\n"
        except:
            continue
    return header_string

# calculate the size of the header string in bytes
def get_header_size(message):
    return sys.getsizeof(header_to_string(message))

# extract the email address that sent an email
def get_sender_email(message):
    if message['Sender']:
        sender_email = get_email_from_string(message['Sender'])
    else:
        sender_email = get_email_from_string(message['From'])
    return sender_email

# count the number of email addresses the email was sent to
def count_to(message):
    try:
        to_count = len(message.get_all('To'))
    except:
        to_count = 0
    return to_count

# count the number of checkpoints the email passed through to reach recipient
def count_received(message):
    try:
        received_count = len(message.get_all('Received'))
    except:
        received_count = 0
    return received_count

# count the number of email addresses carbon copied
def count_cc(message):
    if message.get_all('Cc') == None:
        return 0
    else:
        return len(message.get_all('Cc'))

# count the number of email addresses blind carbon copied
def count_bcc(message):
    if message.get_all('Bcc') == None:
        return 0
    else:
        return len(message.get_all('Bcc'))

# compare the email address domain in the Message-ID and Sender fields
def same_messageID_senderID(message):
    try:
        messageID_domain = get_email_from_string(message['Message-ID']).split('@')[1]
        senderID_domain = get_sender_email(message).split('@')[1]
        return messageID_domain == senderID_domain
    except:
        return True

# compare the email address Return-To Sender fields
def same_return_sender(message):
    try:
        return_email = get_email_from_string(message['Return-Path'])
        sender_email = get_sender_email(message)
        return return_email == sender_email
    except:
        return True

# count words found in the Subject field
def count_words_subject(message):
     try:
        return count_words(message['Subject'].encode('ascii', 'ignore').decode())
     except:
        return 0

# count distinct words found in the Subject field
def count_distinct_words_subject(message):
    try:
        return count_distinct_words(message['Subject'].encode('ascii', 'ignore').decode())
    except:
        return 0

# count characters found in the Subject field
def count_chars_subject(message):
    try:
        return count_chars(message['Subject'].encode('ascii', 'ignore').decode())
    except:
        return 0

# calculate the richness of the Subject field based on the counted characters and words
def get_subject_richness(message):
    try:
        return count_words_subject(message) / count_chars_subject(message)
    except:
        return 0

# count the number of functional words found in the Subject field
def count_functional_words_subject(message):
    try:
        return total_functional_words(message['Subject'])
    except:
        return 0

# check whether email is a reply
def get_is_reply(message):
    try:
        return re.match(r"^re:", message['Subject'].lower()) is not None
    except:
        return False

# check if email was forwarded
def get_is_forward(message):
    try:
        return re.match(r"^fwd:", message['Subject'].lower()) is not None
    except:
        return False

# declare functions used to extract body features

# list the different content types of the email's subparts
# e.g. 'text/html', 'text/html', 'image/gif'
def get_content_type_list(message):
    content_types = []
    for part in message.walk():
        if part.is_multipart():
            continue
        content_types.append(part.get_content_type())
    return content_types

# list the different content dispositions of the email\s subparts
# e.g. None, 'inline', 'attachment'
def get_content_disposition_list(message):
    content_dispositions = []
    for part in message.walk():
        if part.is_multipart():
            continue
        content_dispositions.append(part.get_content_disposition())
    return content_dispositions

# count number of attachments found in content disposition list
def count_attachments(message):
    attachment_count = 0
    for disposition in get_content_disposition_list(message):
        if disposition == 'attachment':
            attachment_count+=1
    return attachment_count

# check whether body text contains any html
def body_has_html(message):
    return bool(BeautifulSoup(get_body(message), "html.parser").find())

# check whether body text contains html forms
def body_has_forms(message):
    return bool(BeautifulSoup(get_body(message), "html.parser").find("form"))

# count words found in the email body
def count_words_body(message):
    return count_words(get_body(message))

# count distinct words found in the email body
def count_distinct_words_body(message):
    return count_distinct_words(get_body(message))

# count characters found in the email body
def count_chars_body(message):
    return count_chars(get_body(message))

# calculate the richness of the email body based on the counted characters and words
def get_body_richness(message):
    try:
        return count_words_body(message) / count_chars_body(message)
    except:
        return 0

# count the number of functional words found in the email body
def count_functional_words_body(message):
    try:
        return total_functional_words(get_body(message))
    except:
        return 0

In [None]:
# declare functions used to process email dataset files
import email

# extract header and body features from an email message object and store in a dict
def process_message(string):
    message = email.message_from_string(string)

    email_features = {}

    email_features['header-size'] = get_header_size(message)
    email_features['count-to'] = count_to(message)
    email_features['count-received'] = count_received(message)
    email_features['count-cc'] = count_cc(message)
    email_features['count-bcc'] = count_bcc(message)
    email_features['same-id-sender'] = same_messageID_senderID(message)
    email_features['same-return-sender'] = same_return_sender(message)
    email_features['subject-word-count'] = count_words_subject(message)
    email_features['subject-distinct-word-count'] = count_distinct_words_subject(message)
    email_features['subject-richness'] = get_subject_richness(message)
    email_features['subject-function-word-count'] = count_functional_words_subject(message)
    email_features['is-reply'] = get_is_reply(message)
    email_features['is-forward'] = get_is_forward(message)

    email_features['count-content-types'] = len(get_content_type_list(message))
    email_features['count-attachments'] = count_attachments(message)
    email_features['body-word-count'] = count_words_body(message)
    email_features['body-distint-word-count'] = count_distinct_words_body(message)
    email_features['body-richness'] = get_body_richness(message)
    email_features['body-function-word-count'] = count_functional_words_body(message)
    email_features['has-html'] = body_has_html(message)
    email_features['has-form'] = body_has_forms(message)

    return list(email_features.values())


**Load the previously trained Machine Learning models for use with the Gradio back-end**

In [None]:
import urllib
import numpy as np
import pickle
from transformers import pipeline



# classical machine learning models

model_nb = pickle.load(urllib.request.urlopen("https://anti-phish.s3.eu-west-1.amazonaws.com/models/Naive+Bayes+2023-07-15+22_00_35.889610"))
model_svc = pickle.load(urllib.request.urlopen("https://anti-phish.s3.eu-west-1.amazonaws.com/models/Support+Vector+Classification+2023-07-15+22_04_53.337322"))
model_knn = pickle.load(urllib.request.urlopen("https://anti-phish.s3.eu-west-1.amazonaws.com/models/k+Nearest+Neighbors+2023-07-15+22_05_52.149846"))
model_dt = pickle.load(urllib.request.urlopen("https://anti-phish.s3.eu-west-1.amazonaws.com/models/Decision+Tree+2023-07-15+22_06_31.570606"))
model_rf = pickle.load(urllib.request.urlopen("https://anti-phish.s3.eu-west-1.amazonaws.com/models/Random+Forest+2023-07-15+22_08_13.900783"))

# distilBERT transformer model
model_distilbert = pipeline("sentiment-analysis", model="foghlaimeoir/phishing-DistilBERT")

# define a function to be used by the Submit button of the Gradio site
def sentiment_analysis(text):

    extracted_features = np.reshape(process_message(text), (1,-1))

    nb_result = model_nb.predict(extracted_features)[0]
    svc_result = model_svc.predict(extracted_features)[0]
    knn_result = model_knn.predict(extracted_features)[0]
    dt_result = model_dt.predict(extracted_features)[0]
    rf_result = model_rf.predict(extracted_features)[0]

    transformer_results = model_distilbert(text)[0]["label"]

    return nb_result, svc_result, knn_result, dt_result, rf_result, transformer_results,

No CUDA runtime is found, using CUDA_HOME='/usr/local/cuda'
Xformers is not installed correctly. If you want to use memory_efficient_attention to accelerate training use the following command to install Xformers
pip install xformers.


**Setup the Gradio Interface**

In [None]:
import gradio as gr

# specify the interface of the Gradio front-end
gradio_ui = gr.Interface(
    fn=sentiment_analysis,
    title="Phishing Email Detector",
    description="Enter sample text of an email and Submit to see if the classical models, or the fine-tuned DistilBERT model can detect a phishing attempt. A value of 1 = phishing.",
    inputs=gr.inputs.Textbox(label="Choose an example below or type email data here."),
    outputs=[
        gr.outputs.Textbox(label="Naive Bayes"),
        gr.outputs.Textbox(label="Support Vector Classification"),
        gr.outputs.Textbox(label="k-Nearest Neighbours"),
        gr.outputs.Textbox(label="Decision Tree"),
        gr.outputs.Textbox(label="Random Forest"),
        gr.outputs.Textbox(label="DistilBERT"),
    ],
    allow_flagging="never",
    examples=["top 20 netflix films", "buy 20 amazon games", "please reset password", "can you reply soon?", "This email is about scheduling an appointment next week. Can you reply soon?",
              "This is a phishing email. Please send your bank account details.",
              """
              *Verify this email address*
              You recently added a new email address to your subdomain.organization.edu! account, or initiated verification of an existing
              email address. To verify that you own this email address, simply click on the link below.
              *Verify "YOU @ subdomain.organization.edu" *
              Your email address was added to the subdomain.organization.edu! ID: â€ŽYOUâ€Ž. If this subdomain.organization.edu! ID does not belong
              to you, or you did not recently add your email address to this graphics.organization.edu! ID, you may permanently stop
              verification process. Cancel verification process.
              Verifying your email address ensures that you can securely retrieve your account information if your password is lost or
              stolen. You must verify your email address before you can use it on subdomain.organization.edu! services that require an email address.
              For your security, please keep your email address information up-to-date.
              *Not your account?*
              If you did not create this account, click here
              ------------------------------------------------------------------------
              Copyright Â© 2012 subdomain.organization.edu! Inc. All rights reserved.Copyright/IP Policy
              | Terms of Service
              """,
              """
              From: Jordan Kaplan <user@domain>
              Content-Type: multipart/alternative;
                boundary="Apple-Mail=_77DEC915-381E-4657-924B-EF562B2C7A00"
              X-Smtp-Server: C6C38CC7-AF1C-4C17-AB9E-879844FB845B
              Subject:
              Message-ID: <user@domain>
              X-Universally-Unique-Identifier: DC8E83D4-A41A-410A-B667-78F5FFBFC9FD
              Date: Fri, 20 May 2015 16:06:43 -0400
              To: "User" <user@domain>
              MIME-Version: 1.0

              call me

              Jordan Kaplan
              National Finance Director
              Organization
              (202) 488-5002 (o) | (312) 339-0224 (c)
              user@domain
              """],
)

  inputs=gr.inputs.Textbox(label="Choose an example below or type email data here."),
  inputs=gr.inputs.Textbox(label="Choose an example below or type email data here."),
  inputs=gr.inputs.Textbox(label="Choose an example below or type email data here."),
  gr.outputs.Textbox(label="Naive Bayes"),
  gr.outputs.Textbox(label="Support Vector Classification"),
  gr.outputs.Textbox(label="k-Nearest Neighbours"),
  gr.outputs.Textbox(label="Decision Tree"),
  gr.outputs.Textbox(label="Random Forest"),
  gr.outputs.Textbox(label="DistilBERT"),


In [None]:
# set gradio_ui.launch(share=True) if you need to share it outside of your local machine.
# The link works for 24 hours and as long as your notebook is running

gradio_ui.launch(share=True, debug=True)

Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
Running on public URL: https://1aa05ed99284ceb59e.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from Terminal to deploy to Spaces (https://huggingface.co/spaces)
