# Email Categorization and Summarization



### Install dependencies

In [None]:
%pip install -r requirements.txt

### List Outlook Folders

In [None]:
import win32com.client

outlook = win32com.client.Dispatch("Outlook.Application").GetNamespace("MAPI")

def list_folders(parent_folder, indent=""):
    for folder in parent_folder.Folders:
        print(f"{indent}Folder: {folder.Name}, EntryID: {folder.EntryID}, Class: {folder.Class}")
        # Recursively list subfolders
        list_folders(folder, indent + "  ")

def get_folder_by_id(entra_id):
    return outlook.GetFolderFromID(entra_id)


print("Getting email folders...")

# Get the root folder
root_folder = outlook.Folders

# List all folders
for account in root_folder:
    print(f"Account: {account.Name}")
    list_folders(account)


### Get Configuration Values

In [None]:
import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()


class Config:
    # ChromaDB
    CHROMA_DB_PATH = os.getenv('CHROMA_DB_PATH')
    CHROMA_PERSIST_DIR = os.getenv('CHROMA_PERSIST_DIR')

    # Embeddings
    EMBEDDING_BASE_URL = os.getenv('EMBEDDING_BASE_URL')
    EMBEDDING_API = os.getenv('EMBEDDING_API')
    EMBEDDING_MODEL = os.getenv('EMBEDDING_MODEL')

    # LLM
    LLM_BASE_URL = os.getenv('LLM_BASE_URL')
    LLM_MODEL = os.getenv('LLM_MODEL')
    LLM_TEMPERATURE = float(os.getenv('LLM_TEMPERATURE'))
    JSON_OUTPUT_PATH = os.getenv('JSON_OUTPUT_PATH')

    # Email retrieval folders
    EMAIL_RETRIEVAL_FOLDERS = [int(item) for item in os.getenv('EMAIL_RETRIEVAL_FOLDER_IDS').split(',')]

    # Folder categories and IDs
    CATEGORIZE_FOLDER_ID = os.getenv('CATEGORIZE_FOLDER_IDS').split(',')
    FOLDER_CATEGORIES = os.getenv('FOLDER_CATEGORIES').split(',')
    FOLDER_CATEGORIES_DICT = dict(zip(FOLDER_CATEGORIES, CATEGORIZE_FOLDER_ID))
    MOVE_EMAILS = os.getenv('MOVE_EMAILS').lower() == 'true'


config = Config()

### Define Chat Model, Embedding Model, and Chains

In [None]:
from langchain_community.embeddings import OllamaEmbeddings
from langchain.chains.summarize import load_summarize_chain
from langchain_ollama import ChatOllama
from langchain_core.pydantic_v1 import BaseModel, Field

# Load models and chains
llm = ChatOllama(base_url=config.LLM_BASE_URL, model=config.LLM_MODEL, temperature=config.LLM_TEMPERATURE)
embeddings = OllamaEmbeddings(base_url=config.LLM_BASE_URL, model=config.EMBEDDING_MODEL)
summarize_chain = load_summarize_chain(llm, chain_type="map_reduce")
collection_name = "categorize-emails"

# Classification categories
categories_string = ", ".join(config.FOLDER_CATEGORIES)
print(f"Categories: {categories_string}")

# Schema for structured output
class Email(BaseModel):
    f"""This class represents an email"""

    category: str = Field(description=f"The classification category (must be one of: {categories_string})")
    subject: str = Field(description="The subject of the email")
    summary: str = Field(description="The summary of the email")
    action_item: str = Field(description="The action item of the email")

# Structured output classification chain
classification_chain = llm.with_structured_output(Email)


### Get Emails for Date Range

In [None]:
from datetime import datetime
import win32com.client

# Outlook instance
outlook = win32com.client.Dispatch("Outlook.Application").GetNamespace("MAPI")

def get_emails(folder_id, progress=None):
    # Hardcode the start date and end date for testing
    start_date_str = "2024-09-26"
    end_date_str = "2025-09-08"
    start_date = datetime.strptime(start_date_str, "%Y-%m-%d").date()
    end_date = datetime.strptime(end_date_str, "%Y-%m-%d").date()

    folder_name = get_folder_name(folder_id)
    print(f"Fetching emails from {folder_name}")
    try:
        folder = outlook.GetDefaultFolder(folder_id)
    except Exception as e:
        print(f"Error accessing Outlook: {e}")
        return []

    emails = folder.Items

    email_data = []

    for email in emails:
        try:
            if not hasattr(email, 'ReceivedTime'):
                continue

            received_datetime = email.ReceivedTime
            received_date = received_datetime.date()
            received_time = received_datetime.strftime("%H:%M:%S")

            # Filter emails by the hardcoded date range
            if start_date <= received_date <= end_date:
                def extract_addresses(recipients):
                    if recipients:
                        if isinstance(recipients, str):
                            return recipients  # If it's a single string address
                        else:
                            return '; '.join(
                                recipient.Address for recipient in recipients if hasattr(recipient, 'Address'))
                    return ''

                to = extract_addresses(getattr(email, 'To', None))
                cc = extract_addresses(getattr(email, 'CC', None))
                bcc = extract_addresses(getattr(email, 'BCC', None))
                sent = email.SentOn.strftime('%Y-%m-%d %H:%M:%S') if hasattr(email, 'SentOn') and email.SentOn else None
                from_address = email.Sender.Address if hasattr(email, 'Sender') and email.Sender else None

                email_dict = {
                    "Folder": folder_name,
                    "Subject": email.Subject,
                    "Sender": email.SenderName,
                    "Received Date": received_date.isoformat(),
                    "Received Time": received_time,
                    "Sent": sent,
                    "From": from_address,
                    "To": to,
                    "CC": cc,
                    "BCC": bcc,
                    "Body": email.Body,
                    "EntryID": email.EntryID,
                }
                email_data.append(email_dict)

                # Update progress
                if progress:
                    progress.value += 1

                print(f"Processed email with subject: {email.Subject}")

        except Exception as e:
            print(f"Error processing an email from {folder_name}: {e}")
            continue

    print(f"Finished fetching {len(email_data)} emails from {folder_name}")
    return email_data


### Define Schema for Structured Output and Classification Chain

In [None]:
import uuid
import re
from langchain_core.documents import Document

# Get outlook folder by EntryID
def get_folder_by_id(entra_id):
    return outlook.GetFolderFromID(entra_id)

# Get folder name by ID
def get_folder_name(folder_id):
    folder_names = {
        6: "Inbox",
        5: "Sent Items",
        3: "Deleted Items"
    }
    return folder_names.get(folder_id, "Unknown Folder")

# Get folder ID for category
def map_category_to_folder(category):
    return config.FOLDER_CATEGORIES_DICT.get(category, None)

# Move email to folder based on category
def move_email_to_folder(metadata):
    try:
        # Get folder ID by category
        folder_id = map_category_to_folder(metadata['Category'])

        if folder_id is None:
            print(f"Error moving email to folder: No folder found for category: {metadata['Category']}")
            return

        # Get the destination folder
        dest_folder = get_folder_by_id(folder_id)

        if dest_folder is None:
            print(f"Error moving email to folder: No folder found for ID: {folder_id}")
            return

        # Get the email by EntryID
        entry_id = metadata["EntryID"]
        email = outlook.GetItemFromID(entry_id)

        print(f"***Moving email with subject***: {metadata['Subject']} to folder: {dest_folder.Name}\n")

        if config.MOVE_EMAILS:
            email.Move(dest_folder)

    except Exception as e:
        print(f"Error moving email to folder: {e}")

# Generate embedding for email body
def generate_embedding(document):
    page_content = document.page_content  # Use the full email body for embedding

    try:
        # Generate embedding using LangChain's OllamaEmbeddings
        embedding = embeddings.embed_query(page_content)
        print(f"Generated embedding for email with subject: {document.metadata['Subject']}\n")
        return Document(id=uuid.uuid1().hex, embedding=embedding, page_content=page_content, metadata=document.metadata)
    except Exception as e:
        print(f"Error generating embedding: {str(e)}")
        return None

# Remove URLs from email body
def remove_urls(email_body):
    url_pattern = re.compile(r'https?://\S+|www\.\S+')
    return url_pattern.sub(r'', email_body)

# Generate summaries, classify emails, and move to appropriate folders
def combine_email_texts_and_metadata(email_data):
    combined_documents = []
    for i, email in enumerate(email_data):
        try:
            print(f"Processing email with subject: {email['Subject']} - {email['Received Date']} - {email['EntryID']}\n")
            metadata = {k: str(v) for k, v in email.items() if k != "Body"}
            email_body = email["Body"]

            # Remove URLs from email body
            email_body = remove_urls(email_body)

            # Create a document to be summarized
            curr_doc = Document(page_content=email_body, metadata=metadata)

            # Classify the document
            email_classification = classification_chain.invoke(curr_doc.page_content)
            print(f"Classified email with subject: {email['Subject']} into category: {email_classification.category}")

            if email_classification is None:
                print(f"Email with subject: {email['Subject']} was classified as 'None'. Skipping...\n")
                continue

            # Add the classifications to metadata
            metadata['Category'] = email_classification.category
            metadata['ClassifySubject'] = email_classification.subject
            metadata['ActionItem'] = email_classification.action_item

            # Generate summary
            summary = summarize_chain.invoke([curr_doc])['output_text']
            metadata['Summary'] = summary

            # Move the email to the appropriate folder
            move_email_to_folder(metadata)

            # Store the raw email content and the summary
            combined_documents.append(Document(page_content=email_body, metadata=metadata))

            print(f"Generated summary for email with subject: {email['Subject']}\n")
            print("=====================================================\n")

        except Exception as e:
            print(f"Error summarizing email: {e}")
            continue

    return combined_documents

# Get emails for the specified folders
def get_emails_for_folders(folder_ids):
    email_data = []
    for folder_id in folder_ids:
        folder_emails = get_emails(folder_id)
        email_data.extend(folder_emails)

    return email_data


### Get emails for configured folders

In [None]:
# Retrieve emails from folders
print("Starting email processing...")
current_emails = get_emails_for_folders(config.EMAIL_RETRIEVAL_FOLDERS)
total_emails = len(current_emails)

if total_emails == 0:
    print("No emails fetched.")
    exit()

print(f"Total Emails Fetched: {total_emails}")

### Refresh collections

In [None]:
from langchain_chroma import Chroma

# delete and create a new collection
print("Refreshing collections...")
vector_store = Chroma(persist_directory=config.CHROMA_DB_PATH, collection_name=collection_name)
vector_store.delete_collection()
print("Collection deleted")

### Generate Summaries and Classify Emails

In [None]:
# create a new collection
print("Summarize and classify emails...")
documents = combine_email_texts_and_metadata(current_emails)
documents

### Generate embeddings

In [None]:
print("Processing embeddings...")
embedding_results = [generate_embedding(document) for document in documents]
print("Embeddings processed")

### Store Documents in ChromaDB

In [None]:
db = vector_store.from_documents(documents=documents,
                           embedding=embeddings,
                           collection_name=collection_name,
                           persist_directory=config.CHROMA_DB_PATH)
print(f"Finished embedding process. There are {len(embedding_results)} documents in the collection: '{collection_name}'")

# Daily Summary Output

### Define Prompt Templates and Summarize Chain

In [None]:
from langchain_core.prompts import PromptTemplate

# Define prompt templates
map_prompt = PromptTemplate.from_template(
    """
    Summarize the key points from the following email. 
    Do not include any introductory phrases. Start directly with the main points.
    
    Email content:
    {text}
    
    Key points:
    """)
reduce_prompt = PromptTemplate.from_template(
    """
    Combine the following email summaries into a concise overall summary. 
    Do not add any introductory phrases like 'Here is a summary' or 'In conclusion'. 
    Start directly with the key points, maintaining a bullet point format.
    
    Individual summaries:
    {text}
    
    Combined summary:
    """)

# Load the summarize chain
summarize_chain = load_summarize_chain(llm,
                                        chain_type="map_reduce",
                                        map_prompt=map_prompt,
                                        combine_prompt=reduce_prompt)

### Functions for Summarizing by Category

In [None]:
def summarize_category(doc_category):
    summary_string = ""

    # Retrieve all documents from the vector store
    category_docs = vector_store.similarity_search("", k=100, filter={"Category" : doc_category})

    if not category_docs:
        return "No emails for this category."

    # Retrieve all documents from the vector store
    doc_summary = summarize_chain.invoke({"input_documents": category_docs})
    print("Response: ", doc_summary)
    print("Summary: ", doc_summary["output_text"])

    # Track already summarized documents
    doc_ids = []

    # Add the summaries of the individual documents
    for doc in category_docs:
        doc_metadata = doc.metadata
        print("Doc Metadata: ", doc_metadata)
        entry_id = doc_metadata["EntryID"]

        if entry_id in doc_ids:
            continue

        doc_ids.append(entry_id)
        summary_string += f"* **{doc_metadata['Subject']}** - {doc_metadata['Summary']}"
        if doc_metadata["ActionItem"] != "None":
            summary_string += f" - **Action Item:** {doc_metadata['ActionItem']}\n"

    # Add the combined summary
    combined_summary = doc_summary["output_text"].replace("•", "*")
    summary_string += f"### {doc_category} Summary\n{combined_summary}\n\n"
    return summary_string


### Generate Summaries by Category

In [None]:
vector_store = Chroma(
    collection_name=collection_name,
    embedding_function=embeddings,
    persist_directory=config.CHROMA_DB_PATH
)

# Retrieve all documents from the vector store
full_summary = ""
for category in config.FOLDER_CATEGORIES:
    category_summary = summarize_category(category)
    full_summary += f"# {category}\n{category_summary}\n\n"
    
full_summary

### Write to Markdown File

In [None]:
now = datetime.now()
date_string = now.strftime("%m_%d_%Y")
file_name = f"{date_string}.md"

with open(file_name, "w", encoding="utf-8") as f:
    f.write(full_summary)