In [2]:
%pip install PyPDF2 --quiet


Note: you may need to restart the kernel to use updated packages.


In [3]:
import os, glob
print("Working directory:", os.getcwd())
print("PDFs found:", [os.path.basename(p) for p in glob.glob("docs/*.pdf")])


Working directory: C:\Users\niamh\docs
PDFs found: []


In [4]:
import re, json, math, textwrap
from pathlib import Path
from collections import Counter, defaultdict
from typing import List, Dict
import PyPDF2

DOCS_DIR = Path("docs")
MEMORY_FILE = Path("memory.json")

# ---------- PDF loading ----------
def extract_text_from_pdf(path: Path) -> str:
    text = []
    with open(path, "rb") as f:
        reader = PyPDF2.PdfReader(f)
        for page in reader.pages:
            t = page.extract_text() or ""
            text.append(t)
    return "\n".join(text)

def chunk_text(text: str, max_chars=1200):
    # sentence-ish split, then recombine to ~max_chars chunks
    sentences = re.split(r"(?<=[.!?])\s+|\n{2,}", text)
    chunks, cur = [], ""
    for s in sentences:
        s = s.strip()
        if not s:
            continue
        if len(cur) + len(s) + 1 <= max_chars:
            cur += (" " if cur else "") + s
        else:
            if cur: chunks.append(cur.strip())
            cur = s
    if cur: chunks.append(cur.strip())
    return chunks

def load_pdf_corpus() -> List[Dict]:
    docs = []
    for p in DOCS_DIR.glob("**/*.pdf"):
        try:
            raw = extract_text_from_pdf(p)
        except Exception as e:
            print(f"Warning: couldn't read {p.name}: {e}")
            continue
        for i, ch in enumerate(chunk_text(raw)):
            docs.append({"path": str(p), "chunk_id": i, "text": ch})
    return docs

# ---------- tiny text utils ----------
def clean(text: str) -> str:
    text = text.lower()
    text = re.sub(r"[^\w\s]", " ", text)
    return re.sub(r"\s+", " ", text).strip()

def tokenize(text: str):
    return [t for t in clean(text).split() if t]

# ---------- retrieval (simple TF-IDF-ish + cosine) ----------
def build_idf(chunks):
    df = Counter()
    seen = defaultdict(set)
    for idx, ch in enumerate(chunks):
        for w in set(tokenize(ch["text"])):
            seen[w].add(idx)
    N = len(chunks)
    idf = {w: math.log((N + 1) / (1 + len(idxs))) + 1 for w, idxs in seen.items()}
    return idf

def score_chunk(query_tokens, chunk_text, idf):
    words = tokenize(chunk_text)
    tf = Counter(words)
    num = 0.0; q_sum = 0.0; d_sum = 0.0
    vocab = set(query_tokens) | set(words)
    for w in vocab:
        qw = (1.0 if w in query_tokens else 0.0) * idf.get(w, 1.0)
        dw = tf[w] * idf.get(w, 1.0)
        num += qw * dw
        q_sum += qw * qw
        d_sum += dw * dw
    denom = (math.sqrt(q_sum) * math.sqrt(d_sum)) or 1e-9
    return num / denom

def retrieve(query, chunks, idf, k=3):
    q_tokens = tokenize(query)
    scored = [(score_chunk(q_tokens, ch["text"], idf), ch) for ch in chunks]
    scored.sort(key=lambda x: x[0], reverse=True)
    return [ch for score, ch in scored[:k] if score > 0]

# ---------- tiny summarizer ----------
def summarize(query, passages):
    q_set = set(tokenize(query))
    candidates = []
    for p in passages:
        # split to sentences; keep ones overlapping query terms
        for s in re.split(r"(?<=[.!?])\s+", p["text"]):
            toks = set(tokenize(s))
            overlap = len(q_set & toks)
            if overlap:
                candidates.append((overlap, s.strip(), p["path"]))
    candidates.sort(key=lambda x: x[0], reverse=True)
    if not candidates:
        return "I couldn't find anything relevant in your PDFs.", []
    picks = candidates[:4]
    body = " ".join(s for _, s, _ in picks)
    body = textwrap.shorten(body, width=800, placeholder=" ...")
    sources = sorted({src for _, _, src in picks})
    return body, sources

# ---------- memory ----------
def append_memory(query, answer, sources):
    entry = {"query": query, "answer": answer, "sources": sources}
    try:
        data = json.loads(MEMORY_FILE.read_text(encoding="utf-8"))
    except Exception:
        data = []
    data.append(entry)
    MEMORY_FILE.write_text(json.dumps(data, indent=2), encoding="utf-8")

# ---------- index at module load ----------
if not DOCS_DIR.exists():
    raise SystemExit("Create a 'docs/' folder first and put your PDFs there.")

pdf_chunks = load_pdf_corpus()
if not pdf_chunks:
    print("No PDFs found in docs/. Add files and re-run this cell.")
else:
    idf = build_idf(pdf_chunks)
    print(f"Indexed {len({c['path'] for c in pdf_chunks})} PDFs, {len(pdf_chunks)} chunks.")

def ask(query: str, k: int = 3, remember: bool = True):
    if not pdf_chunks:
        print("No content indexed.")
        return
    hits = retrieve(query, pdf_chunks, idf, k=k)
    if not hits:
        print("Sorry, I found nothing relevant.")
        return
    result = summarize(query, hits)
    if isinstance(result, str):
        print(result); return
    answer, sources = result
    print("\nAnswer:\n" + textwrap.fill(answer, width=100))
    print("\nSources:")
    for s in sources:
        print(" -", s)
    if remember:
        append_memory(query, answer, sources)


SystemExit: Create a 'docs/' folder first and put your PDFs there.

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)
