In [1]:
import mailbox
import bs4
import os
import re
import numpy as np
import pandas as pd
from email.message import Message
from itertools import chain
from operator import itemgetter
import ipaddress
from urllib.parse import urlparse
import codecs
import warnings
import base64
from html.parser import HTMLParser
from urlextract import URLExtract
from typing import Generator, Union, Tuple, List, Dict, Any

In [2]:
warnings.filterwarnings("ignore", category=UserWarning, module='bs4')

In [3]:
RESOURCES_PATH = r'..\resources'

# Fetch data from the mbox files

In [4]:
class HTMLFinder(HTMLParser):
    """
    Class to parse the HTML and extract attributes specific to the email content.

    Attributes
    ----------
    start_tags : List[str]
        List containing opening HTML tags.
    end_tags : List[str]
        List containing closing HTML tags.
    attributes : List[str]
        List containing HTML attributes.
    contains_html : bool
        Flag indicating valid HTML content.
    contains_html : bool
        Flag indicating valid JavaScript content.
    contains_css : bool
        Flag indicating valid CSS content.
    images: List[str]
        List of images in the HTML.

    Methods
    -------
    handle_starttag(tag, attrs)
        Append new opening tags and attributes.

    handle_endtag(self, tag)
        Append new closing tags.
    
    contains_content_type(content_type)
        Return boolean flag indicating MIME content-type presence. 
    """ 
    def __init__(self) -> None:
        """
        Constructs all the necessary attributes for the custom HTMLParser object.
        """
        super(HTMLParser, self).__init__()
        self.start_tags = []
        self.end_tags = []
        self.attributes = []

    @property
    def contains_html(self) -> bool:
        """
        Flag indicating valid HTML content.

        Returns
        -------
        contains_html : bool
            True if the HTML code is valid else False.
        """
        # https://helpdesk.bitrix24.com/open/14099114/
        # https://zapier.com/help/doc/what-html-tags-are-supported-in-gmail
        allowed_tags = [
            "a",
            "b",
            "br",
            "big",
            "blockquote",
            "caption",
            "code",
            "del",
            "div",
            "dt",
            "dd",
            "font",
            "h1",
            "h2",
            "h3",
            "h4",
            "h5",
            "h6",
            "hr",
            "i",
            "img",
            "ins",
            "li",
            "map",
            "ol",
            "p",
            "pre",
            "s",
            "small",
            "strong",
            "span",
            "sub",
            "sup",
            "table",
            "tbody",
            "td",
            "tfoot",
            "th",
            "thead",
            "tr",
            "u",
            "ul",
            "php",
            "html",
            "head",
            "body",
            "meta",
            "title",
            "style",
            "link",
            "abbr",
            "acronym",
            "address",
            "area",
            "bdo",
            "button",
            "center",
            "cite",
            "col",
            "colgroup",
            "dfn",
            "dir",
            "dl",
            "em",
            "fieldset",
            "form",
            "input",
            "kbd",
            "label",
            "legend",
            "menu",
            "optgroup",
            "option",
            "q",
            "samp",
            "select",
            "strike",
            "textarea",
            "tt",
            "var",
        ]
        return any(tag in self.start_tags for tag in allowed_tags)

    @property
    def contains_js(self) -> bool:
        """
        Flag indicating presence of JavaScript code inside the HTML.

        Returns
        -------
        contains_js : bool
            True if the JavaScript code is present else False.
        """
        return self.contains_content_type("text/javascript")

    @property
    def contains_css(self) -> bool:
        """
        Flag indicating presence of CSS code inside the HTML.

        Returns
        -------
        contains_css : bool
            True if the CSS code is present else False.
        """
        return self.contains_content_type("text/css")
    
    @property
    def images(self) -> List[str]:
        """
        List of images in the HTML.

        Returns
        -------
        images : List[str]
            List of image filenames embedded in the HTML code.
        """
        # https://developer.mozilla.org/en-US/docs/Web/Media/Formats/Image_types
        img_extensions = [
            ".apng",
            ".avif",
            ".gif",
            ".jpg",
            ".jpeg",
            ".jfif",
            ".pjpeg",
            ".pjp",
            ".png",
            ".svg",
            ".webp",
            ".bmp",
            ".ico",
            ".cur",
            ".tif",
            ".tiff",
        ]
        images = []
        for i, tag in enumerate(self.start_tags):
            if tag in ["img", "source"]:
                for attrib, val in self.attributes[i]:
                    if attrib in ["src", "srcset"]:
                        if any(val.lower().endswith(ext) for ext in img_extensions):
                            images.append(val)
        return images

    def handle_starttag(self, tag: str, attrs: List[str]) -> None:
        """
        Append new opening tag and attributes.

        Parameters
        ----------
        tag : str
            Opening HTML tag.
        attrs : List[str]
            HTML attributes related to the tag.
        """
        self.start_tags.append(tag)
        self.attributes.append(attrs)

    def handle_endtag(self, tag: str) -> None:
        """
        Append new closing tag.

        Parameters
        ----------
        tag : str
            Closing HTML tag.
        """
        self.end_tags.append(tag)
    
    def contains_content_type(self, content_type: str) -> bool:
        """
        Return boolean flag indicating MIME content-type presence. 

        Parameters
        ----------
        content_type : str
            MIME content-type.

        Returns
        -------
        contains_content_type : bool
            True if there is any occurence of the specified content type among parsed attributes, else False.
        """
        for attrib, val in chain(*self.attributes):
            if attrib == "type" and val == content_type:
                return True
        return False

In [5]:
class PhishyMatcher:
    """
    Class providing various methods to help extract attributes that indicate the phishing nature of the textual data, such as the body of an email message.

    Attributes
    ----------
    URL_regex : re.Pattern
        Regex used for URL address extraction.
    IP_regex : re.Pattern
        Regex pattern used for IP address extraction.
    
    Methods
    -------
    find_IPs(text) 
        Find IP addresses within the provided text.
    extract_URLs_from_text(text)
        Find URL links within the provided text.
    extract_URLs_from_HTML(html)
        Extract URLs from the HTML code.
    get_html_text(html)
        Get text from the HTML code.
    clean_html(html)
        Remove any '3D' attribute prefix from the HTML code.
    clean_text(text)
        Remove multiple overlapping whitespace characters from text.
    clean_message_tags(text)
        Remove any whitespaces within the message attributes (denoted as <>).
    is_valid_IP(ip)
        Check if given IP is a valid IPv4 or IPv6 address.
    
    Usage
    -----
    Inherit from the PhishyMatcher class.
    """
    @property
    def URL_regex(self) -> re.Pattern:
        # https://daringfireball.net/2010/07/improved_regex_for_matching_urls
        # -> https://gerrit.wikimedia.org/r/c/mediawiki/extensions/Collection/OfflineContentGenerator/latex_renderer/+/170329/1/lib/index.js
        url_regex = r"""\b((?:[a-z][\w\-]+:(?:\/{1,3}|[a-z0-9%])|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}\/)(?:[^\s()<>]|\((?:[^\s()<>]|(?:\([^\s()<>]+\)))*\))+(?:\((?:[^\s()<>]|(?:\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:'".,<>?«»“”‘’]))"""
        return re.compile(url_regex, re.IGNORECASE)

    @property
    def IP_regex(self) -> re.Pattern:
        IPv4 = r"(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)"
        IPv6 = r"(?:(?:[0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|(?:[0-9a-fA-F]{1,4}:){1,7}:|(?:[0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|(?:[0-9a-fA-F]{1,4}:){1,5}(?::[0-9a-fA-F]{1,4}){1,2}|(?:[0-9a-fA-F]{1,4}:){1,4}(?::[0-9a-fA-F]{1,4}){1,3}|(?:[0-9a-fA-F]{1,4}:){1,3}(?::[0-9a-fA-F]{1,4}){1,4}|(?:[0-9a-fA-F]{1,4}:){1,2}(?::[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:(?:(?::[0-9a-fA-F]{1,4}){1,6})|:(?:(?::[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(?::[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(?:ffff(?::0{1,4}){0,1}:){0,1}(?:(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])|(?:[0-9a-fA-F]{1,4}:){1,4}:(?:(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9]))"
        ip_regex = fr"{IPv4}|{IPv6}"
        return re.compile(ip_regex, re.IGNORECASE)
    
    def find_IPs(self, text: str) -> List[str]:
        """
        Find IP addresses within the provided text.

        Parameters
        ----------
        text : str
            Input text.
        
        Returns
        -------
        IPs : List[str]
            List of IP addresses.
        """
        return [ip for ip in re.findall(self.IP_regex, self.clean_text(text)) if self.is_valid_IP(ip)]
    
    def extract_URLs_from_text(self, text: str) -> List[str]:
        """
        Find URL addresses within the provided text.

        Parameters
        ----------
        text : str
            Input text.
        
        Returns
        -------
        URLs : List[str]
            List of URL addresses.
        """
        text = self.clean_text(text)
        text = self.clean_message_tags(text)
        try:
            return URLExtract().find_urls(text)
        except UnicodeDecodeError:
            return [url for url in re.findall(self.URL_regex, text) if url]

    def extract_URLs_from_HTML(self, html: str) -> List[str]:
        """
        Extract URLs from the HTML code.

        Parameters
        ----------
        html : str
            Input HTML code.
        
        Returns
        -------
        URLs : List[str]
            List of URL addresses.
        """
        # https://stackoverflow.com/questions/6976053/xss-which-html-tags-and-attributes-can-trigger-javascript-events
        url_attribs = [
            "action",
            "archive",
            "background",
            "cite",
            "classid",
            "codebase",
            "data",
            "dsync",
            "dynsrc",
            "formaction",
            "href",
            "icon",
            "longdesc",
            "lowsrc",
            "manifest",
            "poster",
            "profile",
            "src",
            "usemap",
        ]

        urls = []
        bs = bs4.BeautifulSoup(self.clean_text(html), "html.parser")
        for x in [
            [
                (attr, tag[attr])
                for tag in bs.select(f"[{attr}]")
                if not tag[attr].startswith("#")
            ]
            for attr in url_attribs
        ]:
            if x:
                for (attr, url) in x:
                    url = self.clean_html(url)
                    if attr == 'href' and re.match(self.URL_regex, url):
                        urls.append(url)

        return urls

    @staticmethod
    def get_html_text(html: str) -> str:
        """
        Get text from the HTML code.

        Parameters
        ----------
        html : str
            String containing HTML snippet.
        
        Returns
        -------
        text : str
            Text parsed from the HTML body.
        """
        try:
            return bs4.BeautifulSoup(html, 'lxml').body.get_text(' ', strip=True)
        except AttributeError:  # message content is empty
            return ''
    
    @staticmethod
    def clean_html(html: str) -> str:
        """
        Remove any '3D' attribute prefix from the HTML code.

        Parameters
        ----------
        html : str
            Input HTML code.
        
        Returns
        -------
        clean_html : str
            Clean HTML code.
        """
        return re.match(r"^(?:3D)?(.*)$", html).group(1).strip('"')

    @staticmethod
    def clean_text(text: str) -> str:
        """
        Remove multiple overlapping whitespace characters from text.

        Parameters
        ----------
        text : str
            Input text.
        
        Returns
        -------
        clean_text : str
            Clean text.
        """
        return " ".join(text.split())
    
    @staticmethod
    def clean_message_tags(text: str) -> str:
        """
        Remove any whitespaces within the message attributes (denoted as <>).

        Parameters
        ----------
        text : str
            Input text.
        
        Returns
        -------
        clean_text : str
            Clean text.
        """
        return re.sub(
            pattern=r"<.*?>", 
            repl=lambda m: re.sub(r"\s+", "", m.group()), 
            string=text
        )
    
    @staticmethod
    def is_valid_IP(ip: str) -> bool:
        """
        Check if given IP is a valid IPv4 or IPv6 address.  

        Parameters
        ----------
        ip : str
            IP address.
        
        Returns
        -------
        flag : bool
            True if ip is a valid IPv4 or IPv6 address. Otherwise, False.
        """
        try:
            addr = ipaddress.ip_address(ip)
            if isinstance(addr, ipaddress.IPv4Address):
                # IPv4 packet size: 576 bytes required
                return int(addr) >= 576
            elif isinstance(addr, ipaddress.IPv6Address):
                # IPv6 packet size: 1280 bytes required
                return int(addr) >= 1280
        except ValueError:
            return False

In [6]:
class MboxParser(PhishyMatcher, HTMLFinder):
    """
    Class to parse email data from the mboxMessage objects (specifically used by the Gmail services0.

    Attributes
    ----------
    keys : List[str]
        List of keys to use for parsing essential email attributes.
    email_data : mailbox.mboxMessage
        mboxMessage object storing email data.
    parsed_email_data : Dict[str, Sequence]
        Dictionary storing parsed email data.

    Methods
    -------
    read_email_payload()
        Read email payload and parse content data. 
    _get_email_messages(email_payload)
        Generator yielding each message separate from the payload.
    _extract_email_data(msg)
        Extract email data from the email message. 

    Usage
    -----
    While iterating over multiple mboxMessage objects, yield parsed_email_data property and covert concatenated dictionaries into a pandas DataFrame.
    """

    # Type hinting for email message
    EmailMessage = Union[Message, str]
    # Type hinting for email payload
    EmailPayload = List[Message] 
    # Type hinting for email data (Content-Type, Content-Encoding, Content-Disposition, Filename, Text)
    EmailData = Tuple[str, str, str, str, str]

    def __init__(self, email_data: mailbox.mboxMessage):
        """
        Constructs all the necessary attributes for the MboxParser object.

        Parameters
        ----------
        email_data : mailbox.mboxMessage
            mboxMessage object storing email data.
        
        Raises
        ------
        TypeError is raised if email_data parameter is not an mailbox.mboxMessage instance.
        """ 
        if not isinstance(email_data, mailbox.mboxMessage):
            raise TypeError('Variable must be type mailbox.mboxMessage')
        super(PhishyMatcher, self).__init__()
        super(HTMLFinder, self).__init__()
        self.keys = ['Message-ID', 'Date', 'From', 'To', 'Subject', 'Content-Length', 'X-Virus-Scanned', 'X-Priority', 'X-Spam-Score']
        self.email_data = email_data

    @property
    def parsed_email_data(self) -> Dict[str, Any]:
        """
        Dictionary storing parsed email data.

        Returns
        -------
        parsed_email_data : Dict[str, Any]
            Dictionary, providing keyed-access to email message attributes.
        """
        parsed_data = {
            **{k: self.email_data.get(k, None) for k in self.keys},
            'Attached Files': [],
            'Attachments': 0,
            'URL Links': [],
            'URLs': 0,
            'IP Addresses': [],
            'IPs': 0,
            'Images Embedded': [],
            'Images': 0,
            'Encoding': 'NA',
            'Is HTML': False,
            'Is JavaScript': False,
            'Is CSS': False,
            'Raw Message': '',
            'Extracted Text': '',
        }

        parsed_data['Message-ID'] = str(parsed_data['Message-ID']).strip('<>')

        def extract_attributes(is_html: bool = False) -> None:
            """
            Extract relevant attributes from the raw message content and update parsed_data in-place.

            Parameters
            ----------
            is_html : bool
                Flag indicating presence of HTML content in the message.
            """
            urls = self.extract_URLs_from_HTML(raw_msg) if is_html else self.extract_URLs_from_text(raw_msg)
            parsed_data['URL Links'] = urls
            parsed_data['URLs'] = len(urls)

            ips = self.find_IPs(raw_msg)
            parsed_data['IP Addresses'] = ips
            parsed_data['IPs'] = len(ips)

            parsed_data['Encoding'] = encoding
            parsed_data['Raw Message'] = raw_msg


        # Parse data from email payload
        payload = self.read_email_payload()
        # parsed_data['Payload'] = payload

        # Flag if there is plain text in the payload
        is_plain = any(
            data[0] == 'text/plain' and data[-1]
            for data in payload
        )

        # Flag if there is formatted text in the payload
        is_formatted = any(
            data[0] in ['text/html', 'NA'] and data[-1]
            for data in payload
        )

        # Flag if formatted text was parsed already
        is_parsed = False
        
        for email_part in payload:
            content_type, encoding, disposition, filename, raw_msg = email_part
            # Differentiate between attached files and embedded images
            # Embedded images are digested within the HTML code
            if disposition == 'attachment':
                parsed_data['Attachments'] += 1
                parsed_data['Attached Files'].append(filename)
            
            if raw_msg:
                if content_type == 'text/javascript':
                    parsed_data['Is JavaScript'] = True
                
                elif content_type == 'text/css':
                    parsed_data['Is CSS'] = True 

                elif content_type == 'text/plain':
                    parsed_data['Extracted Text'] = raw_msg
                    if not is_formatted:     
                        extract_attributes(is_html=False)
                    
                
                elif content_type in ['text/html', 'NA'] and not is_parsed:        
                    self.feed(raw_msg)
                    parsed_data['Is HTML'] = self.contains_html
                    parsed_data['Is JavaScript'] = self.contains_js
                    parsed_data['Is CSS'] = self.contains_css  
                    parsed_data['Images Embedded'] = self.images
                    parsed_data['Images'] = len(self.images) 

                    extract_attributes(is_html=True)

                    if not is_plain:
                        msg_text = self.get_html_text(raw_msg)
                        parsed_data['Extracted Text'] = msg_text if msg_text else raw_msg
                    
                    is_parsed = True 

        return parsed_data

    def read_email_payload(self) -> List[EmailData]:
        """
        Read email payload and parse content data. 

        Returns
        -------
        email_payload : List[EmailData]
            List of parsed content data extracted from the email payload. 
        """
        email_payload = self.email_data.get_payload()
        if self.email_data.is_multipart():
            email_messages = list(self._get_email_messages(email_payload))
        else:
            email_messages = [email_payload]
        return [self._extract_email_data(msg) for msg in email_messages]

    def _get_email_messages(self, email_payload: EmailPayload) -> Generator[EmailMessage, None, None]:
        """
        Generator yielding each message separate from the payload.

        Parameters
        ----------
        email_payload : EmailPayload
            Payload retrieved from the mboxMessage object.
        
        Yields
        ------
        message : EmailMessage
            Singular message.
        """
        for msg in email_payload:
            if isinstance(msg, (list, tuple)):
                for sub_msg in self._get_email_messages(msg):
                    yield sub_msg
            elif msg.is_multipart():
                for sub_msg in self._get_email_messages(msg.get_payload()):
                    yield sub_msg
            else:
                yield msg

    def _extract_email_data(self, msg: EmailMessage) -> EmailData:
        """
        Extract all email data fields from the email message. 

        Parameters
        ----------
        msg : EmailMessage
            Singular message.
        
        Returns
        -------
        email_fields : EmailData
            Content-type, content-encoding, content-disposition, filename, and text.
        """
        def extract_content_data() -> Tuple[str, str]:
            """
            Extract content data from the email message.

            Returns
            -------
            content_fields : Tuple[str, str]
                Content-disposition and filename.
            """
            content_disposition = 'NA' if isinstance(msg, str) else msg.get('Content-Disposition', 'NA')
            if content_disposition != 'NA':
                filename_regex = r'[^</*?"\\>:|]+'
                any_char = r'[\S\s]*'
                match = re.match(fr'^(\w+)(?:;{any_char}filename{any_char}={any_char}"({filename_regex})")?', content_disposition)
                disposition, filename = match.groups()
                if not filename:
                    filename = 'NA'
                return disposition, filename
            return 'NA', 'NA'

        content_type = 'NA' if isinstance(msg, str) else msg.get_content_type()
        encoding = 'NA' if isinstance(msg, str) else msg.get('Content-Transfer-Encoding', 'NA')
        disposition, filename = extract_content_data()
        if content_type.startswith('text') and 'base64' not in encoding:
            msg_text = msg.get_payload().strip()
        elif content_type == 'NA':
            msg_text = msg.strip()
        else:
            msg_text = ''
        return (content_type, encoding, disposition, filename, msg_text)

In [13]:
def parse_data_from_mbox(
    mbox_path: str, is_phishy: bool
) -> Generator[Dict[str, Any], None, None]:
    """
    Generator yielding dictionaries containing data parsed from the mbox file.
    
    Parameters
    ----------
    mbox_path : str
        Path to the mbox file
    is_phishy : str
        Label assigned to the emails from the mbox

    Yields
    ------
    data : Dict[str, Any]
        Dictionary storing data parsed from the mbox file.
    
    Usage
    -----
    Efficiently create a DataFrame from the output Generator of dictionaries. 

    Raises
    ------
    FileNotFoundError
        File must be an existing .mbox file

    """
    if not (mbox_path.lower().endswith('.mbox') and os.path.exists(mbox_path)):
        raise FileNotFoundError(f'Cannot find mbox file {mbox_path}')

    mbox = mailbox.mbox(mbox_path)
    for email_obj in mbox:
        message_obj = MboxParser(email_obj)
        yield {**message_obj.parsed_email_data, "Is Phishy": is_phishy}

**Remark**: the best option is to parse mbox files via Google Colab or with Windows Defender exception for the parsed file. Otherwise, the phishing emails are not parsed.

In [14]:
ENRON_MBOX_PATH = os.path.join(RESOURCES_PATH, r'emails-enron.mbox')
PHISHY_MBOX_PATH = os.path.join(RESOURCES_PATH, r'emails-phishing.mbox')
PHISHY3_MBOX_PATH = os.path.join(RESOURCES_PATH, r'phishing3.mbox')

In [15]:
mbox_zip = zip([ENRON_MBOX_PATH, PHISHY_MBOX_PATH, PHISHY3_MBOX_PATH], [False, True, True])

In [16]:
emails_df = pd.concat([
  pd.DataFrame(parse_data_from_mbox(path, label)) 
  for path, label in mbox_zip
])

## Export data parsed from mbox files

In [None]:
CSV_OUT_PATH = os.path.join(RESOURCES_PATH, 'database', f'emails-database.csv')

In [None]:
emails_df.to_csv(CSV_OUT_PATH, index=False)
print(f'[!] Database saved as {os.path.abspath(CSV_OUT_PATH)}')

[!] Database saved as d:\PyCharm Professional\Projects\Integrated phishing detection for IMAP servers\resources\database\emails-database.csv
