## Enron Email Data Loading and Profiling

### Import the required dependencies

In [None]:
import re
import pandas as pd
import os
import numpy as np
from datetime import datetime
from IPython.display import display, clear_output
from email.utils import parsedate_to_datetime

### Define the helper function 

In [None]:
def _extract_email_metadata(filename):
    """
    this is a helper function which extracts the metadata from an email file with Regex
    parameters:
        filename: the filename of the email file
    returns:
        meta_output: a dictionary containing the metadata
    """
    meta_output = {
        'Message-ID':None,
        'Date':None,
        'From':None,
        'To':None,
        'Cc':None,
        'Bcc':None,
        'Subject':None,
        'Mime-Version':None,
        'Content-Type':None,
        'Content-Transfer-Encoding':None,
        'X-From':None,
        'X-To':None,
        'X-cc':None,
        'X-bcc':None,
        'X-Folder':None,
        'X-Origin':None,
        'X-FileName':None
    }
    with open(filename, 'r', encoding='utf-8', errors='ignore') as file:
        content = file.read()
    # regex for Message-ID
    _message_id = re.search(r"^Message-ID:[^\S\r\n]+(.*)$", content, flags=re.MULTILINE)
    # regex for Date
    _date =  re.search(r"^Date:[^\S\r\n]+(.*)$", content, flags=re.MULTILINE)
    # regex for From
    _from = re.search(r"^From:[^\S\r\n]+(.*?)(?=[\r\n]^To|[\r\n]^Subject|[\r\n]^Cc|[\r\n]^Mime-Version)", content, flags=re.MULTILINE | re.DOTALL)
    # regex for To
    _to = re.search(r"^From:[^\S\r\n]+\S*?[\r\n]{1}^To:[^\S\r\n]+(.*?)(?=[\r\n]^Subject|[\r\n]^Cc|[\r\n]^Mime-Version)", content, flags=re.MULTILINE | re.DOTALL)
    # regex for Subject
    _subject = re.search(r"^Subject:[^\S\r\n]+(.*?)(?=[\r\n]^Cc|[\r\n]^Mime-Version)", content, flags=re.MULTILINE | re.DOTALL)
    # regex for Cc
    _cc = re.search(r"^Cc:[^\S\r\n]+(.*?)(?=[\r\n]^Mime-Version)", content, flags=re.MULTILINE | re.DOTALL)
    # regex for Mime-Version
    _mime_version = re.search(r"^Mime-Version:[^\S\r\n]+(.*)$", content, flags=re.MULTILINE)
    # regex for Content-Type
    _content_type = re.search(r"^Content-Type:[^\S\r\n]+(.*)$", content, flags=re.MULTILINE)
    # regex for Content-Transfer-Encoding
    _content_transfer_encoding = re.search(r"^Content-Transfer-Encoding:[^\S\r\n]+(.*)$", content, flags=re.MULTILINE)
    # regex for Bcc
    _bcc = re.search(r"^Bcc:[^\S\r\n]+(.*?)(?=[\r\n]^X-From)", content, flags=re.MULTILINE | re.DOTALL)
    # regex for X-From
    _x_from = re.search(r"^X-From:[^\S\r\n]+(.*)$", content, flags=re.MULTILINE)
    # regex for X-To
    _x_to = re.search(r"^X-To:[^\S\r\n]+(.*)$", content, flags=re.MULTILINE)
    # regex for X-cc
    _x_cc = re.search(r"^X-cc:[^\S\r\n]+(.*)$", content, flags=re.MULTILINE)
    # regex for X-bcc
    _x_bcc = re.search(r"^X-bcc:[^\S\r\n]+(.*)$", content, flags=re.MULTILINE)
    # regex for X-Folder
    _x_folder = re.search(r"^X-Folder:[^\S\r\n]+(.*)$", content, flags=re.MULTILINE)
    # regex for X-Origin
    _x_origin = re.search(r"^X-Origin:[^\S\r\n]+(.*)$", content, flags=re.MULTILINE)
    # regex for X-FileName
    _x_filename = re.search(r"^X-FileName:[^\S\r\n]+(.*)$", content, flags=re.MULTILINE)
    
    meta_output['Message-ID'] = _message_id.group(1) if _message_id else None
    meta_output['Date'] =  _date.group(1) if _date else None
    meta_output['From'] = _from.group(1) if _from else None
    meta_output['To'] = _to.group(1) if _to else None
    meta_output['Cc'] = _cc.group(1) if _cc else None
    meta_output['Bcc'] = _bcc.group(1) if _bcc else None
    meta_output['Subject'] = _subject.group(1) if _subject else None
    meta_output['Mime-Version'] = _mime_version.group(1) if _mime_version else None
    meta_output['Content-Type'] = _content_type.group(1) if _content_type else None
    meta_output['Content-Transfer-Encoding'] = _content_transfer_encoding.group(1) if _content_transfer_encoding else None
    meta_output['X-From'] = _x_from.group(1) if _x_from else None
    meta_output['X-To'] = _x_to.group(1) if _x_to else None
    meta_output['X-cc'] = _x_cc.group(1) if _x_cc else None
    meta_output['X-bcc'] = _x_bcc.group(1) if _x_bcc else None
    meta_output['X-Folder'] = _x_folder.group(1) if _x_folder else None
    meta_output['X-Origin'] = _x_origin.group(1) if _x_origin else None
    meta_output['X-FileName'] = _x_filename.group(1) if _x_filename else None

    return meta_output
    

In [None]:
def _get_path_after_target(path, target="maildir"):
    """
    This is a helper function to extract the path one level after "maildir"
    for example if the input is "maildir/lay-k/somefolder" the output will be "somefolder"
    if the input is "maildir/lay-k" the output will be np.nan

    parameters:
        path: the path to extract the folder name from
        target: the target folder name to extract the path one level after it
    returns:
        the path one level after the target folder name
    """
    # Split the path into its individual components
    parts = path.split(os.sep)

    # Check if the target exists in the parts
    if target in parts:
        # Get the index of the target
        target_index = parts.index(target)

        # Return the path one level after target if it exists
        if target_index + 2 < len(parts):
            return os.path.join(*[parts[i] for i in range(target_index + 2, len(parts))])

    return np.nan

In [None]:
def _get_folder_after_target(path, target="maildir"):
    """
    This is a helper function to extract the folder name immediately after "maildir"
    for example if the input is "maildir/lay-k/somefolder" the output will be "lay-k"
    if the input is "maildir" the output will be np.nan

    parameters:
        path: the path to extract the folder name from
        target: the target folder name to extract the path one level after it
    returns:
        the folder name immediately after the target folder name
    """
    # Split the path into its individual components
    parts = path.split(os.sep)

    # Check if the target exists in the parts
    if target in parts:
        # Get the index of the target
        target_index = parts.index(target)

        # Return the next folder if it exists
        if target_index + 1 < len(parts):
            return parts[target_index + 1]

    return np.nan

In [None]:
def _reset_structured_df():
    """
    This is a helper function to reset the structured dataframe
    This will be used to iterate through the entire Enron email dataset
    and reset the memory after saving to a subset of the data each time
    This is necessary because the entire dataset is too large to be loaded from
    the source data within a viable length of time, we prefer to load the data and
    save the structured copy to the filesystem

    parameters:
        None
    returns:
        df_output: a dataframe with the same columns as the structured dataframe
    """
    df_output = pd.DataFrame(columns=[
        'maildir_user', 
        'maildir_folder', 
        'maildir_file_name', 
        'maildir_path',
        'Message-ID',
        'Date',
        'From',
        'To',
        'Cc',
        'Bcc',
        'Subject',
        'Mime-Version',
        'Content-Type',
        'Content-Transfer-Encoding',
        'X-From',
        'X-To',
        'X-cc',
        'X-bcc',
        'X-Folder',
        'X-Origin',
        'X-FileName',
        'import_result'
    ])
    return df_output

In [None]:
def _tidy_up_comma_delimited_strings(s):
    """
    This is a helper function to tidy up comma delimited strings
    by removing all whitespace, newlines and carriage returns
    and then sorting the strings alphabetically

    parameters:
        s: the string to tidy up
    returns:
        the tidied up string
    """
    if type(s) == str:
        return ','.join(sorted([re.sub(r'[\r\n\s]+','',i) for i in s.split(',')]))
    else:
        return s

### Load the email data

In [None]:
# Define the path to the input and output directories and files
INPUT = 'input'
MAILDIR = 'maildir'
OUTPUT = 'output'
STRUCTURED_DATA = 'structured_data_'
PROFILING_OUTPUT = 'profiling_output_'
IMPORT_SUCCESS = 'SUCCESS'

input_dir = os.path.join(
    os.path.abspath('..'),
    INPUT,
    MAILDIR
)
output_dir = os.path.join(
    os.path.abspath('..'),
    OUTPUT
)
structured_data_output_path = os.path.join(
    os.path.abspath('..'),
    OUTPUT,
    STRUCTURED_DATA + datetime.now().strftime("%Y%m%d_%H%M%S")
)
profiling_output_path = os.path.join(
    os.path.abspath('..'),
    OUTPUT,
    PROFILING_OUTPUT + datetime.now().strftime("%Y%m%d_%H%M%S") + '.csv'
)


In [None]:
# (THIS BLOCK READS ALL FILES IN THE ENRON MAILDIR AND CAN TAKE MORE THAN 30 MINUTES)
# iterate through the input_dir and save the first level of directories as user, the second level as folder,
# the third level as file_name, and the content of the files as content
# the output is the structured Enron email data, saved in the output folder of this project
loc_i = 0
start_time = datetime.now()
processed_count = 0
this_user = None
df = _reset_structured_df()
for root, dirs, files in os.walk(input_dir):
    try:
        if files: # only scan the folder if there are files in it
            next_user = _get_folder_after_target(root)
            folder_tail = _get_path_after_target(root)
            if next_user != this_user: # split and save the loaded dataframe to a separate csv file if the user is different
                if df.shape[0] > 0:
                    df.to_csv(f"{structured_data_output_path}_{this_user}_{loc_i}.csv", index=False)
                    df = _reset_structured_df() # release the memory of the dataframe
                this_user = next_user
                loc_i = 0
            for file in files: # assign the maildir filesystem information to the structured dataframe
                df.loc[loc_i, 'maildir_user'] = next_user
                df.loc[loc_i, 'maildir_folder'] = folder_tail
                df.loc[loc_i, 'maildir_file_name'] = file
                df.loc[loc_i, 'maildir_path'] = os.path.join(root,file)
                content = _extract_email_metadata(os.path.join(root,file)) # extract the metadata from each email
                for k,v in content.items(): # save the metadata in a strucutred way into the dataframe
                    df.loc[loc_i, k] = v
                df.loc[loc_i, 'import_result'] = IMPORT_SUCCESS # if everything is successful for a record, mark it as SUCCESS for debugging
                loc_i += 1
                processed_count += 1
                if processed_count % 10000 == 0: # some real-time prompt because the process takes 30+ minutes
                    clear_output(wait=True)
                    display(f"{processed_count} files imported, the last 10000 took {datetime.now() - start_time}")
                    start_time = datetime.now()
    except Exception as e: # handle the error if any happens - keep track of it and continue the process
        df.loc[loc_i, 'import_result'] = f"ERROR:{e} | ROOT:{root} | FILE:{file}"
        continue
if df.shape[0] > 0: # when the process is finished, the final dataframe also needs to be saved
    df.to_csv(f"{structured_data_output_path}_{this_user}_{loc_i}.csv", index=False)


In [None]:
# this block is needed if we'd like to read the df from saved
# structured email data files in csv format
# Please note: the target location is in the output folder in the project !
STRUCTURED_DATA_FN_PATTERN = '^structured_data_' # provide the saved structured data csv file name regex pattern
list_of_df = []
for root, dirs, files in os.walk(output_dir):
    if files:
        for file in files: # iterate through all files matching the pattern
            if re.match(STRUCTURED_DATA_FN_PATTERN, file) is not None:
                this_df = pd.read_csv(os.path.join(root,file), index_col=None, header=0)
                list_of_df.append(this_df) # and append each one into the whole list for combination
df = pd.concat(list_of_df, axis=0, ignore_index=True) # combine and form the full email data

### Pre-profiling: Apply simple conversions to the lists of email addresses

In [None]:
# convert the comma delimited emails in the From, To, Cc and Bcc fields into lists
# so they can be sorted alphabetically, this will make sure the duplicate values of the To
# Cc and Bcc lists can be properly detected regardless of the order of the elements
df['From'] = df['From'].apply(lambda x: _tidy_up_comma_delimited_strings(x))
df['To'] = df['To'].apply(lambda x: _tidy_up_comma_delimited_strings(x))
df['Cc'] = df['Cc'].apply(lambda x: _tidy_up_comma_delimited_strings(x))
df['Bcc'] = df['Bcc'].apply(lambda x: _tidy_up_comma_delimited_strings(x))
# Also parse the Date values into UTC datetime stamps so they are comparable
df['Date'] = df['Date'].apply(lambda x: parsedate_to_datetime(x) if type(x) == str else x)

### Data profiling

In [None]:
# We start the profiling with an overview of the dataset column types
print(df.dtypes)
# and we also would like to know some basic statistics for each column in the dataset
print(df.describe(include='all'))
# And we want to have a look at if empty/nan values exist in each column
print(df.isna().sum())
# And we want to have a look at if empty strings exist in each column
print(df.applymap(lambda x: x == '').sum())
# Check if there are duplicated records in the dataframe
print(df.duplicated().sum())

In [None]:
# From the overview above, we can deduce that the following columns may be useful for our analysis tasks:
# Message-ID, Date, From, To, Cc, Bcc, Subject
# However, by looking at the number of unique values in each column, we can see that the Message-ID column
# does not identify unique records, so we will need to drop it
# Verify the unique-ness of messages - the same email can be determined by the same combination of
# From, To, Cc, Bcc, Date and Subject combined together determines duplicated emails stored in different
# people's folders
df_extracted = df.groupby(['From', 'To', 'Cc', 'Bcc', 'Date', 'Subject'], dropna=False).size().reset_index().rename(columns={0:'count'})
print(df_extracted.shape)
# But regardless of the fact above, all message IDs are unique
print(df['Message-ID'].nunique())

In [None]:
# We then apply some basic type casting and conversions for the selected fields
# so that they can be more thoroughly compared and profiled
# 2 Unify the data to lower cases so they can be properly deduplicated
for column in df_extracted.columns:
    # this is only needed for from, to, cc and bcc columns
    # we don't make the Subject all lower case because the upper/lower cases
    # carry actual information, and we don't want to lose that
    if column in ['From', 'To', 'Cc', 'Bcc']:
        df_extracted[column] = df_extracted[column].str.lower() 
# 3 Convert the Date column from datetime.datetime to np.datetime64 so it can be properly deduplicated
df_extracted['Date'] = pd.to_datetime(df_extracted['Date'], utc=True)
print(df_extracted.head())

In [None]:
# now check the extracted columns types and values
print(df_extracted.dtypes)
print(df_extracted.describe(include='all'))
print(df_extracted.isna().sum())

In [None]:
# From above we understand that all emails have a From value, and a date, but they do not always 
# have a To, Cc, Bcc, or Subject value
# Let's have a look at the actual value distributions and validate the values
email_regex_pattern = r"^[a-z0-9.'_%+-<]+@[a-z0-9.-]+\.[a-z]{2,}>*?$"
# From : should be valid emails
print(df_extracted['From'].value_counts())
print(df_extracted['From'].str.contains(email_regex_pattern).sum())
# Only less than 10 invalid emails are in From

In [None]:
# Date : should be within the viable operating years of Enron (~1998-2002)
print(df_extracted['Date'].max())
print(df_extracted['Date'].min())
df_extracted_by_year = df_extracted.copy()
df_extracted_by_year['year'] = df_extracted_by_year['Date'].dt.year
print(df_extracted_by_year.groupby(['year'], dropna=False).size().reset_index().rename(columns={0:'count'}))
# There are some emails (~400) with dates outside of the viable operating years of Enron

In [None]:
# To, Cc and Bcc : these should all be emails
# Since these fields can be comma-delimited email addresses
# In order to check all the emails against our regex rule, we need to explode the lists to rows first
df_extracted_email_to = df_extracted.melt(id_vars=['From'], value_vars=['To', 'Cc', 'Bcc'])
df_extracted_email_to = df_extracted_email_to.rename(columns={'variable': 'message_type', 'value': 'message_to'})
df_extracted_email_to['message_to'] = df_extracted_email_to['message_to'].apply(lambda x: x.split(',') if type(x) == str else x)
df_extracted_email_to = df_extracted_email_to[~df_extracted_email_to['message_to'].isna()].copy().reset_index(drop=True)
df_extracted_email_to = df_extracted_email_to.explode('message_to').reset_index(drop=True)
print(df_extracted_email_to.isna().sum())
print(df_extracted_email_to.describe())
print(df_extracted_email_to['message_to'].str.contains(email_regex_pattern).sum())
# From the result we can see that only a small number of invalid email addresses exist in to To, Cc and Bcc lists
# (~1800 invalid instances out of 1.6M instances)


In [None]:
# let's also look at the domains of these emails (in From, To, Cc and Bcc)
domain_regex_pattern = r'.+(@.+)$'
df_extracted_email_to['From_domain'] = df_extracted_email_to['From'].str.extract(domain_regex_pattern)
df_extracted_email_to['message_to_domain'] = df_extracted_email_to['message_to'].str.extract(domain_regex_pattern)
print(df_extracted_email_to['From_domain'].value_counts())
print(df_extracted_email_to['message_to_domain'].value_counts())
# From the result we can see that the majority of the emails are sent between email addresses
# in the enron.com domain. Also due to the difference in how the email is stored, some of
# the enron email addresses are of the format e.g. "legal <.smith@enron.com>".
# a further specific check on this format reveals that the email addresses within <> do not exist
# as in the regular name@domain format, so it is safe to keep them and use them as they are to identify
# unique senders/receivers
check_domain_pattern = r'^.+<.+@.+\.[a-z]{2,}>$'
print((df_extracted_email_to[df_extracted_email_to['From'].str.contains(check_domain_pattern)]['From']).shape)
print((df_extracted_email_to[df_extracted_email_to['From'].str.contains(check_domain_pattern)]['From']).value_counts())
print((df_extracted_email_to[df_extracted_email_to['message_to'].str.contains(check_domain_pattern)]['message_to']).shape)
print((df_extracted_email_to[df_extracted_email_to['message_to'].str.contains(check_domain_pattern)]['message_to']).value_counts())
# Subject : anything can be used as a subject so no validation is needed