# Requirements

In [9]:
import imaplib
import email
from email import policy
import os
import pandas as pd
from email.header import decode_header
import re
from bs4 import BeautifulSoup
import csv

In [10]:
import sys
sys.path.append(os.path.abspath('../../src'))
from helper_functions.path_resolver import DynamicPathResolver

In [19]:
dpr = DynamicPathResolver(marker="README.md")

raw_dir = dpr.path.data.raw._path
raw_data_path = dpr.path.data.raw.data_mail.own.mails_raw_csv
cleaned_data_path = dpr.path.data.raw.data_mail.own.mails_cleaned_csv
labeled_data_path = dpr.path.data.raw.data_mail.own.mails_labeled_csv

Project Root: c:\Users\ilian\Documents\Projects\git_projects\university\phishing_detection


# Mails from S., M., A.

### Mail extraction

##### IMAP connect

In [None]:
IMAP_SERVER = "imap.web.de"
EMAIL_ACCOUNT = ''
EMAIL_PASSWORD = ''

In [5]:
mail = imaplib.IMAP4_SSL(IMAP_SERVER)
mail.login(EMAIL_ACCOUNT, EMAIL_PASSWORD)

('OK', [b'LOGIN completed'])

##### Functions

In [4]:
def extract_email_address(sender):
    email_match = re.search(r'<(.+?)>', sender)
    return email_match.group(1) if email_match else sender 

In [5]:
def decode_mime_header(header_value):
    if header_value:
        decoded_parts = decode_header(header_value)
        decoded_text = []

        for part, encoding in decoded_parts:
            try:
                if isinstance(part, bytes):
                    decoded_text.append(part.decode(encoding or "utf-8", errors="replace"))  
                else:
                    decoded_text.append(part)
            except LookupError:  
                decoded_text.append(part.decode("utf-8", errors="replace"))  

        return " ".join(decoded_text)
    return "Unknown"

In [6]:
def clean_html_body(html_body):
    soup = BeautifulSoup(html_body, "html.parser")
    text = soup.get_text(separator="\n", strip=True)
    return text

In [7]:
def fetch_emails(folder, label):
    mail.select(folder)
    status, messages = mail.search(None, "ALL")
    email_ids = messages[0].split()
    email_data = []

    for email_id in email_ids:
        status, msg_data = mail.fetch(email_id, "(RFC822)")
        for response_part in msg_data:
            if isinstance(response_part, tuple):
                msg = email.message_from_bytes(response_part[1])

                print(f"Raw Subject: {msg['Subject']}")
                print(f"Decoded Subject: {decode_mime_header(msg['Subject'])}")

                # Decode headers
                subject = decode_mime_header(msg["Subject"]) or "No Subject"
                sender = decode_mime_header(msg["From"])
                sender = extract_email_address(sender)
                date = msg["Date"]
                body = ""

                # Only plain text
                if msg.is_multipart():
                    for part in msg.walk():
                        content_type = part.get_content_type()
                        content_disposition = part.get("Content-Disposition", "")

                        # Skip attachmenets
                        if content_type == "text/plain" and "attachment" not in content_disposition:
                            body = part.get_payload(decode=True).decode(errors="ignore")
                            break  
                else:
                    body = msg.get_payload(decode=True).decode(errors="ignore")

                email_data.append([label, date, sender, subject, body])

    return email_data


In [8]:
def save_to_csv(emails_data, path):
    file_exists = os.path.isfile(path)
    df = pd.DataFrame(emails_data, columns=["Label", "Date", "Sender", "Subject", "Body"])

    df.to_csv(
        path,
        mode='a',  
        header=not file_exists, 
        index=False,
        encoding="utf-8",  
        quoting=csv.QUOTE_MINIMAL, 
        quotechar='"', 
    )

    return df

##### Save extracted

In [12]:
get_all = False

In [None]:
if get_all:
    # Unlabeled
    unlabeled_emails = fetch_emails("INBOX", -1)
    df_unlabeled = save_to_csv(unlabeled_emails, raw_data_path)
    print(f"Saved {len(df_unlabeled)} unlabeled emails to mails_raw.csv")

    # Legit
    legit_emails = fetch_emails("INBOX", 0)
    df_legit = save_to_csv(legit_emails, raw_data_path)
    print(f"Saved {len(df_legit)} legit emails to mails_raw.csv")

    # Spam
    spam_emails = fetch_emails("Spam", 1) 
    df_spam = save_to_csv(spam_emails, raw_data_path)
    print(f"Saved {len(df_spam)} spam emails to mails_raw.csv")

    mail.logout()

##### Load raw

In [13]:
raw_df = pd.read_csv(raw_data_path)
print(f"Total emails: {len(raw_df)}")
print(raw_df["Label"].value_counts()) 

Total emails: 6382
Label
-1    6310
 1      51
 0      21
Name: count, dtype: int64


### Clean mails

##### Remove html

In [20]:
def clean_and_save(df, save_path, filter):
    def clean_html(text):
        return BeautifulSoup(str(text), "html.parser").get_text(separator="\n", strip=True)
    
    if filter:
        df["Body"] = df["Body"].apply(clean_html)
        df.to_csv(save_path, index=False, encoding="utf-8")
        print(f"Cleaned dataset saved as {save_path}")


In [21]:
clean_and_save(raw_df, cleaned_data_path, filter=True)

  return BeautifulSoup(str(text), "html.parser").get_text(separator="\n", strip=True)
  return BeautifulSoup(str(text), "html.parser").get_text(separator="\n", strip=True)


Cleaned dataset saved as C:\Users\ilian\Documents\Projects\git_projects\university\phishing_detection\data\raw\data_mail\own\mails_cleaned.csv


##### Load cleaned

In [40]:
cleaned_df = pd.read_csv(cleaned_data_path)
print(f"Total emails: {len(cleaned_df)}")
print(cleaned_df["label"].value_counts()) 

Total emails: 6382
label
-1    6310
 1      51
 0      21
Name: count, dtype: int64


### Label

##### Load to label

In [None]:
to_label_df = pd.read_csv(labeled_data_path)
print(f"Total emails: {len(to_label_df)}")
print(to_label_df["label"].value_counts()) 

Total emails: 6382
label
-1    6310
 1      51
 0      21
Name: count, dtype: int64


##### Auto label

In [42]:
def auto_label_emails(df, legit_senders, spam_senders, save_path):
    # Label legit emails (0)
    df.loc[df["sender"].isin(legit_senders), "label"] = 0
    legit_count = df["label"].value_counts().get(0, 0)
    print(f"‚úÖ Auto-labeled {legit_count} emails as LEGIT.")

    # Label spam emails (1)
    df.loc[df["sender"].isin(spam_senders), "label"] = 1
    spam_count = df["label"].value_counts().get(1, 0)
    print(f"üö® Auto-labeled {spam_count} emails as SPAM.")

    # Save
    df.to_csv(save_path, index=False, encoding="utf-8")
    print(f"üéâ All emails labeled and saved at: {save_path}")

    return df

In [43]:
known_legit_senders = [
    "service@paypal.com",
    "noreply@amazon.de",
    "support@web.de",
    "noreply@mail.kleinanzeigen.de",
    "neu@mailings.web.de",
    "mail@newsletterdirekt24.eu",
    "info@newsletter.agrar-fachversand.com",
    "mail@newsletterdirekt24.eu",
    "newsletter@yatego.com",
    "noreply@ebay-kleinanzeigen.de",
    "info@bestprovita.com",
    "ebay@reply.ebay.de",
    "ebay@ebay.com",
    "info@customer.autobutler.de",
    "info@agrar-fachversand.com",
]

In [44]:
known_spam_senders = [
    "info@nl.you-buy.net",
    "info@waytohearts.live",
    "admin@treffegirls.com",
    "noreply@mylove.ru",
    "noreply@liebesfun.de",
    "info@beboo.ru",
    "best-pair@fotostrana.ru",
    "user-events@fotostrana.ru",
    "gifts@fotostrana.ru",
    "noreply@fotostrana.ru",
    "status@fotostrana.ru",
    "noreply@znakomstva.ru",
    "events@lafh.org",
    "skg@lafh.org",
    "info@fastheart-connect.eu",
    "dave@cadamedia.ie",
    "k_nadin@i.ua",
    "redaktion@newsletter.lonelyplanet.de",
    "Mailer-Daemon@diplom54.ru",
    "info@magic-mob.com",
    "info@sendmenews.live",
    "info@flashingpanties.eu",
    "z@proton-m03.sarbc.ru",
    "service@ga.acmaildist.com",
    "Thomas.ehrlich5481616@rifsgo.xyz",
    "Stefan-Schubert6786126@meggod.xyz",
    "Stefan-Schubert1081211@sortgo.xyz",
    "Dieter.Kirchner4913521@ankito.xyz",
    "noreply@znakomstva.ru",
    "admin@gibmirsex.com",
    "info@hsr-europe.eu",
    "info@sluttyhour.live",
    "noreply@fpz.de",
    "info@promaster-msg.live",
    "dominik-schroeder@ziegel.de",
    "marina@kouch-stilist.ru",
    "noreply@swing-zone.com",
    "meeting@fotostrana.ru",
    ]

In [39]:
to_label_df = auto_label_emails(to_label_df, known_legit_senders, known_spam_senders, labeled_data_path)
print(to_label_df["label"].value_counts()) 

‚úÖ Auto-labeled 1413 emails as LEGIT.
üö® Auto-labeled 784 emails as SPAM.
üéâ All emails labeled and saved at: C:\Users\ilian\Documents\Projects\git_projects\university\phishing_detection\data\raw\data_mail\own\mails_labeled.csv
label
-1    4185
 0    1413
 1     784
Name: count, dtype: int64


##### Manual label

In [None]:
for index, row in to_label_df[to_label_df["label"] == -1].iterrows():
    print("\n" + "=" * 50)
    print(f"üì© Sender: {row['sender']}")
    print(f"üìú Subject: {row['subject']}")

    label = input("Label this email (0 = legit, 1 = spam, -1 = skip): ")

    try:
        label = int(label)
        if label in [0, 1]:
            sender = row['sender']
            to_label_df.loc[to_label_df['sender'] == sender, 'label'] = label
            print(f"‚úÖ All emails from {sender} have been labeled as {'LEGIT' if label == 0 else 'SPAM'}.")

    except ValueError:
        print("Skipping email...")

    if index % 10 == 0:
        to_label_df.to_csv(labeled_data_path, index=False, encoding="utf-8")
        print("‚úÖ Progress saved.")



üì© Sender: info@beboo.ru
üìú Subject: Anna, 45 -  –•–æ—Ç–∏—Ç–µ –≤—Å—Ç—Ä–µ—Ç–∏—Ç—å—Å—è?
‚úÖ All emails from info@beboo.ru have been labeled as SPAM.

üì© Sender: user-events@fotostrana.ru
üìú Subject: sergej, —Ç–µ–±—è —Ö–æ—Ç—è—Ç –ø–æ–∑–Ω–∞–∫–æ–º–∏—Ç—å —Å –æ–¥–Ω–æ–π –¥–µ–≤—É—à–∫–æ–π...
‚úÖ All emails from user-events@fotostrana.ru have been labeled as LEGIT.

üì© Sender: neu@mailings.web.de
üìú Subject: 4 GB LTE-Tarif nur 6,99 ‚Ç¨ mtl.¬π
Skipping email...

üì© Sender: newsletter@yatego.com
üìú Subject: Ihre Z√§hne immer erstklassig versorgt
‚úÖ All emails from newsletter@yatego.com have been labeled as SPAM.

üì© Sender: noreply@ebay-kleinanzeigen.de
üìú Subject: Neue Treffer zu Ihrer Suche "Verschenken - eternitplatten in Ganz
 Deutschland"
‚úÖ All emails from noreply@ebay-kleinanzeigen.de have been labeled as LEGIT.

üì© Sender: gifts@fotostrana.ru
üìú Subject: üéÅ –£ –≤–∞—Å –ø–æ—è–≤–∏–ª—Å—è –µ—â–µ –æ–¥–Ω–∞ —Ç–∞–π–Ω–∞—è –ø–æ–∫–ª–æ–Ω–Ω–∏—Ü–∞!
‚úÖ All emails from gifts@fot

# Mails from J. Mail Server

In [11]:
from email.parser import BytesParser

In [13]:
def extract_text_from_html(html_content):
    soup = BeautifulSoup(html_content, 'html.parser')
    return soup.get_text(separator='\n', strip=True)


def extract_email_data(email_file):
    encodings = ['utf-8', 'windows-1252', 'iso-8859-1']
    
    for enc in encodings:
        try:
            with open(email_file, 'rb') as f:
                msg = BytesParser(policy=policy.default).parse(f)

            sender = msg.get("From", None)
            subject = msg.get("Subject", None)
            to = msg.get("To", None)
            date = msg.get("Date", None)

            email_text = ""
            if msg.is_multipart():
                for part in msg.iter_parts():
                    if part.get_content_type() == 'text/plain':
                        email_text = part.get_payload(decode=True).decode(enc, errors='replace')
                        break
            else:
                if msg.get_content_type() == 'text/plain':
                    email_text = msg.get_payload(decode=True).decode(enc, errors='replace')

            email_text = email_text.replace('\r\n', '\n').replace('\r', '\n')
            return sender, subject, to, date, email_text.strip() if email_text.strip() else None

        except Exception as e:
            print(f"Failed to read {email_file} with encoding {enc}: {e}")
    
    return None, None, None, None, None

In [14]:
def process_spam_folder(spam_folder_path, output_csv_path):
    email_data = []

    if not os.path.exists(spam_folder_path):
        print(f"Folder not found: {spam_folder_path}")
        return

    for root, dirs, files in os.walk(spam_folder_path):
        for file in files:
            email_file_path = os.path.join(root, file)
            sender, subject, to, date, body = extract_email_data(email_file_path)

            email_data.append([sender if sender else "", 
                               subject if subject else "", 
                               to if to else "", 
                               date if date else "", 
                               body if body else ""])

    os.makedirs(os.path.dirname(output_csv_path), exist_ok=True)

    with open(output_csv_path, mode='w', newline='', encoding='utf-8') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(['Sender', 'Subject', 'Receiver', 'Date', 'Body'])
        writer.writerows(email_data)

    print(f"Data saved in: {output_csv_path}")

In [15]:
def label_spam_csv(csv):
    df = pd.read_csv(csv)
    df['Label'] = 1

    df.to_csv(csv, index=False, encoding='utf-8')
    print(f"Labeled CSV saved to: {csv}")

##### Exectute on VM

In [None]:
spam_folder = r"C:\\Users\\ilian\\Downloads\\spam"
output_csv = paths.data.raw.data_mail.own.jannis_mail_csv

In [17]:
os.makedirs(spam_folder, exist_ok=True)
with open(os.path.join(spam_folder, 'test_sample.eml'), 'wb') as f:
    f.write(b"From: test@example.com\nTo: recipient@example.com\nSubject: Test Email with Umlauts\nDate: Fri, 16 Feb 2025 12:34:56 +0000\n\nHello, this is a test email with special characters like \xe4, \xfc, and \xf6.\nBest regards,\nTester")


In [18]:
process_spam_folder(spam_folder, output_csv)

Data saved in: c:\Users\ilian\Documents\Projects\git_projects\university\phishing_detection\src\data_preperation\data\raw\jannis_mail.csv


In [27]:
label_spam_csv(output_csv)

Labeled CSV saved to: c:\Users\ilian\Documents\Projects\git_projects\university\phishing_detection\data\raw\data_mail\own\jannis_mail.csv
