<a href="https://colab.research.google.com/github/TarunKumar3103/AISummarizer/blob/main/RAG_DemoHCI.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# I tried a couple previous installs but they didnt really work due to compatibility issues so these are what I went with in the end
%pip -q install --upgrade pip setuptools wheel
%pip -q install "numpy==2.0.1" "tqdm>=4.67" jedi==0.18.2
%pip -q install "torch>=2.2" \
                "sentence-transformers==2.7.0" \
                "chromadb==0.4.24" \
                "trafilatura==1.7.0" \
                "readability-lxml==0.8.1" \
                "beautifulsoup4==4.12.3" \
                "orjson==3.10.7"


[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.8 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.2/1.8 MB[0m [31m6.1 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m1.8/1.8 MB[0m [31m24.7 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m17.1 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.2 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m37.2 MB/s[0m eta [36m0:00:00[0m
[?25h[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
ipython 7.34.0 requires jedi>=0.16, which is not installed.[0m[31m
[0m  Installing build 

In [2]:
import os
os.environ["CHROMA_TELEMETRY_ENABLED"] = "false"

import numpy as np
if not hasattr(np, "float_"): np.float_ = np.float64
for alias, target in [("int", int), ("float", float), ("complex", complex), ("bool", bool), ("object", object)]:
    if not hasattr(np, alias): setattr(np, alias, target)

from pathlib import Path
BASE  = Path("/content/web_rag_demo"); BASE.mkdir(parents=True, exist_ok=True)
RAW   = BASE / "data" / "raw";   RAW.mkdir(parents=True, exist_ok=True)
CLEAN = BASE / "data" / "clean"; CLEAN.mkdir(parents=True, exist_ok=True)
INDEX = BASE / "index" / "chroma"; INDEX.mkdir(parents=True, exist_ok=True)

import re, hashlib, time, json
from datetime import datetime
from urllib.parse import urlparse
from typing import List, Dict, Any
from collections import defaultdict

import requests, trafilatura
from bs4 import BeautifulSoup
from tqdm import tqdm

from sentence_transformers import SentenceTransformer, CrossEncoder
import chromadb
from chromadb.config import Settings

import torch
device = "cuda" if torch.cuda.is_available() else "cpu"
EMB_MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2"
embed_model = SentenceTransformer(EMB_MODEL_NAME, device=device)
reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2", device=device)

chroma_client = chromadb.PersistentClient(path=str(INDEX), settings=Settings(allow_reset=False))
collection = chroma_client.get_or_create_collection("web_rag_demo")

HEADINGS_LEVELS = {"H1","H2","H3"}
MAX_TOKENS_PER_CHUNK = 320
OVERLAP_SENTENCES = 1
REQUEST_TIMEOUT = 12
STOP_SECTIONS = {"contents","see also","references","external links","notes","further reading"}

def stable_hash(s: str) -> str:
    return hashlib.sha1(s.encode("utf-8")).hexdigest()[:16]

def est_tokens(text: str) -> int:
    return max(1, int(len(text)/4))

def save_jsonl(lines, path: Path):
    with path.open("wb") as f:
        for obj in lines:
            f.write(json.dumps(obj, ensure_ascii=False).encode("utf-8"))
            f.write(b"\n")

def fetch_html(url: str) -> str:
    tries, last_exc = 2, None
    for _ in range(tries):
        try:
            r = requests.get(url, timeout=REQUEST_TIMEOUT, headers={"User-Agent":"Mozilla/5.0 (RAG demo)"})
            r.raise_for_status(); return r.text
        except Exception as e:
            last_exc = e; time.sleep(1.0)
    raise last_exc

def extract_main_content(html: str, url: str):
    main_text = trafilatura.extract(html)
    soup = BeautifulSoup(html, "lxml")
    title = (soup.title.string.strip() if soup.title and soup.title.string else "") or ""
    byline, date_str = "", ""
    meta_author = soup.find("meta", attrs={"name":"author"}) or soup.find("meta", attrs={"property":"article:author"})
    if meta_author and meta_author.get("content"): byline = meta_author["content"].strip()
    meta_date = (soup.find("meta", attrs={"property":"article:published_time"})
                 or soup.find("meta", attrs={"name":"date"})
                 or soup.find("meta", attrs={"itemprop":"datePublished"}))
    if meta_date and meta_date.get("content"): date_str = meta_date["content"].strip()
    return title, byline, date_str, (main_text or ""), soup

def html_headings_blocks(soup: BeautifulSoup):
    blocks, current_heading, current_level, current_text_buf = [], None, None, []
    def flush():
        if current_heading is not None:
            blocks.append((current_level, current_heading.strip(), "\n".join(current_text_buf).strip()))
    content_root = soup.body or soup
    for node in content_root.find_all(True, recursive=True):
        name = node.name.upper() if node.name else ""
        if name in HEADINGS_LEVELS:
            flush(); current_heading = node.get_text(separator=" ", strip=True) or ""
            current_level = name; current_text_buf = []
        else:
            if current_heading is not None and name not in {"SCRIPT","STYLE","NOSCRIPT"}:
                txt = node.get_text(separator=" ", strip=True)
                if txt: current_text_buf.append(txt)
    flush(); return blocks

SENT_SPLIT_RE = re.compile(r'(?<=[.!?])\s+')
def sentences(text: str):
    parts = SENT_SPLIT_RE.split(text.strip())
    return [s.strip() for s in parts if s.strip()]

def semantic_chunks_from_section(section_text: str,
                                 max_tokens=MAX_TOKENS_PER_CHUNK,
                                 overlap=OVERLAP_SENTENCES):
    sents = sentences(section_text)
    if not sents: return [], []
    sent_emb = embed_model.encode(sents, normalize_embeddings=True, batch_size=256)
    adj_cos = [float(np.dot(sent_emb[i], sent_emb[i+1])) for i in range(len(sents)-1)]
    chunks, boundaries, i = [], [], 0
    while i < len(sents):
        start, token_count, best_split, best_drop = i, 0, None, -1.0
        while i < len(sents) and token_count < max_tokens:
            token_count += est_tokens(sents[i])
            if i < len(sents)-1:
                drop = 1.0 - adj_cos[i]
                if drop > best_drop: best_drop, best_split = drop, i
            i += 1
        end = len(sents) if i >= len(sents) else (best_split + 1 if best_split and best_split+1 > start else i)
        chunk_sents = sents[start:end]
        stickiness = float(np.mean(adj_cos[start:end-1])) if end-start > 1 else 1.0
        chunks.append({"sentences": chunk_sents, "stickiness": stickiness})
        if end < len(sents):
            prev_last = embed_model.encode([chunk_sents[-1]], normalize_embeddings=True)[0]
            next_first = embed_model.encode([sents[end]], normalize_embeddings=True)[0]
            boundaries.append(1.0 - float(np.dot(prev_last, next_first)))
        i = max(end - overlap, end)
    for ch in chunks:
        ch["text"] = " ".join(ch["sentences"])
        ch["tokens_est"] = est_tokens(ch["text"])
    return chunks, boundaries

def ingest_url(url: str):
    domain, doc_id = urlparse(url).netloc, stable_hash(url)
    html = fetch_html(url); (RAW / f"{doc_id}.html").write_text(html, encoding="utf-8")
    title, byline, date_str, main_text, soup = extract_main_content(html, url)
    blocks = html_headings_blocks(soup)
    if not blocks:
        blocks = [("H1", title or domain, main_text or soup.get_text(" ", strip=True))]

    jsonl_lines, section_rank, doc_text_cursor = [], 0, 0
    for level, heading, body in blocks:
        section_rank += 1
        section_text = (body or "").strip()
        if not section_text:
            continue
        if (heading or "").strip().lower() in STOP_SECTIONS:
            continue
        if len(section_text) < 200:
            continue

        chunks, boundaries = semantic_chunks_from_section(section_text)
        for idx, ch in enumerate(chunks):
            text, tokens_est, stickiness = ch["text"], ch["tokens_est"], ch["stickiness"]
            boundary_score = float(boundaries[idx]) if idx < len(boundaries) else -1.0
            start, end = doc_text_cursor, doc_text_cursor + len(text)
            doc_text_cursor = end
            chunk_id = f"{doc_id}_{section_rank}_{idx}"
            jsonl_lines.append({
                "doc_id": doc_id, "chunk_id": chunk_id, "source_type": "web",
                "title": title or "", "authors": [byline] if byline else [],
                "published_at": date_str or "", "url_or_path": url, "site_domain": domain,
                "section_path": [heading] if heading else [level], "section_rank": section_rank,
                "heading_text": heading or "", "char_start": start, "char_end": end,
                "text": text, "tokens_est": tokens_est,
                "boundary_score": boundary_score, "stickiness_score": stickiness,
            })
    save_jsonl(jsonl_lines, CLEAN / f"{doc_id}.jsonl")
    return doc_id

def ingest_many(urls: List[str]):
    out = []
    for u in tqdm(urls, desc="Ingesting"):
        try: out.append(ingest_url(u))
        except Exception as e: print(f"[WARN] {u}: {e}")
    return out

def _to_primitive(v: Any):
    if isinstance(v, (str, int, float, bool)): return v
    if v is None: return ""
    if isinstance(v, (list, tuple)):
        try: return " / ".join(map(str, v))
        except Exception: return str(v)
    return str(v)

def index_doc_jsonl(jsonl_path: Path):
    with jsonl_path.open("r", encoding="utf-8") as f:
        records = [json.loads(line) for line in f]
    if not records: return

    texts = [r["text"] for r in records]
    vecs = embed_model.encode(texts, normalize_embeddings=True, batch_size=256)

    ids = [r["chunk_id"] for r in records]
    metas = []
    for r in records:
        metas.append({
            "doc_id": _to_primitive(r.get("doc_id")),
            "url": _to_primitive(r.get("url_or_path")),
            "site_domain": _to_primitive(r.get("site_domain")),
            "title": _to_primitive(r.get("title")),
            "section_path": _to_primitive(r.get("section_path")),  # stringify list
            "section_rank": int(r.get("section_rank", 0)),
            "heading": _to_primitive(r.get("heading_text")),
            "published_at": _to_primitive(r.get("published_at")),
            "tokens_est": int(r.get("tokens_est", 0)),
            "boundary_score": float(r.get("boundary_score")) if r.get("boundary_score") is not None else -1.0,
            "stickiness_score": float(r.get("stickiness_score", 0.0)),
        })

    collection.add(
        ids=ids,
        documents=texts,
        metadatas=metas,
        embeddings=[v.tolist() for v in vecs]
    )

def build_index():
    files = sorted(CLEAN.glob("*.jsonl"))
    print(f"Indexing {len(files)} documents …")
    for fp in tqdm(files, desc="Indexing"):
        index_doc_jsonl(fp)

def dense_search(query: str, k: int = 24) -> List[Dict]:
    q_emb = embed_model.encode([query], normalize_embeddings=True).tolist()
    res = collection.query(query_embeddings=q_emb, n_results=k)
    hits = []
    if not res or not res.get("documents") or not res["documents"] or not res["documents"][0]: return hits
    for doc, meta in zip(res["documents"][0], res["metadatas"][0]):
        hits.append({"text": doc, "meta": meta})
    return hits

def group_by_section(hits):
    groups = defaultdict(list)
    for h in hits:
        key = (h["meta"]["doc_id"], h["meta"]["section_rank"], h["meta"]["heading"])
        groups[key].append(h)
    return groups

def rank_sections_then_paras(query: str, k_dense=24, top_sections=4, paras_per_section=2):
    hits = dense_search(query, k=k_dense)
    if not hits: return []
    q_emb = embed_model.encode([query], normalize_embeddings=True)[0]
    for h in hits:
        d_emb = embed_model.encode([h["text"]], normalize_embeddings=True)[0]
        h["sim"] = float(np.dot(q_emb, d_emb))
    groups = group_by_section(hits)
    if not groups: return []
    ranked_sections = []
    for key, items in groups.items():
        sims = [it["sim"] for it in items]
        score = 0.4*len(items) + 0.6*(float(np.mean(sims)) if sims else 0.0)
        ranked_sections.append((key, score, items))
    if not ranked_sections: return []
    ranked_sections.sort(key=lambda x: x[1], reverse=True)
    ranked_sections = ranked_sections[:max(1, top_sections)]
    chosen = []
    for key, _, items in ranked_sections:
        items.sort(key=lambda x: x["sim"], reverse=True)
        chosen.extend(items[:max(1, paras_per_section)])
    return chosen

def rerank(query: str, candidates, final_k=6):
    if not candidates: return []
    pairs = [(query, c["text"]) for c in candidates]
    scores = reranker.predict(pairs) if pairs else []
    for c, s in zip(candidates, scores): c["ce_score"] = float(s)
    candidates.sort(key=lambda x: x.get("ce_score", 0.0), reverse=True)
    return candidates[:final_k] if candidates else []

def judge_filter(candidates, threshold=0.4, min_keep=3):
    if not candidates: return []
    kept = [c for c in candidates if c.get("ce_score", 0.0) >= threshold]
    if len(kept) < min_keep: kept = candidates[:min_keep]
    return kept

def _section_to_str(val):
    if isinstance(val, list): return " / ".join(val)
    if isinstance(val, str):  return val
    return ""

def select_context(query: str,
                   k_dense=24, top_sections=4, paras_per_section=2,
                   final_k=6, judge_threshold=0.4):
    candidates = rank_sections_then_paras(query, k_dense, top_sections, paras_per_section)
    if not candidates:
        basic = dense_search(query, k=max(final_k, 6))
        if not basic: return []
        candidates = basic

    filtered = []
    for c in candidates:
        sec = (c["meta"].get("heading") or "").strip().lower()
        if sec in STOP_SECTIONS:
            continue
        if len(c["text"]) < 200:
            continue
        filtered.append(c)
    if not filtered:
        filtered = candidates

    seen, deduped = set(), []
    for c in filtered:
        m = c["meta"]; key = (m.get("doc_id"), m.get("section_rank"), c["text"][:160])
        if key in seen: continue
        seen.add(key); deduped.append(c)

    reranked = rerank(query, deduped, final_k=max(final_k, 6)) or deduped[:final_k]
    kept = judge_filter(reranked, threshold=judge_threshold, min_keep=min(3, len(reranked)))

    out = []
    for i, c in enumerate(kept, 1):
        m = c["meta"]
        out.append({
            "S": i,
            "url": m.get("url"),
            "title": m.get("title"),
            "section": _section_to_str(m.get("section_path")),
            "heading": m.get("heading"),
            "ce_score": round(c.get("ce_score", 0.0), 3),
            "text": c["text"]
        })
    return out

print("Setup complete. Index path:", INDEX)
print("Chunks currently in index:", collection.count(), " (will grow after ingestion)")


  if not hasattr(np, alias): setattr(np, alias, target)
The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/794 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json: 0.00B [00:00, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/132 [00:00<?, ?B/s]

ERROR:chromadb.telemetry.product.posthog:Failed to send telemetry event ClientStartEvent: capture() takes 1 positional argument but 3 were given
ERROR:chromadb.telemetry.product.posthog:Failed to send telemetry event ClientCreateCollectionEvent: capture() takes 1 positional argument but 3 were given


Setup complete. Index path: /content/web_rag_demo/index/chroma
Chunks currently in index: 0  (will grow after ingestion)


In [3]:
URLS = [
    "https://en.wikipedia.org/wiki/Artificial_intelligence",
    "https://en.wikipedia.org/wiki/Machine_learning",
    "https://en.wikipedia.org/wiki/Natural_language_processing",
]

_ = ingest_many(URLS)

build_index()
print("Chunks in index:", collection.count())

query = "In one paragraph, what is NLP and name two common NLP tasks?"
ctx = select_context(query, k_dense=24, top_sections=4, paras_per_section=2, final_k=6, judge_threshold=0.4)

print(f"\nSelected {len(ctx)} chunks:\n")
for c in ctx:
    print(f"[S{c['S']}] {c['title']} — {c['heading'] or c['section']}  (CE={c['ce_score']})")
    print("URL:", c["url"])
    print(c["text"][:320].replace('\n',' '), "...\n")


Ingesting: 100%|██████████| 3/3 [11:07<00:00, 222.39s/it]


Indexing 3 documents …


Indexing:   0%|          | 0/3 [00:00<?, ?it/s]ERROR:chromadb.telemetry.product.posthog:Failed to send telemetry event CollectionAddEvent: capture() takes 1 positional argument but 3 were given
Indexing: 100%|██████████| 3/3 [04:38<00:00, 92.82s/it]
ERROR:chromadb.telemetry.product.posthog:Failed to send telemetry event CollectionQueryEvent: capture() takes 1 positional argument but 3 were given


Chunks in index: 2002

Selected 3 chunks:

[S1] Natural language processing - Wikipedia — Relational semantics (semantics of individual sentences)  (CE=0.316)
URL: https://en.wikipedia.org/wiki/Natural_language_processing
Semantic parsing Given a piece of text (typically a sentence), produce a formal representation of its semantics, either as a graph (e.g., in AMR parsing ) or in accordance with a logical formalism (e.g., in DRT parsing ). This challenge typically includes aspects of several more elementary NLP tasks from semantics (e.g ...

[S2] Natural language processing - Wikipedia — Natural language processing  (CE=-0.269)
URL: https://en.wikipedia.org/wiki/Natural_language_processing
Semantic parsing Given a piece of text (typically a sentence), produce a formal representation of its semantics, either as a graph (e.g., in AMR parsing ) or in accordance with a logical formalism (e.g., in DRT parsing ). This challenge typically includes aspects of several more elementary NLP tasks 