In [23]:
print("ashish is great")

ashish is great


In [36]:
# ==== Cell 1: Imports + Gemini setup (LangChain Google GenAI ONLY) ====

# core libs you asked for
from langchain.vectorstores import FAISS
from langchain.document_loaders import WebBaseLoader, PyMuPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter

# langchain google genai provider
from langchain_google_genai import ChatGoogleGenerativeAI, GoogleGenerativeAIEmbeddings

# env + key
import os
API_KEY = "AIzaSyByRrLMm4_UhLecvKKsJOPp_M76YVwjyuk"  # <-- your key

if not API_KEY.strip():
    raise RuntimeError("Set API_KEY first.")

# set both names some libs look for
os.environ["GOOGLE_API_KEY"] = API_KEY
os.environ["GEMINI_API_KEY"] = API_KEY
# avoid ADC fallback
os.environ.pop("GOOGLE_APPLICATION_CREDENTIALS", None)

# model handles (LangChain Google GenAI ONLY)
llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash", temperature=0, api_key=API_KEY)
embedding_model = GoogleGenerativeAIEmbeddings(model="models/text-embedding-004", google_api_key=API_KEY)

# quick sanity check (no google.generativeai used)
try:
    _ = embedding_model.embed_query("hello world")
    print("[ok] embeddings (text-embedding-004) working")
except Exception as e:
    raise RuntimeError(f"Embeddings init failed: {e}")

print("[ok] LLM (gemini-2.0-flash) ready")
print("[ok] Imports + key setup complete (no google.generativeai used)")


[ok] embeddings (text-embedding-004) working
[ok] LLM (gemini-2.0-flash) ready
[ok] Imports + key setup complete (no google.generativeai used)


In [None]:
# ==== Cell 2: State + Options + Prompts (no executions yet) ====
# Sets up:
#   • Test inputs (RESUME_FILE / RESUME_TEXT, JD_FILE / JD_TEXT)
#   • Run options/thresholds
#   • Flexible STATE dict + artifact paths
#   • Core prompts (resume detector + section classifier + extractors)
# Nothing heavy runs here; next cell will load, chunk, embed, and detect.

# stdlib
import json, re, uuid
from pathlib import Path
from typing import Optional, Dict, Any, List, TypedDict

# ---------- Test inputs (edit these) ----------
RESUME_FILE: Optional[str] = "JALADI ASHISH RESUME_oracle.pdf"                 # e.g., "samples/alice_resume.pdf"
RESUME_TEXT: Optional[str] = None                 # paste resume text if no file

JD_FILE: Optional[str] = None                     # e.g., "samples/senior_de_jd.txt"
JD_TEXT: Optional[str] = """\
🏷 Job Title

Associate Software Engineer

📋 Role Summary

As an Associate Software Engineer, you will work closely under the guidance of senior engineers to design, develop, test, and maintain software applications. This role is ideal for early-career developers who are proactive, willing to learn, and ready to contribute to production code.

🔧 Responsibilities

Assist in the analysis, design, development, and implementation of software modules and features

Write clean, maintainable, and efficient code following best practices

Participate in code reviews and give/receive feedback to ensure code quality

Debug, troubleshoot, and fix defects in existing applications

Create and execute unit tests to validate code correctness

Contribute to system documentation including API specs, architecture diagrams, and user manuals

Collaborate with cross-functional teams (product, QA, UX) to gather requirements and support feature delivery

Adhere to software development lifecycle (SDLC) practices: planning, development, testing, deployment

Stay up to date with emerging technologies and propose improvements to the tech stack

🎯 Qualifications / Skills Required

Bachelor’s degree in Computer Science, Software Engineering, or a related field

0–2 years of software development experience (including internships, coop, or academic projects)

Proficiency in one or more programming languages (e.g. Python, Java, C++, JavaScript)

Good understanding of data structures, algorithms, and object-oriented programming

Experience with version control systems (e.g. Git)

Familiarity with RESTful APIs, web frameworks, or backend services

Basic knowledge of relational databases (SQL)

Strong analytical and problem-solving skills

Good communication and teamwork skills

Eagerness to learn, take feedback, and grow technically

🌟 Preferred / Nice-to-Have

Knowledge of unit testing frameworks, test automation

Exposure to cloud services or DevOps tooling (AWS, Docker, CI/CD pipelines)

Experience with frontend frameworks (React, Angular, Vue)

Understanding of NoSQL databases

Experience with Agile / Scrum methodologies
""".strip()

# ---------- Options / thresholds ----------
class RunOptions(TypedDict):
    chunk_tokens: int
    chunk_overlap: float
    faiss_topk: int
    match_strong: int
    match_partial: int
    recency_months: int
    cap_if_must_have_missing: int
    seniority_penalty: int
    degree_penalty: int
    strict_mode: bool
    language: str

DEFAULT_OPTIONS: RunOptions = {
    "chunk_tokens": 1000,
    "chunk_overlap": 0.18,
    "faiss_topk": 12,
    "match_strong": 85,
    "match_partial": 65,
    "recency_months": 30,
    "cap_if_must_have_missing": 69,
    "seniority_penalty": 5,
    "degree_penalty": 8,
    "strict_mode": True,
    "language": "en",
}

# ---------- STATE (flexible bag-of-facts) ----------
class PipelineState(TypedDict, total=False):
    run_id: str
    options: RunOptions
    flags: Dict[str, bool]

    inputs: Dict[str, Optional[str]]
    raw: Dict[str, Optional[str]]

    provenance: Dict[str, Any]
    chunks: List[Dict[str, Any]]
    faiss: Dict[str, Any]

    contacts: Dict[str, Any]
    high_level: Dict[str, Any]

    education: List[Dict[str, Any]]
    timeline: List[Dict[str, Any]]
    projects: List[Dict[str, Any]]
    skills: List[Dict[str, Any]]
    certs: List[str]
    awards: List[str]

    jd_snapshot: Dict[str, Any]
    canon: Dict[str, Any]
    jd_alignment: Dict[str, Any]

    coverage: Dict[str, Any]
    final: Dict[str, Any]

    artifacts: Dict[str, Any]
    audit: List[str]

def new_state(
    resume_file: Optional[str],
    resume_text: Optional[str],
    jd_file: Optional[str],
    jd_text: Optional[str],
    options: RunOptions = DEFAULT_OPTIONS,
) -> PipelineState:
    run_id = uuid.uuid4().hex[:12]
    base_dir = Path(f"./tmp/{run_id}")
    paths = {
        "base_dir": str(base_dir),
        "chunks_json": str(base_dir / "chunks.json"),
        "entities_by_chunk_json": str(base_dir / "entities_by_chunk.json"),
        "merged_entities_json": str(base_dir / "merged_entities.json"),
        "canon_map_json": str(base_dir / "canon_map.json"),
        "coverage_json": str(base_dir / "coverage.json"),
        "final_json": str(base_dir / "final.json"),
        "faiss_dir": str(base_dir / "faiss"),
    }

    st: PipelineState = {
        "run_id": run_id,
        "options": options,
        "flags": {"is_resume": True},  # will be set by detector in next cell

        "inputs": {
            "resume_file": resume_file,
            "resume_text": resume_text,
            "jd_file": jd_file,
            "jd_text": jd_text,
        },
        "raw": {"resume_text": None, "jd_text": None},

        "provenance": {"chunks": []},
        "chunks": [],
        "faiss": {"index_path": None, "topk_ids": []},

        "contacts": {"name": None, "email": None, "phone": None,
                     "links": {"linkedin": None, "github": None, "portfolio": None, "website": None}},
        "high_level": {"summary": None, "location": None, "years_experience": None},

        "education": [],
        "timeline": [],
        "projects": [],
        "skills": [],
        "certs": [],
        "awards": [],

        "jd_snapshot": {"title": None, "required": [], "preferred": [], "responsibilities": []},
        "canon": {"skill_map": {}, "normalized_skills": [], "normalized_required": [], "normalized_preferred": []},
        "jd_alignment": {"required": [], "preferred": []},

        "coverage": {},
        "final": {},

        "artifacts": {"base_dir": paths["base_dir"], "paths": paths},
        "audit": [],
    }
    return st

STATE = new_state(RESUME_FILE, RESUME_TEXT, JD_FILE, JD_TEXT)
print(f"[ok] STATE initialized — run_id={STATE['run_id']} → artifacts: {STATE['artifacts']['base_dir']}")

# ---------- Prompts ----------
PROMPTS: Dict[str, str] = {}

# Helper to escape all braces except the variables we actually pass
def _escape_braces_keep_vars(template: str, keep_vars: List[str]) -> str:
    esc = template.replace("{", "{{").replace("}", "}}")
    for v in keep_vars:
        esc = esc.replace("{{" + v + "}}", "{" + v + "}")
    return esc

PROMPTS["resume_detector"] = _escape_braces_keep_vars(r"""
You are a strict document classifier. Decide if the uploaded content is a *resume/CV*.
Positive resume signals (examples, not variables): {{title, company, dates, location}}, {{degree, field, institution, year}}.

Return **JSON only**:
{
  "label": "resume" | "close_to_resume" | "not_resume",
  "confidence": 0.0 to 1.0,
  "reasons": ["short bullets"],
  "quick_metadata": {
    "has_contact_like_section": true|false,
    "has_experience_like_section": true|false,
    "has_education_like_section": true|false,
    "has_skills_like_section": true|false,
    "pages_seen": <int>,
    "approx_length_chars": <int>
  }
}

Text to classify:
---
{resume_excerpt}
---
(Only the JSON object; no extra text.)
""".strip(), ["resume_excerpt"])

PROMPTS["section_classifier"] = _escape_braces_keep_vars(r"""
Label the resume chunk as one of:
  "contact" | "summary" | "education" | "experience" | "projects" | "skills" | "certifications" | "awards" | "other"

Return JSON only:
{ "label": "<one_of_the_labels>", "rationale": "<very_short_reason>" }

Chunk:
---
{chunk_text}
---
""".strip(), ["chunk_text"])

PROMPTS["extract_experience"] = _escape_braces_keep_vars(r"""
Extract **verbatim** experience entries in this chunk.
Return JSON array of:
  { "title": "", "company": "", "location": null, "start": null, "end": null,
    "highlights": [], "evidence": "<short verbatim>", "chunk_id": "<id>" }

Chunk ID: {chunk_id}
Chunk:
---
{chunk_text}
---
""".strip(), ["chunk_id", "chunk_text"])

PROMPTS["extract_skills"] = _escape_braces_keep_vars(r"""
List explicit skills/technologies/frameworks/clouds/databases.
Return JSON array of:
  { "name": "<as_written>", "aliases": [], "evidence": ["<short verbatim>"], "chunk_id": "<id>" }

Chunk ID: {chunk_id}
Chunk:
---
{chunk_text}
---
""".strip(), ["chunk_id", "chunk_text"])

PROMPTS["extract_education"] = _escape_braces_keep_vars(r"""
Extract education facts in this chunk.
Return JSON array of:
  { "degree": "", "field": "", "institution": "", "start": null, "end": null,
    "evidence": "<short verbatim>", "chunk_id": "<id>" }

Chunk ID: {chunk_id}
Chunk:
---
{chunk_text}
---
""".strip(), ["chunk_id", "chunk_text"])

PROMPTS["extract_projects"] = _escape_braces_keep_vars(r"""
Extract projects/initiatives if present.
Return JSON array of:
  { "name": "", "tech": [], "impact": null, "links": [], "chunk_id": "<id>" }

Chunk ID: {chunk_id}
Chunk:
---
{chunk_text}
---
""".strip(), ["chunk_id", "chunk_text"])

PROMPTS["jd_snapshot"] = _escape_braces_keep_vars(r"""
From the JD, extract:
- title (short)
- required (skill tokens; lowercase)
- preferred (skill tokens; lowercase)
- responsibilities (short verbs/n-grams; lowercase)

Return JSON only:
{ "title": "", "required": [], "preferred": [], "responsibilities": [] }

JD:
---
{jd_text}
---
""".strip(), ["jd_text"])

print("[ok] Prompts ready (escaped correctly). Next: load→chunk→FAISS→detect in Cell 3.")


[ok] STATE initialized — run_id=00de0447b3d8 → artifacts: tmp\00de0447b3d8
[ok] Prompts ready (escaped correctly). Next: load→chunk→FAISS→detect in Cell 3.


In [39]:
# ==== Cell 3: Load → chunk → embed (FAISS) → detect resume/not_resume ====

from pathlib import Path
import json, re, zipfile, xml.etree.ElementTree as ET

# ---------- helpers: read files or raw text ----------
def read_pdf_pymupdf(path: str) -> str:
    """Use PyMuPDFLoader (no fitz import needed here)."""
    loader = PyMuPDFLoader(path)
    docs = loader.load()
    return "\n".join(d.page_content or "" for d in docs)

def read_docx_quick(path: str) -> str:
    """Lightweight .docx text reader via zip/XML (no python-docx dependency)."""
    with zipfile.ZipFile(path) as z:
        xml_bytes = z.read("word/document.xml")
    root = ET.fromstring(xml_bytes)
    ns = {"w": "http://schemas.openxmlformats.org/wordprocessingml/2006/main"}
    lines = []
    for p in root.findall(".//w:p", ns):
        txt = "".join((t.text or "") for t in p.findall(".//w:t", ns)).strip()
        if txt:
            lines.append(txt)
    return "\n".join(lines)

def read_text_file(path: str) -> str:
    return Path(path).read_text(encoding="utf-8", errors="ignore")

def read_any(path_or_text: Optional[str]) -> str:
    """If file exists, read by extension; else treat as raw text."""
    if not path_or_text:
        return ""
    p = Path(path_or_text)
    if p.exists():
        ext = p.suffix.lower()
        if ext == ".pdf":
            return read_pdf_pymupdf(str(p))
        if ext == ".docx":
            return read_docx_quick(str(p))
        return read_text_file(str(p))
    return str(path_or_text)

# ---------- ensure artifacts dir ----------
base_dir = Path(STATE["artifacts"]["base_dir"])
base_dir.mkdir(parents=True, exist_ok=True)

# ---------- load resume & JD into STATE["raw"] ----------
resume_text = STATE["raw"].get("resume_text") or (
    read_any(STATE["inputs"].get("resume_file")) if STATE["inputs"].get("resume_file") else read_any(STATE["inputs"].get("resume_text"))
)
jd_text = STATE["raw"].get("jd_text") or (
    read_any(STATE["inputs"].get("jd_file")) if STATE["inputs"].get("jd_file") else read_any(STATE["inputs"].get("jd_text"))
)

if not resume_text.strip():
    raise RuntimeError("No resume content found. Provide RESUME_FILE or RESUME_TEXT in Cell 2.")
if not jd_text.strip():
    raise RuntimeError("No JD content found. Provide JD_FILE or JD_TEXT in Cell 2.")

STATE["raw"]["resume_text"] = resume_text
STATE["raw"]["jd_text"] = jd_text

# ---------- chunking (char-based proxy for tokens) ----------
opts = STATE["options"]
chunk_chars = int(opts["chunk_tokens"] * 4)         # ≈ 4 chars/token
overlap_chars = int(chunk_chars * opts["chunk_overlap"])

splitter = RecursiveCharacterTextSplitter(
    chunk_size=chunk_chars,
    chunk_overlap=overlap_chars,
    separators=["\n\n", "\n", ". ", " ", ""],
)
chunks_texts: List[str] = splitter.split_text(resume_text)

STATE["chunks"].clear()
for i, txt in enumerate(chunks_texts):
    cid = f"c_{i:02d}"
    STATE["chunks"].append({
        "id": cid,
        "page": None,        # optional page mapping later
        "text": txt,
        "start": None,
        "end": None,
        "tokens": None,      # we chunked by chars; token count optional
        "label": None,
    })

with open(STATE["artifacts"]["paths"]["chunks_json"], "w", encoding="utf-8") as f:
    json.dump(STATE["chunks"], f, ensure_ascii=False, indent=2)
print(f"[ok] Chunked resume into {len(STATE['chunks'])} chunks → {STATE['artifacts']['paths']['chunks_json']}")

# ---------- embeddings + FAISS (LangChain Google GenAI ONLY) ----------
texts = [c["text"] for c in STATE["chunks"]]
metas = [{"id": c["id"]} for c in STATE["chunks"]]

# use the embedding_model handle from Cell 1
vs = FAISS.from_texts(texts=texts, embedding=embedding_model, metadatas=metas)

faiss_dir = Path(STATE["artifacts"]["paths"]["faiss_dir"])
faiss_dir.mkdir(parents=True, exist_ok=True)
vs.save_local(str(faiss_dir))
STATE["faiss"]["index_path"] = str(faiss_dir)
print(f"[ok] FAISS index saved at {STATE['faiss']['index_path']}")

# JD-guided retrieval (optional prioritization)
topk = max(1, int(opts["faiss_topk"]))
docs = vs.similarity_search(jd_text[:4000], k=min(topk, len(STATE["chunks"])))
STATE["faiss"]["topk_ids"] = [d.metadata.get("id") for d in docs if d.metadata and d.metadata.get("id")]

print(f"[ok] Top-{len(STATE['faiss']['topk_ids'])} JD-relevant chunks: {STATE['faiss']['topk_ids']}")

# ---------- resume/not_resume detector ----------
def _json_loose(s: str) -> Dict[str, Any]:
    s = (s or "").strip()
    try:
        return json.loads(s)
    except Exception:
        m = re.search(r"\{.*\}", s, flags=re.S)
        if m:
            return json.loads(m.group(0))
        raise ValueError("Detector did not return valid JSON.")

# Build an excerpt (first 1–2 chunks or raw)
excerpt = "\n\n".join(c["text"] for c in STATE["chunks"][: min(2, len(STATE["chunks"]))]) or resume_text[:3500]

detector_prompt = ChatPromptTemplate.from_template(PROMPTS["resume_detector"])
detector_chain = detector_prompt | llm
detector_raw = detector_chain.invoke({"resume_excerpt": excerpt})
detector_out = detector_raw.content if hasattr(detector_raw, "content") else str(detector_raw)

try:
    det = _json_loose(detector_out)
except Exception:
    det = {
        "label": "close_to_resume",
        "confidence": 0.3,
        "reasons": ["LLM parsing fallback"],
        "quick_metadata": {
            "has_contact_like_section": False,
            "has_experience_like_section": False,
            "has_education_like_section": False,
            "has_skills_like_section": False,
            "pages_seen": None,
            "approx_length_chars": len(resume_text),
        },
    }

label = str(det.get("label", "close_to_resume")).lower().strip()
STATE["flags"]["is_resume"] = label in {"resume", "close_to_resume"}
STATE["audit"].append(f"resume_detector.label={label}")
print(f"[ok] Detector label: {label} (confidence={det.get('confidence')})")

# ---------- early exit if NOT a resume ----------
if label == "not_resume":
    STATE["final"] = {
        "score_100": 0,
        "breakdown": {},
        "reasons": [],          # per your spec: no remarks
        "strong_matches": [],
        "skill_gaps": [],
        "risk_flags": []
    }
    with open(STATE["artifacts"]["paths"]["final_json"], "w", encoding="utf-8") as f:
        json.dump(STATE["final"], f, ensure_ascii=False, indent=2)
    print("[END] Classified as not_resume → score=0, no remarks. Stopping here for this run.")
else:
    print("[ok] Classified as resume/close_to_resume → proceed to extraction pipeline next.")


[ok] Chunked resume into 2 chunks → tmp\00de0447b3d8\chunks.json
[ok] FAISS index saved at tmp\00de0447b3d8\faiss
[ok] Top-2 JD-relevant chunks: ['c_01', 'c_00']
[ok] Detector label: resume (confidence=0.95)
[ok] Classified as resume/close_to_resume → proceed to extraction pipeline next.


In [41]:
# ==== Cell 4 (updated): JD snapshot → LLM chunk routing (no hardcoded sections) → per-extractor parsing ====
# What changed:
#  • Replaced the old "section_classifier" with an LLM **chunk_router** that decides which extractors to run
#    for each chunk (contacts / education / experience / projects / skills) based on semantics, not headings.
#  • Contacts are collected from chunks the router marks as contacts; regex still used for precision.
#  • Everything else (JD snapshot, extractors, provenance, light de-dup) stays aligned with our plan.

import re, json
from typing import Any, Dict, List, Optional
from langchain_core.prompts import ChatPromptTemplate

# ---------- helpers ----------
def _escape_braces_keep_vars(template: str, keep_vars: List[str]) -> str:
    esc = template.replace("{", "{{").replace("}", "}}")
    for v in keep_vars:
        esc = esc.replace("{{" + v + "}}", "{" + v + "}")
    return esc

def _json_loose(s: str) -> Any:
    s = (s or "").strip()
    try:
        return json.loads(s)
    except Exception:
        m = re.search(r"\{.*\}|\[.*\]", s, flags=re.S)
        if m:
            return json.loads(m.group(0))
        raise

def _norm_tokens(xs: List[str]) -> List[str]:
    out: List[str] = []
    for x in xs or []:
        s = (x or "").strip().lower()
        if s and s not in out:
            out.append(s)
    return out

# Safety: ensure we didn't early-exit as not_resume
if STATE.get("final", {}).get("score_100") == 0 and not STATE["flags"].get("is_resume", True):
    raise RuntimeError("This run ended earlier as not_resume. Start a new run with a valid resume.")

# ---------- 1) JD snapshot (same as before) ----------
jd_prompt = ChatPromptTemplate.from_template(PROMPTS["jd_snapshot"])
jd_chain = jd_prompt | llm
jd_raw = jd_chain.invoke({"jd_text": STATE["raw"]["jd_text"]})
try:
    jd_obj = _json_loose(getattr(jd_raw, "content", str(jd_raw)))
except Exception:
    jd_obj = {"title": None, "required": [], "preferred": [], "responsibilities": []}

STATE["jd_snapshot"]["title"] = (jd_obj.get("title") or "").strip() or STATE["jd_snapshot"]["title"]
STATE["jd_snapshot"]["required"] = _norm_tokens(jd_obj.get("required", []))
STATE["jd_snapshot"]["preferred"] = _norm_tokens(jd_obj.get("preferred", []))
STATE["jd_snapshot"]["responsibilities"] = _norm_tokens(jd_obj.get("responsibilities", []))
STATE["audit"].append("jd_snapshot.ok")
print("[ok] JD snapshot:", STATE["jd_snapshot"])

# ---------- 2) LLM router (semantic, not headings) ----------
# New router prompt: for a given chunk, tell which extractors apply (true/false) with confidence & notes.
PROMPTS["chunk_router"] = _escape_braces_keep_vars(r"""
You act as a semantic router for resume chunks. Do **not** rely on literal headings alone.
Infer the chunk’s purpose from content and phrasing (e.g., duties, achievements, dates, technologies, institutions).

For this chunk, decide which extractors should run (true/false):
- contacts: email/phone/links (LinkedIn/GitHub/portfolio/website), candidate name lines.
- education: degrees, fields, institutions, years.
- experience: role titles, companies, date ranges, locations, bullet achievements, impact/metrics.
- projects: named projects/initiatives with tech stacks and outcomes; internships count if project-oriented.
- skills: explicit lists or inline mentions of technologies, tools, clouds, databases, frameworks, languages.

Return JSON **only**:
{
  "apply": {
    "contacts": true|false,
    "education": true|false,
    "experience": true|false,
    "projects": true|false,
    "skills": true|false
  },
  "confidence": 0.0 to 1.0,
  "notes": "brief rationale"
}

Chunk:
---
{chunk_text}
---
""".strip(), ["chunk_text"])

router_chain = ChatPromptTemplate.from_template(PROMPTS["chunk_router"]) | llm

router_stats = {"contacts":0,"education":0,"experience":0,"projects":0,"skills":0}
for ch in STATE["chunks"]:
    out = router_chain.invoke({"chunk_text": ch["text"]})
    try:
        r = _json_loose(getattr(out, "content", str(out)))
        apply = r.get("apply", {}) or {}
    except Exception:
        apply = {}
    # normalize booleans
    for k in ["contacts","education","experience","projects","skills"]:
        apply[k] = bool(apply.get(k, False))
        if apply[k]:
            router_stats[k] += 1
    ch["route"] = apply

print("[ok] Router decisions (num chunks flagged):", router_stats)

# ---------- 3) Per-extractor LLM chains (same extractor prompts as Cell 2) ----------
exp_chain  = ChatPromptTemplate.from_template(PROMPTS["extract_experience"]) | llm
skill_chain= ChatPromptTemplate.from_template(PROMPTS["extract_skills"])     | llm
edu_chain  = ChatPromptTemplate.from_template(PROMPTS["extract_education"])  | llm
proj_chain = ChatPromptTemplate.from_template(PROMPTS["extract_projects"])   | llm

# Speed: prioritize FAISS top-K chunks (JD-relevant); still LLM decides which extractors apply.
use_only_topk = True
topk_ids = set(STATE["faiss"]["topk_ids"]) if use_only_topk else None

# ---------- 4) Contacts collection (regex over LLM-marked contact chunks) ----------
EMAIL_RE = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}")
PHONE_RE = re.compile(r"(?:(?:\+\d{1,3}[\s\-]?)?(?:\(?\d{3}\)?[\s\-]?\d{3}[\s\-]?\d{4}))", re.I)
LINKEDIN_RE = re.compile(r"(https?://(?:www\.)?linkedin\.com/in/[A-Za-z0-9\-_/]+)", re.I)
GITHUB_RE = re.compile(r"(https?://(?:www\.)?github\.com/[A-Za-z0-9\-_/]+)", re.I)
GEN_URL_RE = re.compile(r"(https?://[^\s]+)", re.I)

def _apply_contacts_from_text(text: str):
    email = next(iter(EMAIL_RE.findall(text)), None)
    phone = next(iter(PHONE_RE.findall(text)), None)
    if email and not STATE["contacts"].get("email"):
        STATE["contacts"]["email"] = email
    if phone and not STATE["contacts"].get("phone"):
        STATE["contacts"]["phone"] = phone
    li = LINKEDIN_RE.search(text)
    gh = GITHUB_RE.search(text)
    if li and not STATE["contacts"]["links"].get("linkedin"):
        STATE["contacts"]["links"]["linkedin"] = li.group(1)
    if gh and not STATE["contacts"]["links"].get("github"):
        STATE["contacts"]["links"]["github"] = gh.group(1)
    # generic portfolio/website, avoid duplicating linkedin/github
    for u in GEN_URL_RE.findall(text):
        if "linkedin.com" in u or "github.com" in u:
            continue
        if not STATE["contacts"]["links"].get("portfolio"):
            STATE["contacts"]["links"]["portfolio"] = u
            break

contact_text = ""
for i, ch in enumerate(STATE["chunks"]):
    if ch.get("route", {}).get("contacts", False):
        # respect topK speed gate if enabled
        if topk_ids and ch["id"] not in topk_ids:
            continue
        contact_text += "\n" + ch["text"]
# fallback: use first two chunks if router found nothing
if not contact_text.strip() and STATE["chunks"]:
    for ch in STATE["chunks"][:2]:
        contact_text += "\n" + ch["text"]
_apply_contacts_from_text(contact_text)

# Naive name guess: first plausible line in any contact-routed chunk
def _guess_name(chunks: List[Dict[str,Any]]) -> Optional[str]:
    for ch in chunks:
        if not ch.get("route", {}).get("contacts", False):
            continue
        for line in ch["text"].splitlines():
            s = line.strip()
            if not s:
                continue
            low = s.lower()
            if any(x in low for x in ["email", "phone", "linkedin", "github", "@", "http", "www."]):
                continue
            if 2 <= len(s.split()) <= 7 and 3 <= len(s) <= 60:
                return s
    return None

if not STATE["contacts"].get("name"):
    nm = _guess_name(STATE["chunks"])
    if nm:
        STATE["contacts"]["name"] = nm

# ---------- 5) Extraction loop driven by router.apply booleans ----------
entities_by_chunk: Dict[str, Dict[str, Any]] = {}
for ch in STATE["chunks"]:
    cid, txt, route = ch["id"], ch["text"], ch.get("route") or {}
    if topk_ids and cid not in topk_ids:
        continue  # skip non-topK chunks for speed

    entities_by_chunk[cid] = {"experience": [], "skills": [], "education": [], "projects": []}

    if route.get("experience"):
        out = exp_chain.invoke({"chunk_id": cid, "chunk_text": txt})
        try:
            arr = _json_loose(getattr(out, "content", str(out)))
            if isinstance(arr, list):
                entities_by_chunk[cid]["experience"].extend(arr)
        except Exception:
            pass

    if route.get("skills"):
        out = skill_chain.invoke({"chunk_id": cid, "chunk_text": txt})
        try:
            arr = _json_loose(getattr(out, "content", str(out)))
            if isinstance(arr, list):
                entities_by_chunk[cid]["skills"].extend(arr)
        except Exception:
            pass

    if route.get("education"):
        out = edu_chain.invoke({"chunk_id": cid, "chunk_text": txt})
        try:
            arr = _json_loose(getattr(out, "content", str(out)))
            if isinstance(arr, list):
                entities_by_chunk[cid]["education"].extend(arr)
        except Exception:
            pass

    if route.get("projects"):
        out = proj_chain.invoke({"chunk_id": cid, "chunk_text": txt})
        try:
            arr = _json_loose(getattr(out, "content", str(out)))
            if isinstance(arr, list):
                entities_by_chunk[cid]["projects"].extend(arr)
        except Exception:
            pass

# Persist raw entities
Path(STATE["artifacts"]["base_dir"]).mkdir(parents=True, exist_ok=True)
with open(STATE["artifacts"]["paths"]["entities_by_chunk_json"], "w", encoding="utf-8") as f:
    json.dump(entities_by_chunk, f, ensure_ascii=False, indent=2)

# ---------- 6) Flatten + light de-dup ----------
for cid, groups in entities_by_chunk.items():
    for it in groups.get("education", []):
        it.setdefault("chunk_id", cid)
        STATE["education"].append(it)

    for it in groups.get("experience", []):
        it.setdefault("chunk_id", cid)
        if "highlights" not in it or it["highlights"] is None:
            it["highlights"] = []
        STATE["timeline"].append(it)

    for it in groups.get("projects", []):
        it.setdefault("chunk_id", cid)
        if "tech" not in it or it["tech"] is None:
            it["tech"] = []
        if "links" not in it or it["links"] is None:
            it["links"] = []
        STATE["projects"].append(it)

    for it in groups.get("skills", []):
        nm = (it.get("name") or "").strip().lower()
        if not nm:
            continue
        aliases = [a.strip().lower() for a in (it.get("aliases") or []) if a]
        ev = it.get("evidence") or []
        STATE["skills"].append({
            "name": nm,
            "aliases": aliases,
            "evidence": ev,
            "chunk_ids": [cid],
            "frequency": 1,
            "recent_use_years": None
        })

# Collapse duplicates by exact lowercased name
skill_index: Dict[str, Dict[str, Any]] = {}
collapsed: List[Dict[str, Any]] = []
for s in STATE["skills"]:
    nm = s["name"]
    if nm in skill_index:
        tgt = skill_index[nm]
        tgt["frequency"] += 1
        for e in s["evidence"]:
            if e and e not in tgt["evidence"]:
                tgt["evidence"].append(e)
        for cc in s["chunk_ids"]:
            if cc not in tgt["chunk_ids"]:
                tgt["chunk_ids"].append(cc)
        for a in s["aliases"]:
            if a and a not in tgt["aliases"]:
                tgt["aliases"].append(a)
    else:
        skill_index[nm] = s
        collapsed.append(s)
STATE["skills"] = collapsed

STATE["audit"].append("router_extraction.ok")
print("[ok] Router-driven extraction complete.")
print("[ok] Counts → education:", len(STATE["education"]), 
      "| experience:", len(STATE["timeline"]), 
      "| projects:", len(STATE["projects"]), 
      "| skills (uniq):", len(STATE["skills"]))
print("[ok] Contacts:", STATE["contacts"])


[ok] JD snapshot: {'title': 'Data Analyst', 'required': ['analytical skills', 'problem-solving skills', 'sql', 'excel', 'python', 'r', 'data visualization', 'power bi', 'tableau', 'google data studio', 'statistics', 'data modeling', 'data cleaning', 'communication skills'], 'preferred': ['etl pipelines', 'data warehouses', 'bigquery', 'snowflake', 'redshift', 'machine learning', 'predictive modeling', 'apis', 'google analytics', 'crm systems'], 'responsibilities': ['collect data', 'clean data', 'validate data', 'perform data analysis', 'identify trends', 'identify patterns', 'identify anomalies', 'develop dashboards', 'develop reports', 'query data', 'manipulate data', 'data wrangling', 'data analysis', 'automation', 'translate data insights', 'support a/b testing', 'performance tracking', 'ad-hoc analysis', 'ensure data quality', 'ensure data accuracy', 'ensure data consistency']}
[ok] Router decisions (num chunks flagged): {'contacts': 1, 'education': 2, 'experience': 1, 'projects': 

In [42]:
# ==== Cell 5: Merge → JD-aware normalization → coverage features → final 0–100 score (+save) ====
# No new deps; only stdlib. Uses the STATE built by previous cells and `llm` handle if needed later.

from datetime import datetime
from difflib import SequenceMatcher
import math, json, re
from pathlib import Path

# ---------- helpers (stdlib only) ----------
def _clean_token(s: str) -> str:
    s = (s or "").strip().lower()
    # normalize common punctuation variants: node.js -> nodejs, c++ -> cpp, .net -> dotnet
    s = s.replace("+", "p").replace(".js", "js").replace(".net", "dotnet")
    s = re.sub(r"[^a-z0-9#]+", " ", s)
    return re.sub(r"\s+", " ", s).strip()

def _sim(a: str, b: str) -> int:
    # similarity 0..100 using difflib (no rapidfuzz to avoid extra deps)
    return int(round(100 * SequenceMatcher(None, a, b).ratio()))

def _best_match(token: str, candidates: list[str]) -> tuple[Optional[str], int]:
    token_c = _clean_token(token)
    best = None
    best_sc = -1
    for c in candidates:
        sc = _sim(token_c, _clean_token(c))
        if sc > best_sc:
            best_sc = sc
            best = c
    return best, best_sc

_MONTHS = {
    "jan":1,"january":1,"feb":2,"february":2,"mar":3,"march":3,"apr":4,"april":4,
    "may":5,"jun":6,"june":6,"jul":7,"july":7,"aug":8,"august":8,"sep":9,"sept":9,"september":9,
    "oct":10,"october":10,"nov":11,"november":11,"dec":12,"december":12
}

def _parse_date(s: Optional[str]) -> Optional[datetime]:
    if not s: return None
    t = s.strip().lower()
    t = re.sub(r"[^\w\s\-\/]", " ", t)
    t = re.sub(r"\s+", " ", t)
    # current / present
    if any(x in t for x in ["present", "current", "now"]):
        return datetime.today()
    # YYYY-MM or MM/YYYY or Mon YYYY or YYYY
    m = re.search(r"(\d{4})[-/](\d{1,2})", t)
    if m:
        y, mo = int(m.group(1)), int(m.group(2))
        mo = 1 if mo < 1 or mo > 12 else mo
        return datetime(y, mo, 1)
    m = re.search(r"(\d{1,2})[-/](\d{4})", t)
    if m:
        mo, y = int(m.group(1)), int(m.group(2))
        mo = 1 if mo < 1 or mo > 12 else mo
        return datetime(y, mo, 1)
    m = re.search(r"([A-Za-z]{3,9})\s+(\d{4})", t)
    if m:
        mo = _MONTHS.get(m.group(1)[:3], 1)
        y  = int(m.group(2))
        return datetime(y, mo, 1)
    m = re.search(r"\b(20\d{2}|19\d{2})\b", t)
    if m:
        y = int(m.group(1))
        return datetime(y, 1, 1)
    return None

def _months_between(a: Optional[datetime], b: Optional[datetime]) -> int:
    if not a or not b: return 0
    months = (b.year - a.year) * 12 + (b.month - a.month)
    return max(0, months)

def _count_numbers(texts: list[str]) -> int:
    cnt = 0
    for t in texts:
        for num in re.findall(r"\b\d+(?:\.\d+)?%?|\b\d+[kKmMbB]?\b", t or ""):
            cnt += 1
    return cnt

def _unique(seq):
    out = []
    seen = set()
    for x in seq:
        if x not in seen:
            seen.add(x)
            out.append(x)
    return out

# ---------- 1) Merge roles, compute experience years ----------
now = datetime.today()
total_months = 0
for role in STATE["timeline"]:
    st = _parse_date(role.get("start"))
    en = _parse_date(role.get("end")) or now
    if st:
        total_months += _months_between(st, en)

years_experience = round(total_months / 12.0, 2) if total_months > 0 else None
STATE["high_level"]["years_experience"] = years_experience

# ---------- 2) Skill normalization (build canon from JD req+pref) ----------
jd_req = STATE["jd_snapshot"].get("required", []) or []
jd_pref = STATE["jd_snapshot"].get("preferred", []) or []
canon_pool = _unique([_clean_token(s) for s in (jd_req + jd_pref) if s])

resume_skill_names = []
for s in STATE["skills"]:
    nm = s.get("name")
    if nm:
        resume_skill_names.append(nm)
    for a in s.get("aliases") or []:
        resume_skill_names.append(a)
resume_skill_names = _unique([_clean_token(x) for x in resume_skill_names if x])

strong_hits = set()
partial_hits = set()
skill_map = {}

opts = STATE["options"]
for r in resume_skill_names:
    if not canon_pool:
        break
    best, sc = _best_match(r, canon_pool)
    if best is None: 
        continue
    skill_map[r] = {"canon": best, "score": sc}
    if sc >= opts["match_strong"]:
        strong_hits.add(best)
    elif sc >= opts["match_partial"]:
        partial_hits.add(best)

STATE["canon"]["skill_map"] = skill_map
STATE["canon"]["normalized_skills"] = resume_skill_names
STATE["canon"]["normalized_required"] = canon_pool[:len(jd_req)] if canon_pool else []
STATE["canon"]["normalized_preferred"] = canon_pool[len(jd_req):] if canon_pool else []

# ---------- 3) JD alignment (required/preferred present/partial/missing) ----------
def _align_list(targets: list[str]) -> list[dict]:
    out = []
    for t in targets:
        t_c = _clean_token(t)
        status = "missing"
        if t_c in strong_hits:
            status = "present_strong"
        elif t_c in partial_hits:
            status = "present_partial"
        # find any one evidence phrase from STATE["skills"]
        ev, cid = None, None
        if status != "missing":
            for s in STATE["skills"]:
                names = [s.get("name","")] + (s.get("aliases") or [])
                for nm in names:
                    if _best_match(nm, [t_c])[1] >= opts["match_partial"]:
                        if s.get("evidence"):
                            ev = s["evidence"][0]
                        if s.get("chunk_ids"):
                            cid = s["chunk_ids"][0]
                        break
                if ev or cid:
                    break
        out.append({"name": t_c, "status": status, "evidence": ev, "chunk_id": cid})
    return out

STATE["jd_alignment"]["required"]  = _align_list(jd_req)
STATE["jd_alignment"]["preferred"] = _align_list(jd_pref)

missing_required = [x["name"] for x in STATE["jd_alignment"]["required"] if x["status"] == "missing"]

# ---------- 4) Responsibilities overlap ----------
jd_resps = STATE["jd_snapshot"].get("responsibilities", []) or []
all_highlights = []
for r in STATE["timeline"]:
    for h in r.get("highlights") or []:
        all_highlights.append(h)
for p in STATE["projects"]:
    if p.get("impact"):
        all_highlights.append(str(p["impact"]))

resp_hits = 0
for r in jd_resps:
    r_clean = _clean_token(r)
    found = any(r_clean in _clean_token(h) for h in all_highlights)
    resp_hits += 1 if found else 0
resp_cover = (resp_hits / max(1, len(jd_resps))) if jd_resps else 0.0

# ---------- 5) Evidence depth ----------
evidence_count = _count_numbers(all_highlights)
# normalize: 0 numbers -> 0, 10+ numbers -> ~1.0
evidence_depth = min(1.0, evidence_count / 10.0)

# ---------- 6) Seniority (rough) ----------
# Try to infer required years from JD title or text: e.g., "Senior", "5+ years", etc.
jd_text_all = (STATE["raw"].get("jd_text") or "") + " " + (STATE["jd_snapshot"].get("title") or "")
m_years = re.search(r"(\d+)\s*\+?\s*(?:years|yrs)", jd_text_all.lower())
req_years = int(m_years.group(1)) if m_years else None

seniority_fit = 1.0
if years_experience is not None and req_years is not None:
    # simple ramp: fully fit if >= req_years, else linearly scale
    seniority_fit = max(0.0, min(1.0, years_experience / max(1, req_years)))

# ---------- 7) Education/cert presence ----------
has_degree = any(e.get("degree") for e in STATE["education"])
has_cert   = bool(STATE.get("certs"))
edu_cert_score = 0.0
if has_degree:
    edu_cert_score += 0.5
if has_cert:
    edu_cert_score += 0.5

# ---------- 8) Compute coverage primitives ----------
req_total = len(jd_req)
req_strong = sum(1 for x in STATE["jd_alignment"]["required"] if x["status"] == "present_strong")
req_partial = sum(1 for x in STATE["jd_alignment"]["required"] if x["status"] == "present_partial")

pref_total = len(jd_pref)
pref_strong = sum(1 for x in STATE["jd_alignment"]["preferred"] if x["status"] == "present_strong")
pref_partial = sum(1 for x in STATE["jd_alignment"]["preferred"] if x["status"] == "present_partial")

STATE["coverage"] = {
    "required": {"total": req_total, "strong": req_strong, "partial": req_partial, "missing": len(missing_required)},
    "preferred": {"total": pref_total, "strong": pref_strong, "partial": pref_partial},
    "responsibility_overlap": resp_cover,     # 0..1
    "seniority_fit": seniority_fit,           # 0..1
    "evidence_depth": evidence_depth,         # 0..1
    "edu_cert": edu_cert_score,               # 0..1 (in 0.5 steps)
}

# ---------- 9) Score (0–100) with caps/penalties ----------
rubric = {
    "must_have_coverage": 35,       # strong matches weighted higher than partial
    "preferred_alignment": 15,
    "experience_seniority_fit": 20,
    "responsibility_overlap": 10,
    "evidence_depth": 10,
    "education_cert_match": 10,
}
# must-have coverage: strong=1.0, partial=0.5
req_cov = 0.0
if req_total > 0:
    req_cov = (req_strong + 0.5 * req_partial) / req_total

pref_cov = 0.0
if pref_total > 0:
    pref_cov = (pref_strong + 0.5 * pref_partial) / pref_total

score = (
    rubric["must_have_coverage"] * req_cov +
    rubric["preferred_alignment"] * pref_cov +
    rubric["experience_seniority_fit"] * seniority_fit +
    rubric["responsibility_overlap"] * resp_cover +
    rubric["evidence_depth"] * evidence_depth +
    rubric["education_cert_match"] * edu_cert_score
)

# penalties / caps
opts = STATE["options"]
if missing_required:
    score = min(score, float(opts["cap_if_must_have_missing"]))
# (Optional) degree penalty if JD explicitly asks for a degree and none found:
if ("bachelor" in jd_text_all.lower() or "degree" in jd_text_all.lower()) and not has_degree:
    score = max(0.0, score - float(opts["degree_penalty"]))

score_100 = int(round(max(0.0, min(100.0, score))))

# ---------- 10) Reasons & gaps (deterministic text) ----------
strong_list = [x["name"] for x in STATE["jd_alignment"]["required"] if x["status"] == "present_strong"]
partial_list = [x["name"] for x in STATE["jd_alignment"]["required"] if x["status"] == "present_partial"]
pref_strong_list = [x["name"] for x in STATE["jd_alignment"]["preferred"] if x["status"] == "present_strong"]

reasons = []
if strong_list:
    reasons.append(f"Strong matches on required: {', '.join(strong_list[:6])}" + ("…" if len(strong_list) > 6 else ""))
if partial_list:
    reasons.append(f"Partial matches on required: {', '.join(partial_list[:6])}" + ("…" if len(partial_list) > 6 else ""))
if pref_strong_list:
    reasons.append(f"Preferred skills present: {', '.join(pref_strong_list[:6])}" + ("…" if len(pref_strong_list) > 6 else ""))
if years_experience is not None:
    if req_years:
        reasons.append(f"Experience: {years_experience} yrs vs JD ~{req_years} yrs (fit={round(100*seniority_fit)}%).")
    else:
        reasons.append(f"Experience: {years_experience} yrs (JD years not specified).")
reasons.append(f"Responsibilities overlap: {int(round(100*resp_cover))}% ; Evidence signals: {evidence_count} metrics found.")

risk_flags = []
if missing_required:
    risk_flags.append(f"Missing required: {', '.join(missing_required[:6])}" + ("…" if len(missing_required) > 6 else ""))
if not STATE["contacts"].get("email"):
    risk_flags.append("No email detected.")
if years_experience is None:
    risk_flags.append("Could not infer experience years.")
if evidence_count == 0:
    risk_flags.append("No quantifiable impact statements detected.")

# ---------- 11) Persist final ----------
STATE["final"] = {
    "score_100": score_100,
    "breakdown": {
        "must_have_coverage": round(rubric["must_have_coverage"] * req_cov, 2),
        "preferred_alignment": round(rubric["preferred_alignment"] * pref_cov, 2),
        "experience_seniority_fit": round(rubric["experience_seniority_fit"] * seniority_fit, 2),
        "responsibility_overlap": round(rubric["responsibility_overlap"] * resp_cover, 2),
        "evidence_depth": round(rubric["evidence_depth"] * evidence_depth, 2),
        "education_cert_match": round(rubric["education_cert_match"] * edu_cert_score, 2),
    },
    "reasons": reasons,
    "strong_matches": strong_list,
    "skill_gaps": missing_required,
    "risk_flags": risk_flags,
}

Path(STATE["artifacts"]["base_dir"]).mkdir(parents=True, exist_ok=True)
with open(STATE["artifacts"]["paths"]["final_json"], "w", encoding="utf-8") as f:
    json.dump(STATE["final"], f, ensure_ascii=False, indent=2)

print(f"[ok] Final score: {STATE['final']['score_100']}/100")
print("[ok] Breakdown:", STATE["final"]["breakdown"])
print("[ok] Reasons:", *STATE["final"]["reasons"], sep="\n  - ")
print("[ok] Risk flags:", *STATE["final"]["risk_flags"], sep="\n  - ")
print(f"[ok] Saved → {STATE['artifacts']['paths']['final_json']}")


[ok] Final score: 20/100
[ok] Breakdown: {'must_have_coverage': 7.5, 'preferred_alignment': 2.25, 'experience_seniority_fit': 1.13, 'responsibility_overlap': 0.0, 'evidence_depth': 4.0, 'education_cert_match': 5.0}
[ok] Reasons:
  - Strong matches on required: sql, python, power bi
  - Experience: 0.17 yrs vs JD ~3 yrs (fit=6%).
  - Responsibilities overlap: 0% ; Evidence signals: 4 metrics found.
[ok] Risk flags:
  - Missing required: analytical skills, problem solving skills, excel, r, data visualization, tableau…
[ok] Saved → tmp\00de0447b3d8\final.json
