In [None]:
import os
import email
import pandas as pd
import html2text

from IPython.display import clear_output

# This notebook is an attempt to extract cleaned text data from my personal emails, extracted in a eml format.
# This format stores everything about the email, including headers, style, attachments, etc.

In [None]:
DATA_FOLDER = "raw_eml_data"

# Words to remove from email files, lowercased
PERSONAL_INFOS = ["SENSITIVE", "CONFIDENTIAL", "PRIVATE", "PERSONAL", "SECRET"]

In [None]:
def remove_personal_infos(personal_infos: list, data_folder: str = "raw_data") -> None:
    """
    Remove sensitive data from a list of eml files, for example a name of phone number.
    """
    eml_files = [file for file in os.listdir(data_folder) if file.endswith('.eml')]
    
    for file in eml_files:
        path = os.path.join(data_folder, file)
        with open(path, 'rb') as f:
            content = f.read().decode('utf-8', errors='ignore').lower()
        
        counter = 0
        for info in personal_infos:
            info = info.lower()
            if info in content:
                counter += content.count(info)  # Count occurrences
                modified_content = content.replace(info, "redacted")
        
        if counter > 0:
            with open(path, 'wb') as f:
                f.write(modified_content.encode('utf-8'))
            print(f"Removed {counter} occurrence(s) from \"{file}\".")
        else:
            clear_output(wait=True)
            print(f"No changes made to \"{file}\".")
    
    clear_output(wait=True)
    print("Done.")


remove_personal_infos(PERSONAL_INFOS, DATA_FOLDER)

In [None]:
h = html2text.HTML2Text()

def parse_eml(raw_email) -> str:
    text_parts = []
    
    if raw_email.is_multipart():
        for part in raw_email.walk():
            content_type = part.get_content_type()
            if content_type in ['text/plain', 'text/html']:
                charset = part.get_content_charset() or 'utf-8'
                message = part.get_payload(decode=True)
                decoded_message = message.decode(charset, 'ignore')
                plain_message = h.handle(decoded_message)
                text_parts.append(plain_message)
                
        text = ' '.join(text_parts)
    else:
        charset = raw_email.get_content_charset() or 'utf-8'
        message = raw_email.get_payload(decode=True)
        text = message.decode(charset, 'ignore')
        text = h.handle(text)

    return text.strip()

In [16]:
def process_all_files(export_csv: bool = False, data_folder: str = "raw_data"):
    eml_files = [file for file in os.listdir(data_folder) if file.endswith('.eml')]
    
    texts = []
    labels = []
    
    for counter, file in enumerate(eml_files):
        print(f"Processing file {counter + 1} / {len(eml_files)}")
        clear_output(wait=True)
        labels.append(1)
        path = os.path.join(DATA_FOLDER, file)
        with open(path) as f:
            raw_email = email.message_from_file(f)
            parsed_email = parse_eml(raw_email)
            texts.append(parsed_email)

    clear_output(wait=True)
    print("Done.")
    
    df = pd.DataFrame({
        "label": labels,
        "text": texts
    })

    # Remove unwanted chars
    for char in ["\n", "|", "-"]:
        df["text"] = df["text"].apply(lambda x: x.replace(char, ""))

    df["text"] = df["text"].apply(lambda entry: entry.strip())

    df = df[~df['text'].str.contains("github")]

    df = df[df['text'].str.strip() != '']
    
    # Remove rows where 'text' column is NaN
    df = df.dropna(subset=['text'])
                                
    if export_csv is True:
        df.to_csv("email_dataset.csv",index=False)

    return df

df = process_all_files()

Done.
