In [1]:
import re
#import sys
import imaplib
import os
import argparse
from configobj import ConfigObj
from getpass import getpass
# import email
#from collections import defaultdict
from tqdm.notebook import tqdm
#import pandas as pd
import logging
from pathlib import Path

In [2]:
# Configure logging
logging.basicConfig(filename='icloud-cleaner-oo.log', level=logging.INFO, filemode="a",
                    format='%(asctime)s %(levelname)s:%(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S')

In [3]:
class ICloudCleaner:
    def __init__(self, config_file):
        self.config = ConfigObj(config_file)
        self.email_connection = None
        self._ensure_password()
        self._connect()
        logging.info("New cleaning job starting...")


    def _ensure_password(self):
        if self.config["password"] == "":
            self.config["password"] = getpass("Enter your email password: ")
            logging.info("Password obtained from user input.")
        else:
            logging.info("Using password from config.ini.")

    def _connect(self, mailbox="INBOX"):
        try:
            self.email_connection = imaplib.IMAP4_SSL(self.config["imap_server"], self.config["imap_port"])
            self.email_connection.login(self.config["username"], self.config["password"])
            self.email_connection.select(mailbox)
            logging.info(f"Successfully connected to {self.config['username']}@icloud.com - {mailbox}")
        except Exception as e:
            logging.error(f"Failed to connect: {e}")
            raise

    def search_emails(self, sender):
        try:
            _, data = self.email_connection.search(None, f'(FROM "{sender}")')
            mail_ids = data[0]
            return mail_ids.split() if mail_ids else None
        except Exception as e:
            logging.error(f"Error searching emails from {sender}: {e}")
            return None

    def set_deleted(self, email_uid):
        try:
            self.email_connection.uid("STORE", bytes(str(email_uid).strip(), "ascii"), "+FLAGS", "(\\Deleted)")
            logging.info(f"Email UID {email_uid} marked for deletion.")
        except Exception as e:
            logging.error(f"Error setting email UID {email_uid} as deleted: {e}")

    def fetch_uid(self, email_id):
        try:
            _, uid_string = self.email_connection.fetch(email_id, "UID")
            uid_str = [str(x, encoding="utf-8") for x in uid_string]
            uid_res = re.search(r"\((UID.*?)\)", uid_str[0])
            return uid_res[1].replace("UID", "") if uid_res else None
        except Exception as e:
            logging.error(f"Error fetching UID for email ID {email_id}: {e}")
            return None

    def clean_mailbox(self, target_emails):
        total_emails_count = 0
        for target_email in tqdm(target_emails, desc="Total emails", total=len(target_emails)):
            if not self.validate_input_email(target_email):
                logging.warning(f"The email '{target_email}' is not valid")
                continue

            emails = self.search_emails(target_email)
            emails_count = len(emails) if emails else 0
            total_deleted_emails = 0

            while emails_count > 0:
                for idx, e in tqdm(enumerate(emails), total=emails_count, desc=target_email):
                    uid = self.fetch_uid(e)
                    if uid:
                        self.set_deleted(uid)

                self.email_connection.expunge()
                logging.info(f"Deleted {emails_count} email(s) for {target_email}")
                emails = self.search_emails(target_email)
                emails_count = len(emails) if emails else 0
                total_deleted_emails += emails_count

            logging.info(f"Cleanup for {target_email} was successful. Deleted {total_deleted_emails} email(s)")
            total_emails_count += total_deleted_emails

        logging.info(f"The cleanup was successful. Deleted {total_emails_count} email(s)")
        return total_emails_count

    @staticmethod
    def validate_input_email(email_address):
        if re.match(r"[^@]+@[^@]+\.[^@]+", email_address):
            return True
        else:
            logging.warning(f"Invalid email format: {email_address}")
            return False

    @staticmethod
    def import_emails_from_file(filename):
        if os.path.isfile(filename):
            with open(filename, 'r') as file:
                emails = file.read().splitlines()
                logging.info(f"Imported {len(emails)} emails from {filename}.")
                return emails
        else:
            logging.error(f"The file {filename} doesn't exist.")
            return []

In [4]:
parser = argparse.ArgumentParser(description="Delete all incoming emails from a sender email address")
parser.add_argument("--email", help="The sender whose messages should be deleted")
parser.add_argument("--file", help="A file which contains the target emails. Each email should be on a separate line.")

_StoreAction(option_strings=['--file'], dest='file', nargs=None, const=None, default=None, type=None, choices=None, required=False, help='A file which contains the target emails. Each email should be on a separate line.', metavar=None)

In [5]:
args = parser.parse_args(["--file", (Path.cwd() / "target_email_address.txt").as_posix()])
if args.email is None and args.file is None and not Path(args.file).exists():
    print("You need to specify a target email or a file. Use --help for details")

In [6]:
try:
    cleaner = ICloudCleaner("config.ini")
    target_emails = cleaner.import_emails_from_file(args.file) if args.file else [args.email]
    total_emails_count = cleaner.clean_mailbox(target_emails)
    logging.info(f"Total emails deleted: {total_emails_count}")
    print(f"Total emails deleted: {total_emails_count}")
except Exception as e:
    logging.exception(f"An error occurred during the cleaning process: {e}")

Total emails:   0%|          | 0/251 [00:00<?, ?it/s]

Total emails deleted: 0
