In [None]:
import ast
import email
import hashlib
import imaplib
import os
import re
import getpass
from datetime import datetime
from collections import defaultdict, Counter


## General Set Up
The username will prompt for a username input to ensure you don't store it in the script.

The password will prompt for a password input to ensure you don't store it in the script.

> **_Note_**: Most people will likely have 2fa enabled, so you will need to use a personal access token to authenticate. You can create one in your Google Account > Security > 2fa > App Passwords.

In [None]:
user_name = getpass.getpass(prompt="Enter your Gmail address: ")
password = getpass.getpass(prompt="Enter your app password: ")
save_path = "./ouput"


## Search Operations and Query Syntax for Gmail IMAP
The Gmail IMAP service allows you to search for emails with the same syntax as you would be able to if you were using the browser edition of Gmail by using the X-GM-RAW [1] extension of the SEARCH command.

This is especially useful for searching for emails with attachments as it will also search within the content of the attachments to find files.

In my case I wanted to search for an explicit string within quotes, so the sample provided below includes syntax for that arrangement.

You can modify the query to suit your needs.

Other options could include:
    - before:yyyy-mm-dd (before the specified date)
    - after:yyyy-mm-dd (after the specified date)
    - larger:20M (larger than 20MB)
    - smaller:20M (smaller than 20MB)
    - filename:pdf (search for pdf files)
    etc.

[1] https://developers.google.com/gmail/imap/imap-extensions#extension_of_the_search_command_x-gm-raw


In [None]:
query = '(X-GM-RAW "has:attachment \\" <your search query> \\"")'


### Folder Specification
Uncomment the below helper function to list the folders available for your account the folder to search in. In my case I wanted to search all mail items including sent so I used the \[Gmail\]\/All Mail folder in the format provided below.

> **_Note:_** This could also be '"[Google Mail]/All Mail"' - it was for my secondary account

In [None]:
# find_folders(user_name, password)


In [None]:
folder = '"[Gmail]/All Mail"'


#### Function Definitions

In [None]:
def save_state(resume_file, user_name, save_path):
    with open(resume_file, "w") as f:
        f.write(f"user_name = {user_name}\n")
        f.write(f"save_path = {save_path}\n")
    open(resume_file, "a").close()


In [None]:
def recover(resume_file, processed_id_file):
    user_name, save_path = None, None
    ProcessedMsgIDs = set()
    if os.path.exists(resume_file):
        print("Recovering last state...")
        if os.path.exists(processed_id_file):
            with open(processed_id_file) as f:
                processed_ids = f.read()
                for ProcessedId in filter(None, processed_ids.split(",")):
                    ProcessedMsgIDs.add(ast.literal_eval(ProcessedId))
        with open(resume_file) as f:
            last_state = f.read()
            user_name = last_state.split("\n")[0].split(" = ")[1]
            save_path = last_state.split("\n")[1].split(" = ")[1]
    else:
        print("No Recovery file found.")
    open(processed_id_file, "a").close()
    open(resume_file, "a").close()
    return user_name, save_path, ProcessedMsgIDs


In [None]:
def decode_mime_words(s):
    """Decode MIME encoded words in a string to a UTF-8 string."""
    decoded_words = email.header.decode_header(s)
    return "".join(
        word.decode(encoding or "utf-8") if isinstance(word, bytes) else word
        for word, encoding in decoded_words
    )


In [None]:
def generate_mail_messages(
    gmail_user_name,
    password,
    processed_id_file,
    processed_ids,
    folder,
    query,
    max_attempts=3,
):
    with imaplib.IMAP4_SSL("imap.gmail.com") as imap_session:
        imap_session.login(gmail_user_name, password)
        imap_session.select(folder)
        search_query = query
        session_typ, data = imap_session.search(None, search_query)
        print("Searching Inbox for emails with attachments.")
        if session_typ != "OK":
            print("Error searching Inbox.")
            raise Exception("Error searching Inbox.")
        for msg_id in data[0].split():
            print(f"Processing mail {msg_id}")
            if msg_id not in processed_ids:
                print(f"Fetching mail {msg_id}")
                attempts = 0
                while attempts < max_attempts:
                    msg_typ, message_parts = imap_session.fetch(msg_id, "(RFC822)")
                    if msg_typ == "OK":
                        email_body = message_parts[0][1]
                        yield email.message_from_bytes(email_body)
                        processed_ids.add(msg_id)
                        with open(processed_id_file, "a") as resume:
                            resume.write(f"{msg_id},")
                        break  # Break out of the retry loop
                    else:
                        print(
                            f"Error fetching mail {msg_id}, attempt {attempts + 1}/{max_attempts}"
                        )
                        attempts += 1
                if attempts == max_attempts:
                    print(
                        f"Failed to fetch mail {msg_id} after {max_attempts} attempts."
                    )


In [None]:
def build_directory(directory):
    if not os.path.exists(directory):
        os.makedirs(directory)


In [None]:
def extract_email_address(from_line):
    email_regex = r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,4}"
    match = re.search(email_regex, from_line)
    if match:
        return match.group(0)
    else:
        return None


In [None]:
def by_date_domain(save_folder, date, domain):
    sender_domain = "unknown_sender"
    sender_domain = domain if domain is not None else "unknown_sender_domain"
    path_date = date.strftime("/%Y/%m_%b/")
    build_directory(f"{save_folder}{path_date}{sender_domain}")
    new_save_folder = f"{save_folder}{path_date}{sender_domain}"
    return new_save_folder


In [None]:
def save_attachments(message, directory, file_name_counter, file_name_hashes):
    msg_from = message["From"]
    msg_date = message["Date"]
    parsed_date = email.utils.parsedate_tz(msg_date)
    date = datetime.fromtimestamp(email.utils.mktime_tz(parsed_date))
    file_safe_date = date.strftime("%Y_%m_%d")
    msg_domain = (
        msg_from.split("@")[-1].split(".")[0].replace(">", "")
        if "@" in msg_from
        else None
    )
    email_address = extract_email_address(msg_from)
    file_safe_address = email_address.replace("@", "_").replace(".", "_")

    directory = by_date_domain(directory, date, msg_domain)

    for part in message.walk():
        disposition = part.get("Content-Disposition")
        if part.get_content_maintype() == "multipart" or (
            disposition is not None
            and disposition.split(";")[0].strip() not in ["attachment", "inline"]
        ):
            continue

        file_name = part.get_filename()
        if file_name:
            file_name = decode_mime_words(file_name)
            file_name = re.sub(r'[\\/*?:"<>|]', "_", file_name)
            file_name = "".join(file_name.splitlines())
            file_name = f"{file_safe_date}_{file_safe_address}_{file_name}"
            payload = part.get_payload(decode=True)
            if payload:
                x_hash = hashlib.md5(payload).hexdigest()
                if x_hash not in file_name_hashes[file_name]:
                    file_name_counter[file_name] += 1
                    file_str, file_extension = os.path.splitext(file_name)
                    new_file_name = (
                        f"{file_str}(v.{file_name_counter[file_name]}){file_extension}"
                        if file_name_counter[file_name] > 1
                        else file_name
                    )
                    print(f"\tStoring: {new_file_name}")
                    file_name_hashes[file_name].add(x_hash)
                    file_path = os.path.join(directory, new_file_name)
                    if not os.path.exists(file_path):
                        with open(file_path, "wb") as fp:
                            fp.write(payload)
                    else:
                        print(f"\tExists in destination: {new_file_name}")
                else:
                    print(f"\tDuplicate detected, skipping: {file_name}")
            else:
                print("\tNo payload detected for this part.")
        else:
            print("\tNo filename detected for this part.")


In [None]:
def find_folders(gmail_user_name, password):
    imap_session = imaplib.IMAP4_SSL("imap.gmail.com")
    imap_session.login(gmail_user_name, password)
    print("Fetching folders...")
    folders = imap_session.list()
    print(folders)
    imap_session.logout()


##### Main Operation Executes Below

In [None]:
file_name_counter = Counter()
file_name_hashes = defaultdict(set)
resume_file = "resume.txt"
processed_id_file = "processed_ids.txt"

save_state(resume_file, user_name, save_path)

user_name, save_path, processed_ids = recover(resume_file, processed_id_file)

for msg in generate_mail_messages(
    gmail_user_name=user_name,
    password=password,
    processed_id_file=processed_id_file,
    processed_ids=processed_ids,
    folder=folder,
    query=query,
):
    save_attachments(msg, save_path, file_name_counter, file_name_hashes)

os.remove(processed_id_file)
os.remove(resume_file)
