In [None]:
%pip install -r ../../requirements.txt 

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import os
import re, html
from email import policy
from email.parser import Parser
from bs4 import BeautifulSoup 
from pathlib import Path
from tqdm import tqdm

In [None]:
df = pd.read_csv("../data/raw/emails.csv")
# pd.set_option('display.max_colwidth', None)
# df.head()

In [None]:
print(df.iloc[22,1])

The message columns clearly have a lot of information, let's parse and clean them before doing any further analysis

In [None]:
import re

# -------- layer 1: header clean‑up --------
_SPLIT_KEYS = [
    'Message-ID: ', 'Date: ', 'From: ', 'To: ', 'Subject: ', 'Cc: ',
    'Mime-Version: ', 'Content-Type: ', 'Content-Transfer-Encoding: ',
    'Bcc: ', 'X-From: ', 'X-To: ', 'X-cc: ', 'X-bcc: ', 'X-Folder: ',
    'X-Origin: ', 'X-FileName: '
]
_SPLIT_PATTERN = '|'.join(_SPLIT_KEYS)

def clean_headers(raw_msg: str) -> str:
    """
    1) Remove header duplicates that break splitting
    2) Ensure all 18 keys exist (add blank 'To:' if missing)
    Returns a repaired RFC‑822 string.
    """
    txt = (
        raw_msg
        .replace(' Date: ',    ' Date- ')     # duplicate tokens
        .replace(' Subject: ', ' Subject2: ')
        .replace(' To: ',      ' To- ')
        .replace(' (Subject: ',' (Subject- ')
    )
    # Add a blank 'To:' line for messages that go From -> Subject directly
    if re.search(r'\nSubject: ', txt) and not re.search(r'\nTo: ', txt):
        txt = txt.replace('\nSubject: ', '\nTo: \nSubject: ')

    # Add a blank 'Cc:' line for messages that go From -> To -> Subject directly
    if re.search(r'\nTo: ', txt) and not re.search(r'\nCc: ', txt):
        txt = txt.replace('\nTo: ', '\nCc: \nTo: ')
    
    # Add a blank 'Bcc:' line for messages that go From -> To -> Cc -> Subject directly
    if re.search(r'\nCc: ', txt) and not re.search(r'\nBcc: ', txt):
        txt = txt.replace('\nCc: ', '\nBcc: \nCc: ')
    
    return txt


In [None]:
# --- layer 2: clean the BODY ---------------------------------
SIG_RE   = re.compile(r'(?m)^\s*--\s*$')                                 # “-- ” sig separator
FWD_RE = re.compile(
    r"""(?imx)                     # i: ignore‑case, m: ^$ per‑line, x: verbose
    ^\s*-{2,}\s*                   # line starts with ≥2 dashes
    (?:                            # non‑capturing group
         forwarded\s+message       # “Forwarded message”
       | forwarded\s+by            # “Forwarded by …”
       | original\s+message        # “Original Message”
    )
    .*?$                           # rest of line
    """
)
QUOTE_RE = re.compile(r'(?m)^\s*>.*$')                                   # quoted reply lines

def drop_html(raw: str) -> str:
    """Strip HTML tags and decode HTML entities."""
    soup = BeautifulSoup(raw, "lxml")
    return soup.get_text(" ", strip=True)

def clean_content(body: str) -> dict:
    """
    Returns a dict with:
      clean_body       : purified main text (no quoted replies, sigs, forwards, HTML)
      forward_tail     : text from the first forwarded‑chain marker onward (may be '')
      has_other_content: True if attachment/banner separator '-------------' seen
      is_forwarded     : True if FWD_RE matched (forward chain existed)
    """
    # 1️⃣ split on first forward marker
    m = FWD_RE.search(body)
    if m:
        body_main  = body[:m.start()]
        forward_tail = body[m.start():]
        is_fwd = True
    else:
        body_main  = body
        forward_tail = ""
        is_fwd = False

    # 2️⃣ now handle banners *inside the main part only*
    has_other = "-------------" in body_main
    if has_other:
        body_main = body_main.split("-------------", 1)[0]

    # -- 2) strip HTML from both parts ------------------------
    body_main  = drop_html(body_main)
    forward_tail = drop_html(forward_tail)

    # -- 3) remove quoted replies & signatures ----------------
    body_main = QUOTE_RE.sub("", body_main)
    body_main = SIG_RE.split(body_main)[0]

    # -- 4) normalise whitespace & entities -------------------
    body_main = html.unescape(body_main)
    body_main = re.sub(r"\s+", " ", body_main).strip()

    forward_tail = html.unescape(forward_tail)
    forward_tail = re.sub(r"\s+", " ", forward_tail).strip()

    return {
        "clean_body": body_main,
        # "forward_tail": forward_tail,      # re-include later for future experiments
        "has_other_content": has_other,
        "is_forwarded": is_fwd
    }


In [None]:
parser = Parser(policy=policy.default)

def parse_email(raw_msg: str) -> dict:
    # header repair ➜ parse
    msg = parser.parsestr(clean_headers(raw_msg))

    body_text = next(
        (part.get_payload(decode=True).decode(errors='ignore')
         for part in (msg.walk() if msg.is_multipart() else [msg])
         if part.get_content_type() == 'text/plain'),
        ''
    )

    cleaned = clean_content(body_text)

    return {
        "from":   msg["from"],  "to": msg["to"],     "cc":  msg["cc"],
        "bcc":    msg["bcc"],   "subject": msg["subject"],
        "date":   msg["date"],  "msg_id": msg["message-id"],
        **cleaned
    }

In [None]:
parsed = df['message'].apply(parse_email)
parsed_df = pd.DataFrame(parsed.tolist())

In [None]:
parsed_df.head(10)

In [None]:
parsed_df.info()
parsed_df.isnull().sum()


There are 3 emails that still arent formatted like the others, let's drop them.

In [None]:
parsed_df = parsed_df[parsed_df["to"].notna()]
parsed_df = parsed_df[parsed_df["cc"].notna()]
parsed_df = parsed_df[parsed_df["bcc"].notna()]

In [None]:
parsed_df.info()
parsed_df.isnull().sum()

Great! Now we have a cleaned DataFrame with the parsed email data. Let's keep exploring the data further and then apply it to the whole dataset.

In [None]:
parsed_df['from'].value_counts().head(10)


In [None]:
parsed_df['to'].value_counts().head(10)

In [None]:
# Ensure 'date' is converted to datetime before using .dt accessor
if not pd.api.types.is_datetime64_any_dtype(parsed_df['date']):
	parsed_df['date'] = pd.to_datetime(parsed_df['date'], errors='coerce')
# parsed_df['year_month'] = parsed_df['date'].dt.to_period('M')
# parsed_df['year_month'].value_counts().sort_index().plot()


In [None]:
parsed_df.info()

In [None]:
parsed_df.head()

The data looks clean enough, lets export it in chunks for training!

In [None]:
chunk_iter = pd.read_csv(
    "../data/raw/emails.csv",
    usecols=['message'],
    chunksize=25000
)
for i, chunk in enumerate(chunk_iter):
    parsed = chunk['message'].apply(parse_email)
    parsed_df = pd.DataFrame(parsed.tolist())

    parsed_df = parsed_df[parsed_df["to"].notna()]
    parsed_df = parsed_df[parsed_df["cc"].notna()]
    parsed_df = parsed_df[parsed_df["bcc"].notna()]
    
    if not pd.api.types.is_datetime64_any_dtype(parsed_df['date']):
        parsed_df['date'] = pd.to_datetime(parsed_df['date'], errors='coerce')

    parsed_df.to_parquet(
        f"../data/interim/cleaned_emails_part{i:03d}.parquet",
        index=False
    )
