In [1]:
import os
import numpy as np
import base64
import psycopg2
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from datetime import datetime, timedelta
from openai import OpenAI, AsyncOpenAI
from bs4 import BeautifulSoup
import re

import base64
import asyncio

In [2]:
import nest_asyncio
nest_asyncio.apply()  # Allows `asyncio.run()` inside Jupyter

In [3]:
from dotenv import load_dotenv # used to load env variables
load_dotenv()

True

In [4]:
# Define your time range
START_DATE = "2025-02-20"  # Change to your desired start date (YYYY-MM-DD)
END_DATE = "2025-03-05"    # Change to your desired end date (YYYY-MM-DD)

# Gmail API scope
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']


# def to_datetime(date):
#     '''Convert date time string into date time object'''
#     date_format = '%a, %d %b %Y %H:%M:%S %z'
#     return datetime.strptime(date.replace(' (UTC)', '') , date_format)


# Connect to Gmail API
def authenticate_gmail():
    creds = None
    if os.path.exists("token.json"):
        creds = Credentials.from_authorized_user_file("token.json", SCOPES)
    if not creds or not creds.valid:
        flow = InstalledAppFlow.from_client_secrets_file("google_creds.json", SCOPES)
        creds = flow.run_local_server(port=0)
        with open("token.json", "w") as token:
            token.write(creds.to_json())
    return build("gmail", "v1", credentials=creds)

def date_to_unix(date_str):
    dt = datetime.strptime(date_str, "%Y-%m-%d")
    return int(dt.timestamp())  # Convert to Unix timestamp (seconds)

# Get emails within the time range
# def get_emails(service):
#     query = f"after:{date_to_unix(START_DATE)} before:{date_to_unix(END_DATE)}"
#     results = service.users().messages().list(userId="me", q=query, maxResults=None).execute()
#     messages = results.get("messages", [])
        
#     emails = []
#     for msg in messages:
#         msg_data = service.users().messages().get(userId="me", id=msg["id"]).execute()
#         payload = msg_data.get("payload", {})
        
#         # Extract headers
#         headers = {header["name"]: header["value"] for header in payload.get("headers", [])}
#         sender = headers.get("From", "Unknown Sender")
#         subject = headers.get("Subject", "No Subject")
#         date = headers.get("Date", "Unknown Date")
        
#         # Extract email content
#         body = ""
#         if "parts" in payload:
#             for part in payload["parts"]:
#                 if part["mimeType"] == "text/plain":
#                     body = base64.urlsafe_b64decode(part["body"]["data"]).decode("utf-8")
#         elif "body" in payload and "data" in payload["body"]:
#             body = base64.urlsafe_b64decode(payload["body"]["data"]).decode("utf-8")
            
        
#         if body != '':
#             emails.append({'sender':sender, 'subject':subject, 'date':date, 'body':body})
        
#     return emails


def get_all_messages(service, query):
    """Fetch all message IDs using pagination."""
    messages = []
    request = service.users().messages().list(userId="me", q=query, maxResults=500)
    
    while request:
        response = request.execute()
        messages.extend(response.get("messages", []))
        request = service.users().messages().list_next(request, response)  # Get next page

    return messages


def get_emails(service):
    """Download emails within a specific date range and extract relevant fields."""
    query = f"after:{date_to_unix(START_DATE)} before:{date_to_unix(END_DATE)}"
    
    messages = get_all_messages(service, query)  # Use paginated function

    emails = []
    for msg in messages:
        msg_data = service.users().messages().get(userId="me", id=msg["id"]).execute()
        payload = msg_data.get("payload", {})
        
        # Extract headers
        headers = {header["name"]: header["value"] for header in payload.get("headers", [])}
        sender = headers.get("From", "Unknown Sender")
        subject = headers.get("Subject", "No Subject")
        date = headers.get("Date", "Unknown Date")
        
        # Extract email content
        body = ""
        if "parts" in payload:
            for part in payload["parts"]:
                if part["mimeType"] == "text/plain":
                    body = base64.urlsafe_b64decode(part["body"]["data"]).decode("utf-8")
        elif "body" in payload and "data" in payload["body"]:
            body = base64.urlsafe_b64decode(payload["body"]["data"]).decode("utf-8")

        if body.strip():  # Ensure body is not empty
            emails.append({'sender': sender, 'subject': subject, 'date': date, 'body': body})

    return emails


In [12]:
def to_datetime(date_str):
    """Convert email timestamps into a standardized datetime object or return None if unknown."""

    if isinstance(date_str, datetime):
        return date_str  # Already a datetime object, return as is

    # if not isinstance(date_str, str):
    #     raise TypeError(f"Expected a string or datetime, but got {type(date_str)}: {date_str}")

    # Handle unknown date cases
    if date_str.strip().lower() in {"unknown date", "", "none", "null"}:
        return None

    # Remove any timezone abbreviations in parentheses (e.g., (UTC), (PST), etc.)
    date_str = re.sub(r"\s*\([^)]*\)\s*", "", date_str)

    # Replace "GMT" with "+0000" for timezone parsing
    date_str = date_str.replace("GMT", "+0000")

    # Possible datetime formats to try
    formats = [
        "%a, %d %b %Y %H:%M:%S %z",  # With weekday and numeric timezone (e.g., +0000)
        "%d %b %Y %H:%M:%S %z",      # Without weekday, with numeric timezone
        "%a, %d %b %Y %H:%M:%S",     # With weekday, no timezone
        "%d %b %Y %H:%M:%S"          # Without weekday, no timezone
    ]

    # Try parsing with different formats
    for fmt in formats:
        try:
            dt = datetime.strptime(date_str, fmt)
            return dt
        except ValueError:
            continue  # Try the next format
    
    return None  # Return None if all formats fail



def remove_unicode_characters(text):
    # Remove non-breaking spaces (\xa0) and zero-width non-joiners (\u200c)
    cleaned_text = re.sub(r'[\xa0\u200c]', '', text)
    return cleaned_text

def strip_html(html_content):
    soup = BeautifulSoup(html_content, "html.parser")
    return soup.get_text()  # Extract plain text

def script_newline(text):
    cleaned_text = re.sub(r'[\r\n]', '', text)
    return cleaned_text

def remove_links(text):
    # Remove URLs (http://, https://, www)
    return re.sub(r'http[s]?://\S+|www\.\S+', '', text)

def replace_multiple_spaces(text):
    return re.sub(r'\s{2,}', ' ', text)

def clean_text(text):
    text = strip_html(text)
    text = remove_unicode_characters(text)
    text = script_newline(text)
    text = remove_links(text)
    text = replace_multiple_spaces(text)
    return text 

def clean_emails(emails):
    for email in emails:
        email['body'] = clean_text(email['body'])
        email['date'] = to_datetime(email['date'])
        
    return emails

In [6]:
# 1) brew install postgresql
# 2) brew services start postgresql

# 3) now that postgres is running in the terminal create table 
# CREATE DATABASE emails_db;
# CREATE USER your_user WITH PASSWORD 'your_password';
# GRANT ALL PRIVILEGES ON DATABASE emails_db TO your_user;


In [5]:
# client = OpenAI()

# # Function to vectorize text using OpenAI API
# def vectorize_text(text):
#     response = client.embeddings.create(
#         model="text-embedding-3-small",  
#         input=text[:8192] # this embedding model has a token limit of 8192
#     )
#     return response.data[0].embedding


# def vectorize_email_body(emails):
#     for email in emails:
#         # Vectorize the email body using OpenAI's model
#         body_vector = vectorize_text(email['body'])  # Vectorize the body of the email
#         body_vector_bytes = np.array(body_vector, dtype=np.float32).tobytes()  # Convert to bytes
#         email['body_vector'] = body_vector_bytes
        
#     return emails


client = AsyncOpenAI()  # Use AsyncOpenAI for async requests

# Asynchronous function to vectorize text using OpenAI API
async def vectorize_text(text):
    response = await client.embeddings.create(
        model="text-embedding-3-small",
        input=text[:8192]  # Ensure token limit compliance
    )
    return response.data[0].embedding

# Asynchronous function to vectorize email bodies
async def vectorize_email_body(emails):
    tasks = []
    
    for email in emails:
        task = vectorize_text(email['body'])  # Create async task
        tasks.append(task)

    # Gather all embedding results concurrently
    embeddings = await asyncio.gather(*tasks)

    # Convert embeddings to bytes and store in emails
    for email, body_vector in zip(emails, embeddings):
        body_vector_bytes = np.array(body_vector, dtype=np.float32).tobytes()
        email['body_vector'] = body_vector_bytes
    
    return emails


In [6]:
# DB Configuration (Assuming you already have this)
DB_CONFIG = {
    'dbname': 'emails_db',
    'user': 'alexanderbarriga03',
    'password': 'water03',
    'host': 'localhost',
    'port': '5432'
}

def store_emails_in_db(emails):
    conn = psycopg2.connect(**DB_CONFIG)
    cursor = conn.cursor()
    
    # Modify table to store vector as BYTEA (binary data)
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS emails (
            id SERIAL PRIMARY KEY,
            sender TEXT,
            subject TEXT,
            timestamp TEXT, -- sender's timestamp 
            body TEXT,
            body_vector BYTEA  -- To store the vector in bytes 
        );
    """)
    conn.commit()

    for email in emails:        
        # Insert the email data into the database, including the vector
        cursor.execute("""
            INSERT INTO emails (sender, subject, timestamp, body, body_vector)
            VALUES (%s, %s, %s, %s, %s)
        """, (email['sender'], email['subject'], email['date'], email['body'], email['body_vector'])
        )
        
    conn.commit()
    cursor.close()
    conn.close()  


In [7]:
def get_emails_from_db():
    # Retrieve vector from PostgreSQL and decode it
    conn = psycopg2.connect(**DB_CONFIG)
    cursor = conn.cursor()

    cursor.execute("SELECT * FROM emails")
    emails = cursor.fetchall()

    data = []
    for email in emails:
        emails_dict = {}
        emails_dict['id'] = email[0]
        emails_dict['sender'] = email[1]
        emails_dict['subject'] = email[2]
        emails_dict['timestamp'] = email[3]
        emails_dict['body'] = email[4]
        emails_dict['body_vector'] = np.frombuffer(email[5], dtype=np.float32)
        data.append(emails_dict)

    cursor.close()
    conn.close()
    
    return data

----

### Run Email Pipeline

In [8]:
service = authenticate_gmail() # connect to gmail api

Please visit this URL to authorize this application: https://accounts.google.com/o/oauth2/auth?response_type=code&client_id=416359460616-ab2i28mbi70bai1k4bmu323v2j45qho6.apps.googleusercontent.com&redirect_uri=http%3A%2F%2Flocalhost%3A59260%2F&scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fgmail.readonly&state=1e7emtZEwv7vlbE1EhttLmWXclcG08&access_type=offline


In [9]:
emails = get_emails(service) # download emails 

In [13]:
len(emails)

932

In [14]:
emails = clean_emails(emails) # remove useless text 

In [25]:
# emails = vectorize_email_body(emails) # run in series: transform email bodies from text into vector bytes 

In [15]:
async def main(emails):
    emails = await vectorize_email_body(emails)  # Await the coroutine
    return emails 
    
emails = await main(emails)  # Use `await` instead of `asyncio.run()`

In [None]:
#store_emails_in_db(emails) # emails stored in db
#emails = get_emails_from_db() # load emails from db

In [16]:
emails[:3]

[{'sender': 'LinkedIn Job Alerts <jobalerts-noreply@linkedin.com>',
  'subject': '“data scientist”: Pinterest - Data Scientist and more',
  'date': datetime.datetime(2025, 3, 5, 6, 58, 23, tzinfo=datetime.timezone.utc),
  'body': 'Your job alert for data scientist in Berkeley18 new jobs match your preferences. Data ScientistPinterestSan Francisco, CAFast growingView job: Risk Data ScientistRipplingSan Francisco, CAFast growingView job: Senior Data Scientist/Applied Scientist, AI ProductsAttentiveSan Francisco, CA24 school alumniView job: Founding Data Scientist (Causal Inference)GreylockSan Francisco, CA1 company alumApply with resume & profileView job: Data Scientist / Senior Data Scientist - StudioRobloxSan Mateo, CAFast growingView job: Founding Data Scientist (Hybrid DS/SWE)GreylockSan Francisco, CA1 company alumApply with resume & profileView job: See all jobs on LinkedIn: email was intended for Alexander Barriga, M.S. (Data Scientist | Python, SQL, AWS, ML, NLP, AI | Ex-NASA)Lear

In [17]:
len(emails)

932

In [18]:
store_emails_in_db(emails) # emails stored in db

In [19]:
load_emails = get_emails_from_db() # load emails from db

In [20]:
len(load_emails)

1086

In [21]:
load_emails

[{'id': 8,
  'sender': 'Meetup <info@meetup.com>',
  'subject': 'Amelia Mink posted in Data Science for Sustainability',
  'timestamp': '2025-02-17 23:31:19-08',
  'body': 'Amelia Mink posted in Data Science for SustainabilityAmelia Mink started a discussion in Data Science for Sustainability.Amelia Mink More AI, More Spending? Let’s Change ThatScaling AI shouldn’t mean skyrocketing costs.Discover how to cut inefficiencies & boost performance in this live session on Feb 19 at 2PM ET.🔗 Reserve your spot View discussionUnsubscribe from these type of emails. Manage your Email Notification Preferences. Read our Privacy Policy. Meetup LLC, POB 4668 #37895 New York NY USA 10163.',
  'body_vector': array([ 0.00786488,  0.02192589,  0.0491499 , ..., -0.02138709,
          0.03355484, -0.01344738], shape=(1536,), dtype=float32)},
 {'id': 9,
  'sender': 'Gargi K Kand <gargikand@user.luma-mail.com>',
  'subject': 'You are invited to Gen Z Founders Meetup 💃🎉',
  'timestamp': '2025-02-17 23:00:46-0