# Email RAG Pipeline

This notebook demonstrates a simple RAG (Retrieval-Augmented Generation) pipeline for processing downloaded Outlook emails.


In [1]:
import os
import email
import json
from pathlib import Path
from typing import Dict, List, Any
import pandas as pd

# For vector operations (you'll need to install these)
import numpy as np
#from sentence_transformers import SentenceTransformer
#import faiss


## 1. Load Email Files from Local Directory

Load your downloaded Outlook email files (.eml, .msg, or .pst files) from a local directory


In [2]:
# Set the path to your email directory
email_directory = "data/raw_emails"  # Change this to your email folder path

# Load email files from local directory
email_paths = {}
if os.path.exists(email_directory):
    # Find all email files in the directory
    email_files = []
    for ext in ['*.eml', '*.msg', '*.pst']:
        email_files.extend(Path(email_directory).glob(ext))
    
    # Create a mapping of email files
    for i, file_path in enumerate(email_files):
        email_paths[f"Email-{i+1}"] = str(file_path)
    
    print(f"Found {len(email_paths)} email files in {email_directory}:")
    for name, path in email_paths.items():
        print(f"  {name}: {os.path.basename(path)}")
else:
    print(f"Directory {email_directory} not found!")
    print("Please create the directory and add your email files, or update the 'email_directory' variable above.")
    print("Example: email_directory = '/path/to/your/email/folder'")


Found 1 email files in data/raw_emails:
  Email-1: First Merchants Bank Royal Oak_ Invitation to bid on First Merchants Bank.eml


## 2. Email Parsing Functions


In [3]:
def parse_eml_file(file_path: str) -> Dict[str, Any]:
    """Parse .eml email file and extract text content"""
    with open(file_path, 'rb') as f:
        msg = email.message_from_bytes(f.read())
    
    # Extract email metadata
    email_data = {
        'subject': msg.get('Subject', ''),
        'from': msg.get('From', ''),
        'to': msg.get('To', ''),
        'date': msg.get('Date', ''),
        'body': ''
    }
    
    # Extract email body
    if msg.is_multipart():
        for part in msg.walk():
            content_type = part.get_content_type()
            if content_type == "text/plain":
                email_data['body'] = part.get_payload(decode=True).decode('utf-8', errors='ignore')
                break
            elif content_type == "text/html" and not email_data['body']:
                # Fallback to HTML if no plain text
                email_data['body'] = part.get_payload(decode=True).decode('utf-8', errors='ignore')
    else:
        email_data['body'] = msg.get_payload(decode=True).decode('utf-8', errors='ignore')
    
    return email_data

def parse_msg_file(file_path: str) -> Dict[str, Any]:
    """Parse .msg email file (requires python-msg-parser library)"""
    try:
        from msg_parser import MsOxMessage
        
        msg = MsOxMessage(file_path)
        
        email_data = {
            'subject': msg.subject or '',
            'from': msg.sender or '',
            'to': msg.to or '',
            'date': str(msg.date) if msg.date else '',
            'body': msg.body or ''
        }
        
        return email_data
    except ImportError:
        print("python-msg-parser not installed. Install with: pip install python-msg-parser")
        return None

def parse_email_file(file_path: str) -> Dict[str, Any]:
    """Parse email file based on extension"""
    file_ext = Path(file_path).suffix.lower()
    
    if file_ext == '.eml':
        return parse_eml_file(file_path)
    elif file_ext == '.msg':
        return parse_msg_file(file_path)
    else:
        print(f"Unsupported file format: {file_ext}")
        return None


## 3. Extract Text from Uploaded Emails


In [4]:
# Extract text from uploaded emails
email_texts = {}
email_metadata = {}

for email_name, filename in email_paths.items():
    print(f"Processing {filename}...")
    
    # Parse the email file
    email_data = parse_email_file(filename)
    
    if email_data:
        # Store metadata
        email_metadata[email_name] = {
            'subject': email_data['subject'],
            'from': email_data['from'],
            'to': email_data['to'],
            'date': email_data['date'],
            'filename': filename
        }
        
        # Combine subject and body for text processing
        full_text = f"Subject: {email_data['subject']}\n\n{email_data['body']}"
        email_texts[email_name] = full_text
        
        word_count = len(full_text.split())
        print(f"  Extracted {word_count} words from {filename}")
        print(f"  Subject: {email_data['subject'][:50]}...")
    else:
        print(f"  Failed to parse {filename}")

print(f"\nSuccessfully processed {len(email_texts)} emails")


Processing data/raw_emails/First Merchants Bank Royal Oak_ Invitation to bid on First Merchants Bank.eml...
  Extracted 569 words from data/raw_emails/First Merchants Bank Royal Oak_ Invitation to bid on First Merchants Bank.eml
  Subject: First Merchants Bank Royal Oak: Invitation to bid ...

Successfully processed 1 emails


## 4. Display Email Summary


In [5]:
# Create a summary DataFrame
if email_metadata:
    df = pd.DataFrame.from_dict(email_metadata, orient='index')
    print("Email Summary:")
    print(df[['subject', 'from', 'date']].to_string())
    
    # Show word counts
    print("\nWord Counts:")
    for email_name, text in email_texts.items():
        word_count = len(text.split())
        print(f"  {email_name}: {word_count} words")


Email Summary:
                                                                           subject                                                                        from                             date
Email-1  First Merchants Bank Royal Oak: Invitation to bid on First Merchants Bank  "Brad Kecskemeti (PCI Industries, Inc)" <notifications@update.procore.com>  Wed, 24 Sep 2025 15:03:18 +0000

Word Counts:
  Email-1: 569 words


## 5. Prepare for RAG Pipeline

The extracted email texts are now ready for further processing in a RAG pipeline:
- Text chunking
- Vector embeddings
- Vector store creation
- Retrieval and generation


In [6]:
# Example: Simple text chunking for RAG
def chunk_text(text: str, chunk_size: int = 500, overlap: int = 50) -> List[str]:
    """Split text into overlapping chunks"""
    words = text.split()
    chunks = []
    
    for i in range(0, len(words), chunk_size - overlap):
        chunk = ' '.join(words[i:i + chunk_size])
        chunks.append(chunk)
        
        if i + chunk_size >= len(words):
            break
    
    return chunks

# Create chunks for each email
email_chunks = {}
for email_name, text in email_texts.items():
    chunks = chunk_text(text)
    email_chunks[email_name] = chunks
    print(f"{email_name}: {len(chunks)} chunks")

print(f"\nTotal chunks created: {sum(len(chunks) for chunks in email_chunks.values())}")


Email-1: 2 chunks

Total chunks created: 2


## 6. Next Steps for Full RAG Pipeline

To complete the RAG pipeline, you would typically:

1. **Generate Embeddings**: Use a model like Sentence-BERT to create vector embeddings for each chunk
2. **Create Vector Store**: Store embeddings in a vector database (FAISS, Pinecone, etc.)
3. **Implement Retrieval**: Create a function to find relevant chunks based on query similarity
4. **Add Generation**: Use an LLM to generate responses based on retrieved chunks

Example code structure:
```python
# Generate embeddings
model = SentenceTransformer('all-MiniLM-L6-v2')
embeddings = model.encode(chunks)

# Create vector store
index = faiss.IndexFlatIP(embeddings.shape[1])
index.add(embeddings)

# Retrieve relevant chunks
def retrieve_chunks(query: str, k: int = 5):
    query_embedding = model.encode([query])
    scores, indices = index.search(query_embedding, k)
    return [chunks[i] for i in indices[0]]
```
