In [None]:
!pip -q install "openai>=1.40.0" "chromadb>=0.5.4" gradio pypdf python-dotenv

[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/67.3 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m67.3/67.3 kB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m19.9/19.9 MB[0m [31m67.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m323.5/323.5 kB[0m [31m17.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m278.2/278.2 kB[0m [31m15.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.0/2.0 MB[0m [31m20.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m103.3/103.3 kB[0m [31m7.2 MB/s[0m eta [36m0:0

In [3]:
import os, json, uuid, traceback
from typing import List, Dict, Any
from string import Template

import gradio as gr
from pypdf import PdfReader
import chromadb
from chromadb import PersistentClient
from openai import OpenAI

from google.colab import userdata


In [4]:
# @title Config + API key
COLLECTION_NAME = "cvs_simple_kb"
EMBED_MODEL = "text-embedding-3-small"
CHAT_MODEL  = "gpt-4o-mini"
CHROMA_DIR  = "/content/chroma_store"   # change to a Drive path if you mount Drive

# Ask for API key if missing
client = OpenAI(api_key= userdata.get('OPENAI_API_KEY'))
chroma_client: PersistentClient = chromadb.PersistentClient(path=CHROMA_DIR)
collection = chroma_client.get_or_create_collection(name=COLLECTION_NAME)


In [5]:
# @title Helpers

def read_pdf_text(path: str) -> str:
    txt = []
    reader = PdfReader(path)
    for p in reader.pages:
        txt.append(p.extract_text() or "")
    return "\n".join(txt)

def read_any_text(path: str) -> str:
    path_l = path.lower()
    if path_l.endswith(".pdf"):
        return read_pdf_text(path)
    with open(path, "r", encoding="utf-8", errors="ignore") as f:
        return f.read()

def embed_texts(texts: List[str]) -> List[List[float]]:
    out = []
    for i in range(0, len(texts), 64):
        batch = texts[i:i+64]
        resp = client.embeddings.create(model=EMBED_MODEL, input=batch)
        out.extend(d.embedding for d in resp.data)
    return out

def _normalize_file_list(files):
    """Gradio File(file_count='multiple', type='filepath') may return list of paths or file objs."""
    if not files:
        return []
    if isinstance(files, (str, os.PathLike)):
        return [str(files)]
    paths = []
    for f in files:
        p = getattr(f, "name", None) or (f if isinstance(f, str) else None)
        if p:
            paths.append(p)
    return paths

def _md_ok(msg):   return f"✅ **{msg}**"
def _md_warn(msg): return f"⚠️ **{msg}**"
def _md_err(msg):  return f"❌ **{msg}**"


In [6]:
# @title LLM: chunk + metadata

# Use $cv_text placeholder so JSON braces don't interfere
CHUNK_META_PROMPT = Template("""You are helping HR index a candidate CV.
Return STRICT JSON with this schema:
{
  "candidate": {
    "name": string|null,
    "email": string|null,
    "phone": string|null,
    "location": string|null,
    "years_experience": number|null,
    "seniority": "Junior"|"Mid"|"Senior"|null,
    "role": string|null,
    "skills": string[]
  },
  "chunks": [
    {
      "text": string,
      "section": string|null
    }
  ]
}

Rules:
- Split into 4–20 coherent chunks (by section/paragraph). Each chunk <= 800 chars.
- Do not invent facts; if unknown, use null or [].
- skills should be concise lowercase tokens.
- Output ONLY JSON.

CV TEXT:
\"\"\"$cv_text\"\"\"""")

def llm_chunk_and_metadata(cv_text: str) -> Dict[str, Any]:
    # keep prompt length reasonable
    msg = CHUNK_META_PROMPT.substitute(cv_text=cv_text[:12000])
    resp = client.chat.completions.create(
        model=CHAT_MODEL,
        messages=[{"role":"user","content": msg}],
        temperature=0
    )
    raw = resp.choices[0].message.content
    try:
        data = json.loads(raw)
    except Exception:
        # Fallback minimal structure if LLM returns invalid JSON
        data = {
            "candidate": {
                "name": None, "email": None, "phone": None, "location": None,
                "years_experience": None, "seniority": None, "role": None, "skills": []
            },
            "chunks": [{"text": cv_text[:800], "section": "full_text"}]
        }
    # sanitize
    chunks = [c for c in data.get("chunks", []) if c and c.get("text")]
    data["chunks"] = chunks
    if "candidate" not in data or not isinstance(data["candidate"], dict):
        data["candidate"] = {
            "name": None, "email": None, "phone": None, "location": None,
            "years_experience": None, "seniority": None, "role": None, "skills": []
        }
    return data


In [7]:
import json, os, uuid, traceback

def _coerce_metadata(md: dict) -> dict:
    """Chroma 0.5 metadata must be scalar/None. Convert lists/dicts safely."""
    out = {}
    for k, v in md.items():
        if isinstance(v, list):
            out[k] = ", ".join(str(x) for x in v) if v else None
        elif isinstance(v, (dict, set, tuple)):
            out[k] = json.dumps(list(v) if not isinstance(v, dict) else v, ensure_ascii=False)
        elif isinstance(v, (str, int, float, bool)) or v is None:
            out[k] = v
        else:
            out[k] = str(v)
    return out

def index_cvs(files, default_tags: str = "cv") -> str:
    try:
        file_paths = _normalize_file_list(files)
        if not file_paths:
            return _md_warn("No files received. Ensure File component is `type=\"filepath\"` and files are selected.")

        tags_input = [t.strip() for t in (default_tags or "").split(",") if t.strip()]
        total_chunks, lines = 0, [f"### Index report ({len(file_paths)} file(s))"]

        for path in file_paths:
            try:
                file_name = os.path.basename(path)
                text = read_any_text(path)
                if not text.strip():
                    lines.append(_md_warn(f"{file_name}: extracted empty text (scanned PDF without OCR?). Skipping."))
                    continue

                parsed = llm_chunk_and_metadata(text)
                cand   = parsed.get("candidate", {}) or {}
                chunks = [c for c in parsed.get("chunks", []) if c.get("text")]
                if not chunks:
                    lines.append(_md_warn(f"{file_name}: LLM returned 0 chunks. Skipping."))
                    continue

                candidate_id = str(uuid.uuid4())
                docs = [c["text"][:2000] for c in chunks]
                embs = embed_texts(docs)
                ids  = [f"{candidate_id}-c{i}" for i in range(len(docs))]

                # Build raw metadata, then coerce types for Chroma
                raw_mds = [{
                    "candidate_id": candidate_id,
                    "file_name": file_name,
                    "section": c.get("section"),
                    "name": cand.get("name"),
                    "email": cand.get("email"),
                    "phone": cand.get("phone"),
                    "location": cand.get("location"),
                    "years_experience": cand.get("years_experience"),
                    "seniority": cand.get("seniority"),
                    "role": cand.get("role"),
                    "skills": cand.get("skills") or [],   # list → will be coerced
                    "tags": tags_input,                   # list → will be coerced
                    "doc_type": "cv",
                } for c in chunks]

                metadatas = [_coerce_metadata(md) for md in raw_mds]

                collection.add(ids=ids, documents=docs, metadatas=metadatas, embeddings=embs)
                total_chunks += len(docs)
                lines.append(_md_ok(f"{file_name}: indexed {len(docs)} chunk(s). Candidate: {cand.get('name') or 'Unknown'}"))

            except Exception as e:
                lines.append(_md_err(f"{os.path.basename(path)}: {e}"))
                lines.append("```text\n" + traceback.format_exc() + "\n```")

        if total_chunks == 0:
            lines.append(_md_warn("No chunks were added. Check errors above (API key, PDF text, etc.)."))
        return "\n\n".join(lines)

    except Exception as e:
        return _md_err(f"Top-level error: {e}") + "\n\n```text\n" + traceback.format_exc() + "\n```"


In [8]:
# @title Search (JD match)

def search_candidates(jd_text: str,
                      top_k_candidates: int = 10,
                      seniority: str = "", location: str = "", role_hint: str = "") -> List[Dict[str, Any]]:
    where = {"doc_type":"cv"}
    if seniority.strip(): where["seniority"] = seniority.strip().title()
    if location.strip():  where["location"]  = location.strip()
    if role_hint.strip(): where["role"]      = role_hint.strip().title()

    qvec = embed_texts([jd_text or ""])[0]
    res = collection.query(
        query_embeddings=[qvec],
        n_results=200,
        where=where,
        include=["documents","metadatas","distances"]
    )
    docs = res.get("documents",[[]])[0]
    metas = res.get("metadatas",[[]])[0]
    dists = res.get("distances",[[]])[0]

    by_cand: Dict[str, Dict[str, Any]] = {}
    for doc, md, dist in zip(docs, metas, dists):
        cid = md.get("candidate_id", "unknown")
        if cid not in by_cand:
            by_cand[cid] = {
                "Score": 999.0,
                "Name": md.get("name"), "Email": md.get("email"), "Phone": md.get("phone"),
                "Location": md.get("location"), "YearsExp": md.get("years_experience"),
                "Seniority": md.get("seniority"), "Role": md.get("role"),
                "Skills": ", ".join(md.get("skills") or []),
                "Snippets": []
            }
        d = float(dist)
        if d < by_cand[cid]["Score"]:
            by_cand[cid]["Score"] = d
        if len(by_cand[cid]["Snippets"]) < 2:
            by_cand[cid]["Snippets"].append(doc[:220] + ("..." if len(doc) > 220 else ""))

    rows = list(by_cand.values())
    rows.sort(key=lambda r: r["Score"])           # lower distance first
    for r in rows:
        r["Score"] = round(1.0/(1.0 + r["Score"]), 4)   # convert to similarity-ish
    return rows[:top_k_candidates]


In [9]:
# --- Q&A: dedupe by candidate and flatten output ---

RAG_SYSTEM = (
    "You are an HR assistant. Answer ONLY from the provided CONTEXT.\n"
    "If the answer is not present, say you don't know. Be concise."
)

def qa_over_cvs_dedup(question: str, top_k_chunks: int = 6,
                      seniority: str = "", location: str = "", role_hint: str = "", keyword: str = ""):
    if not question.strip():
        return "Please enter a question.", []

    # 1) Retrieve (overfetch) then filter
    qvec = embed_texts([question])[0].reshape(1, -1)
    scores, rows = faiss_store.search(qvec, top_k=top_k_chunks * 6)
    scores, rows = scores[0], rows[0]

    # 2) Build context (keep up to top_k_chunks unique chunks after filters)
    ctx_parts = []
    per_candidate = {}  # candidate_id -> {score, name, role, seniority, location, snippet}
    kept = 0
    for s, r in zip(scores, rows):
        if r == -1:
            continue
        _id = faiss_store.row2id[r]
        rec = faiss_store.store[_id]
        md  = rec["metadata"]
        doc = rec["document"]

        # filters
        if seniority and (md.get("seniority") or "") != seniority:   continue
        if location  and (md.get("location")  or "") != location:    continue
        if role_hint and (md.get("role")      or "") != role_hint:   continue
        if keyword   and (keyword.lower() not in (doc or "").lower()): continue

        # add to LLM context until limit
        if kept < top_k_chunks:
            label = f"{md.get('name') or md.get('file_name')} | {md.get('section')}"
            ctx_parts.append(f"[{kept+1} | {label}]\n{doc}")
            kept += 1

        # aggregate per candidate (best score + first good snippet)
        cid = md.get("candidate_id") or _id
        current = per_candidate.get(cid, {
            "candidate": md.get("name") or md.get("file_name"),
            "role": md.get("role") or "",
            "seniority": md.get("seniority") or "",
            "location": md.get("location") or "",
            "score": 0.0,
            "snippet": ""
        })
        if float(s) > current["score"]:
            current["score"] = float(s)
            # keep a short snippet for the table
            current["snippet"] = (doc[:220] + ("..." if len(doc) > 220 else ""))
        per_candidate[cid] = current

    if not ctx_parts:
        return "No relevant information found.", []

    # 3) Ask LLM with grounded context
    ctx = "\n\n".join(ctx_parts)
    messages = [
        {"role":"system","content": RAG_SYSTEM},
        {"role":"user","content": f"QUESTION: {question}\n\nCONTEXT:\n{ctx}"}
    ]
    resp = client.chat.completions.create(model=CHAT_MODEL, messages=messages, temperature=0)
    answer = resp.choices[0].message.content

    # 4) Build a clean, flat table (no objects)
    rows = []
    for i, (_, c) in enumerate(sorted(per_candidate.items(), key=lambda kv: kv[1]["score"], reverse=True), start=1):
        rows.append([
            i,
            c["candidate"],
            c["role"],
            c["seniority"],
            c["location"],
            round(c["score"], 4),
            c["snippet"]
        ])
    return answer, rows


In [None]:
# @title UI
with gr.Blocks(title="Simple HR CV Search — LLM Chunk + Metadata") as app:
    gr.Markdown("### Simple HR RAG: Upload CVs → LLM chunks & metadata → Search/JD → Q&A")

    with gr.Tabs():
        with gr.TabItem("Ingest"):
            up = gr.File(file_count="multiple", type="filepath", label="Upload CVs (PDF/TXT)")
            tags = gr.Textbox(value="cv", label="Default tags (optional)")
            go = gr.Button("Index", variant="primary")
            out = gr.Markdown()
            go.click(index_cvs, inputs=[up, tags], outputs=out)

        with gr.TabItem("Search (JD match)"):
            jd   = gr.Textbox(lines=6, label="Job Description")
            with gr.Row():
                sen = gr.Dropdown(choices=["","Junior","Mid","Senior"], value="", label="Seniority")
                loc = gr.Textbox(label="Location")
                role = gr.Textbox(label="Role")
            topk = gr.Slider(1, 30, value=10, step=1, label="Top-K candidates")
            btn  = gr.Button("Search", variant="primary")
            tbl  = gr.Dataframe(
                wrap=True,
                headers=["Score","Name","Email","Phone","Location","YearsExp","Seniority","Role","Skills","Snippets"],
                interactive=False
            )
            btn.click(search_candidates, inputs=[jd, topk, sen, loc, role], outputs=tbl)

        with gr.TabItem("Q&A"):
            q = gr.Textbox(lines=2, label="Question", placeholder="e.g., Who has hands-on Airflow experience?")
            with gr.Row():
                topk_c = gr.Slider(1, 12, value=6, step=1, label="Top-K chunks")
                sen2 = gr.Dropdown(choices=["","Junior","Mid","Senior"], value="", label="Seniority")
                loc2 = gr.Textbox(label="Location")
                role2 = gr.Textbox(label="Role")
            kw = gr.Textbox(label="Keyword prefilter (optional)", placeholder="e.g. Airflow")
            ask = gr.Button("Ask", variant="primary")
            ans = gr.Markdown()
            cites = gr.Dataframe(headers=["#","candidate","role","seniority","location","distance"], wrap=True, interactive=False)
            ask.click(qa_over_cvs_dedup, inputs=[q, topk_c, sen2, loc2, role2, kw], outputs=[ans, cites])

app.launch(share=True, debug=True)

Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://582ebe95d5879d8506.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


In [None]:
# ===============================
# 1) Install & Imports
# ===============================
!pip -q install "openai>=1.40.0" gradio pypdf python-dotenv faiss-cpu

import os, json, uuid, traceback, math, pickle
from typing import List, Dict, Any, Tuple
from string import Template
from dataclasses import dataclass, asdict

import gradio as gr
from pypdf import PdfReader
import faiss
import numpy as np
from openai import OpenAI

from google.colab import userdata

# ===============================
# 2) Config & Init
# ===============================
EMBED_MODEL = "text-embedding-3-small"
CHAT_MODEL  = "gpt-4o-mini"

# Where we persist the FAISS index & store
STORE_DIR   = "/content/faiss_store"
INDEX_PATH  = os.path.join(STORE_DIR, "index.faiss")
MAP_PATH    = os.path.join(STORE_DIR, "store.json")   # our sidecar: id -> {doc, metadata}
DIM         = 1536                                    # text-embedding-3-small = 1536 dims

os.makedirs(STORE_DIR, exist_ok=True)

client = OpenAI(api_key= userdata.get('OPENAI_API_KEY'))

# ===============================
# 3) Helpers (IO, embeddings, UI)
# ===============================
def read_pdf_text(path: str) -> str:
    txt = []
    reader = PdfReader(path)
    for p in reader.pages:
        txt.append(p.extract_text() or "")
    return "\n".join(txt)

def read_any_text(path: str) -> str:
    return read_pdf_text(path) if path.lower().endswith(".pdf") else open(path, "r", encoding="utf-8", errors="ignore").read()

def embed_texts(texts: List[str]) -> np.ndarray:
    """Return L2-normalized embeddings (needed for cosine via inner product)."""
    out = []
    for i in range(0, len(texts), 64):
        batch = texts[i:i+64]
        resp = client.embeddings.create(model=EMBED_MODEL, input=batch)
        out.extend(d.embedding for d in resp.data)
    arr = np.array(out, dtype="float32")
    # normalize to unit length for cosine via IP
    norms = np.linalg.norm(arr, axis=1, keepdims=True) + 1e-12
    arr = arr / norms
    return arr

def _normalize_file_list(files):
    if not files: return []
    if isinstance(files, (str, os.PathLike)): return [str(files)]
    paths = []
    for f in files:
        p = getattr(f, "name", None) or (f if isinstance(f, str) else None)
        if p: paths.append(p)
    return paths

def _md_ok(msg):   return f"✅ **{msg}**"
def _md_warn(msg): return f"⚠️ **{msg}**"
def _md_err(msg):  return f"❌ **{msg}**"
# ---- Put near your other helpers ----
TABLE_HEADERS = ["Score","Name","Email","Phone","Location","YearsExp","Seniority","Role","Skills","Snippets"]

def _rows_to_table(rows):
    """Flatten list-of-dicts into list-of-lists with only scalar values."""
    table = []
    for r in rows or []:
        table.append([
            r.get("Score", 0),
            r.get("Name") or "",
            r.get("Email") or "",
            r.get("Phone") or "",
            r.get("Location") or "",
            r.get("YearsExp") if r.get("YearsExp") is not None else "",
            r.get("Seniority") or "",
            r.get("Role") or "",
            r.get("Skills") or "",
            " | ".join(r.get("Snippets") or []),  # stringify list
        ])
    return table
def do_search_for_table(jd, topk, sen, loc, role):
    rows = search_candidates(jd_text=jd,
                             top_k_candidates=topk,
                             seniority=sen or "",
                             location=loc or "",
                             role_hint=role or "")
    return _rows_to_table(rows)


# ---- Q&A table helpers ----
CITES_HEADERS = ["#", "Candidate", "Role", "Seniority", "Location", "Score"]

def _cites_to_table(cites):
    table = []
    for c in cites or []:
        # ensure only scalars go into the dataframe
        score = c.get("score", "")
        try:
            score = round(float(score), 4)
        except Exception:
            pass
        table.append([
            c.get("#", ""),
            c.get("candidate") or "",
            c.get("role") or "",
            c.get("seniority") or "",
            c.get("location") or "",
            score,
        ])
    return table

# ===============================
# 4) FAISS Index Manager
# ===============================
class FaissStore:
    """
    - FAISS index (IndexFlatIP) over L2-normalized vectors  => cosine similarity
    - Python dict sidecar: id -> {"document": str, "metadata": {...}}
    """
    def __init__(self, dim: int, index_path: str, map_path: str):
        self.dim = dim
        self.index_path = index_path
        self.map_path = map_path

        self.id2row: Dict[str, int] = {}      # id -> row number in FAISS
        self.row2id: List[str] = []           # row -> id
        self.store: Dict[str, Dict[str, Any]] = {}  # id -> payload

        if os.path.exists(index_path) and os.path.exists(map_path):
            self._load()
        else:
            self.index = faiss.IndexFlatIP(dim)

    def add(self, ids: List[str], vectors: np.ndarray, documents: List[str], metadatas: List[Dict[str, Any]]):
        assert vectors.shape[0] == len(ids) == len(documents) == len(metadatas)
        # Add to FAISS
        self.index.add(vectors)
        # Update mappings
        start_row = len(self.row2id)
        for i, _id in enumerate(ids):
            row = start_row + i
            self.row2id.append(_id)
            self.id2row[_id] = row
            self.store[_id] = {"document": documents[i], "metadata": metadatas[i]}

    def search(self, query_vectors: np.ndarray, top_k: int = 5) -> Tuple[np.ndarray, np.ndarray]:
        """Return (scores, row_indices). Scores are inner product (cosine)."""
        scores, idxs = self.index.search(query_vectors, top_k)
        return scores, idxs

    def save(self):
        faiss.write_index(self.index, self.index_path)
        with open(self.map_path, "w", encoding="utf-8") as f:
            json.dump({
                "row2id": self.row2id,
                "store": self.store
            }, f, ensure_ascii=False)

    def _load(self):
        self.index = faiss.read_index(self.index_path)
        with open(self.map_path, "r", encoding="utf-8") as f:
            data = json.load(f)
        self.row2id = data.get("row2id", [])
        self.store  = data.get("store", {})
        self.id2row = {i: r for r, i in enumerate(self.row2id)}

def do_qa_for_table(q, topk, sen, loc, role, kw):
    answer, cites = qa_over_cvs(
        question=q,
        top_k_chunks=topk,
        seniority=sen or "",
        location=loc or "",
        role_hint=role or "",
        keyword=kw or "",
    )
    return answer, _cites_to_table(cites)


# global store
faiss_store = FaissStore(DIM, INDEX_PATH, MAP_PATH)

# ===============================
# 5) LLM: chunk + metadata (simple)
# ===============================
CHUNK_META_PROMPT = Template("""You are helping HR index a candidate CV.
Return STRICT JSON with this schema:
{
  "candidate": {
    "name": string|null,
    "email": string|null,
    "phone": string|null,
    "location": string|null,
    "years_experience": number|null,
    "seniority": "Junior"|"Mid"|"Senior"|null,
    "role": string|null,
    "skills": string[]
  },
  "chunks": [
    {
      "text": string,
      "section": string|null
    }
  ]
}

Rules:
- Split into 4–20 coherent chunks; each chunk <= 800 characters.
- Do not invent facts; if unknown, use null or [].
- skills must be short lowercase tokens.
- Output ONLY JSON.

CV TEXT:
\"\"\"$cv_text\"\"\"""")

import re, json
from string import Template

# keep the same CHUNK_META_PROMPT you already have
# CHUNK_META_PROMPT = Template(""" ... $cv_text ... """)

NAME_RE  = re.compile(r"(?im)^\s*([A-Z][A-Za-z]+(?:\s+[A-Z][A-Za-z'’\-]+){0,3})\s*$")
EMAIL_RE = re.compile(r"[\w\.-]+@[\w\.-]+\.\w+")
PHONE_RE = re.compile(r"(\+?\d[\d\-\s\(\)]{7,})")

def _fallback_profile_from_text(t: str) -> dict:
    lines = [ln.strip() for ln in (t or "").splitlines() if ln.strip()]
    name = None
    for ln in lines[:8]:
        m = NAME_RE.match(ln)
        if m:
            name = m.group(1)
            break
    email = (EMAIL_RE.search(t) or [None])[0] if EMAIL_RE.search(t) else None
    phone = (PHONE_RE.search(t) or [None])[0] if PHONE_RE.search(t) else None
    return {
        "name": name, "email": email, "phone": phone,
        "location": None, "years_experience": None,
        "seniority": None, "role": None, "skills": []
    }

def llm_chunk_and_metadata(cv_text: str) -> dict:
    """Stricter JSON mode + single retry; if still invalid, fall back to heuristics and multi-split."""
    prompt = CHUNK_META_PROMPT.substitute(cv_text=cv_text[:12000])
    def _ask():
        return client.chat.completions.create(
            model=CHAT_MODEL,
            messages=[{"role":"user","content": prompt}],
            temperature=0,
            response_format={ "type": "json_object" }  # <-- JSON mode (supported by gpt-4o-mini)
        )

    raw = None
    for _ in range(2):  # try twice
        resp = _ask()
        raw = resp.choices[0].message.content
        try:
            data = json.loads(raw)
            # minimal sanity
            if isinstance(data.get("chunks"), list) and len(data["chunks"]) >= 2:
                return data
        except Exception:
            pass

    # If we are here, LLM gave invalid JSON or too few chunks → heuristic fallback
    profile = _fallback_profile_from_text(cv_text)
    # simple multi-split fallback so you don't end up with a single chunk
    text = "\n".join(ln.strip() for ln in cv_text.splitlines())
    chunks, size, overlap = [], 800, 120
    i = 0
    while i < len(text):
        j = min(len(text), i + size)
        window = text[i:j]
        last = max(window.rfind("\n\n"), window.rfind(". "))
        if last != -1 and (j - i) > 200:
            j = i + last + (2 if window[last:last+2] == "\n\n" else 1)
        chunk = text[i:j].strip()
        if chunk:
            chunks.append({"text": chunk, "section": None})
        if j >= len(text): break
        i = max(j - overlap, 0)
        if i == j: break

    return {
        "candidate": profile,
        "chunks": chunks if chunks else [{"text": cv_text[:800], "section": "full_text"}],
    }


# ===============================
# 6) Ingest (multi-file)
# ===============================
def index_cvs(files, default_tags: str = "cv") -> str:
    try:
        file_paths = _normalize_file_list(files)
        if not file_paths:
            return _md_warn("No files selected.")

        tags = [t.strip() for t in (default_tags or "").split(",") if t.strip()]
        total_chunks = 0
        lines = [f"### Index report ({len(file_paths)} file(s))"]

        for path in file_paths:
            try:
                file_name = os.path.basename(path)
                text = read_any_text(path)
                if not text.strip():
                    lines.append(_md_warn(f"{file_name}: empty text (scanned PDF?). Skipping."))
                    continue

                parsed = llm_chunk_and_metadata(text)
                cand   = parsed.get("candidate", {}) or {}
                chunks = [c for c in parsed.get("chunks", []) if c.get("text")]
                if not chunks:
                    lines.append(_md_warn(f"{file_name}: LLM returned 0 chunks. Skipping."))
                    continue

                candidate_id = str(uuid.uuid4())
                docs   = [c["text"][:2000] for c in chunks]  # safety cap
                embs   = embed_texts(docs)
                ids    = [f"{candidate_id}-c{i}" for i in range(len(docs))]
                metas  = [{
                    "candidate_id": candidate_id,
                    "file_name": file_name,
                    "section": c.get("section"),
                    "name": cand.get("name"),
                    "email": cand.get("email"),
                    "phone": cand.get("phone"),
                    "location": cand.get("location"),
                    "years_experience": cand.get("years_experience"),
                    "seniority": cand.get("seniority"),
                    "role": cand.get("role"),
                    "skills": ", ".join(cand.get("skills") or []),   # store as string in sidecar
                    "tags": ", ".join(tags),
                    "doc_type": "cv",
                } for c in chunks]

                # Add to FAISS store (no type restriction headaches)
                faiss_store.add(ids, embs, docs, metas)
                total_chunks += len(docs)
                lines.append(_md_ok(f"{file_name}: indexed {len(docs)} chunk(s). Candidate: {cand.get('name') or 'Unknown'}"))

            except Exception as e:
                lines.append(_md_err(f"{os.path.basename(path)}: {e}"))
                lines.append("```text\n" + traceback.format_exc() + "\n```")

        if total_chunks == 0:
            lines.append(_md_warn("No chunks were added. Check errors above (API key, PDF text, etc.)."))
        else:
            faiss_store.save()
            lines.append(_md_ok(f"Saved index with total chunks: {len(faiss_store.row2id)}"))
        return "\n\n".join(lines)

    except Exception as e:
        return _md_err(f"Top-level error: {e}") + "\n\n```text\n" + traceback.format_exc() + "\n```"

# ===============================
# 7) Search (JD match) — group by candidate
# ===============================
def search_candidates(jd_text: str,
                      top_k_candidates: int = 10,
                      seniority: str = "", location: str = "", role_hint: str = "") -> List[Dict[str, Any]]:
    if not jd_text.strip():
        return []

    qvec = embed_texts([jd_text])[0].reshape(1, -1)
    scores, rows = faiss_store.search(qvec, top_k=200)   # pull wide
    scores, rows = scores[0], rows[0]

    # Gather hits, apply light metadata filters from sidecar
    by_cand: Dict[str, Dict[str, Any]] = {}
    for s, r in zip(scores, rows):
        if r == -1: continue
        _id = faiss_store.row2id[r]
        rec = faiss_store.store[_id]
        md  = rec["metadata"]

        if seniority and (md.get("seniority") or "") != seniority:
            continue
        if location and (md.get("location") or "") != location:
            continue
        if role_hint and (md.get("role") or "") != role_hint:
            continue

        cid = md.get("candidate_id", "unknown")
        if cid not in by_cand:
            by_cand[cid] = {
                "Score": 0.0,
                "Name": md.get("name"),
                "Email": md.get("email"),
                "Phone": md.get("phone"),
                "Location": md.get("location"),
                "YearsExp": md.get("years_experience"),
                "Seniority": md.get("seniority"),
                "Role": md.get("role"),
                "Skills": md.get("skills"),
                "Snippets": []
            }
        by_cand[cid]["Score"] = max(by_cand[cid]["Score"], float(s))  # use best (max) cosine
        if len(by_cand[cid]["Snippets"]) < 2:
            doc = rec["document"]
            by_cand[cid]["Snippets"].append(doc[:220] + ("..." if len(doc) > 220 else ""))

    rows_out = list(by_cand.values())
    rows_out.sort(key=lambda r: r["Score"], reverse=True)
    for r in rows_out:
        r["Score"] = round(r["Score"], 4)
    return rows_out[:top_k_candidates]

# ===============================
# 8) Q&A (grounded) with optional keyword prefilter
# ===============================
# --- Q&A: dedupe by candidate and flatten output ---

RAG_SYSTEM = (
    "You are an HR assistant. Answer ONLY from the provided CONTEXT.\n"
    "If the answer is not present, say you don't know. Be concise."
)

def qa_over_cvs_dedup(question: str, top_k_chunks: int = 6,
                      seniority: str = "", location: str = "", role_hint: str = "", keyword: str = ""):
    if not question.strip():
        return "Please enter a question.", []

    # 1) Retrieve (overfetch) then filter
    qvec = embed_texts([question])[0].reshape(1, -1)
    scores, rows = faiss_store.search(qvec, top_k=top_k_chunks * 6)
    scores, rows = scores[0], rows[0]

    # 2) Build context (keep up to top_k_chunks unique chunks after filters)
    ctx_parts = []
    per_candidate = {}  # candidate_id -> {score, name, role, seniority, location, snippet}
    kept = 0
    for s, r in zip(scores, rows):
        if r == -1:
            continue
        _id = faiss_store.row2id[r]
        rec = faiss_store.store[_id]
        md  = rec["metadata"]
        doc = rec["document"]

        # filters
        if seniority and (md.get("seniority") or "") != seniority:   continue
        if location  and (md.get("location")  or "") != location:    continue
        if role_hint and (md.get("role")      or "") != role_hint:   continue
        if keyword   and (keyword.lower() not in (doc or "").lower()): continue

        # add to LLM context until limit
        if kept < top_k_chunks:
            label = f"{md.get('name') or md.get('file_name')} | {md.get('section')}"
            ctx_parts.append(f"[{kept+1} | {label}]\n{doc}")
            kept += 1

        # aggregate per candidate (best score + first good snippet)
        cid = md.get("candidate_id") or _id
        current = per_candidate.get(cid, {
            "candidate": md.get("name") or md.get("file_name"),
            "role": md.get("role") or "",
            "seniority": md.get("seniority") or "",
            "location": md.get("location") or "",
            "score": 0.0,
            "snippet": ""
        })
        if float(s) > current["score"]:
            current["score"] = float(s)
            # keep a short snippet for the table
            current["snippet"] = (doc[:220] + ("..." if len(doc) > 220 else ""))
        per_candidate[cid] = current

    if not ctx_parts:
        return "No relevant information found.", []

    # 3) Ask LLM with grounded context
    ctx = "\n\n".join(ctx_parts)
    messages = [
        {"role":"system","content": RAG_SYSTEM},
        {"role":"user","content": f"QUESTION: {question}\n\nCONTEXT:\n{ctx}"}
    ]
    resp = client.chat.completions.create(model=CHAT_MODEL, messages=messages, temperature=0)
    answer = resp.choices[0].message.content

    # 4) Build a clean, flat table (no objects)
    rows = []
    for i, (_, c) in enumerate(sorted(per_candidate.items(), key=lambda kv: kv[1]["score"], reverse=True), start=1):
        rows.append([
            i,
            c["candidate"],
            c["role"],
            c["seniority"],
            c["location"],
            round(c["score"], 4),
            c["snippet"]
        ])
    return answer, rows

QA_HEADERS = ["#", "Candidate", "Role", "Seniority", "Location", "Score", "Snippet"]
# ===============================
# 9) Gradio UI
# ===============================
with gr.Blocks(title="HR CV Search (FAISS) — LLM Chunk + Metadata") as app:
    gr.Markdown("### HR CV RAG (FAISS): Upload CVs → LLM chunks & metadata → Search/JD → Q&A")

    with gr.Tabs():
        with gr.TabItem("Ingest"):
            up = gr.File(file_count="multiple", type="filepath", label="Upload CVs (PDF/TXT)")
            tags = gr.Textbox(value="cv", label="Default tags (optional)")  # saved in sidecar only
            go = gr.Button("Index", variant="primary")
            out = gr.Markdown()
            go.click(index_cvs, inputs=[up, tags], outputs=out)

        with gr.TabItem("Search (JD match)"):
            jd   = gr.Textbox(lines=6, label="Job Description")
            with gr.Row():
                sen = gr.Dropdown(choices=["","Junior","Mid","Senior"], value="", label="Seniority")
                loc = gr.Textbox(label="Location")
                role = gr.Textbox(label="Role")
            topk = gr.Slider(1, 30, value=10, step=1, label="Top-K candidates")
            btn  = gr.Button("Search", variant="primary")

            tbl  = gr.Dataframe(
                headers=TABLE_HEADERS,
                wrap=True,
                interactive=False,
                row_count=(0, "dynamic"),
                col_count=(len(TABLE_HEADERS), "fixed"),
            )

        btn.click(do_search_for_table, inputs=[jd, topk, sen, loc, role], outputs=tbl)


        # --- Replace only the Q&A tab in your UI block ---
        with gr.TabItem("Q&A"):
            q = gr.Textbox(lines=2, label="Question", placeholder="e.g., Who has hands-on RAG experience?")
            with gr.Row():
                topk_c = gr.Slider(1, 12, value=6, step=1, label="Top-K chunks (for context)")
                sen2 = gr.Dropdown(choices=["","Junior","Mid","Senior"], value="", label="Seniority")
                loc2 = gr.Textbox(label="Location")
                role2 = gr.Textbox(label="Role")
            kw = gr.Textbox(label="Keyword prefilter (optional)", placeholder="e.g. Airflow")

            ask = gr.Button("Ask", variant="primary")
            ans = gr.Markdown()
            cites = gr.Dataframe(headers=QA_HEADERS, wrap=True, interactive=False,
                              row_count=(0, "dynamic"), col_count=(len(QA_HEADERS), "fixed"))

        ask.click(qa_over_cvs_dedup,
                  inputs=[q, topk_c, sen2, loc2, role2, kw],
                  outputs=[ans, cites])



app.launch(share=True, debug=True)


Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://6263c0a9173d8e40ec.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


Keyboard interruption in main thread... closing server.
Killing tunnel 127.0.0.1:7861 <> https://6263c0a9173d8e40ec.gradio.live




In [None]:
!pip -q install "openai>=1.40.0" gradio pypdf faiss-cpu


In [None]:
import os, json, uuid, traceback, math, pickle
from typing import List, Dict, Any, Tuple
from string import Template
from dataclasses import dataclass, asdict

import gradio as gr
from pypdf import PdfReader
import faiss
import numpy as np
from openai import OpenAI

from google.colab import userdata

In [None]:
# 2) Config & Init
# ===============================
EMBED_MODEL = "text-embedding-3-small"
CHAT_MODEL  = "gpt-4o-mini"

# Where we persist the FAISS index & store
STORE_DIR   = "/content/faiss_store"
INDEX_PATH  = os.path.join(STORE_DIR, "index.faiss")
MAP_PATH    = os.path.join(STORE_DIR, "store.json")   # our sidecar: id -> {doc, metadata}
DIM         = 1536                                    # text-embedding-3-small = 1536 dims

os.makedirs(STORE_DIR, exist_ok=True)

client = OpenAI(api_key= userdata.get('OPENAI_API_KEY'))

In [None]:
def read_pdf_text(path: str) -> str:
    txt = []
    reader = PdfReader(path)
    for p in reader.pages:
        txt.append(p.extract_text() or "")
    return "\n".join(txt)

In [None]:
def read_any_text(path: str) -> str:
    return read_pdf_text(path) if path.lower().endswith(".pdf") else open(path, "r", encoding="utf-8", errors="ignore").read()

def embed_texts(texts: List[str]) -> np.ndarray:
    """Return L2-normalized embeddings (needed for cosine via inner product)."""
    out = []
    for i in range(0, len(texts), 64):
        batch = texts[i:i+64]
        resp = client.embeddings.create(model=EMBED_MODEL, input=batch)
        out.extend(d.embedding for d in resp.data)
    arr = np.array(out, dtype="float32")
    # normalize to unit length for cosine via IP
    norms = np.linalg.norm(arr, axis=1, keepdims=True) + 1e-12
    arr = arr / norms
    return arr

def _normalize_file_list(files):
    if not files: return []
    if isinstance(files, (str, os.PathLike)): return [str(files)]
    paths = []
    for f in files:
        p = getattr(f, "name", None) or (f if isinstance(f, str) else None)
        if p: paths.append(p)
    return paths

def _md_ok(msg):   return f"✅ **{msg}**"
def _md_warn(msg): return f"⚠️ **{msg}**"
def _md_err(msg):  return f"❌ **{msg}**"
# ---- Put near your other helpers ----
TABLE_HEADERS = ["Score","Name","Email","Phone","Location","YearsExp","Seniority","Role","Skills","Snippets"]

def _rows_to_table(rows):
    """Flatten list-of-dicts into list-of-lists with only scalar values."""
    table = []
    for r in rows or []:
        table.append([
            r.get("Score", 0),
            r.get("Name") or "",
            r.get("Email") or "",
            r.get("Phone") or "",
            r.get("Location") or "",
            r.get("YearsExp") if r.get("YearsExp") is not None else "",
            r.get("Seniority") or "",
            r.get("Role") or "",
            r.get("Skills") or "",
            " | ".join(r.get("Snippets") or []),  # stringify list
        ])
    return table
def do_search_for_table(jd, topk, sen, loc, role):
    rows = search_candidates(jd_text=jd,
                             top_k_candidates=topk,
                             seniority=sen or "",
                             location=loc or "",
                             role_hint=role or "")
    return _rows_to_table(rows)


# ---- Q&A table helpers ----
CITES_HEADERS = ["#", "Candidate", "Role", "Seniority", "Location", "Score"]

def _cites_to_table(cites):
    table = []
    for c in cites or []:
        # ensure only scalars go into the dataframe
        score = c.get("score", "")
        try:
            score = round(float(score), 4)
        except Exception:
            pass
        table.append([
            c.get("#", ""),
            c.get("candidate") or "",
            c.get("role") or "",
            c.get("seniority") or "",
            c.get("location") or "",
            score,
        ])
    return table

In [None]:
# ===============================
# 4) FAISS Index Manager
# ===============================
class FaissStore:
    """
    - FAISS index (IndexFlatIP) over L2-normalized vectors  => cosine similarity
    - Python dict sidecar: id -> {"document": str, "metadata": {...}}
    """
    def __init__(self, dim: int, index_path: str, map_path: str):
        self.dim = dim
        self.index_path = index_path
        self.map_path = map_path

        self.id2row: Dict[str, int] = {}      # id -> row number in FAISS
        self.row2id: List[str] = []           # row -> id
        self.store: Dict[str, Dict[str, Any]] = {}  # id -> payload

        if os.path.exists(index_path) and os.path.exists(map_path):
            self._load()
        else:
            self.index = faiss.IndexFlatIP(dim)

    def add(self, ids: List[str], vectors: np.ndarray, documents: List[str], metadatas: List[Dict[str, Any]]):
        assert vectors.shape[0] == len(ids) == len(documents) == len(metadatas)
        # Add to FAISS
        self.index.add(vectors)
        # Update mappings
        start_row = len(self.row2id)
        for i, _id in enumerate(ids):
            row = start_row + i
            self.row2id.append(_id)
            self.id2row[_id] = row
            self.store[_id] = {"document": documents[i], "metadata": metadatas[i]}

    def search(self, query_vectors: np.ndarray, top_k: int = 5) -> Tuple[np.ndarray, np.ndarray]:
        """Return (scores, row_indices). Scores are inner product (cosine)."""
        scores, idxs = self.index.search(query_vectors, top_k)
        return scores, idxs

    def save(self):
        faiss.write_index(self.index, self.index_path)
        with open(self.map_path, "w", encoding="utf-8") as f:
            json.dump({
                "row2id": self.row2id,
                "store": self.store
            }, f, ensure_ascii=False)

    def _load(self):
        self.index = faiss.read_index(self.index_path)
        with open(self.map_path, "r", encoding="utf-8") as f:
            data = json.load(f)
        self.row2id = data.get("row2id", [])
        self.store  = data.get("store", {})
        self.id2row = {i: r for r, i in enumerate(self.row2id)}

def do_qa_for_table(q, topk, sen, loc, role, kw):
    answer, cites = qa_over_cvs(
        question=q,
        top_k_chunks=topk,
        seniority=sen or "",
        location=loc or "",
        role_hint=role or "",
        keyword=kw or "",
    )
    return answer, _cites_to_table(cites)


# global store
faiss_store = FaissStore(DIM, INDEX_PATH, MAP_PATH)


In [None]:

# ===============================
# 5) LLM: chunk + metadata (simple)
# ===============================
CHUNK_META_PROMPT = Template("""You are helping HR index a candidate CV.
Return STRICT JSON with this schema:
{
  "candidate": {
    "name": string|null,
    "email": string|null,
    "phone": string|null,
    "location": string|null,
    "years_experience": number|null,
    "seniority": "Junior"|"Mid"|"Senior"|null,
    "role": string|null,
    "skills": string[]
  },
  "chunks": [
    {
      "text": string,
      "section": string|null
    }
  ]
}

Rules:
- Split into 4–20 coherent chunks; each chunk <= 800 characters.
- Do not invent facts; if unknown, use null or [].
- skills must be short lowercase tokens.
- Output ONLY JSON.

CV TEXT:
\"\"\"$cv_text\"\"\"""")


import re, json
from string import Template

# keep the same CHUNK_META_PROMPT you already have
# CHUNK_META_PROMPT = Template(""" ... $cv_text ... """)

NAME_RE  = re.compile(r"(?im)^\s*([A-Z][A-Za-z]+(?:\s+[A-Z][A-Za-z'’\-]+){0,3})\s*$")
EMAIL_RE = re.compile(r"[\w\.-]+@[\w\.-]+\.\w+")
PHONE_RE = re.compile(r"(\+?\d[\d\-\s\(\)]{7,})")

def _fallback_profile_from_text(t: str) -> dict:
    lines = [ln.strip() for ln in (t or "").splitlines() if ln.strip()]
    name = None
    for ln in lines[:8]:
        m = NAME_RE.match(ln)
        if m:
            name = m.group(1)
            break
    email = (EMAIL_RE.search(t) or [None])[0] if EMAIL_RE.search(t) else None
    phone = (PHONE_RE.search(t) or [None])[0] if PHONE_RE.search(t) else None
    return {
        "name": name, "email": email, "phone": phone,
        "location": None, "years_experience": None,
        "seniority": None, "role": None, "skills": []
    }



def llm_chunk_and_metadata(cv_text: str) -> dict:
    """Stricter JSON mode + single retry; if still invalid, fall back to heuristics and multi-split."""
    prompt = CHUNK_META_PROMPT.substitute(cv_text=cv_text[:12000])
    def _ask():
        return client.chat.completions.create(
            model=CHAT_MODEL,
            messages=[{"role":"user","content": prompt}],
            temperature=0,
            response_format={ "type": "json_object" }  # <-- JSON mode (supported by gpt-4o-mini)
        )

    raw = None
    for _ in range(2):  # try twice
        resp = _ask()
        raw = resp.choices[0].message.content
        try:
            data = json.loads(raw)
            # minimal sanity
            if isinstance(data.get("chunks"), list) and len(data["chunks"]) >= 2:
                return data
        except Exception:
            pass

    # If we are here, LLM gave invalid JSON or too few chunks → heuristic fallback
    profile = _fallback_profile_from_text(cv_text)
    # simple multi-split fallback so you don't end up with a single chunk
    text = "\n".join(ln.strip() for ln in cv_text.splitlines())
    chunks, size, overlap = [], 800, 120
    i = 0
    while i < len(text):
        j = min(len(text), i + size)
        window = text[i:j]
        last = max(window.rfind("\n\n"), window.rfind(". "))
        if last != -1 and (j - i) > 200:
            j = i + last + (2 if window[last:last+2] == "\n\n" else 1)
        chunk = text[i:j].strip()
        if chunk:
            chunks.append({"text": chunk, "section": None})
        if j >= len(text): break
        i = max(j - overlap, 0)
        if i == j: break

    return {
        "candidate": profile,
        "chunks": chunks if chunks else [{"text": cv_text[:800], "section": "full_text"}],
    }



In [None]:
===============================
# 6) Ingest (multi-file)
# ===============================
def index_cvs(files, default_tags: str = "cv") -> str:
    try:
        file_paths = _normalize_file_list(files)
        if not file_paths:
            return _md_warn("No files selected.")

        tags = [t.strip() for t in (default_tags or "").split(",") if t.strip()]
        total_chunks = 0
        lines = [f"### Index report ({len(file_paths)} file(s))"]

        for path in file_paths:
            try:
                file_name = os.path.basename(path)
                text = read_any_text(path)
                if not text.strip():
                    lines.append(_md_warn(f"{file_name}: empty text (scanned PDF?). Skipping."))
                    continue

                parsed = llm_chunk_and_metadata(text)
                cand   = parsed.get("candidate", {}) or {}
                chunks = [c for c in parsed.get("chunks", []) if c.get("text")]
                if not chunks:
                    lines.append(_md_warn(f"{file_name}: LLM returned 0 chunks. Skipping."))
                    continue

                candidate_id = str(uuid.uuid4())
                docs   = [c["text"][:2000] for c in chunks]  # safety cap
                embs   = embed_texts(docs)
                ids    = [f"{candidate_id}-c{i}" for i in range(len(docs))]
                metas  = [{
                    "candidate_id": candidate_id,
                    "file_name": file_name,
                    "section": c.get("section"),
                    "name": cand.get("name"),
                    "email": cand.get("email"),
                    "phone": cand.get("phone"),
                    "location": cand.get("location"),
                    "years_experience": cand.get("years_experience"),
                    "seniority": cand.get("seniority"),
                    "role": cand.get("role"),
                    "skills": ", ".join(cand.get("skills") or []),   # store as string in sidecar
                    "tags": ", ".join(tags),
                    "doc_type": "cv",
                } for c in chunks]

                # Add to FAISS store (no type restriction headaches)
                faiss_store.add(ids, embs, docs, metas)
                total_chunks += len(docs)
                lines.append(_md_ok(f"{file_name}: indexed {len(docs)} chunk(s). Candidate: {cand.get('name') or 'Unknown'}"))

            except Exception as e:
                lines.append(_md_err(f"{os.path.basename(path)}: {e}"))
                lines.append("```text\n" + traceback.format_exc() + "\n```")

        if total_chunks == 0:
            lines.append(_md_warn("No chunks were added. Check errors above (API key, PDF text, etc.)."))
        else:
            faiss_store.save()
            lines.append(_md_ok(f"Saved index with total chunks: {len(faiss_store.row2id)}"))
        return "\n\n".join(lines)

    except Exception as e:
        return _md_err(f"Top-level error: {e}") + "\n\n```text\n" + traceback.format_exc() + "\n```"

In [None]:
# ===============================
# 7) Search (JD match) — group by candidate
# ===============================
def search_candidates(jd_text: str,
                      top_k_candidates: int = 10,
                      seniority: str = "", location: str = "", role_hint: str = "") -> List[Dict[str, Any]]:
    if not jd_text.strip():
        return []

    qvec = embed_texts([jd_text])[0].reshape(1, -1)
    scores, rows = faiss_store.search(qvec, top_k=200)   # pull wide
    scores, rows = scores[0], rows[0]

    # Gather hits, apply light metadata filters from sidecar
    by_cand: Dict[str, Dict[str, Any]] = {}
    for s, r in zip(scores, rows):
        if r == -1: continue
        _id = faiss_store.row2id[r]
        rec = faiss_store.store[_id]
        md  = rec["metadata"]

        if seniority and (md.get("seniority") or "") != seniority:
            continue
        if location and (md.get("location") or "") != location:
            continue
        if role_hint and (md.get("role") or "") != role_hint:
            continue

        cid = md.get("candidate_id", "unknown")
        if cid not in by_cand:
            by_cand[cid] = {
                "Score": 0.0,
                "Name": md.get("name"),
                "Email": md.get("email"),
                "Phone": md.get("phone"),
                "Location": md.get("location"),
                "YearsExp": md.get("years_experience"),
                "Seniority": md.get("seniority"),
                "Role": md.get("role"),
                "Skills": md.get("skills"),
                "Snippets": []
            }
        by_cand[cid]["Score"] = max(by_cand[cid]["Score"], float(s))  # use best (max) cosine
        if len(by_cand[cid]["Snippets"]) < 2:
            doc = rec["document"]
            by_cand[cid]["Snippets"].append(doc[:220] + ("..." if len(doc) > 220 else ""))

    rows_out = list(by_cand.values())
    rows_out.sort(key=lambda r: r["Score"], reverse=True)
    for r in rows_out:
        r["Score"] = round(r["Score"], 4)
    return rows_out[:top_k_candidates]

# ===============================
# 8) Q&A (grounded) with optional keyword prefilter
# ===============================
# --- Q&A: dedupe by candidate and flatten output ---

RAG_SYSTEM = (
    "You are an HR assistant. Answer ONLY from the provided CONTEXT.\n"
    "If the answer is not present, say you don't know. Be concise."
)

def qa_over_cvs_dedup(question: str, top_k_chunks: int = 6,
                      seniority: str = "", location: str = "", role_hint: str = "", keyword: str = ""):
    if not question.strip():
        return "Please enter a question.", []

    # 1) Retrieve (overfetch) then filter
    qvec = embed_texts([question])[0].reshape(1, -1)
    scores, rows = faiss_store.search(qvec, top_k=top_k_chunks * 6)
    scores, rows = scores[0], rows[0]

    # 2) Build context (keep up to top_k_chunks unique chunks after filters)
    ctx_parts = []
    per_candidate = {}  # candidate_id -> {score, name, role, seniority, location, snippet}
    kept = 0
    for s, r in zip(scores, rows):
        if r == -1:
            continue
        _id = faiss_store.row2id[r]
        rec = faiss_store.store[_id]
        md  = rec["metadata"]
        doc = rec["document"]

        # filters
        if seniority and (md.get("seniority") or "") != seniority:   continue
        if location  and (md.get("location")  or "") != location:    continue
        if role_hint and (md.get("role")      or "") != role_hint:   continue
        if keyword   and (keyword.lower() not in (doc or "").lower()): continue

        # add to LLM context until limit
        if kept < top_k_chunks:
            label = f"{md.get('name') or md.get('file_name')} | {md.get('section')}"
            ctx_parts.append(f"[{kept+1} | {label}]\n{doc}")
            kept += 1

        # aggregate per candidate (best score + first good snippet)
        cid = md.get("candidate_id") or _id
        current = per_candidate.get(cid, {
            "candidate": md.get("name") or md.get("file_name"),
            "role": md.get("role") or "",
            "seniority": md.get("seniority") or "",
            "location": md.get("location") or "",
            "score": 0.0,
            "snippet": ""
        })
        if float(s) > current["score"]:
            current["score"] = float(s)
            # keep a short snippet for the table
            current["snippet"] = (doc[:220] + ("..." if len(doc) > 220 else ""))
        per_candidate[cid] = current

    if not ctx_parts:
        return "No relevant information found.", []

    # 3) Ask LLM with grounded context
    ctx = "\n\n".join(ctx_parts)
    messages = [
        {"role":"system","content": RAG_SYSTEM},
        {"role":"user","content": f"QUESTION: {question}\n\nCONTEXT:\n{ctx}"}
    ]
    resp = client.chat.completions.create(model=CHAT_MODEL, messages=messages, temperature=0)
    answer = resp.choices[0].message.content

    # 4) Build a clean, flat table (no objects)
    rows = []
    for i, (_, c) in enumerate(sorted(per_candidate.items(), key=lambda kv: kv[1]["score"], reverse=True), start=1):
        rows.append([
            i,
            c["candidate"],
            c["role"],
            c["seniority"],
            c["location"],
            round(c["score"], 4),
            c["snippet"]
        ])
    return answer, rows

QA_HEADERS = ["#", "Candidate", "Role", "Seniority", "Location", "Score", "Snippet"]

In [None]:
with gr.Blocks(title="HR CV Search (FAISS) — LLM Chunk + Metadata") as app:
    gr.Markdown("### HR CV RAG (FAISS): Upload CVs → LLM chunks & metadata → Search/JD → Q&A")

    with gr.Tabs():
        with gr.TabItem("Ingest"):
            up = gr.File(file_count="multiple", type="filepath", label="Upload CVs (PDF/TXT)")
            tags = gr.Textbox(value="cv", label="Default tags (optional)")  # saved in sidecar only
            go = gr.Button("Index", variant="primary")
            out = gr.Markdown()
            go.click(index_cvs, inputs=[up, tags], outputs=out)

        with gr.TabItem("Search (JD match)"):
            jd   = gr.Textbox(lines=6, label="Job Description")
            with gr.Row():
                sen = gr.Dropdown(choices=["","Junior","Mid","Senior"], value="", label="Seniority")
                loc = gr.Textbox(label="Location")
                role = gr.Textbox(label="Role")
            topk = gr.Slider(1, 30, value=10, step=1, label="Top-K candidates")
            btn  = gr.Button("Search", variant="primary")

            tbl  = gr.Dataframe(
                headers=TABLE_HEADERS,
                wrap=True,
                interactive=False,
                row_count=(0, "dynamic"),
                col_count=(len(TABLE_HEADERS), "fixed"),
            )

        btn.click(do_search_for_table, inputs=[jd, topk, sen, loc, role], outputs=tbl)


        # --- Replace only the Q&A tab in your UI block ---
        with gr.TabItem("Q&A"):
            q = gr.Textbox(lines=2, label="Question", placeholder="e.g., Who has hands-on RAG experience?")
            with gr.Row():
                topk_c = gr.Slider(1, 12, value=6, step=1, label="Top-K chunks (for context)")
                sen2 = gr.Dropdown(choices=["","Junior","Mid","Senior"], value="", label="Seniority")
                loc2 = gr.Textbox(label="Location")
                role2 = gr.Textbox(label="Role")
            kw = gr.Textbox(label="Keyword prefilter (optional)", placeholder="e.g. Airflow")

            ask = gr.Button("Ask", variant="primary")
            ans = gr.Markdown()
            cites = gr.Dataframe(headers=QA_HEADERS, wrap=True, interactive=False,
                              row_count=(0, "dynamic"), col_count=(len(QA_HEADERS), "fixed"))

        ask.click(qa_over_cvs_dedup,
                  inputs=[q, topk_c, sen2, loc2, role2, kw],
                  outputs=[ans, cites])



app.launch(share=True, debug=True)

Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://0eaf5e98a4cb8ad253.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)
