In [1]:
import imaplib
import email
from email.header import decode_header
import webbrowser
import os
import re
import uuid
import sqlite3
import requests
from datetime import datetime

In [2]:
# account credentials
username = "daniel.higgins@yahoo.com"
password = "serbiwxklglgammm"
server = "imap.mail.yahoo.com"

In [3]:
#imap.list()

In [4]:
# Check if the current imap is logged in
def is_logged_in(imap):
    try:
        status, _ = imap.select("INBOX")
        if status == 'OK':
            return True
    except imaplib.IMAP4.error:
        pass
    return False

# Login to the email server
def imap_login(username, password, server):
    imap = imaplib.IMAP4_SSL(server)
    if not is_logged_in(imap):
        imap.login(username, password)
    return imap

# Get the unread messages from the given mailbox
def get_unread_message_ids(imap, mailbox="Inbox"):
    status, _ = imap.select(mailbox)
    status, unseen_messages = imap.search(None, "UNSEEN")
    message_ids = []
    if unseen_messages[0]:
        message_ids = unseen_messages[0].split()

    return message_ids

# Get the last n messages from the given mailbox
def get_last_n_message_ids(imap, n=-1, mailbox="Inbox"):
    # Set n = -1 to return all the elements in the mailbox
    imap.select(mailbox)
    status, messages = imap.search(None, "ALL")
    message_ids = []
    last_n_messages_ids = []
    if messages:
        message_ids = messages[0].split()
        last_n_messages_ids = message_ids if n == -1 else message_ids[-n:]
    return last_n_messages_ids

# Given a message_id read the specified message
def read_message(imap, message_id):
    message = None
    try:
        _, msg = imap.fetch(message_id, "(RFC822)")
        for response in msg:
            if isinstance(response, tuple):
                message = email.message_from_bytes(response[1])
    except Exception as e:
            print(f"Error on {message_id} while creating message body:{e}") 
    return message

# Given a message extract the given field from the message
def decode_header_field(message, field="Subject"):
    header_full_string = ""
    if message == None:
        return header_full_string
        
    field_value = message.get(field)
    if field_value != None:
        header_list = decode_header(field_value)
        for header in header_list:
            header_msg, encoding = header
            if isinstance(header_msg, bytes):
                try:
                    encoding = 'utf-8' if encoding is None else encoding
                    header_string = header_msg.decode(encoding)
                except Exception: 
                    #print(f"Error decoding string: {encoding} decoding failed")
                    try:
                        header_string = header_msg.decode('utf-8')
                    except UnicodeDecodeError: 
                        print(f"Failed a second time \n {header_msg}")
                        pass
            else:
                header_string = header_msg

            header_full_string += header_string
    
    # Only return prinatable charaters
    header_full_chars = [char for char in header_full_string if 32 <= ord(char) <= 126]
    header_full_string = ''.join(header_full_chars)
    return header_full_string
                

# Mark and email message as unread    
def mark_msg_as_unread(imap, message_id):
    imap.store(message_id, "-FLAGS", "\\Seen")

# Create a mailbox if it does not exists    
def create_mailbox(imap, mailbox_name):
    status, _ = imap.select(mailbox_name)
    if status == 'OK':
        pass # Mailbox exists
    else:
        # Create the mailbox
        try:
            status, _ = imap.create(mailbox_name)
            if status != 'OK':
                print(f"Failed to create mailbox '{mailbox_name}'. Please check if the name is valid.")
        except Exception as e:
            print("Error while creating message body:",e)    

            
# Extract any plain text and any HTML from the message body            
def get_message_body(message):
    body = ""
    try:
        if message.is_multipart():
            for part in message.walk():
                content_type = part.get_content_type()
                if content_type == 'text/plain':
                    body += part.get_payload(decode=True).decode()
                elif content_type == 'text/html':
                    body += part.get_payload(decode=True).decode()
        else:
            body = message.get_payload(decode=True).decode()
    except Exception as e:
        print("Error while getting message body:",e)
            
    return body

# Move a message to a different mailbox
def move_message(imap, message_id, uid, to_mailbox):
    # Move the message
    #imap.copy(message_id, to_mailbox)
    imap.uid("COPY", uid, to_mailbox)
    # Delete the original message
    imap.store(message_id, '+FLAGS', '\\Deleted')
    imap.expunge()
    
# Given an HTML fragment extract all anchor links <a href=...>    
def collect_links(body):
    matches = []
    try:
        # Regular expression to match URLs in <a href> tags
        url_regex = r'<a\s+(?:[^>]*?\s+)?href=[\'"](.*?)[\'"]'
        matches = re.findall(url_regex, body)
    except Exception as e:
        print("Error while collecting links:",e)

    return matches

# Given a string of text extract any valid email (expect only 1 email in the input_string)
def extract_email(input_string):
    found_email = None
    if input_string is None:
        return None
    try:
        email_regex = r"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+"
        match = re.search(email_regex, input_string)
        if match:
            found_email = match.group()
    except Exception as e:
        print("Error while collecting links:",e)
    return found_email

# Given a vaild email seperate the username and the domain name
def extract_domain(email):
    username, domain = None, None
    if email is None:
        return username, domain
    try:
        username, domain = email.split('@', 1)
        domain_parts = domain.split('.')
        if len(domain_parts) < 2:
            top_level_domain =  None
        else:
            top_level_domain =  '.'.join(domain_parts[-2:])
    except ValueError:
        username, domain = None, None
        
    return username, top_level_domain


# Given a string of text extract the text that is not the email 
def extract_name(input_string):
    if input_string is None:
        return None
    # Use a regular expression to remove anything between "<" and ">"
    name = re.sub(r'<.*?>', '', input_string)
    # Trim the result
    name = name.strip()
    return name

# Check if the given domain has an associated website
def domain_has_website(domain_name, timeout=5):
    if domain_name is None:
        return False
    url = f'http://{domain_name}'
    try:
        response = requests.head(url, timeout=timeout)
        return response.status_code in range(200, 400)
    except requests.exceptions.RequestException:
        return False

# Convert imap date string to data obj    
def convert_to_date(date_str):
    try:
        format_string = '%d %b %Y %H:%M:%S %z'
        if date_str[3] == ',':
            format_string = '%a, ' + format_string
        if date_str[-1] == ')':
            date_str = re.sub(r'\s+\([A-Z]+\)$', '', date_str)
        date_obj = datetime.strptime(date_str, format_string)
        return date_obj
    except ValueError:
        return None
    

In [5]:
# Connect to the SQLite database
def create_email_db(db_name, reset_db=False):
    if reset_db:
        if os.path.exists(db_name):
            os.remove(db_name)
            
    conn = sqlite3.connect(db_name)
    cursor = conn.cursor()
    
    # Create the emails table
    cursor.execute('''CREATE TABLE IF NOT EXISTS email
                  (uuid TEXT PRIMARY KEY, date date, sender_name TEXT, sender_email TEXT, reply_to TEXT, subject TEXT, message_id TEXT, has_website INTEGER)''')

    # Create the message_body_links table with a self-generated primary key and a foreign key constraint
    cursor.execute('''CREATE TABLE IF NOT EXISTS email_body_link
                  (id INTEGER PRIMARY KEY AUTOINCREMENT, uuid TEXT, url TEXT, FOREIGN KEY (uuid) REFERENCES email(uuid))''')
    return conn

def sqlite_date_format(date_obj):
    return date_obj.strftime('%Y-%m-%d %H:%M:%S')

def from_imap_sqlite_dt(date_str):
    ret_val = None
    date_obj = convert_to_date(date_str)
    if date_obj != None:
        ret_val = date_obj.strftime('%Y-%m-%d %H:%M:%S')
    return ret_val

def insert_into_email(conn, record):
    cursor = conn.cursor()
    cursor.execute('INSERT INTO email (uuid, date, sender_name, sender_email, subject, message_id, has_website) VALUES (?, ?, ?, ?, ?, ?, ?)', record)
    conn.commit()

def insert_into_message_body_link(conn, uuid, urls):
    cursor = conn.cursor()
    for url in urls:
        cursor.execute('INSERT INTO email_body_link (uuid, url) VALUES (?, ?)', (uuid, url))
    conn.commit()

In [6]:
def update_email_list_db(mailbox, reset_db=True):
    if mailbox=="Whitelist":
        email_list_db="./input_data/email_wl.db"
    elif mailbox=="Blacklist":
        email_list_db="./input_data/email_bl.db"
    else:
        print("Only Whitelist or Blacklist are allowed")
        return
    
    conn = create_email_db(email_list_db, reset_db)
    imap = imap_login(username, password, server)

    # not providing n parameter will get ALL messages in the mailbox
    message_ids = get_last_n_message_ids(imap, mailbox=mailbox)

    for message_id in message_ids:
        print(f"{message_id}",end=" ")
        msg = read_message(imap, message_id)
        msg_uuid        = str(uuid.uuid4())
        msg_date        = decode_header_field(msg, "Date")
        msg_sqlite_date = from_imap_sqlite_dt(msg_date)
        msg_from        = decode_header_field(msg, "From")
        msg_from_name   = extract_name(msg_from)
        msg_from_email  = extract_email(msg_from)
        msg_subject     = decode_header_field(msg, "Subject")
        msg_id          = decode_header_field(msg, "Message-ID")
        print(f"{msg_date=} {msg_sqlite_date=}")
        user, domain   = extract_domain(msg_from_email)
        has_website    = domain_has_website(domain,timeout=2)
        msg_body       = get_message_body(msg)
        msg_body_links = collect_links(msg_body)
        record = (msg_uuid, msg_sqlite_date, msg_from_name, msg_from_email, msg_subject, msg_id, has_website)
        insert_into_email(conn, record)
        insert_into_message_body_link(conn, msg_uuid, msg_body_links)

    conn.close()    
    


In [17]:
#####################################################################################################################
## BUILD WHITE LIST / BLACK LIST ##

update_email_list_db("Whitelist", reset_db=True)

b'1' msg_date='Wed, 21 Feb 2024 14:14:04 -0600' msg_sqlite_date='2024-02-21 14:14:04'
b'2' msg_date='Wed, 21 Feb 2024 09:52:34 -0600' msg_sqlite_date='2024-02-21 09:52:34'
b'3' msg_date='Wed, 21 Feb 2024 15:58:07 +0000' msg_sqlite_date='2024-02-21 15:58:07'
b'4' msg_date='Wed, 21 Feb 2024 09:50:01 -0500' msg_sqlite_date='2024-02-21 09:50:01'
b'5' msg_date='Wed, 21 Feb 2024 09:26:16 -0500' msg_sqlite_date='2024-02-21 09:26:16'
b'6' msg_date='Wed, 21 Feb 2024 05:40:10 -0800' msg_sqlite_date='2024-02-21 05:40:10'
b'7' msg_date='Mon, 12 Feb 2024 15:10:45 +0000' msg_sqlite_date='2024-02-12 15:10:45'
b'8' msg_date='Mon, 12 Feb 2024 17:25:27 +0000 (UTC)' msg_sqlite_date='2024-02-12 17:25:27'
b'9' msg_date='Fri, 16 Feb 2024 16:39:00 -0500' msg_sqlite_date='2024-02-16 16:39:00'
b'10' msg_date='Mon, 12 Feb 2024 13:05:36 +0000 (UTC)' msg_sqlite_date='2024-02-12 13:05:36'
b'11' msg_date='Mon, 29 Jan 2024 17:00:25 +0000' msg_sqlite_date='2024-01-29 17:00:25'
b'12' msg_date='Fri, 26 Jan 2024 18:58:1

In [8]:
# Print Name and Email
imap = imap_login(username, password, server)
message_ids = get_last_n_message_ids(imap, 1, mailbox="new_junk")
for message_id in message_ids:
    msg = read_message(imap, message_id)
    msg_from       = decode_header_field(msg, "From")
    msg_from_name  = extract_name(msg_from)
    msg_from_email = extract_email(msg_from)
    print(f"{msg_from_name=}")
    print(f"{msg_from_email=}")

msg_from_name='Devastating Disasters'
msg_from_email='uleideprimapresa@develop-map.com'


In [9]:
# Cosine sililarity finction
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np

stop_words=["i", "me", "my", "myself", "we", "our", "ours", "ourselves", "you", "your", "yours", "yourself", "yourselves",
            "he", "him", "his", "himself", "she", "her", "hers", "herself", "it", "its", "itself", "they", "them", "their", 
            "theirs", "themselves", "what", "which", "who", "whom", "this", "that", "these", "those", "am", "is", "are", 
            "was", "were", "be", "been", "being", "have", "has", "had", "having", "do", "does", "did", "doing", "a", "an", 
            "the", "and", "but", "if", "or", "because", "as", "until", "while", "of", "at", "by", "for", "with", "about", 
            "against", "between", "into", "through", "during", "before", "after", "above", "below", "to", "from", "up", 
            "down", "in", "out", "on", "off", "over", "under", "again", "further", "then", "once", "here", "there", "when", 
            "where", "why", "how", "all", "any", "both", "each", "few", "more", "most", "other", "some", "such", "no", "nor", 
            "not", "only", "own", "same", "so", "than", "too", "very", "can", "will", "just", "should", "now"]

def clean_phrase(stop_words, phrase):
    # Convert the phrase to lowercase
    cleaned_phrase = phrase.lower()
    
    # Remove stop words
    for word in stop_words:
        # Use regex to match whole words only
        cleaned_phrase = re.sub(rf'\b{re.escape(word.lower())}\b', '', cleaned_phrase)
        
    # Remove special characters
    special_characters = ['"', ',', '|', '@', '#', '^', '*', ' -']
    for char in special_characters:
        cleaned_phrase = cleaned_phrase.replace(char, '')
    
    # Remove extra spaces
    cleaned_phrase = ' '.join(cleaned_phrase.split())
    
    return cleaned_phrase


def cosine_similarity_score(s1, s2):
    # Create TF-IDF vectorizer
    vectorizer = TfidfVectorizer()
    
    # Fit-transform the input strings to TF-IDF vectors
    tfidf_matrix = vectorizer.fit_transform([s1, s2])
    
    # Calculate the cosine similarity between the TF-IDF vectors
    cosine_sim = cosine_similarity(tfidf_matrix[0], tfidf_matrix[1])[0][0]
    
    # Return the cosine similarity score (as a probability between 0 and 1)
    return cosine_sim

def sender_name_similarity(compare_name, email_sender_names):
    max_similarity=0
    max_similarity_sender_name = None
    compare_name = clean_phrase(stop_words, compare_name)
    for sender_name in email_sender_names:
        similarity = cosine_similarity_score(compare_name, sender_name)
        if similarity > 0:
            print(f"{similarity} {compare_name} {sender_name}")
        if similarity > max_similarity:
            max_similarity = similarity
            max_similarity_sender_name = sender_name
    if ends_with_ad(compare_name) == 1:
        max_similarity, max_similarity_sender_name = (1, "ends_with_ad")
    if marketing_words(compare_name) == 1:
        max_similarity, max_similarity_sender_name = (1, "marketing_words")
    return max_similarity, max_similarity_sender_name

def ends_with_ad(compare_name):
    ret_val = 0
    if compare_name[-3:] == ' ad':
        ret_val = 1
    return ret_val

def marketing_words(compare_name):
    ret_val = 0
    marketing_words = ['affiliate', 'ad partner']
    for marketing_word in marketing_words:
        if marketing_word in compare_name:
            ret_val = 1
    return ret_val
    

In [15]:

import sqlite3
import pandas as pd

def get_email_df(email_db):
    conn = sqlite3.connect(email_db)
    query = 'SELECT * FROM email'
    email_df = pd.read_sql_query(query, conn)
    conn.close()
    return email_df

def get_domain_list(email_df):
    email_df['domain'] = email_df['sender_email'].apply(lambda x: extract_domain(x)[1])
    unique_domains = email_df['domain'].unique()
    return unique_domains

def get_similarity_names(email_sender_names):
    similarity_names = []
    for email_sender_name in email_sender_names:
        similarity_name = clean_phrase(stop_words, email_sender_name)
        similarity_names.append(similarity_name)
    return similarity_names

email_wl_df = get_email_df('./input_data/email_wl.db')
email_bl_df = get_email_df('./input_data/email_bl.db')

email_bl_emails = email_bl_df.sender_email.unique()
email_wl_emails = email_wl_df.sender_email.unique()
email_bl_sender_names = email_bl_df.sender_name.unique()
email_bl_similarity_names = get_similarity_names(email_bl_sender_names)
email_wl_sender_names = email_wl_df.sender_name.unique()
email_bl_domains = get_domain_list(email_bl_df)
email_wl_domains = get_domain_list(email_wl_df)
#email_bl_similarity_names

In [11]:
def test_sender_name(sender_name, email_bl_sender_names, email_wl_sender_names):
    email_class = 0
    if sender_name in email_wl_sender_names:
        email_class = 1
    elif sender_name in email_bl_sender_names:
        email_class = -1
    return email_class

def test_sender_email(sender_email, email_bl_emails, email_wl_emails):
    email_class = 0
    if sender_email in email_wl_emails:
        email_class = 1
    elif sender_email in email_bl_emails:
        email_class = -1
    return email_class

def test_sender_domain(sender_domain, email_bl_domains, email_wl_domains):
    email_class = 0
    if sender_domain in email_wl_domains:
        email_class = 1
    elif sender_domain in email_bl_domains:
        email_class = -1
    return email_class


In [21]:
#####################################################################################################################
## RUN DAILY ##
imap = imap_login(username, password, server)
#message_ids = get_last_n_message_ids(imap, 25, mailbox="Inbox")
message_ids = get_unread_message_ids(imap, mailbox="Inbox")
move_message_ids = []

for message_id in reversed(message_ids):
    print(f"{message_id}",end=" ")
    msg = read_message(imap, message_id)
    if msg != None:
        msg_from        = decode_header_field(msg, "From")
        msg_from_name   = extract_name(msg_from)
        msg_from_email  = extract_email(msg_from)
        user, domain    = extract_domain(msg_from_email)
        detected_from_name = test_sender_name(msg_from_name, email_bl_sender_names, email_wl_sender_names)
        detected_from_email = test_sender_email(msg_from_email, email_bl_emails, email_wl_emails)
        detected_from_domain = test_sender_domain(domain, email_bl_domains, email_wl_domains)
        detected_from_similarity = 0
        if detected_from_name + detected_from_email + detected_from_domain == 0:
            score, similarity_sender_name = sender_name_similarity(msg_from_name, email_bl_similarity_names)
            detected_from_similarity = -1 if score >.5699 else 0
            if score >.579:
                print(f"{score}, {similarity_sender_name}")
                    
            
        print(f"{msg_from} {detected_from_name} {detected_from_email} {detected_from_domain} {detected_from_similarity}")
        detected = detected_from_name + detected_from_email + detected_from_domain + detected_from_similarity
        if detected < 0:
            status, data = imap.fetch(message_id, "(UID)")
            uid = data[0].split()[-1].decode()
            uid = uid.rstrip(")")
            move_message_ids.append(('new_junk',message_id, uid))
        elif detected == 0:
            status, data = imap.fetch(message_id, "(UID)")
            uid = data[0].split()[-1].decode()
            uid = uid.rstrip(")")
            move_message_ids.append(('marketing',message_id, uid))
        else:
            mark_msg_as_unread(imap, message_id)

move_message_ids = sorted(move_message_ids, key=lambda x: x[1], reverse=True)
for to_mailbox, message_id, uid in move_message_ids:
    move_message(imap, message_id, uid, to_mailbox)
        
    

In [None]:
#####################################################################################################################
import smtplib

def send_email(sender_email, receiver_email, subject, body, smtp_server, smtp_port, smtp_username, smtp_password):
    # Create the message content
    message = f"Subject: {subject}\n\n{body}"

    try:
        # Connect to the SMTP server
        server = smtplib.SMTP_SSL(smtp_server, smtp_port)

        # Login to the SMTP server with username and password
        server.login(smtp_username, smtp_password)

        # Send the email
        server.sendmail(sender_email, receiver_email, message)

        # Close the connection to the SMTP server
        server.quit()

        print("Email sent successfully!")
    except smtplib.SMTPAuthenticationError:
        print("SMTP Authentication failed. Check your username and password.")
    except smtplib.SMTPException as e:
        print(f"An error occurred while sending the email: {e}")

# Replace the following with your actual email and server information
sender_email = "daniel.higgins@yahoo.com"
receiver_email = "dphiggins@gmail.com"
subject = "Test Email"
body = "Hello, this is a test email sent from Python!"
smtp_server = "smtp.mail.yahoo.com"
smtp_port = 465  # For SSL/TLS connection
smtp_username = "daniel.higgins@yahoo.com"
smtp_password = "serbiwxklglgammm"

send_email(sender_email, receiver_email, subject, body, smtp_server, smtp_port, smtp_username, smtp_password)


In [13]:
import math
from collections import Counter

def tfidf_vectorizer(text):
    words = text.split()
    tf = Counter(words)
    tfidf = {}
    for word, freq in tf.items():
        tfidf[word] = freq / len(words)
    return tfidf

def cosine_similarity(s1, s2):
    vec1 = tfidf_vectorizer(s1)
    vec2 = tfidf_vectorizer(s2)
    
    common_words = set(vec1.keys()) & set(vec2.keys())
    
    dot_product = sum(vec1[word] * vec2[word] for word in common_words)
    magnitude_vec1 = math.sqrt(sum(vec1[word] ** 2 for word in vec1))
    magnitude_vec2 = math.sqrt(sum(vec2[word] ** 2 for word in vec2))
    
    if magnitude_vec1 * magnitude_vec2 == 0:
        return 0  # To avoid division by zero
    else:
        return dot_product / (magnitude_vec1 * magnitude_vec2)

# Example usage
string1 = "Charles Schwab & Co., Inc."
string2 = "Charles Schwab"
score = cosine_similarity(string1, string2)
print("Cosine Similarity Score:", score)


Cosine Similarity Score: 0.6324555320336758


In [14]:
def bl_sender_name_similarity(compare_name):
    max_similarity=0
    max_similarity_sender_name = None
    for sender_name in email_bl_sender_names:
        similarity = cosine_similarity_score(compare_name, sender_name)
        if similarity > max_similarity:
            similarity = max_similarity
        
    print(f"{similarity} {sender_name}")
    

In [15]:
imap = imap_login(username, password, server)
message_ids = get_last_n_message_ids(imap, 1, mailbox="Inbox")
status, data = imap.fetch(message_ids[0], "(UID)")
uid = data[0].split()[-1].decode()
uid = uid.rstrip(")")
print("Status:", status)
print("Data:", data)
print("UID", uid)
imap.uid("COPY", uid, to_mailbox)

Status: OK
Data: [b'10000 (UID 701503)']
UID 701503


NameError: name 'to_mailbox' is not defined

In [697]:
to_mailbox

'marketing'