# Email Ingestion & RAG Prebuild (Simplified)

This notebook now reflects a **concise, production‑lean prebuild** of the email pipeline. We stripped phased prototype noise and focus on clean, single‑responsibility components:

Architecture:
```
┌──────────────────────────┐
│ Email Source Connector(s)│  IMAP / (future) Gmail / Exchange
└───────────┬──────────────┘
            ▼
  ┌─────────────────────┐
  │ Email Normalizer    │  Parse + canonical record + hashing
  └──────────┬──────────┘
             ▼
┌─────────────────────────┐
│ Chunker + Embedder      │  Sentence chunk + vector (stub/real)
└───────────┬─────────────┘
            ▼
  ┌──────────────────────┐      ┌──────────────────────┐
  │ SQLite (metadata)    │◀────▶│ Vector Store (Milvus) │
  └──────────────────────┘      └──────────────────────┘
```

Core Principles:
- **Separation of concerns**: each class does one thing.
- **Stateless helpers** where possible; state isolated to Store / VectorStore.
- **Deterministic hashing** for idempotent ingestion.
- **Configuration centralized** (imports + config cells only).
- **Replaceable implementations**: connectors, embedder, vector store.

Cells (after imports/config):
1. Connectors & Fixtures
2. Normalizer & Hashing
3. Persistence Store (SQLite) + VectorStore stub
4. Chunker & Embedder
5. Orchestrator (run cycle demo)

Deprecated former phase cells are minimized or removed.


In [10]:
# Starting Code: Centralized Imports & Global Symbols
# Update this cell only when adding/removing libraries. Re-run it, then re-run downstream cells as needed.
from __future__ import annotations

# Standard Library
import hashlib, json, re, textwrap, math, random, sqlite3, time, os, ssl, imaplib, socket, email, quopri, base64
from dataclasses import dataclass, asdict, field
from typing import List, Dict, Any, Optional, Protocol, Iterable, Sequence, Tuple
from datetime import datetime, timedelta, timezone
import email.utils as eutils
from copy import deepcopy
from dataclasses import dataclass
from typing import Protocol, Iterable

# NOTE:
# - All configurable parameters & runtime tunables are defined in the dedicated Configuration cell (next cell).
# - This cell must remain side‑effect light (no network, DB, or socket timeouts). Those are applied in the config cell.
# - Add future dependency imports here and re-run this cell before using them elsewhere.

print("Environment setup complete (imports loaded).")

Environment setup complete (imports loaded).


In [None]:
# Configuration Cell: Centralized tunables & environment parameters
# Re-run this cell after any change; downstream cells should only reference these symbols (not hard-code values).

# Mailbox / Identity
PRIMARY_MAILBOX = os.getenv("PRIMARY_MAILBOX", "support@example.com")  # Aligned with app.Config

# Networking / IO (lightweight here; heavy network config deferred to app integration)
DEFAULT_SOCKET_TIMEOUT = 30  # seconds
socket.setdefaulttimeout(DEFAULT_SOCKET_TIMEOUT)

# IMAP Connector settings (placeholders for future real connector)
IMAP_HOST = os.getenv("IMAP_HOST", "imap.gmail.com")
IMAP_PORT = int(os.getenv("IMAP_PORT", "993"))
IMAP_USE_SSL = True
IMAP_USERNAME = os.getenv("IMAP_USERNAME", "user@example.com")
IMAP_PASSWORD = os.getenv("IMAP_PASSWORD", "CHANGE_ME")  # NEVER commit real secrets; use env vars in production
IMAP_MAILBOX = os.getenv("IMAP_MAILBOX", "INBOX")
IMAP_BATCH_LIMIT = 50  # soft cap per fetch cycle

# Email ingestion alignment with main app Config (placeholders / not all used in notebook yet)
EMAIL_ENABLED = False  # feature flag (app.Config.EMAIL_ENABLED)
EMAIL_PROVIDER = os.getenv("EMAIL_PROVIDER", "imap")
EMAIL_SYNC_SINCE_DAYS = int(os.getenv("EMAIL_SYNC_SINCE_DAYS", "30"))
EMAIL_SYNC_INTERVAL_SECONDS = int(os.getenv("EMAIL_SYNC_INTERVAL_SECONDS", "300"))
EMAIL_ALLOWED_MIME = os.getenv("EMAIL_ALLOWED_MIME", "pdf,docx,txt,md,csv")
EMAIL_MAX_ATTACH_SIZE_MB = int(os.getenv("EMAIL_MAX_ATTACH_SIZE_MB", "8"))

# Dedup / Hashing
CONTENT_HASH_RECIPE_VERSION = 1  # increment when compute_content_hash logic changes

# Enrichment toggles & limits (heuristics phase)
ENABLE_ENRICHMENT = True
ENABLE_KEYWORDS = True
ENABLE_SUMMARY = True
MAX_KEYWORDS = 5

# Chunking parameters (aligned with main app naming: EMAIL_CHUNK_SIZE / EMAIL_CHUNK_OVERLAP)
# In app these are character-based; here we use them as soft character targets for sentence accumulation.
EMAIL_CHUNK_SIZE = int(os.getenv("EMAIL_CHUNK_SIZE", "600"))
EMAIL_CHUNK_OVERLAP = int(os.getenv("EMAIL_CHUNK_OVERLAP", "60"))

OLLAMA_HOST = os.getenv("OLLAMA_HOST", "http://localhost:11434")
# Embedding parameters (aligned with app: EMBEDDING_MODEL, VECTOR_DIM)
EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "mxbai-embed-large")
# Main app default VECTOR_DIM=384; we keep a small simulation dim for speed but reuse the same variable name.
VECTOR_DIM = int(os.getenv("VECTOR_DIM", "16"))  # simulation dimension (app uses 384)

DB_DIR = "databases"
os.makedirs(DB_DIR, exist_ok=True)
DB_MODE = os.path.join(DB_DIR, "knowledgebase.db")  # file-backed so state persists across runs

# Logging / Observability (lightweight placeholders)
LOG_LEVEL = "INFO"

# Stopwords (keyword extraction)
STOPWORDS = {"the","a","an","and","or","of","to","in","on","for","with","at","by","is","it","this","that"}

print("Configuration loaded (file DB mode, names aligned with main app):")
print({k: v for k, v in list(globals().items()) if k in [
    'IMAP_USERNAME','DB_MODE','CONTENT_HASH_RECIPE_VERSION','EMAIL_CHUNK_SIZE','VECTOR_DIM','EMBEDDING_MODEL'
]})

Configuration loaded (file DB mode, names aligned with main app):
{'CONTENT_HASH_RECIPE_VERSION': 1, 'DB_MODE': 'databases/knowledgebase.db', 'IMAP_USERNAME': 'tonyphilip@gmail.com', 'EMAIL_CHUNK_SIZE': 600, 'EMBEDDING_MODEL': 'mxbai-embed-large', 'VECTOR_DIM': 1024}


### Email Record Field Reference (Detailed)
Each processed email record (pre-DB) uses the following semantics:

| Field | Type | Purpose / Semantics | Notes / Future Considerations |
|-------|------|--------------------|-------------------------------|
| message_id | str | Globally unique RFC822 Message-ID | Uniqueness anchor; collisions rare but validated. |
| thread_id | str | Logical conversation/thread root | Simplistic: first reference or self; could upgrade via References chain collapse. |
| subject | str | Original Subject line | Normalization (prefix stripping) deferred for display vs. analytics. |
| from_addr | str | Sender (lowercased) | Single sender only; malformed multiple ignored. |
| to_addrs | list[str] | Primary recipients | Stored as JSON array in DB. |
| cc_addrs | list[str] | CC recipients | Optional JSON array. |
| date_utc | str (ISO) | Declared sent date/time (UTC) | Not validated vs Received; trust header for now. |
| received_utc | str (ISO) | Proxy for first-seen timestamp | Placeholder = date_utc; real pipeline would parse Received headers. |
| in_reply_to | str | Single Message-ID replied to | Null if not a reply. |
| references_ids | list[str] | Ordered ancestry of Message-IDs | Enables deeper thread reconstruction. |
| is_reply | int (0/1) | Heuristic flag for reply | Derived from subject OR In-Reply-To presence. |
| is_forward | int (0/1) | Heuristic flag for forward | Subject starts with fwd:/fw: (case-insensitive). |
| raw_size_bytes | int | Approx body size pre‑processing | Used for batching / limits. |
| body_text | str | Extracted canonical plain text body | HTML stripped / MIME selection to be added later. |
| body_html | str or null | Original HTML (if retained) | Currently None; later can store sanitized HTML. |
| language | str or null | ISO language code | Deferred: language detection integration. |
| has_attachments | int (0/1) | True if any attachments present | Quick filter for enrichment policies. |
| attachment_manifest | list[dict] | Lightweight metadata for attachments | JSON (filename,size,mime). No binary content stored. |
| processed | int (0/1) | Downstream processing completion marker | Set when embeddings/chunking done (future). |
| ingested_at | str (ISO) | DB insertion timestamp | DB default or set programmatically. |
| updated_at | str (ISO) | Last mutation timestamp | Null until updates occur. |
| content_hash | str | Deterministic dedupe hash | Subject + body + participants_hash + date_utc (current recipe). |
| summary | str | Short abstract summary | Stub now; LLM or heuristic later. |
| keywords | list[str] | Extracted keywords (naive or LLM) | Stored JSON array. |
| auto_topic | str | Heuristic / model inferred topic | May evolve with model versions. |
| manual_topic | str | Human override | Never overwritten automatically. |
| topic_confidence | float | Confidence score for auto_topic | Range 0..1; stub values now. |
| topic_version | int | Version of topic inference algorithm | Increment on model rule change. |
| error_state | str | Classification of last processing error | Null if healthy; used for retry logic. |
| direction | str | inbound | outbound | cc_only | other | Derived relative to primary mailbox. |
| participants | list[str] | Sorted unique participants (from/to/cc) | Facilitates hash, search facets. |
| participants_hash | str | Hash of participants list | Stable across ordering differences. |
| to_primary | str or null | Echoes PRIMARY_MAILBOX if directly addressed | Aids direct vs peripheral classification. |

Guiding Principles:
- Keep raw fidelity (subject, addresses) while adding normalized derivatives (hashes, flags).
- Avoid premature heavy processing (LLM calls) until base ingestion stable.
- All enrichment fields are nullable and additive.


In [21]:
# Connector Layer (Pluggable)

class EmailConnector:
    """
    Abstract connector responsible for retrieving raw email messages from a source
    (IMAP, Gmail API, MS Graph, etc.) and returning a list of dict records that
    conform to the canonical email schema used downstream.

    Implementations SHOULD populate as many fields as are naturally derivable
    from the source. Fields not yet computed (e.g., advanced enrichment or
    content hashes) MUST still appear with a value of None (or sensible default)
    to keep shape stable and simplify later processing stages.

    Expected record keys (aligned with the Field Reference table):
        message_id          : str
        thread_id           : str | None
        subject             : str | None
        from_addr           : str | None
        to_addrs            : list[str]
        cc_addrs            : list[str]
        date_utc            : str (ISO) | None
        received_utc        : str (ISO) | None (placeholder = date_utc)
        in_reply_to         : str | None
        references_ids      : list[str]
        is_reply            : int (0/1)
        is_forward          : int (0/1)
        raw_size_bytes      : int | None
        body_text           : str | None
        body_html           : str | None
        language            : str | None
        has_attachments     : int (0/1)
        attachment_manifest : list[dict]
        processed           : int (0/1)  (default 0 at ingestion)
        ingested_at         : str | None (set later by persistence)
        updated_at          : str | None (set later on mutation)
        content_hash        : str | None (filled by hashing stage)
        summary             : str | None
        keywords            : list[str] | None
        auto_topic          : str | None
        manual_topic        : str | None
        topic_confidence    : float | None
        topic_version       : int | None
        error_state         : str | None
        direction           : str | None  (derived later if not set)
        participants        : list[str]
        participants_hash   : str | None
        to_primary          : str | None

    Returns:
        list[dict]: list of email records (see above)
    """

    def fetch_emails(self, since_date=None) -> list[dict]:
        raise NotImplementedError("Concrete connectors must implement fetch_emails")


In [26]:
# IMAP Connection

import imaplib, email, quopri, base64
from email.header import decode_header, make_header
from email.utils import parsedate_to_datetime, getaddresses

class IMAPConnector(EmailConnector):
    def __init__(self, host, username, password, mailbox="INBOX", batch_limit=IMAP_BATCH_LIMIT):
        self.host = host
        self.username = username
        self.password = password
        self.mailbox = mailbox
        self.batch_limit = batch_limit

    def fetch_emails(self, since_date=None):
        conn = imaplib.IMAP4_SSL(self.host) if IMAP_USE_SSL else imaplib.IMAP4(self.host)
        conn.login(self.username, self.password)
        conn.select(self.mailbox)

        criteria = []
        if since_date:
            criteria.append(f'SINCE "{since_date.strftime("%d-%b-%Y")}"')
        search_query = "ALL" if not criteria else " ".join(criteria)
        status, messages = conn.search(None, search_query)
        if status != 'OK':
            conn.logout()
            return []
        email_ids = messages[0].split()
        if self.batch_limit:
            email_ids = email_ids[-self.batch_limit:]

        results = []
        for eid in email_ids:
            status, msg_data = conn.fetch(eid, "(RFC822)")
            if status != 'OK' or not msg_data or not msg_data[0]:
                continue
            try:
                msg = email.message_from_bytes(msg_data[0][1])
                results.append(self._parse_email(msg))
            except Exception as exc:
                # Fallback minimal record with error_state populated for visibility
                results.append({
                    'message_id': None,
                    'thread_id': None,
                    'subject': None,
                    'from_addr': None,
                    'to_addrs': [],
                    'cc_addrs': [],
                    'date_utc': None,
                    'received_utc': None,
                    'in_reply_to': None,
                    'references_ids': [],
                    'is_reply': 0,
                    'is_forward': 0,
                    'raw_size_bytes': None,
                    'body_text': None,
                    'body_html': None,
                    'language': None,
                    'has_attachments': 0,
                    'attachment_manifest': [],
                    'processed': 0,
                    'ingested_at': None,
                    'updated_at': None,
                    'content_hash': None,
                    'summary': None,
                    'keywords': None,
                    'auto_topic': None,
                    'manual_topic': None,
                    'topic_confidence': None,
                    'topic_version': None,
                    'error_state': f"parse_error: {exc.__class__.__name__}",
                    'direction': None,
                    'participants': [],
                    'participants_hash': None,
                    'to_primary': None,
                })
        conn.logout()
        return results

    def _decode_header_value(self, raw_val):
        if not raw_val:
            return None
        try:
            # make_header handles multiple encoded parts
            return str(make_header(decode_header(raw_val))).strip()
        except Exception:
            parts = decode_header(raw_val)
            decoded = []
            for text, enc in parts:
                if isinstance(text, bytes):
                    try:
                        decoded.append(text.decode(enc or 'utf-8', errors='ignore'))
                    except Exception:
                        decoded.append(text.decode('utf-8', errors='ignore'))
                else:
                    decoded.append(text)
            return "".join(decoded).strip()

    def _parse_email(self, msg):
        subject = self._decode_header_value(msg.get('Subject')) or None
        message_id = (msg.get('Message-ID') or '').strip() or None
        in_reply_to = (msg.get('In-Reply-To') or '').strip() or None
        references_raw = msg.get('References') or ''
        references_ids = [r.strip('<> ') for r in references_raw.split() if '@' in r] if references_raw else []
        date_raw = msg.get('Date')
        date_utc = None
        if date_raw:
            try:
                dt = parsedate_to_datetime(date_raw)
                if dt and dt.tzinfo is None:
                    dt = dt.replace(tzinfo=timezone.utc)
                date_utc = dt.astimezone(timezone.utc).isoformat()
            except Exception:
                date_utc = None
        received_utc = date_utc  # placeholder

        # Addresses
        from_addr = None
        raw_from = msg.get('From')
        if raw_from:
            addrs = getaddresses([raw_from])
            if addrs:
                from_addr = (addrs[0][1] or addrs[0][0]).lower()
        to_addrs = [addr.lower() for _, addr in getaddresses([msg.get('To') or '']) if addr]
        cc_addrs = [addr.lower() for _, addr in getaddresses([msg.get('Cc') or '']) if addr]

        # Reply / forward heuristics
        subj_lc = (subject or '').lower()
        is_reply = 1 if (in_reply_to or subj_lc.startswith('re:')) else 0
        is_forward = 1 if (subj_lc.startswith('fwd:') or subj_lc.startswith('fw:')) else 0

        # Body extraction (prefer text/plain, fallback to first text/html)
        body_text, body_html = None, None
        attachment_manifest = []
        has_attachments = 0
        raw_size_bytes = None

        if msg.is_multipart():
            for part in msg.walk():
                ctype = part.get_content_type()
                disp = (part.get('Content-Disposition') or '').lower()
                if ctype == 'text/plain' and 'attachment' not in disp and body_text is None:
                    try:
                        body_text = self._decode_part(part)
                    except Exception:
                        body_text = None
                elif ctype == 'text/html' and 'attachment' not in disp and body_html is None:
                    try:
                        body_html = self._decode_part(part)
                    except Exception:
                        body_html = None
                elif 'attachment' in disp or part.get_filename():
                    has_attachments = 1
                    fname = self._decode_header_value(part.get_filename()) if part.get_filename() else None
                    payload = part.get_payload(decode=True) or b''
                    attachment_manifest.append({
                        'filename': fname,
                        'size': len(payload),
                        'mime': ctype,
                    })
            # size heuristic = sum of part payload sizes
            size_acc = 0
            for part in msg.walk():
                try:
                    pl = part.get_payload(decode=True)
                    if pl:
                        size_acc += len(pl)
                except Exception:
                    pass
            raw_size_bytes = size_acc or None
        else:
            try:
                body_text = self._decode_part(msg)
            except Exception:
                body_text = None
            payload = msg.get_payload(decode=True) or b''
            raw_size_bytes = len(payload) if payload else None

        participants = sorted({p for p in ([from_addr] if from_addr else []) + to_addrs + cc_addrs})

        record = {
            'message_id': message_id,
            'thread_id': self._derive_thread_id(message_id, in_reply_to, references_ids),
            'subject': subject,
            'from_addr': from_addr,
            'to_addrs': to_addrs,
            'cc_addrs': cc_addrs,
            'date_utc': date_utc,
            'received_utc': received_utc,
            'in_reply_to': in_reply_to or None,
            'references_ids': references_ids,
            'is_reply': is_reply,
            'is_forward': is_forward,
            'raw_size_bytes': raw_size_bytes,
            'body_text': body_text,
            'body_html': body_html,
            'language': None,
            'has_attachments': has_attachments,
            'attachment_manifest': attachment_manifest,
            'processed': 0,
            'ingested_at': None,
            'updated_at': None,
            'content_hash': None,
            'summary': None,
            'keywords': None,
            'auto_topic': None,
            'manual_topic': None,
            'topic_confidence': None,
            'topic_version': None,
            'error_state': None,
            'direction': None,  # will be derived later relative to PRIMARY_MAILBOX
            'participants': participants,
            'participants_hash': None,
            'to_primary': PRIMARY_MAILBOX if PRIMARY_MAILBOX.lower() in to_addrs else None,
        }
        return record

    def _decode_part(self, part):
        charset = part.get_content_charset() or 'utf-8'
        payload = part.get_payload(decode=True)
        if not payload:
            return None
        try:
            return payload.decode(charset, errors='ignore')
        except Exception:
            # Attempt quoted-printable / base64 heuristics
            try:
                return quopri.decodestring(payload).decode('utf-8', errors='ignore')
            except Exception:
                try:
                    return base64.b64decode(payload).decode('utf-8', errors='ignore')
                except Exception:
                    return payload.decode('utf-8', errors='ignore')

    def _derive_thread_id(self, message_id, in_reply_to, references_ids):
        # Simple heuristic: first reference if chain exists, else in_reply_to, else self
        if references_ids:
            return references_ids[0]
        if in_reply_to:
            return in_reply_to
        return message_id


In [23]:
# Normalizer Layer

from bs4 import BeautifulSoup
import re, html
from typing import Dict, Any

class EmailNormalizer:
    """Email normalization (minimal scope).

    Current responsibilities:
      - Strip HTML tags (retain plain text)
      - Trim quoted reply blocks (heuristic) to keep new content concise
      - Remove common signatures / disclaimers / footers
      - Normalize whitespace (collapse multiple blank lines / spaces)

    Explicitly NOT handled here (deferred to other stages):
      - Hash computation (content_hash, participants_hash)
      - Direction classification (inbound/outbound/cc_only)
      - Summaries or keyword extraction
      - Language detection
      - Any LLM or model-based enrichment

    Design Goal: Keep this idempotent and inexpensive so it can run early
    and safely be re-run without altering semantic content.
    """

    # Simple regex patterns for disclaimers / signatures (extend as needed)
    DISCLAIMER_PATTERNS = [
        r"^confidentiality notice.*$",
        r"^this email and any attachments.*$",
        r"^please consider the environment.*$",
    ]
    SIGNATURE_SEPARATORS = [
        r"^--\s*$",              # standard signature delimiter
        r"^__+$",                 # line of underscores
        r"^cheers,?$",            # informal closings
        r"^regards,?$",
        r"^best regards,?$",
        r"^thanks,?$",
        r"^thank you,?$",
    ]

    QUOTED_BLOCK_PATTERNS = [
        r"^>.*$",                 # classic quoted lines
        r"^on .* wrote:$",        # common reply intro
        r"^from: .*",             # forwarded headers
        r"^sent: .*",
        r"^to: .*",
        r"^subject: .*",
    ]

    def normalize_record(self, record: Dict[str, Any]) -> Dict[str, Any]:
        body = record.get('body_text') or record.get('body_html') or ''
        cleaned = self.clean_body(body, original_html=record.get('body_html'))
        trimmed = self.trim_quoted(cleaned)
        stripped = self.strip_disclaimers_and_signatures(trimmed)
        collapsed = self.normalize_whitespace(stripped)

        record['body_text'] = collapsed or None
        # We are not retaining HTML; ensure body_html is None for downstream consistency
        record['body_html'] = None
        # Leave summary / keywords untouched (remain None)
        return record

    # --- Cleaning Helpers ---

    def clean_body(self, body: str, original_html: str | None = None) -> str:
        if not body:
            return ''
        # Prefer original HTML for fidelity if present
        text_source = original_html if (original_html and '<html' in original_html.lower()) else body
        soup = BeautifulSoup(text_source, 'html.parser')
        # Remove script/style
        for tag in soup(['script', 'style']):
            tag.decompose()
        text = soup.get_text('\n')
        text = html.unescape(text)
        return text

    def trim_quoted(self, text: str) -> str:
        if not text:
            return text
        lines = text.splitlines()
        trimmed = []
        for line in lines:
            if any(re.match(pat, line.strip().lower()) for pat in self.QUOTED_BLOCK_PATTERNS):
                break  # drop everything from first quoted indicator onward
            trimmed.append(line)
        return '\n'.join(trimmed) if trimmed else text

    def strip_disclaimers_and_signatures(self, text: str) -> str:
        if not text:
            return text
        lines = text.splitlines()
        cleaned_lines = []
        for line in lines:
            lstrip = line.strip().lower()
            if any(re.match(pat, lstrip) for pat in self.SIGNATURE_SEPARATORS):
                break  # cut remainder after signature separator
            if any(re.match(pat, lstrip) for pat in self.DISCLAIMER_PATTERNS):
                continue  # skip disclaimer line entirely
            cleaned_lines.append(line)
        return '\n'.join(cleaned_lines).strip()

    def normalize_whitespace(self, text: str) -> str:
        text = re.sub(r"\r", "\n", text)
        text = re.sub(r"\n{3,}", "\n\n", text)
        text = re.sub(r"[ \t]+", " ", text)
        return text.strip()

In [27]:
# Chunking and Embedding and Storing.
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_ollama import OllamaEmbeddings, ChatOllama
from pymilvus import connections, Collection

class EmailProcessor:
    def __init__(self, milvus_collection, sqlite_conn):
        self.text_splitter = RecursiveCharacterTextSplitter(chunk_size=800, chunk_overlap=100)
        self.embeddings_model = OllamaEmbeddings(model=EMBEDDING_MODEL, base_url=OLLAMA_HOST)
        self.milvus_collection = milvus_collection
        self.sqlite_conn = sqlite_conn

    def process(self, emails):
        for email in emails:
            chunks = self.text_splitter.split_text(email['body'])
            for chunk in chunks:
                embedding = self.embedder.embed_query(chunk)
                self._store_in_milvus(email['id'], embedding)
            self._store_in_sqlite(email)

    def _store_in_milvus(self, email_id, embedding):
        self.milvus_collection.insert([[email_id], [embedding]])

    def _store_in_sqlite(self, email):
        cursor = self.sqlite_conn.cursor()
        cursor.execute(
            "INSERT OR REPLACE INTO emails (id, subject, sender, recipient, date, body) VALUES (?, ?, ?, ?, ?, ?)",
            (email['id'], email['subject'], email['from'], email['to'], email['date'], email['body'])
        )
        self.sqlite_conn.commit()


In [None]:
# Orchestration / Normalizer Usage Demo
from datetime import datetime, timedelta
import hashlib, json

def _participants_hash(participants: list[str]) -> str | None:
    if not participants:
        return None
    norm = sorted({(p or '').lower() for p in participants if p})
    return hashlib.sha256('\n'.join(norm).encode('utf-8')).hexdigest()

def _content_hash(record: dict) -> str:
    payload = {
        'v': CONTENT_HASH_RECIPE_VERSION,
        'subject': record.get('subject') or '',
        'body_text': record.get('body_text') or '',
        'date_utc': record.get('date_utc') or '',
        'participants_hash': record.get('participants_hash') or '',
    }
    return hashlib.sha256(json.dumps(payload, sort_keys=True, separators=(',',':')).encode('utf-8')).hexdigest()

def _direction(record: dict) -> str:
    primary = (PRIMARY_MAILBOX or '').lower()
    from_addr = (record.get('from_addr') or '').lower()
    to_addrs = [a.lower() for a in (record.get('to_addrs') or [])]
    cc_addrs = [a.lower() for a in (record.get('cc_addrs') or [])]
    if from_addr == primary:
        return 'outbound'
    in_to = primary in to_addrs
    in_cc = primary in cc_addrs
    if in_to and from_addr != primary:
        return 'inbound'
    if in_cc and not in_to and from_addr != primary:
        return 'cc_only'
    return 'other'

def run_normalization_demo(fetch_limit: int = 3, since_days: int = EMAIL_SYNC_SINCE_DAYS):
    normalizer = EmailNormalizer()
    if EMAIL_ENABLED:
        connector = IMAPConnector(IMAP_HOST, IMAP_USERNAME, IMAP_PASSWORD, IMAP_MAILBOX, batch_limit=fetch_limit)
        since_date = datetime.utcnow() - timedelta(days=since_days)
        raw_records = connector.fetch_emails(since_date)
    else:
        # Synthetic sample records (simulate shape pre-normalization)
        raw_records = [
            {
                'message_id': f'<demo-{i}@example.com>',
                'thread_id': None,
                'subject': f'Demo Email {i}',
                'from_addr': 'alice@example.com' if i % 2 == 0 else PRIMARY_MAILBOX,
                'to_addrs': [PRIMARY_MAILBOX] if i % 2 == 0 else ['bob@example.com'],
                'cc_addrs': ['carol@example.com'] if i == 2 else [],
                'date_utc': datetime.utcnow().replace(tzinfo=timezone.utc).isoformat(),
                'received_utc': None,
                'in_reply_to': None,
                'references_ids': [],
                'is_reply': 0,
                'is_forward': 0,
                'raw_size_bytes': None,
                'body_text': "Hello Bob,\n\nPlease see below.\n\nOn Tue wrote:\n> older quoted line",  # contains quoted pattern
                'body_html': None,
                'language': None,
                'has_attachments': 0,
                'attachment_manifest': [],
                'processed': 0,
                'ingested_at': None,
                'updated_at': None,
                'content_hash': None,
                'summary': None,
                'keywords': None,
                'auto_topic': None,
                'manual_topic': None,
                'topic_confidence': None,
                'topic_version': None,
                'error_state': None,
                'direction': None,
                'participants': [],
                'participants_hash': None,
                'to_primary': None,
            }
            for i in range(fetch_limit)
        ]
        for r in raw_records:
            # Fill participants baseline like connector would
            participants = sorted({p for p in ([r.get('from_addr')] if r.get('from_addr') else []) + r.get('to_addrs', []) + r.get('cc_addrs', [])})
            r['participants'] = participants
            r['to_primary'] = PRIMARY_MAILBOX if PRIMARY_MAILBOX.lower() in [a.lower() for a in r.get('to_addrs', [])] else None

    print(f"Raw records: {len(raw_records)}")

    normalized = []
    for rec in raw_records:
        before_len = len(rec.get('body_text') or '' )
        rec = normalizer.normalize_record(rec)
        after_len = len(rec.get('body_text') or '' )
        rec['participants_hash'] = _participants_hash(rec.get('participants'))
        rec['content_hash'] = _content_hash(rec)
        rec['direction'] = _direction(rec)
        normalized.append(rec)
        print(f"message_id={rec.get('message_id')} body_len {before_len}->{after_len} direction={rec['direction']}")

    # Show sample of final normalized shape (first record)
    if normalized:
        sample_keys = ['message_id','subject','direction','participants_hash','content_hash','body_text']
        preview = {k: (normalized[0].get(k)[:80] + '…' if isinstance(normalized[0].get(k), str) and len(normalized[0].get(k))>83 else normalized[0].get(k)) for k in sample_keys}
        print("Sample normalized record subset:")
        print(preview)
    return normalized

# Run demo (will use synthetic data unless EMAIL_ENABLED=True)
_ = run_normalization_demo()