In [11]:
from bs4 import BeautifulSoup
from dotenv import load_dotenv
from email_reply_parser import EmailReplyParser
from email.utils import parsedate_tz, mktime_tz

import ast
import datetime
import mailbox
import ntpath
import os
import quopri
import re
import rules
import sys
import time
import unicodecsv as csv

# converts seconds since epoch to mm/dd/yyyy string
def get_date(second_since_epoch, date_format):
    if second_since_epoch is None:
        return None
    time_tuple = parsedate_tz(email["date"])
    utc_seconds_since_epoch = mktime_tz(time_tuple)
    datetime_obj = datetime.datetime.fromtimestamp(utc_seconds_since_epoch)
    return datetime_obj.strftime(date_format)

# clean content
def clean_content(content):
    # decode message from "quoted printable" format
    content = quopri.decodestring(content)

    # try to strip HTML tags
    # if errors happen in BeautifulSoup (for unknown encodings), then bail
    try:
        soup = BeautifulSoup(content, "html.parser", from_encoding="iso-8859-1")
    except Exception as e:
        return ''
    return ''.join(soup.findAll(text=True))

# get contents of email
def get_content(email):
    parts = []

    for part in email.walk():
        if part.get_content_maintype() == 'multipart':
            continue

        content = part.get_payload(decode=True)

        part_contents = ""
        if content is None:
            part_contents = ""
        else:
            part_contents = EmailReplyParser.parse_reply(clean_content(content))

        parts.append(part_contents)

    return parts[0]

# get all emails in field
def get_emails_clean(field):
    # find all matches with format <user@example.com> or user@example.com
    matches = re.findall(r'\<?([a-zA-Z0-9_\-\.]+@[a-zA-Z0-9_\-\.]+\.[a-zA-Z]{2,5})\>?', str(field))
    if matches:
        emails_cleaned = []
        for match in matches:
            emails_cleaned.append(match.lower())
        unique_emails = list(set(emails_cleaned))
        return sorted(unique_emails, key=str.lower)
    else:
        return []

# entry point
if __name__ == '__main__':
    argv = sys.argv

    if len(argv) != 2:
        print('usage: mbox_parser.py [path_to_mbox]')
        # In a Jupyter Notebook, set the mbox_file explicitly
        mbox_file = 'C:\\Users\\Here\\Desktop\\Disertatie-Final\\4. Cod\\thesis-phishing-email-detection-main\\phishing_dataset\\private-phishing4.mbox'  # Replace with the actual path to your mbox file
    else:
        # load environment settings
        load_dotenv(verbose=True)

        mbox_file = argv[1]
        file_name = ntpath.basename(mbox_file).lower()
        # Sanitize the export file name to remove invalid characters
        export_file_name = re.sub(r'[<>:"/\\|?*]', '_', mbox_file) + ".csv"
        export_file = open(export_file_name, "wb")

        # get owner(s) of the mbox
        owners = []
        if os.path.exists(".owners"):
            with open('.owners', 'r') as ownerlist:
                contents = ownerlist.read()
                owner_dict = ast.literal_eval(contents)
            # find owners
            for owners_array_key in owner_dict:
                if owners_array_key in file_name:
                    for owner_key in owner_dict[owners_array_key]:
                        owners.append(owner_key)

        # get domain blacklist
        blacklist_domains = []
        if os.path.exists(".blacklist"):
            with open('.blacklist', 'r') as blacklist:
                blacklist_domains = [domain.rstrip() for domain in blacklist.readlines()]

        # create CSV with header row
        writer = csv.writer(export_file, encoding='utf-8')
        writer.writerow(["flagged", "date", "description", "from", "to", "cc", "subject", "content", "time (minutes)"])

        # create row count
        row_written = 0

        for email in mailbox.mbox(mbox_file):
            # capture default content
            date = get_date(email["date"], os.getenv("DATE_FORMAT"))
            sent_from = get_emails_clean(email["from"])
            sent_to = get_emails_clean(email["to"])
            cc = get_emails_clean(email["cc"])
            subject = re.sub('[\n\t\r]', ' -- ', str(email["subject"]))
            contents = get_content(email)

            # apply rules to default content
            row = rules.apply_rules(date, sent_from, sent_to, cc, subject, contents, owners, blacklist_domains)

            # write the row
            writer.writerow(row)
            row_written += 1

        # report
        report = "generated " + export_file_name + " for " + str(row_written) + " messages"
        report += " (" + str(rules.cant_convert_count) + " could not convert; "
        report += str(rules.blacklist_count) + " blacklisted)"
        print(report)

        export_file.close()

OSError: [Errno 22] Invalid argument: 'c:\\Users\\Here\\Desktop\\Disertatie-Final\\4. Cod\\thesis-phishing-email-detection-main\\--f=c:\\Users\\Here\\AppData\\Roaming\\jupyter\\runtime\\kernel-v36a6a238d1130df4d166fd48a40e606443967080d.json'

In [12]:
from bs4 import BeautifulSoup
from dotenv import load_dotenv
from email_reply_parser import EmailReplyParser
from email.utils import parsedate_to_datetime, mktime_tz

import ast
import datetime
import mailbox
import ntpath
import os
import quopri
import re
import rules
import sys
import time
import unicodecsv as csv

# Convertește secunde de la epoch în șir de caracteres în format mm/dd/yyyy
def get_date(email, date_format):
    time_tuple = parsedate_to_datetime(email.get("date")).timetuple()
    utc_seconds_since_epoch = mktime_tz(time_tuple)
    datetime_obj = datetime.datetime.fromtimestamp(utc_seconds_since_epoch)
    return datetime_obj.strftime(date_format)

# Curăță conținutul
def clean_content(content):
    # Decodează mesajul din formatul "quoted printable"
    content = quopri.decodestring(content)

    # Încearcă să elimine etichetele HTML
    try:
        soup = BeautifulSoup(content, "html.parser", from_encoding="iso-8859-1")
    except Exception as e:
        return ''
    return ''.join(soup.findAll(text=True))

# Obține conținutul email-ului
def get_content(email):
    parts = []

    for part in email.walk():
        if part.get_content_maintype() == 'multipart':
            continue

        content = part.get_payload(decode=True)

        part_contents = ""
        if content is None:
            part_contents = ""
        else:
            part_contents = EmailReplyParser().parse_reply(clean_content(content))

        parts.append(part_contents)

    return parts[0]

# Obține toate email-urile din câmp
def get_emails_clean(field):
    # Găsește toate potrivirile cu formatul <user@example.com> sau user@example.com
    matches = re.findall(r'\<?([a-zA-Z0-9_\-\.]+@[a-zA-Z0-9_\-\.]+\.[a-zA-Z]{2,5})\>?', str(field))
    if matches:
        emails_cleaned = []
        for match in matches:
            emails_cleaned.append(match.lower())
        unique_emails = list(set(emails_cleaned))
        return sorted(unique_emails, key=str.lower)
    else:
        return []

# Punct de intrare
# Punct de intrare
if __name__ == '__main__':
    argv = sys.argv

    # Ignorați argumentele suplimentare care încep cu `--`
    argv = [arg for arg in argv if not arg.startswith('--')]

    if len(argv) != 2:
        print('usage: mbox_parser.py [path_to_mbox]')
        # Într-un Jupyter Notebook, setați fișierul mbox explicit
        mbox_file = 'C:\\Users\\Here\\Desktop\\Disertatie-Final\\4. Cod\\thesis-phishing-email-detection-main\\phishing_dataset\\private-phishing4.mbox'  # Înlocuiți cu calea reală către fișierul mbox
    else:
        # Încărcați setările de mediu
        load_dotenv(verbose=True)

        mbox_file = argv[1]
        file_name = ntpath.basename(mbox_file).lower()
        # Sanitizați numele fișierului de export pentru a elimina caracterele nevalide
        export_file_name = re.sub(r'[<>:"/\\|?*]', '_', file_name) + ".csv"
        export_file = open(export_file_name, "wb")

        # Obțineți proprietarii mbox-ului
        owners = []
        if os.path.exists(".owners"):
            with open('.owners', 'r') as ownerlist:
                contents = ownerlist.read()
                owner_dict = ast.literal_eval(contents)
            # Găsiți proprietarii
            for owners_array_key in owner_dict:
                if owners_array_key in file_name:
                    for owner_key in owner_dict[owners_array_key]:
                        owners.append(owner_key)

        # Obțineți lista neagră a domeniilor
        blacklist_domains = []
        if os.path.exists(".blacklist"):
            with open('.blacklist', 'r') as blacklist:
                blacklist_domains = [domain.rstrip() for domain in blacklist.readlines()]

        # Creează CSV cu rândul de antet
        writer = csv.writer(export_file, encoding='utf-8')
        writer.writerow(["flagged", "date", "description", "from", "to", "cc", "subject", "content", "time (minutes)"])

        # Creează numărătoare de rânduri
        row_written = 0

        for email in mailbox.mbox(mbox_file):
            # Capturați conținutul implicit
            date = get_date(email, os.getenv("DATE_FORMAT"))
            sent_from = get_emails_clean(email["from"])
            sent_to = get_emails_clean(email["to"])
            cc = get_emails_clean(email["cc"])
            subject = re.sub('[\n\t\r]', ' -- ', str(email["subject"]))
            contents = get_content(email)

            # Aplicați reguli la conținutul implicit
            row = rules.apply_rules(date, sent_from, sent_to, cc, subject, contents, owners, blacklist_domains)

            # Scrieți rândul
            writer.writerow(row)
            row_written += 1

        # Raport
        report = "generated " + export_file_name + " for " + str(row_written) + " messages"
        report += " (" + str(rules.cant_convert_count) + " could not convert; "
        report += str(rules.blacklist_count) + " blacklisted)"
        print(report)

        export_file.close()


usage: mbox_parser.py [path_to_mbox]


In [13]:
import os
import re
import ast
import csv
import quopri
import mailbox
import datetime
from bs4 import BeautifulSoup
from dotenv import load_dotenv
from email_reply_parser import EmailReplyParser
from email.utils import parsedate_to_datetime

# Funcția pentru conversia datei
def get_date(email, date_format="%m/%d/%Y"):
    date_str = email.get("date")
    if not date_str:  # Dacă nu există data, returnăm un placeholder
        return "Unknown"

    try:
        parsed_date = parsedate_to_datetime(date_str)
        if parsed_date is None:
            return "Unknown"
        return parsed_date.strftime(date_format)
    except Exception:
        return "Unknown"

# Funcția pentru curățarea conținutului HTML
def clean_content(content):
    try:
        # Încearcă să decodeze conținutul dacă este în format "quoted-printable"
        content = quopri.decodestring(content).decode(errors="ignore")
    except (ValueError, AttributeError):
        # Dacă nu se poate decoda, tratază conținutul ca șir de caractere
        content = content.decode(errors="ignore")

    try:
        soup = BeautifulSoup(content, "html.parser")
    except Exception:
        return ''
    return ''.join(soup.stripped_strings)

# Funcția pentru obținerea conținutului email-ului
def get_content(email):
    for part in email.walk():
        if part.get_content_maintype() == 'multipart':
            continue
        content = part.get_payload(decode=True)
        return EmailReplyParser.parse_reply(clean_content(content)) if content else ""

# Funcția pentru extragerea adreselor de email
def get_emails_clean(field):
    matches = re.findall(r'\<?([a-zA-Z0-9_\-\.]+@[a-zA-Z0-9_\-\.]+\.[a-zA-Z]{2,5})\>?', str(field))
    return sorted(set(match.lower() for match in matches)) if matches else []

# Încarcă variabilele de mediu
load_dotenv(verbose=True)

# Setează calea fișierului MBOX (modifică după necesități)
mbox_file = "C:\\Users\\Here\\Desktop\\Disertatie-Final\\4. Cod\\thesis-phishing-email-detection-main\\phishing_dataset\\private-phishing4.mbox"

# Setează fișierul CSV de export
export_file_name = os.path.splitext(os.path.basename(mbox_file))[0] + ".csv"

# Deschide fișierul CSV pentru scriere
with open(export_file_name, "w", newline='', encoding="utf-8-sig") as export_file:
    writer = csv.writer(export_file, quoting=csv.QUOTE_NONNUMERIC)
    writer.writerow(["date", "from", "to", "cc", "subject", "content"])

    # Parcurge email-urile și le scrie în CSV
    for email in mailbox.mbox(mbox_file):
        date = get_date(email)
        sent_from = get_emails_clean(email["from"])
        sent_to = get_emails_clean(email["to"])
        cc = get_emails_clean(email["cc"])
        subject = re.sub('[\n\t\r]', ' -- ', str(email["subject"]))
        contents = get_content(email)

        writer.writerow([date, ", ".join(sent_from), ", ".join(sent_to), ", ".join(cc), subject, contents])

print(f"Fișierul CSV '{export_file_name}' a fost generat cu succes!")


Error: need to escape, but no escapechar set