In [None]:
!pip -q install fastapi uvicorn nest_asyncio pyngrok --upgrade
!pip -q install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121
!pip -q install pymupdf networkx openai sentence-transformers spacy
!python -m spacy download en_core_web_sm -q

In [None]:
import os
#redacted
os.environ["OPENAI_API_KEY"] = "your-api-key"

In [None]:
import os, shutil, sys, json, pickle, math, time, threading
from pathlib import Path

BASE = Path.cwd()
STORE = BASE / "store"
STORE.mkdir(exist_ok=True)

PDF_NAME = "gecu101.pdf"
PDF_PATH = BASE / PDF_NAME

print("Working dir:", BASE)
print("Artifacts dir:", STORE)
print("Expected PDF path:", PDF_PATH)

In [None]:
import fitz
import torch
import torch.nn.functional as F
import networkx as nx
from typing import List, Tuple, Dict, Any
from sentence_transformers import SentenceTransformer

# Global config
CHUNK_TOKENS = 500  # ~ words granularity (simple split)
EMB_MODEL = "sentence-transformers/all-MiniLM-L6-v2"

# Embedding model (torch-backed)
embedder = SentenceTransformer(EMB_MODEL)

def pdf_to_text(path: Path) -> str:
    doc = fitz.open(str(path))
    text = " ".join(page.get_text() for page in doc)
    return text

def text_to_chunks(text: str, tokens_per_chunk: int = CHUNK_TOKENS) -> List[str]:
    words = text.split()
    return [" ".join(words[i:i+tokens_per_chunk]) for i in range(0, len(words), tokens_per_chunk)]

def embed_texts(texts: List[str]) -> torch.Tensor:
    # Returns a (N, D) tensor
    return embedder.encode(texts, convert_to_tensor=True)

# --------- GPT-nano client (triples extraction) ----------
import os
from openai import OpenAI

def get_openai_client():
    key = os.getenv("OPENAI_API_KEY")
    if not key:
        print("⚠️ OPENAI_API_KEY not set. Triple extraction and answering will use a local mock.")
        return None
    return OpenAI(api_key=key)

def extract_triples_llm(text: str, client: Any) -> List[Tuple[str, str, str]]:
    """
    Calls GPT-nano to extract triples. Falls back to a dumb heuristic if no API key.
    """
    if client is None:
        # Simple heuristic fallback (find "X ... Y ... Z" patterns) — very rough!
        triples = []
        lines = [l.strip() for l in text.splitlines() if l.strip()]
        for l in lines[:10]:
            if " is " in l and " by " in l:
                # e.g., "ice is melted by heat" -> ("heat","melts","ice")
                try:
                    left, right = l.split(" is ", 1)
                    pred, tail = right.split(" by ", 1)
                    triples.append((tail.strip().strip("."), pred.strip(), left.strip().strip(".")))
                except:
                    pass
        return triples

    prompt = f"""
You are an information extraction system. From the TEXT, extract knowledge triples as JSON:
[{{"h":"head","r":"relation","t":"tail"}}, ...]
Only include pedagogically relevant scientific facts and processes.

TEXT:
{text}
"""

    resp = client.chat.completions.create(
        model="gpt-4.1-nano",
        messages=[{"role":"user","content":prompt}],
        temperature=0.2,
        max_tokens=500
    )
    out = resp.choices[0].message.content
    # Try to parse a JSON array in the output; fallback to line-based parsing.
    import json, re
    triples: List[Tuple[str,str,str]] = []
    try:
        json_txt = re.search(r"\[.*\]", out, flags=re.S).group(0)  # type: ignore
        arr = json.loads(json_txt)
        for item in arr:
            h, r, t = item.get("h","").strip(), item.get("r","").strip(), item.get("t","").strip()
            if h and r and t:
                triples.append((h,r,t))
    except Exception:
        # Line-based parsing: look for (h, r, t)
        for line in out.splitlines():
            if "(" in line and "," in line and ")" in line:
                clean = line.strip().strip("()")
                parts = [p.strip() for p in clean.split(",")]
                if len(parts) == 3:
                    triples.append(tuple(parts))  # type: ignore
    return triples

def build_kg_from_chunks(chunks: List[str]) -> Tuple[nx.MultiDiGraph, Dict[str, set]]:
    """
    Builds a KG as MultiDiGraph with edges labeled by 'relation' and 'chunk_id'.
    Returns: (graph, node_to_chunks mapping)
    """
    G = nx.MultiDiGraph()
    node_to_chunks: Dict[str, set] = {}
    client = get_openai_client()

    for i, ch in enumerate(chunks):
        triples = extract_triples_llm(ch, client)
        for (h, r, t) in triples:
            h, r, t = h.strip(), r.strip(), t.strip()
            if not (h and r and t):
                continue
            G.add_node(h, type="entity")
            G.add_node(t, type="entity")
            G.add_edge(h, t, relation=r, chunk_id=i)
            node_to_chunks.setdefault(h, set()).add(i)
            node_to_chunks.setdefault(t, set()).add(i)

    return G, node_to_chunks

In [None]:
import json, pickle, torch, networkx as nx
from pathlib import Path

REG = STORE / "subjects.json"

def _save_subject_registry(registry: dict):
    REG.write_text(json.dumps(registry, indent=2))

def _load_subject_registry() -> dict:
    if REG.exists():
        return json.loads(REG.read_text())
    return {}

def ingest_subject(subject_name: str, pdf_paths: list, tokens_per_chunk: int = CHUNK_TOKENS):
    """
    Build per-subject artifacts by concatenating all provided PDFs for that subject.
    """
    subject_dir = STORE / subject_name
    subject_dir.mkdir(exist_ok=True)

    # Concatenate all PDFs
    full_texts = []
    for p in pdf_paths:
        p = Path(p)
        assert p.exists(), f"Missing PDF: {p}"
        full_texts.append(pdf_to_text(p))
    full_text = "\n\n".join(full_texts)

    # Chunk + embed + KG
    chunks = text_to_chunks(full_text, tokens_per_chunk=tokens_per_chunk)
    doc_embs = embed_texts(chunks)
    kg, node_to_chunks = build_kg_from_chunks(chunks)

    # Save
    torch.save(doc_embs, subject_dir / "docs.pt")
    pickle.dump(chunks, open(subject_dir / "chunks.pkl","wb"))
    pickle.dump(kg, open(subject_dir / "kg.pkl","wb"))
    pickle.dump(node_to_chunks, open(subject_dir / "node_to_chunks.pkl","wb"))

    # Update registry
    reg = _load_subject_registry()
    reg[subject_name] = {
        "pdfs": [str(Path(p)) for p in pdf_paths],
        "chunks": len(chunks)
    }
    _save_subject_registry(reg)

    print(f"✅ Ingested subject='{subject_name}' with {len(chunks)} chunks across {len(pdf_paths)} PDF(s).")

In [None]:
subjects = {
    "science": ["gecu101.pdf"],                     # your uploaded NCERT sample (already there)
    "math":    ["gegp101.pdf"],
    "history": ["gees101.pdf"]
}

for subj, pdfs in subjects.items():
    ingest_subject(subj, pdfs)

In [None]:
#@title Subject-aware retrieval (load on demand + routing)
import json, pickle, torch, torch.nn.functional as F
from sentence_transformers import SentenceTransformer

SUBJECT_CACHE = {}  # subject -> dict(chunks, embs, kg, node_to_chunks)
SUBJECT_CENTROIDS = {}  # subject -> mean embedding

def _load_subject(subj: str):
    if subj in SUBJECT_CACHE:
        return SUBJECT_CACHE[subj]

    subject_dir = STORE / subj
    assert subject_dir.exists(), f"Unknown subject '{subj}'. Did you ingest it?"

    chunks = pickle.load(open(subject_dir / "chunks.pkl","rb"))
    embs   = torch.load(subject_dir / "docs.pt")
    kg     = pickle.load(open(subject_dir / "kg.pkl","rb"))
    node_to_chunks = pickle.load(open(subject_dir / "node_to_chunks.pkl","rb"))

    SUBJECT_CACHE[subj] = {
        "chunks": chunks,
        "embs": embs,
        "kg": kg,
        "node_to_chunks": node_to_chunks,
    }
    # centroid for routing
    with torch.no_grad():
        SUBJECT_CENTROIDS[subj] = embs.mean(dim=0)
    return SUBJECT_CACHE[subj]

def list_subjects():
    reg = _load_subject_registry()
    return list(reg.keys())

def vector_search_subject(subj: str, query: str, top_k: int = 5):
    ctx = _load_subject(subj)
    chunks, embs = ctx["chunks"], ctx["embs"]
    q_emb = embedder.encode([query], convert_to_tensor=True)
    sims = F.cosine_similarity(q_emb, embs)
    topk = torch.topk(sims, k=min(top_k, sims.numel()))
    results = [(int(i), chunks[int(i)], float(sims[int(i)])) for i in topk.indices]
    return results

def kg_expand_subject(subj: str, query: str, depth: int = 1, n_passages: int = 6):
    ctx = _load_subject(subj)
    chunks, embs, kg, node_to_chunks = ctx["chunks"], ctx["embs"], ctx["kg"], ctx["node_to_chunks"]

    q_emb = embedder.encode([query], convert_to_tensor=True)
    sims = F.cosine_similarity(q_emb, embs)
    idx = int(torch.argmax(sims))

    seed_entities = [node for node, cids in node_to_chunks.items() if idx in cids]

    expanded_nodes = set()
    for node in seed_entities:
        if node in kg:
            expanded_nodes |= set(nx.single_source_shortest_path_length(kg, node, cutoff=depth).keys())

    candidate_chunks = set()
    for node in expanded_nodes:
        if node in node_to_chunks:
            candidate_chunks |= node_to_chunks[node]

    selected_idx = list(candidate_chunks)[:n_passages]
    return [(i, chunks[i]) for i in selected_idx]

def dual_retrieve_subject(subj: str, query: str, top_k: int = 5, depth: int = 1, n_passages: int = 6):
    v = vector_search_subject(subj, query, top_k=top_k)
    k = kg_expand_subject(subj, query, depth=depth, n_passages=n_passages)
    return v, k

def route_subject(query: str) -> str:
    """Pick subject by nearest centroid; fallback to first if tie/empty."""
    if not SUBJECT_CENTROIDS:
        # lazy load all subjects once
        for s in list_subjects():
            _load_subject(s)

    if not SUBJECT_CENTROIDS:
        return None

    q_emb = embedder.encode([query], convert_to_tensor=True)
    best_subj, best_sim = None, -1e9
    for subj, centroid in SUBJECT_CENTROIDS.items():
        sim = F.cosine_similarity(q_emb, centroid.unsqueeze(0)).item()
        if sim > best_sim:
            best_sim, best_subj = sim, subj
    return best_subj

def dual_retrieve_across_all(query: str, top_k: int = 5, depth: int = 1, n_passages: int = 6):
    """Search each subject; return the result from the subject whose top-1 similarity is highest."""
    candidates = []
    for subj in list_subjects():
        ctx = _load_subject(subj)
        embs = ctx["embs"]
        q_emb = embedder.encode([query], convert_to_tensor=True)
        sims = F.cosine_similarity(q_emb, embs)
        top_idx = int(torch.argmax(sims))
        top_sim = float(sims[top_idx])
        candidates.append((top_sim, subj, top_idx))

    if not candidates:
        return None, [], []

    candidates.sort(reverse=True)
    _, best_subj, _ = candidates[0]
    v, k = dual_retrieve_subject(best_subj, query, top_k=top_k, depth=depth, n_passages=n_passages)
    return best_subj, v, k

In [None]:
import os
from openai import OpenAI

def call_gpt_nano(prompt: str) -> str:
    key = os.getenv("OPENAI_API_KEY")
    if not key:
        # Fallback: simple echo with truncation (for offline demo)
        return "MOCK_ANSWER:\n" + prompt[:800]
    client = OpenAI(api_key=key)
    resp = client.chat.completions.create(
        model="gpt-4.1-nano",
        messages=[{"role":"user","content":prompt}],
        temperature=0.4,
        max_tokens=600
    )
    return resp.choices[0].message.content

def build_prompt(question: str, vec_ctx, kg_ctx):
    vec_text = "\n\n".join([f"[vec:{i}] {t}" for i, t, _ in vec_ctx])
    kg_text  = "\n\n".join([f"[kg:{i}] {t}" for i, t in kg_ctx])
    prompt = f"""You are an expert tutor. Cite the provided contexts when relevant.
Question: {question}

Similarity-based context:
{vec_text}

KG-expanded context:
{kg_text}

Answer step-by-step, and end with a brief recap + 2 practice questions.
"""
    return prompt

In [None]:
#@title FastAPI app (multi-subject)
import nest_asyncio, uvicorn
from fastapi import FastAPI
from pydantic import BaseModel
import threading

app = FastAPI(title="KG-RAG (Torch) API — Multi-subject")

class AskReq(BaseModel):
    q: str
    top_k: int = 5
    depth: int = 1
    n_passages: int = 6
    subject: str | None = None       # optional: pin to a subject
    cross_subject: bool = False      # if True, route & search across all

class IngestReq(BaseModel):
    subject: str
    pdf_paths: list[str]

@app.get("/subjects")
def subjects():
    return {"subjects": list_subjects()}

@app.post("/ingest_subject")
def ingest_api(req: IngestReq):
    ingest_subject(req.subject, req.pdf_paths)
    return {"status": "ok", "subject": req.subject}

@app.post("/ask")
def ask(req: AskReq):
    # 1) choose subject
    subj = req.subject
    if req.cross_subject or not subj:
        # auto-route across all if requested or no subject given
        routed = route_subject(req.q)
        if routed is None:
            return {"error": "No subjects available. Please ingest first."}
        subj = routed

    # 2) retrieve within chosen subject
    vec_ctx, kg_ctx = dual_retrieve_subject(subj, req.q, top_k=req.top_k, depth=req.depth, n_passages=req.n_passages)

    # 3) build prompt & answer
    prompt = build_prompt(req.q, vec_ctx, kg_ctx)
    answer = call_gpt_nano(prompt)

    return {
        "subject": subj,
        "question": req.q,
        "vector_ctx": [{"chunk_id": i, "score": score, "text": t} for i, t, score in vec_ctx],
        "kg_ctx": [{"chunk_id": i, "text": t} for i, t in kg_ctx],
        "answer": answer
    }

def run_server():
    nest_asyncio.apply()
    uvicorn.run(app, host="0.0.0.0", port=8000, log_level="warning")

server_thread = threading.Thread(target=run_server, daemon=True)
server_thread.start()
print("FastAPI started on http://127.0.0.1:8000")

In [None]:
# List subjects
import requests
requests.get("http://127.0.0.1:8000/subjects").json()

In [None]:
# Ask within a pinned subject
requests.post("http://127.0.0.1:8000/ask",
              json={"q":"Explain eclipses", "subject":"science"}).json()

In [None]:
# Ask across all subjects (auto-routing)
requests.post("http://127.0.0.1:8000/ask",
              json={"q":"Derive area of a triangle", "cross_subject": True}).json()