In [1]:
import pandas as pd
import email
from email import policy
from email.parser import BytesParser
import os
import pandas as pd
import threading
import queue
from nltk.stem import WordNetLemmatizer
import nltk
import re

In [2]:
nltk.download('wordnet')
nltk.download('omw-1.4')

[nltk_data] Downloading package wordnet to
[nltk_data]     C:\Users\fan\AppData\Roaming\nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package omw-1.4 to
[nltk_data]     C:\Users\fan\AppData\Roaming\nltk_data...
[nltk_data]   Package omw-1.4 is already up-to-date!


True

In [9]:
def clean_text(text):
    # Use regular expressions to remove escape characters
    text = re.sub(r'\\[a-zA-Z]', '', text)
    # Remove extra spaces and line breaks
    text = text.strip()
    text = re.sub(r'\s+', ' ', text)  # Replace extra whitespace with a single space

    # Lemmatization
    lemmatizer = WordNetLemmatizer()
    words = text.split()
    lemmatized_words = [lemmatizer.lemmatize(word) for word in words]
    
    # Returns the lemmatized text
    return ' '.join(lemmatized_words).replace('\n', ' ')


In [10]:
def parse_email_from_file(file_path):
    """
    Parse the subject, sender, recipients, and body of an email，Enter the path to the mail file.
    Parameter:
    file_path (str): Path to the mail file

    Return:
    list: Contains the cleaned list of subject, sender, to, and body
    """
    
    # Read mail file contents
    with open(file_path, 'rb') as f:
        email_content = f.read()

    # Parsing mail using BytesParser
    msg = BytesParser(policy=policy.default).parsebytes(email_content)

    # Get basic information about emails
    subject = msg['Subject'] if 'Subject' in msg else '(No Subject)'
    sender = msg['From'] if 'From' in msg else '(No Sender)'
    to = msg['To'] if 'To' in msg else '(No Recipients)'

    # Parsing the email body
    body = ""
    if msg.is_multipart():
        # Handling multipart messages
        for part in msg.iter_parts():
            if part.get_content_type() == 'text/plain':
                body = part.get_payload(decode=True).decode()
                break
    else:
        # Handling single-part messages
        body = msg.get_payload(decode=True).decode()

    # Clean subject, sender, to, body
    subject_cleaned = clean_text(subject)
    sender_cleaned = clean_text(sender)
    to_cleaned = clean_text(to)
    body_cleaned = clean_text(body)

    # Return the parsing result
    return [subject_cleaned, sender_cleaned, to_cleaned, body_cleaned]

In [3]:
mail_dir_first = os.listdir('./maildir')
mail_dir_seconds = []

In [4]:
for item in mail_dir_first:
    second = os.listdir(os.path.join('maildir', item))
    second_abs_dir = [os.path.join('maildir', item, x) for x in second]
    mail_dir_seconds += second_abs_dir

In [5]:
files = []
for item in mail_dir_seconds:
    try:
        email_files = os.listdir(item)
        for email in email_files:
            files.append(os.path.join(item, email))
    except Exception:
        pass

In [18]:
len(files)

491331

In [19]:
import tqdm
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed

# Configuring Logging
logging.basicConfig(filename='email_parsing_errors.log', 
                    level=logging.ERROR, 
                    format='%(asctime)s %(levelname)s: %(message)s')

# Multithreaded function processes each file
def process_file(item):
    try:
        email_ = parse_email_from_file(item)
        return email_
    except Exception as e:
        # Record error information to a log file
        logging.error(f"Error processing file {item}: {str(e)}")
        return None

# Multithreading
def process_emails_concurrently(files, max_workers=4):
    result = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all tasks
        futures = {executor.submit(process_file, file): file for file in files}
        # Using tqdm to display progress
        for future in tqdm.tqdm(as_completed(futures), total=len(futures)):
            email_ = future.result()
            if email_ is not None:
                result.append(email_)
    return result

# Example call
result = process_emails_concurrently(files, max_workers=4)
# Output
print(f"Successfully processed {len(result)} files.")

100%|██████████| 491331/491331 [37:33<00:00, 218.00it/s] 


Successfully processed 490845 files.


In [20]:
result[0]

['Re: Newsgroups',
 'billc@greenbuilder.com',
 'strawbale@crest.org',
 ">What other cool newsgroups are available for u alternative thinkers? >Rammed Earth, Cob, etc? > We have a list of our favorite at http://www.greenbuilder.com/general/discussion.html (and we're open to more suggestions) BC Bill Christensen billc@greenbuilder.com Green Homes For Sale/Lease: http://www.greenbuilder.com/realestate/ Green Building Pro Directory: http://www.greenbuilder.com/directory/ Sustainable Bldg Calendar: http://www.greenbuilder.com/calendar/ Sustainable Bldg Bookstore: http://www.greenbuilder.com/bookstore"]

In [22]:
df = pd.DataFrame(result, columns=['subject', 'sender', 'recipients', 'body'])
df.to_csv('./emails.csv', index=False)

Unnamed: 0,0,1,2,3
0,Consolidated positions: Issues & To Do list,phillip.allen@enron.com,keith.holst@enron.com,---------------------- Forwarded by Phillip K ...
1,Consolidated positions: Issues & To Do list,phillip.allen@enron.com,keith.holst@enron.com,---------------------- Forwarded by Phillip K ...
2,,phillip.allen@enron.com,david.delainey@enron.com,"Dave, Here are the name of the west desk membe..."
3,Re: 2001 Margin Plan,phillip.allen@enron.com,paula.harris@enron.com,"Paula, 35 million is fine Phillip"
4,"Var, Reporting and Resources Meeting",phillip.allen@enron.com,ina.rangel@enron.com,---------------------- Forwarded by Phillip K ...
