In [None]:
import mailparser
import os
import pandas as pd

# Directory to search
directory = 'C:\\Users\\ericb\\Desktop\\Research\\Primary@gmail.com\\Cleaned_Mail\\2023_test\\08\\'
output_csv = 'C:\\Users\\ericb\\Desktop\\Research\\542_Project\\data\\test\\warranted_data_test_output\\mailparser_test_output.csv'

# DataFrame to store the results
df = pd.DataFrame(columns=['filename', 'body', 'subject', 'text_plain', 'text_html', 'text_not_managed', 'defects', 'defects_categories'])

print('Searching files in directory: ' + directory)

# Loop through all files in the directory
for filename in os.listdir(directory):
    if filename.endswith('.txt'):  # Check for .eml files
        print('Processing file: ' + filename)

        # Parse the email
        mail = mailparser.parse_from_file(os.path.join(directory, filename))

        # mail = mailparser.parse_from_bytes(file)
        # mail = mailparser.parse_from_file(file)
        # mail = mailparser.parse_from_file_msg(file)
        # mail = mailparser.parse_from_file_obj(file)
        # mail = mailparser.parse_from_string(file)

        # print(mail.attachments) # list of all attachments
        # print(mail.body)

        # print(mail.date) # datetime object in UTC
        # print(mail.defects) # defect RFC not compliance
        # print(mail.defects_categories)  # only defects categories
        # print(mail.delivered_to)
        # print(mail.from_)
        # print(mail.headers) # dict of all headers
        # print(mail.mail) # tokenized mail in a object
        # print(mail.message) # email.message.Message object
        # print(mail.message_as_string) # message as string
        # print(mail.message_id)
        # print(mail.received)
        # print(mail.subject)
        # print(mail.text_plain) # only text plain mail parts in a list
        # print(mail.text_html) # only text html mail parts in a list
        # print(mail.text_not_managed) # all not managed text (check the warning logs to find content subtype)
        # print(mail.to)
        # print(mail.to_domains)
        # print(mail.timezone) # returns the timezone, offset from UTC
        # print(mail.mail_partial) # returns only the mains parts of emails

        # Collect data and append to DataFrame
        df = df.append({
            'filename': filename, 
            'body': mail.body, 
            'subject': mail.subject, 
            'text_plain': mail.text_plain, 
            'text_html': mail.text_html, 
            'text_not_managed': mail.text_not_managed, 
            'defects': str(mail.defects),  # Convert list to string
            'defects_categories': str(mail.defects_categories)  # Convert list to string
        }, ignore_index=True)

# Save the DataFrame to a CSV file
df.to_csv(output_csv, index=False)

print(f"Data saved to {output_csv}")

# TEST SCRIPT

In [None]:
import os
import pandas as pd
import email
import quopri
from bs4 import BeautifulSoup
import mailparser

def decode_payload(payload):
    if payload is None:
        return None, 0

    encodings = ['utf-8', 'cp437', 'ISO-8859-1']
    for encoding in encodings:
        try:
            decoded_text = quopri.decodestring(payload).decode(encoding, errors='replace')
            return decoded_text, decoded_text.count('ï¿½')
        except UnicodeDecodeError:
            pass
    return None, 0

def extract_info_from_email(file_path):
    # Parsing the email using mailparser
    mail = mailparser.parse_from_file(file_path)

    # Additional processing as in Script 1
    msg = email.message_from_string(mail.message_as_string)
    body = ''
    unsubscribe_links = []
    unknown_chars_count = 0
    soup = None  # Initialize soup here

    if msg.is_multipart():
        for part in msg.walk():
            part_body, unknown_count = decode_payload(part.get_payload(decode=True))
            body += part_body or ''
            unknown_chars_count += unknown_count
            if part_body and part_body.strip():
                part_soup = BeautifulSoup(part_body, 'html5lib')
                unsubscribe_links.extend([link['href'] for link in part_soup.find_all('a', href=True) if "unsubscribe" in link.text.lower()])
                if soup is None:
                    soup = part_soup
    else:
        body, unknown_chars_count = decode_payload(mail.body)

    tracking_pixel = len(soup.find_all('img', width='1', height='1')) > 0 if soup else False
    total_links = len(soup.find_all('a')) if soup else 0

    dkim_signature = 'Present' if msg.get('DKIM-Signature') else 'Absent'

    return {
        'filename': os.path.basename(file_path),
        'body': body,
        'subject': mail.subject,
        'text_plain': mail.text_plain,
        'text_html': mail.text_html,
        'text_not_managed': mail.text_not_managed,
        'defects': str(mail.defects),
        'defects_categories': str(mail.defects_categories),
        'number of unsubscribe links': len(unsubscribe_links),
        'number of undecodable characters': unknown_chars_count,
        'tracking pixel present': tracking_pixel,
        'total links in email': total_links,
        'email size (bytes)': len(body),
        'dkim-signature': dkim_signature
    }

# Directory to search
directory = 'C:\\Users\\ericb\\Desktop\\Research\\Primary@gmail.com\\Cleaned_Mail\\2023_test\\08\\'
output_csv = 'C:\\Users\\ericb\\Desktop\\Research\\542_Project\\data\\test\\warranted_data_test_output\\combined_output.csv'

infos = []

# Loop through all files in the directory
for filename in os.listdir(directory):
    if filename.endswith('.txt'):
        file_path = os.path.join(directory, filename)
        info = extract_info_from_email(file_path)
        infos.append(info)

# Convert to DataFrame and save to CSV
if infos:
    df = pd.DataFrame(infos)
    df.to_csv(output_csv, index=False)

print(f"Data saved to {output_csv}")


In [None]:
# Now that I have the data, I can clean the textual data for use in the model
# print(df['body'][0])
df.head()

In [None]:
# Open the CSV file and read it into a DataFrame
df_to_clean = pd.read_csv(output_csv)

In [None]:
import re
import emoji
from bs4 import BeautifulSoup, NavigableString
import quopri
import base64

def replace_emojis(text):
    return emoji.demojize(text, delimiters=("", ""))

def replace_urls_based_on_context(html_content):
    soup = BeautifulSoup(html_content, 'lxml')
    for a_tag in soup.find_all('a'):
        href = a_tag.get('href', '')
        url_type = 'UNSAFE_' if href.startswith('http://') else ''
        if a_tag.img:
            a_tag.string = f'{url_type}IMAGE_URL'
        elif isinstance(a_tag.next, NavigableString) and a_tag.next.strip():
            a_tag.string = f'{url_type}LINK_URL'
        else:
            a_tag.string = f'{url_type}BUTTON_URL'
    return str(soup)

def replace_urls_in_text(text):
    http_url_pattern = re.compile(r'http://\S+')
    text = http_url_pattern.sub('UNSAFE_LINK_URL', text)
    https_url_pattern = re.compile(r'https://\S+')
    text = https_url_pattern.sub('LINK_URL', text)
    return text

def decode_quoted_printable(input_data):
    if isinstance(input_data, bytes):
        return quopri.decodestring(input_data).decode('utf-8', errors='replace')
    else:
        return quopri.decodestring(input_data.encode()).decode('utf-8', errors='replace')

def decode_base64(text):
    return base64.b64decode(text).decode('utf-8', errors='replace')

def clean_text(raw_text):

    #Remove line breaks and continuation equals signs
    raw_text = re.sub(r'=\n', '', raw_text)
    # Decode any quoted-printable text
    raw_text = quopri.decodestring(raw_text.encode()).decode('utf-8', errors='replace')

    # Create a BeautifulSoup object
    soup = BeautifulSoup(raw_text, 'lxml')
    
    # Remove style and script tags and their content
    for tag in soup(['style', 'script', 'img']):
        tag.decompose()

    # Replace URLs in 'a' tags
    for a_tag in soup.find_all('a'):
        href = a_tag.get('href', '')
        url_type = 'UNSAFE ' if href.startswith('http://') else ''
        if a_tag.img:
            a_tag.string = f'{url_type}IMAGE URL'
        elif isinstance(a_tag.next, NavigableString) and a_tag.next.strip():
            a_tag.string = f'{url_type}LINK URL'
        else:
            a_tag.string = f'{url_type}BUTTON URL'

    # Now proceed with extracting text and further cleaning
    text = soup.get_text(separator=' ', strip=True)
    text = replace_emojis(text)

    text = replace_urls_in_text(text)

    # Remove any remaining HTML encoded characters
    text = re.sub(r'&[a-zA-Z0-9#]+;', '', text)
    
    # Remove extra whitespace
    text = re.sub(r'\s+', ' ', text)
    
    # Remove Zero Width Non-Joiner characters
    text = text.replace('\u200b', '')  # Unicode for ZWNBSP
    text = text.replace('\u200c', '')  # Unicode for ZWNJ
    text = text.replace('\u200d', '')  # Unicode for ZWJ
    text = text.replace('\u200e', '')  # Unicode for LEFT-TO-RIGHT MARK
    text = text.replace('\u200f', '')  # Unicode for RIGHT-TO-LEFT MARK


    # Strip string of leading/trailing whitespace
    return text.strip()


In [None]:
# print(df_to_clean['body'][1])

# run the clean_text function on the subject and body columns
df_to_clean['subject'] = df_to_clean['subject'].apply(clean_text)
df_to_clean['body'] = df_to_clean['body'].apply(clean_text)



In [None]:
df_to_clean.head()


In [None]:
print(df_to_clean['body'][1])

# save df_to_clean['body'][i] to txt file
for i in range (0, len(df_to_clean['body'])):
    with open('C:\\Users\\ericb\\Desktop\\Research\\542_Project\\data\\test\\warranted_data_test_output\\mailparser_test_output_body' + str(i) + '.txt', 'w') as f:
        f.write(df_to_clean['body'][i])

# save df_to_clean['text_not_managed'][i] to txt file
for i in range(0, len(df_to_clean['text_not_managed'])):
    with open('C:\\Users\\ericb\\Desktop\\Research\\542_Project\\data\\test\\warranted_data_test_output\\mailparser_test_output_text_not_managed' + str(i) + '.txt', 'w') as f:
        f.write(df_to_clean['text_not_managed'][i])
