In [7]:
import io, uuid, pathlib, subprocess, json
import numpy as np
import fitz  # PyMuPDF
from pypdf import PdfReader
from sentence_transformers import SentenceTransformer
from chromadb import Client
from chromadb.config import Settings

In [8]:
EMBED_MODEL = "sentence-transformers/all-MiniLM-L6-v2"
TOP_K = 5
MIN_SCORE = 0.5
OLLAMA_MODEL = "llama3"

In [9]:
emb = SentenceTransformer(EMBED_MODEL)
chroma = Client(Settings(anonymized_telemetry=False))

In [10]:
# pdf_reader.py
import io
import fitz  # PyMuPDF
from pypdf import PdfReader

def extract_pages(pdf_bytes: bytes) -> list[str]:
    # Try PyMuPDF first (usually best); fall back to pypdf
    try:
        doc = fitz.open(stream=pdf_bytes, filetype="pdf")
        pages = [doc[i].get_text() or "" for i in range(len(doc))]
        doc.close()
        return pages
    except Exception:
        reader = PdfReader(io.BytesIO(pdf_bytes))
        return [(p.extract_text() or "") for p in reader.pages]


In [12]:

import re
from typing import List, Dict, Any
_client = Client(Settings(anonymized_telemetry=False))
_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")

# In-memory cache so we can run exact-match before vector search
CHUNK_CACHE: dict[str, List[Dict[str, Any]]] = {}

BULLET_RE = re.compile(r'^\s*([\-•\*\u2022]|\d+[\.)])\s+')

def _split_paragraphs(text: str) -> List[str]:
    t = re.sub(r'\r\n?', '\n', text or '')
    # split on blank lines
    paras = [p.strip() for p in re.split(r'\n\s*\n', t) if p.strip()]
    return paras

def _chunk_paragraphs_with_bullets(page_text: str) -> List[str]:
    paras = _split_paragraphs(page_text)
    out = []
    i = 0
    while i < len(paras):
        cur = paras[i]
        # "Heading" heuristic: shortish line, no trailing punctuation (tweak if needed)
        is_heading = (len(cur) < 120) and (not cur.endswith(('.', '!', '?')))
        if is_heading and i + 1 < len(paras):
            j = i + 1
            bullets = []
            while j < len(paras) and BULLET_RE.match(paras[j]):
                bullets.append(paras[j])
                j += 1
            if bullets:
                # group heading + its bullet block
                out.append(cur + "\n" + "\n".join(bullets))
                i = j
                continue
        out.append(cur)
        i += 1
    return out

def build_index(doc_id: str, filename: str, pages: List[str]):
    col = _client.get_or_create_collection(name=f"doc_{doc_id}")
    ids, docs, metas = [], [], []
    cache_items: List[Dict[str, Any]] = []

    for page_no, page_text in enumerate(pages, start=1):
        for n, chunk in enumerate(_chunk_paragraphs_with_bullets(page_text)):
            cid = f"{doc_id}_{page_no}_{n}"
            ids.append(cid)
            docs.append(chunk)
            meta = {"page": page_no, "filename": filename, "cid": cid}
            metas.append(meta)
            cache_items.append({"text": chunk, "meta": meta})

    if docs:
        vecs = _model.encode(docs, convert_to_numpy=True, normalize_embeddings=True)
        col.add(ids=ids, documents=docs, metadatas=metas, embeddings=vecs.tolist())

    # keep paragraph/bullet chunks for exact-match pass
    CHUNK_CACHE[doc_id] = cache_items
    return col

In [13]:
"""# indexer.py
from chromadb import Client
from chromadb.config import Settings
from sentence_transformers import SentenceTransformer

_client = Client(Settings(anonymized_telemetry=False))
_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")  # fast + good baseline

def build_index(doc_id: str, filename: str, pages: list[str]):
    col = _client.get_or_create_collection(name=f"doc_{doc_id}")

    ids, docs, metas, embeds = [], [], [], []
    for page_num, page_text in enumerate(pages, start=1):
        for j, chunk in enumerate(chunk_text(page_text)):
            ids.append(f"{doc_id}_{page_num}_{j}")
            docs.append(chunk)
            metas.append({"page": page_num, "filename": filename})
    if docs:
        embeds = _model.encode(docs, convert_to_numpy=True, normalize_embeddings=True)
        col.add(ids=ids, documents=docs, metadatas=metas, embeddings=embeds.tolist())
    return col
"""

'# indexer.py\nfrom chromadb import Client\nfrom chromadb.config import Settings\nfrom sentence_transformers import SentenceTransformer\n\n_client = Client(Settings(anonymized_telemetry=False))\n_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")  # fast + good baseline\n\ndef build_index(doc_id: str, filename: str, pages: list[str]):\n    col = _client.get_or_create_collection(name=f"doc_{doc_id}")\n\n    ids, docs, metas, embeds = [], [], [], []\n    for page_num, page_text in enumerate(pages, start=1):\n        for j, chunk in enumerate(chunk_text(page_text)):\n            ids.append(f"{doc_id}_{page_num}_{j}")\n            docs.append(chunk)\n            metas.append({"page": page_num, "filename": filename})\n    if docs:\n        embeds = _model.encode(docs, convert_to_numpy=True, normalize_embeddings=True)\n        col.add(ids=ids, documents=docs, metadatas=metas, embeddings=embeds.tolist())\n    return col\n'

In [14]:
# retriever.py
from typing import List, Dict, Any
from chromadb import Client
from chromadb.config import Settings
import numpy as np

_client = Client(Settings(anonymized_telemetry=False))

def retrieve(doc_id: str, query: str, top_k: int = 5) -> List[Dict[str, Any]]:
    col = _client.get_or_create_collection(name=f"doc_{doc_id}")
    if col.count() == 0:
        return []
    res = col.query(query_texts=[query], n_results=top_k,
                    include=["documents","metadatas","distances","embeddings"])
    items = []
    if res and res["documents"]:
        docs = res["documents"][0]
        metas = res["metadatas"][0]
        dists = res["distances"][0]
        for doc, meta, dist in zip(docs, metas, dists):
            # Chroma's distance depends on embedding_fn; with our manual embeddings,
            # it's cosine distance in [0,2]. Convert to similarity in [0,1] roughly:
            # sim = 1 - (dist / 2)
            sim = 1 - (float(dist) / 2.0)
            items.append({"text": doc, "meta": meta, "score": float(np.clip(sim, 0.0, 1.0))})
    return items


In [15]:
"""# qa.py
from typing import Dict, Any

MIN_SCORE = 0.40
TOP_K = 5

def answer(question: str, doc_id: str) -> Dict[str, Any]:
    hits = retrieve(doc_id, question, TOP_K)
    if not hits or hits[0]["score"] < MIN_SCORE:
        return {
            "answer": "I do not have an answer to that question because the PDF doesn’t contain it (or I can’t retrieve it reliably).",
            "sources": []
        }
    # Extractive baseline: return stitched text from top chunks
    context = "\n\n---\n\n".join([h["text"] for h in hits[:2]])  # keep it short for readability
    pages = sorted({h["meta"]["page"] for h in hits[:2]})
    return {
        "answer": context,  # replace later with LLM-generated summary over context, if you want
        "sources": [{"page": p} for p in pages],
        "scores": [round(h["score"],3) for h in hits[:2]]
    }
"""

MIN_SCORE = 0.45     # raise to reduce bloat; lower to refuse less
TOP_K = 5
OLLAMA_MODEL = "mistral"  # or "llama3"

def _ollama(prompt: str) -> str:
    try:
        p = run(["ollama", "run", OLLAMA_MODEL, prompt], stdout=PIPE, stderr=PIPE, text=True)
        return (p.stdout or "").strip()
    except FileNotFoundError:
        return ""  # if ollama isn’t available, we’ll return extractive text

def _exact_match_pass(doc_id: str, query: str) -> Dict[str, Any] | None:
    """
    If the query matches a heading inside a heading+bullets chunk,
    return ONLY the following 1–3 lines (the bullets) as the answer.
    """
    chunks = CHUNK_CACHE.get(doc_id, [])
    q = (query or "").strip().lower()
    if not q:
        return None

    for ch in chunks:
        text = ch["text"]
        if q in text.lower():
            # split into non-empty lines
            lines = [l for l in text.splitlines() if l.strip()]
            # find the matched line (likely the heading)
            try:
                idx = next(i for i, l in enumerate(lines) if q in l.lower())
            except StopIteration:
                continue
            # take next 1–3 lines as the answer
            following: List[str] = []
            for l in lines[idx+1:]:
                following.append(l)
                if len(following) >= 3:  # tweak if you want more
                    break
            if following:
                snippet = "\n".join(following)
            else:
                # fallback: return the chunk if no bullet follows
                snippet = text
            return {
                "answer": snippet,
                "sources": [{"page": ch["meta"]["page"]}],
                "scores": [1.0],
            }
    return None

def answer(question: str, doc_id: str, history: List[Dict[str,str]] | None = None) -> Dict[str, Any]:
    # 1) exact-match first – handles “heading → bullet” cases precisely
    em = _exact_match_pass(doc_id, question)
    if em:
        return em

    # 2) semantic retrieve (fallback)
    hits = retrieve(doc_id, question, TOP_K)
    if not hits or hits[0]["score"] < MIN_SCORE:
        return {
            "answer": "I cannot find that in the PDF.",
            "sources": [],
            "scores": []
        }

    # 3) keep a small, tight context (best 1–2 hits)
    top = hits[:2]
    context = "\n\n---\n\n".join([h["text"] for h in top])
    pages = sorted({h["meta"]["page"] for h in top})

    # 4) ask Ollama with strict grounding; if Ollama not available, return extractive text
    prompt = f"""
You are a PDF-grounded assistant.
Use ONLY the context from the PDF to answer.
If the answer is not in the context, say exactly:
"I cannot find that in the PDF."
Cite page numbers {pages}.

Context:
{context}

Question:
{question}
""".strip()

    llm_answer = _ollama(prompt)
    if not llm_answer:
        # local fallback: just return extractive context
        return {
            "answer": context,
            "sources": [{"page": p} for p in pages],
            "scores": [round(h["score"], 3) for h in top],
        }

    return {
        "answer": llm_answer,
        "sources": [{"page": p} for p in pages],
        "scores": [round(h["score"], 3) for h in top],
    }

In [16]:

# 6) Answer with PDF-only grounding
def answer(question: str, doc_id: str):
    hits = retrieve(doc_id, question, TOP_K)
    if not hits or hits[0]["score"] < MIN_SCORE:
        return {
            "answer": "I cannot find that in the PDF.",
            "sources": []
        }
    context = "\n\n---\n\n".join([h["text"] for h in hits])
    pages = sorted({h["meta"]["page"] for h in hits})
    prompt = f"""
You are a PDF-grounded assistant.
Use ONLY the context from the PDF below to answer the question.
If the answer is not in the context, say exactly:
"I cannot find that in the PDF."
Always cite page numbers {pages} in your answer.

Context:
{context}

Question:
{question}
"""
    llm_answer = call_ollama(prompt)
    return {
        "answer": llm_answer,
        "sources": [f"p.{p}" for p in pages]
    }


In [17]:
import re

def split_paragraphs(text: str) -> list[str]:
    # normalize newlines
    t = re.sub(r'\r\n?', '\n', text or '')
    # split on blank lines
    paras = [p.strip() for p in re.split(r'\n\s*\n', t) if p.strip()]
    return paras

BULLET_RE = re.compile(r'^\s*([\-•\*\u2022]|\d+[\.)])\s+')

def chunk_paragraphs_with_bullets(page_text: str) -> list[str]:
    paras = split_paragraphs(page_text)
    out = []
    i = 0
    while i < len(paras):
        cur = paras[i]
        # if this para looks like a heading (short, Title Case, ends without punctuation), group with the bullet block after it
        is_heading = (len(cur) < 120 and cur.endswith(('.', ':')) is False)
        if is_heading and i + 1 < len(paras):
            # collect consecutive bullet paragraphs
            j = i + 1
            bullets = []
            while j < len(paras) and BULLET_RE.match(paras[j]):
                bullets.append(paras[j])
                j += 1
            if bullets:
                out.append(cur + "\n" + "\n".join(bullets))
                i = j
                continue
        out.append(cur)
        i += 1
    return out


In [18]:
def exact_match_pass(chunks_with_meta, query: str):
    q = query.strip().lower()
    for ch in chunks_with_meta:
        text = ch["text"]
        if q and q in text.lower():
            # If it’s a heading+bullets chunk, extract only the first bullet/line after heading
            lines = [l for l in text.splitlines() if l.strip()]
            # find heading line index
            idx = next((k for k,l in enumerate(lines) if q in l.lower()), None)
            if idx is not None:
                # return the next non-heading line(s), capped
                following = []
                for l in lines[idx+1:]:
                    following.append(l)
                    # stop after 1–3 bullet/lines; tweak to taste
                    if len(following) >= 3: break
                snippet = "\n".join(following) if following else text
                return {"text": snippet, "meta": ch["meta"], "score": 1.0}
    return None


In [19]:
def answer_pdf_only(question, doc_id):
    # 0) load paragraph-level chunks for doc_id (you already have them in your collection or in-memory)
    chunks = load_doc_chunks(doc_id)  # each: {"text":..., "meta":{"page":...}}
    
    # 1) exact match first
    em = exact_match_pass(chunks, question)
    if em:
        return {
            "answer": em["text"],
            "sources": [f"p.{em['meta']['page']}"]
        }
    
    # 2) semantic retrieve + gating (your existing retrieve)
    hits = retrieve(doc_id, question, top_k=5)
    if not hits or hits[0]["score"] < MIN_SCORE:
        return {"answer": "I cannot find that in the PDF.", "sources": []}
    
    # 3) small context → Ollama with strict prompt (if you still want generated prose)
    context = "\n\n---\n\n".join([h["text"] for h in hits[:2]])
    pages = sorted({h["meta"]["page"] for h in hits[:2]})
    prompt = f"""
Use ONLY the context from the PDF to answer.
If missing, say: "I cannot find that in the PDF."
Cite pages {pages}.

Context:
{context}

Question:
{question}
"""
    llm_answer = call_ollama(prompt)
    return {"answer": llm_answer, "sources": [f"p.{p}" for p in pages]}


In [20]:
# app.py
import pathlib
import sys
import subprocess
import textwrap
"""
def load_pdf(path: str) -> bytes:
    return pathlib.Path(path).read_bytes()

def main():
    pdf_path = input("Path to PDF: ").strip()
    q = None
    raw = load_pdf(pdf_path)
    pages = extract_pages(raw)
    print(f"Pages extracted: {len(pages)}")

    doc_id = str(uuid.uuid4())
    build_index(doc_id, filename=pdf_path.split("/")[-1], pages=pages)
    print("Indexed. Ask questions (blank to exit).")

    while True:
        q = input("\nQ: ").strip()
        if not q:
            break
        resp = answer(q, doc_id)
        print("\nA:", resp["answer"])
        if resp["sources"]:
            print("Sources:", ", ".join([f"p.{s['page']}" for s in resp["sources"]]))

if __name__ == "__main__":
    main()
"""

USE_CQR = True            # Turn on conversational query reformulation
OLLAMA_MODEL = "mistral"  # or "llama3"

def load_pdf(path: str) -> bytes:
    return pathlib.Path(path).read_bytes()

# --------- Ollama helpers (local LLM) ----------
def _ollama_run(model: str, prompt: str) -> str:
    """Run a one-shot prompt against a local Ollama model."""
    try:
        res = subprocess.run(
            ["ollama", "run", model, prompt],
            capture_output=True, text=True, check=False
        )
        return res.stdout.strip()
    except FileNotFoundError:
        # Ollama not found or not on PATH
        return ""

def reformulate_with_ollama(history, new_q, model=OLLAMA_MODEL, turns=3) -> str:
    """
    Rewrite the user's new question into a standalone, contextful query
    using the last N user turns from history.
    """
    if not USE_CQR or not history:
        return new_q

    # Pull last N user turns
    prior_user_turns = [h["content"] for h in history if h["role"] == "user"][-turns:]
    if not prior_user_turns:
        return new_q

    prompt = textwrap.dedent(f"""
    Rewrite the user's question into a single, self-contained query.
    Use the brief conversation context to replace pronouns and vague references.
    Do NOT answer; only rewrite the question. Keep it concise and specific.

    Context:
    {chr(10).join(f"- {p}" for p in prior_user_turns)}

    New question:
    {new_q}

    Rewritten standalone question:
    """).strip()

    rewritten = _ollama_run(model, prompt).strip()
    # Fallback to original if Ollama isn't available / returns empty
    return rewritten if rewritten else new_q

# --------- Pretty helpers ----------
def _pretty_sources(sources):
    """
    Accepts either a list of dicts like {'page': 2} or a list of 'p.2' strings.
    Returns a pretty string like 'p.2, p.3'.
    """
    if not sources:
        return ""
    out = []
    for s in sources:
        if isinstance(s, dict) and "page" in s:
            out.append(f"p.{s['page']}")
        elif isinstance(s, str):
            out.append(s if s.startswith("p.") else f"p.{s}")
    return ", ".join(out)

def _print_help():
    print(textwrap.dedent("""
    Commands:
      :help         Show this help
      :new          Load a new PDF (re-index)
      :quit         Exit
      (blank line)  Also exits

    Just type your question to query the current PDF.
    """).strip())

def main():
    # ---- Load & index a PDF ----
    pdf_path = input("Path to PDF: ").strip()
    if not pdf_path:
        print("No file provided. Exiting.")
        sys.exit(0)

    raw = load_pdf(pdf_path)
    pages = extract_pages(raw)
    print(f"Pages extracted: {len(pages)}")

    doc_id = str(uuid.uuid4())
    build_index(doc_id, filename=pathlib.Path(pdf_path).name, pages=pages)
    print("Indexed. Ask questions (':help' for commands, blank to exit).")

    # ---- Conversational history ----
    # We keep a simple list of {'role': 'user'/'assistant', 'content': str}
    history = []

    while True:
        q = input("\nQ: ").strip()

        # Commands
        if q in ("", ":quit", ":q", ":exit"):
            print("Bye.")
            break
        if q == ":help":
            _print_help()
            continue
        if q == ":new":
            # Load a new PDF and re-index
            pdf_path = input("Path to NEW PDF: ").strip()
            if not pdf_path:
                print("No file provided. Keeping current document.")
                continue
            try:
                raw = load_pdf(pdf_path)
                pages = extract_pages(raw)
                print(f"Pages extracted: {len(pages)}")
                doc_id = str(uuid.uuid4())
                build_index(doc_id, filename=pathlib.Path(pdf_path).name, pages=pages)
                history.clear()
                print("Re-indexed. You can ask questions now.")
                continue
            except Exception as e:
                print(f"Failed to load/index new PDF: {e}")
                continue

        # ---- Conversational query reformulation (CQR) ----
        standalone_q = reformulate_with_ollama(history, q) if USE_CQR else q

        # ---- Ask your existing QA function ----
        # Try passing history if your answer() supports it;
        # otherwise fall back to the original signature.
        try:
            resp = answer(standalone_q, doc_id, history=history)  # type: ignore
        except TypeError:
            resp = answer(standalone_q, doc_id)

        # ---- Print answer + sources ----
        ans = resp.get("answer", "")
        print("\nA:", ans)

        sources = resp.get("sources", [])
        pretty_src = _pretty_sources(sources)
        if pretty_src:
            print("Sources:", pretty_src)

        # (Optional) show scores if present
        scores = resp.get("scores")
        if scores:
            print("Scores:", scores)

        # ---- Update history ----
        history.append({"role": "user", "content": q})
        history.append({"role": "assistant", "content": ans})

In [21]:
if __name__ == "__main__":
    main()

Pages extracted: 3
Indexed. Ask questions (':help' for commands, blank to exit).


NameError: name 'call_ollama' is not defined