# EmailSearchAI — Single‑File RAG over Kaggle Email Threads
_Lightweight notebook: ingest ▶︎ chunk ▶︎ embed ▶︎ hybrid search+rerank ▶︎ grounded answer._

> Structured similarly to your sample notebook sections: ## Text Processing, ## Chunking & Embeddings, ## Generate and Store Embeddings using OpenAI and ChromaDB, ## Semantic Search with Cache, ## Re-Ranking with a Cross Encoder, ## 6. Retrieval Augmented Generation

## 1) Environment Setup

In [13]:
# !pip uninstall -y transformers sentence-transformers torch huggingface_hub
# !pip install torch==2.1.0 transformers==4.36.0 huggingface_hub==0.19.4
# !pip install sentence-transformers==2.2.2

In [21]:
# If running fresh, uncomment installs (kept inline to stay single-file)
# %pip install -q python-dotenv pandas orjson beautifulsoup4 rapidfuzz rank-bm25 chromadb faiss-cpu sentence-transformers openai langchain-text-splitters tiktoken rich

import os, json, time, hashlib, sqlite3, math, textwrap
import pandas as pd
from bs4 import BeautifulSoup
from typing import List, Dict, Any
from dataclasses import dataclass
from datetime import datetime
from IPython.display import display, Markdown

# Optional: load OPENAI_API_KEY from env if present
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "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")


## 2) Config

In [None]:
# === Paths (edit if needed) ===
RAW_DETAILS = "dataset/CSV/email_thread_details.csv"          # from Kaggle dataset
RAW_SUMMARIES = "dataset/CSV/email_thread_summaries.csv"      # optional for eval
CURATED_MSGS = "./data/curated/emails.jsonl"
CURATED_SUMS = "./data/curated/thread_summaries.jsonl"
DERIVED_CHUNKS = "./data/derived/chunks.jsonl"
INDEX_DIR = "./.chroma_emailsearch"

# === Models ===
USE_OPENAI_EMBED = False  # set True to use OpenAI embeddings
EMBED_MODEL = "sentence-transformers/all-MiniLM-L6-v2"     # local fast default
CROSS_ENCODER_MODEL = "cross-encoder/ms-marco-MiniLM-L-6-v2"

# === Search params ===
TOP_K = 12
TOP_K_RERANK = 5
BM25_TOP = 60

# === Cache ===
CACHE_PATH = ".cache_emailsearch.sqlite"
CACHE_TTL = 6*3600

os.makedirs("data/curated", exist_ok=True)
os.makedirs("data/derived", exist_ok=True)


## 3) Ingestion — CSV ▶︎ Curated JSONL

In [33]:
def html_to_text(x: str) -> str:
    if not isinstance(x, str): return ""
    return BeautifulSoup(x, "html.parser").get_text(" ", strip=True)

def normalize_list(x):
    if isinstance(x, list): return x
    if not isinstance(x, str) or not x.strip(): return []
    try:
        v = json.loads(x)
        return v if isinstance(v, list) else [str(v)]
    except Exception:
        import re
        return [s.strip() for s in re.split(r"[;,]", x) if s.strip()]

def write_jsonl(path, rows):
    with open(path, "w", encoding="utf-8") as f:
        for r in rows:
            f.write(json.dumps(r, ensure_ascii=False) + "\n")

def ingest_kaggle(details_path=RAW_DETAILS, summaries_path=RAW_SUMMARIES):
    df = pd.read_csv(details_path)
    rows = []
    for i, r in df.iterrows():
        thread_id = str(r.get("thread_id",""))
        rec = {
            "message_id": str(r.get("message_id","")) or f"{thread_id}-{i}",
            "thread_id": thread_id,
            "date": str(r.get("timestamp","")),
            "from": r.get("from",""),
            "to": normalize_list(r.get("to","")),
            "cc": normalize_list(r.get("cc","")) if "cc" in df.columns else [],
            "subject": r.get("subject",""),
            "body_text": html_to_text(r.get("body","")),
            "body_html": None,
            "attachments": [],
            "tags": [],
            "path": f"email_thread_details.csv:{i}"
        }
        rows.append(rec)
    write_jsonl(CURATED_MSGS, rows)
    display(Markdown(f"**Wrote {len(rows)} messages →** `{CURATED_MSGS}`"))

    if os.path.exists(summaries_path):
        ds = pd.read_csv(summaries_path)
        sums = [{"thread_id": str(sr.get("thread_id","")), "summary": sr.get("summary","")} for _, sr in ds.iterrows()]
        write_jsonl(CURATED_SUMS, sums)
        display(Markdown(f"**Wrote {len(sums)} summaries →** `{CURATED_SUMS}`"))

# Run once after placing CSVs under dataset/raw/
ingest_kaggle()


FileNotFoundError: [Errno 2] No such file or directory: './dataset/CSV/email_thread_details.csv'

## 4) Chunking — Message, Thread‑Window, or Semantic

In [None]:
from langchain_text_splitters import RecursiveCharacterTextSplitter

def iter_jsonl(path):
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            if line.strip():
                yield json.loads(line)

def windowed_chunks(text, max_toks=900, overlap=120):
    splitter = RecursiveCharacterTextSplitter(chunk_size=max_toks, chunk_overlap=overlap, separators=["\n\n","\n",". "])
    return splitter.split_text(text)

def chunk_messages(curated_path=CURATED_MSGS, out_path=DERIVED_CHUNKS, mode="thread_window"):
    # mode: "message", "thread_window"
    msgs = list(iter_jsonl(curated_path))
    msgs.sort(key=lambda r: (r["thread_id"], r.get("date","")))

    chunks = []
    if mode == "message":
        for m in msgs:
            text = f"SUBJECT: {m['subject']}\nFROM: {m['from']}\nDATE: {m['date']}\n\n{m['body_text']}"
            for piece in windowed_chunks(text, 900, 120):
                chunks.append({
                    "id": f"{m['message_id']}",
                    "thread_id": m["thread_id"],
                    "subject": m["subject"],
                    "date": m["date"],
                    "text": piece
                })
    else:  # thread_window
        # merge consecutive messages within a thread up to target token-ish length
        buf = []
        cur_tid = None
        def flush():
            nonlocal buf, chunks, cur_tid
            if not buf: return
            text = "\n\n---\n\n".join(buf)
            pieces = windowed_chunks(text, 1000, 140)
            for j, p in enumerate(pieces):
                chunks.append({
                    "id": f"{cur_tid}-{hashlib.md5((p[:200]).encode()).hexdigest()[:8]}-{j}",
                    "thread_id": cur_tid,
                    "subject": subj_agg[:180],
                    "date": f"{d0}…{d1}",
                    "text": p
                })
            buf = []

        subj_agg, d0, d1 = "", "", ""
        for m in msgs:
            tid = m["thread_id"]
            if cur_tid is None:
                cur_tid = tid
                subj_agg = m["subject"]
                d0 = d1 = m.get("date","")
            if tid != cur_tid:
                flush()
                cur_tid = tid
                subj_agg = m["subject"]
                d0 = d1 = m.get("date","")
            d1 = m.get("date","") or d1
            buf.append(f"SUBJECT: {m['subject']}\nFROM: {m['from']}\nDATE: {m['date']}\n\n{m['body_text']}")
        flush()

    write_jsonl(out_path, chunks)
    display(Markdown(f"**Chunks written: {len(chunks)} →** `{out_path}`"))

chunk_messages(mode="thread_window")  # run after ingest


## 5) Embeddings & Index (Chroma)

In [None]:
from sentence_transformers import SentenceTransformer
from chromadb import PersistentClient

class Embedder:
    def __init__(self, model_name=EMBED_MODEL, use_openai=USE_OPENAI_EMBED):
        self.kind = "openai" if use_openai else "st"
        if self.kind == "st":
            self.model = SentenceTransformer(model_name)
        else:
            from openai import OpenAI
            self.client = OpenAI(api_key=OPENAI_API_KEY)
            self.model_name = model_name.split("/",1)[1]

    def encode(self, texts: List[str]):
        if self.kind == "st":
            return self.model.encode(texts, normalize_embeddings=True, show_progress_bar=False).tolist()
        resp = self.client.embeddings.create(model=self.model_name, input=texts)
        return [e.embedding for e in resp.data]

def build_index(chunks_path=DERIVED_CHUNKS, index_dir=INDEX_DIR):
    emb = Embedder()
    client = PersistentClient(path=index_dir)
    try:
        client.delete_collection("emails")
    except Exception:
        pass
    col = client.create_collection("emails", metadata={"hnsw:space": "cosine"})

    ids, texts, metas = [], [], []
    for i, r in enumerate(iter_jsonl(chunks_path)):
        ids.append(r["id"] if r.get("id") else f"c{i}")
        texts.append(r["text"])
        metas.append({k:v for k,v in r.items() if k != "text"})
        if len(ids) >= 256:
            vecs = emb.encode(texts)
            col.add(ids=ids, documents=texts, embeddings=vecs, metadatas=metas)
            ids, texts, metas = [], [], []
    if ids:
        vecs = emb.encode(texts)
        col.add(ids=ids, documents=texts, embeddings=vecs, metadatas=metas)
    display(Markdown("**Index built in** `{}'`".format(index_dir)))

build_index()  # run after chunking


.gitattributes: 0.00B [00:00, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

data_config.json: 0.00B [00:00, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

model.onnx:   0%|          | 0.00/90.4M [00:00<?, ?B/s]

model_O1.onnx:   0%|          | 0.00/90.4M [00:00<?, ?B/s]

model_O2.onnx:   0%|          | 0.00/90.3M [00:00<?, ?B/s]

model_O3.onnx:   0%|          | 0.00/90.3M [00:00<?, ?B/s]

model_O4.onnx:   0%|          | 0.00/45.2M [00:00<?, ?B/s]

model_qint8_arm64.onnx:   0%|          | 0.00/23.0M [00:00<?, ?B/s]

model_qint8_avx512.onnx:   0%|          | 0.00/23.0M [00:00<?, ?B/s]

model_qint8_avx512_vnni.onnx:   0%|          | 0.00/23.0M [00:00<?, ?B/s]

model_quint8_avx2.onnx:   0%|          | 0.00/23.0M [00:00<?, ?B/s]

openvino_model.bin:   0%|          | 0.00/90.3M [00:00<?, ?B/s]

openvino_model.xml: 0.00B [00:00, ?B/s]

openvino_model_qint8_quantized.bin:   0%|          | 0.00/22.9M [00:00<?, ?B/s]

openvino_model_qint8_quantized.xml: 0.00B [00:00, ?B/s]

pytorch_model.bin:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

train_script.py: 0.00B [00:00, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

FileNotFoundError: [Errno 2] No such file or directory: 'data/derived/chunks.jsonl'

## 6) Search — Hybrid (Vector ∪ BM25) + TTL Cache

In [None]:
from rank_bm25 import BM25Okapi

def ensure_cache(path=CACHE_PATH):
    conn = sqlite3.connect(path)
    conn.execute("CREATE TABLE IF NOT EXISTS cache (k TEXT PRIMARY KEY, v BLOB, t INT)")
    conn.commit()
    conn.close()

def cache_get(k, path=CACHE_PATH):
    conn = sqlite3.connect(path)
    cur = conn.execute("SELECT v,t FROM cache WHERE k=?", (k,))
    row = cur.fetchone()
    conn.close()
    if not row: return None
    v, t = row
    if time.time() - t > CACHE_TTL: return None
    return json.loads(v)

def cache_put(k, obj, path=CACHE_PATH):
    conn = sqlite3.connect(path)
    conn.execute("REPLACE INTO cache(k,v,t) VALUES (?,?,?)", (k, json.dumps(obj), int(time.time())))
    conn.commit()
    conn.close()

def qhash(q: str) -> str:
    return hashlib.sha256(q.strip().lower().encode()).hexdigest()

# Preload BM25 corpus from chunks
_bm25 = None
_bm25_docs = None
def _load_bm25(chunks_path=DERIVED_CHUNKS):
    global _bm25, _bm25_docs
    docs = [r["text"] for r in iter_jsonl(chunks_path)]
    tokenized = [d.split() for d in docs]
    _bm25 = BM25Okapi(tokenized)
    _bm25_docs = docs

def vector_search(query: str, top_k=TOP_K):
    client = PersistentClient(path=INDEX_DIR)
    col = client.get_collection("emails")
    res = col.query(query_texts=[query], n_results=top_k*2, include=["documents","metadatas","distances"])
    out = []
    for i in range(len(res["ids"][0])):
        out.append({ "text": res["documents"][0][i], **res["metadatas"][0][i] })
    return out[:top_k]

def bm25_search(query: str, top_bm25=BM25_TOP):
    if _bm25 is None: _load_bm25()
    scores = _bm25.get_scores(query.split())
    idxs = sorted(range(len(scores)), key=lambda i: -scores[i])[:top_bm25]
    outs = []
    # Reopen chunks to fetch metadata
    all_chunks = list(iter_jsonl(DERIVED_CHUNKS))
    for i in idxs:
        r = all_chunks[i]
        outs.append({ "text": r["text"], **{k:v for k,v in r.items() if k != "text"} })
    return outs

def hybrid_search(query: str, top_k=TOP_K):
    key = qhash("hybrid:"+query)
    ensure_cache()
    hit = cache_get(key)
    if hit: return hit
    v = vector_search(query, top_k=top_k)
    b = bm25_search(query, top_bm25=BM25_TOP)
    # Union by (id, text) signature
    seen = set()
    merged = []
    for lst in (v, b):
        for c in lst:
            sig = (c.get("id"), c["text"][:64])
            if sig in seen: continue
            seen.add(sig); merged.append(c)
    cache_put(key, merged)
    return merged


## 7) Re‑ranking (Cross‑Encoder)

In [None]:
from sentence_transformers import CrossEncoder

_reranker = None
def get_reranker(model=CROSS_ENCODER_MODEL):
    global _reranker
    if _reranker is None:
        _reranker = CrossEncoder(model)
    return _reranker

def rerank(query: str, candidates: List[Dict[str,Any]], top_k=TOP_K_RERANK):
    ce = get_reranker()
    pairs = [(query, c["text"]) for c in candidates]
    scores = ce.predict(pairs)
    for s, c in zip(scores, candidates):
        c["rerank_score"] = float(s)
    ranked = sorted(candidates, key=lambda x: -x["rerank_score"])
    return ranked[:top_k]


## 8) Generation — Grounded Answer (Quotes + Provenance)

In [None]:
    def make_prompt(query: str, chunks: List[Dict[str,Any]]) -> str:
        ctx = "\n\n".join([
            f"[CHUNK {i+1}]\nTHREAD: {c.get('thread_id','?')} | DATE: {c.get('date','?')}\nSUBJECT: {c.get('subject','')}\nTEXT:\n{c['text']}"
            for i, c in enumerate(chunks)
        ])
        return f"""
Answer the user strictly from the CONTEXT.
Rules:
- Short answer first.
- Then Evidence: bullet list; each bullet quotes a short line (≤20 words) and includes (thread_id · date).
- If context is insufficient or conflicting, say so explicitly.
- Do not add external knowledge.

USER QUESTION: {query}

CONTEXT:
{ctx}
"""

    def generate_answer_openai(query, chunks):
        from openai import OpenAI
        client = OpenAI(api_key=OPENAI_API_KEY)
        prompt = make_prompt(query, chunks)
        resp = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {"role":"system","content":"You are EmailSearchAI, a cautious, citation-first assistant."},
                {"role":"user","content": prompt}
            ],
            temperature=0.2
        )
        return resp.choices[0].message.content


## 9) Run a Query End‑to‑End

In [None]:
def answer(query: str, top_k=TOP_K, top_k_rerank=TOP_K_RERANK, show_top=3):
    cand = hybrid_search(query, top_k=top_k)
    ranked = rerank(query, cand, top_k=top_k_rerank)
    display(Markdown("**Top candidates (after re‑rank):**"))
    for i, c in enumerate(ranked[:show_top]):
        md = f"**{i+1}. THREAD {c.get('thread_id','?')} — {c.get('date','')}**  \n" \
             f"*{c.get('subject','')[:200]}*\n\n> {c['text'][:500]}..."
        display(Markdown(md))
    if not OPENAI_API_KEY:
        display(Markdown("⚠️ Set OPENAI_API_KEY to enable final LLM answer."))
        return ranked
    ans = generate_answer_openai(query, ranked[:show_top])
    display(Markdown("### Final Answer"))
    display(Markdown(ans))
    return ranked

# Example:
# answer("What decision was reached about the Q3 forecast and who approved it?")


## 10) (Optional) Quick Multi‑Query Eval

In [None]:
def run_queries(queries: List[str]):
    results = {}
    for q in queries:
        display(Markdown(f"## Query: **{q}**"))
        results[q] = answer(q)
    return results

# SAMPLE QUERIES (edit to match your corpus topics)
SAMPLE_QUERIES = [
    "Summarize the main decision and the final deadline in the budget thread.",
    "Who proposed the chosen approach for the data migration and when?",
    "List follow-up action items assigned to Finance in November threads.",
]
run_queries(SAMPLE_QUERIES)


## Query: **Summarize the main decision and the final deadline in the budget thread.**

NotFoundError: Collection [emails] does not exists