In [2]:
#!/usr/bin/env python3
"""
NexResume Pipeline (Refactored):
- Robust PDF parsing (pypdf -> pdfminer.six fallback)
- Structured LLM outputs via Pydantic (no more JSON guessing)
- Deterministic scoring (optional, based on JD weights)
- Chunking for long resumes to avoid token cutoffs
- Concurrency + retries + logging
- CSV summary aggregation

Usage:
  python nexresume_refactor.py --job path/to/job.yml --resumes path/to/resumes_folder [--workers 4]

Requires:
  pip install langchain-openai langchain pydantic backoff pypdf pdfminer.six pyyaml numpy
  export OPENAI_API_KEY=...
"""

from __future__ import annotations

import os
import re
import json
import yaml
import csv
import math
import time
import logging
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple
from concurrent.futures import ThreadPoolExecutor, as_completed

# LangChain (modern imports)
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field
import backoff
import numpy as np
from langchain_openai import OpenAIEmbeddings

In [None]:
#!/usr/bin/env python3
"""
NexResume Pipeline (Refactored):
- Robust PDF parsing (pypdf -> pdfminer.six fallback)
- Structured LLM outputs via Pydantic (no more JSON guessing)
- Deterministic scoring (optional, based on JD weights)
- Chunking for long resumes to avoid token cutoffs
- Concurrency + retries + logging
- CSV summary aggregation

Usage:
  python nexresume_refactor.py --job path/to/job.yml --resumes path/to/resumes_folder [--workers 4]

Requires:
  pip install langchain-openai langchain pydantic backoff pypdf pdfminer.six pyyaml numpy sentence-transformers torch --upgrade
  export OPENAI_API_KEY=...
"""

from __future__ import annotations

import os
import re
import json
import yaml
import csv
import math
import time
import logging
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple
from concurrent.futures import ThreadPoolExecutor, as_completed

# LangChain (modern imports)
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field
import backoff
import numpy as np
from langchain_community.embeddings import SentenceTransformerEmbeddings

# ========== Logging ==========
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(message)s"
)

# ========== IO helpers ==========

def load_yaml_job_description(path: str) -> Dict[str, Any]:
    with open(path, 'r', encoding='utf-8') as f:
        jd = yaml.safe_load(f) or {}
    return jd


def _read_pdf_pypdf(file_path: str) -> str:
    try:
        from pypdf import PdfReader
    except Exception:  # pragma: no cover
        return ""
    try:
        text = []
        with open(file_path, "rb") as f:
            reader = PdfReader(f)
            for page in reader.pages:
                try:
                    text.append(page.extract_text() or "")
                except Exception:
                    text.append("")
        return "\n".join(text)
    except Exception as e:
        logging.warning(f"pypdf failed on {file_path}: {e}")
        return ""


def _read_pdf_pdfminer(file_path: str) -> str:
    try:
        # pdfminer.six
        from pdfminer.high_level import extract_text
        return extract_text(file_path) or ""
    except Exception as e:  # pragma: no cover
        logging.warning(f"pdfminer failed on {file_path}: {e}")
        return ""


def load_resume_text(file_path: str) -> str:
    file_path = str(file_path)
    if file_path.lower().endswith('.txt'):
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                return f.read()
        except Exception as e:
            logging.warning(f"Could not read TXT {file_path}: {e}")
            return ""
    elif file_path.lower().endswith('.pdf'):
        text = _read_pdf_pypdf(file_path)
        if not text.strip():
            text = _read_pdf_pdfminer(file_path)
        if not text.strip():
            logging.warning(f"Could not extract text from PDF {file_path}")
        return text
    else:
        logging.warning(f"Unsupported resume format: {file_path}")
        return ""


def safe_candidate_name_from_file(file_name: str) -> str:
    stem = Path(file_name).stem
    return re.sub(r"[^A-Za-z0-9_.-]", "_", stem)

# ========== Chunking ==========

def chunk_text(text: str, max_chars: int = 6000, overlap: int = 300) -> List[str]:
    """Simple char-based chunking to keep prompts within context.
    Adjust max_chars based on your target model context window.
    """
    text = text or ""
    if len(text) <= max_chars:
        return [text]
    chunks = []
    i = 0
    while i < len(text):
        chunks.append(text[i:i + max_chars])
        i += max_chars - overlap
    return chunks

# ========== Pydantic schema for structured output ==========

class MatchReport(BaseModel):
    matched_required_skills: List[str] = []
    missing_required_skills: List[str] = []
    matched_optional_skills: List[str] = []
    education_match: str
    experience_match: str
    keywords_matched: List[str] = []
    soft_skills_match: List[str] = []
    resume_summary: str
    match_score: float = Field(ge=0, le=1)
    city_tier_match: bool
    longest_tenure_months: int
    final_score: int = Field(ge=0, le=100)

# ========== Prompt ==========

PROMPT = ChatPromptTemplate.from_messages([
    ("system", "You are a precise resume matching assistant. "
               "Compare the job description and resume content to fill the schema exactly."),
    ("user",
     "Job Description (YAML):\n{job_description}\n\n"
     "Resume chunk:\n{resume_text}\n\n"
     "Return ONLY JSON compatible with the provided schema (no markdown, no prose).")
])

# ========== Embeddings (pre-filter) ==========

def _cosine(a: np.ndarray, b: np.ndarray) -> float:
    if a is None or b is None:
        return -1.0
    na = np.linalg.norm(a); nb = np.linalg.norm(b)
    if na == 0 or nb == 0:
        return -1.0
    return float(np.dot(a, b) / (na * nb))

_embedder: Optional[SentenceTransformerEmbeddings] = None

def get_embedder() -> SentenceTransformerEmbeddings:
    global _embedder
    if _embedder is None:
        _embedder = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2")
    return _embedder

def embed_text(text: str) -> np.ndarray:
    emb = get_embedder().embed_query(text or "")
    return np.array(emb, dtype=np.float32)


def prefilter_resumes(jd: Dict[str, Any], resume_paths: List[Path], texts: List[str], topk: Optional[int] = None, topk_frac: float = 0.4) -> List[Tuple[Path, float]]:
    """Rank resumes by embedding similarity to the JD and return the top subset.
    If topk is None, select ceil(len(resumes) * topk_frac). Never fewer than 1.
    """
    jd_text = yaml.dump(jd, sort_keys=False)
    jd_vec = embed_text(jd_text)

    sims: List[Tuple[int, float]] = []
    for i, t in enumerate(texts):
        try:
            v = embed_text(t)
            sims.append((i, _cosine(jd_vec, v)))
        except Exception:
            sims.append((i, -1.0))
    sims.sort(key=lambda x: x[1], reverse=True)

    n = len(resume_paths)
    k = int(topk) if topk is not None else int(np.ceil(max(1, n) * float(topk_frac)))
    k = max(1, min(n, k))

    selected = [(resume_paths[i], score) for i, score in sims[:k]]
    return selected

# ========== LLM client ==========
key = os.getenv("OPENROUTER_API_KEY", "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")
base = os.getenv("OPENROUTER_BASE", "https://openrouter.ai/api/v1")
model = os.getenv("MODEL_NAME", "nvidia/nemotron-nano-9b-v2:free")
def make_llm(model: str = "gpt-4o-mini", temperature: float = 0.6):
    llm = ChatOpenAI(model=model, temperature=temperature, api_key=key, base_url=base)
    return llm.with_structured_output(MatchReport)

# ========== Retry wrapper ==========

@backoff.on_exception(backoff.expo, Exception, max_time=90)
def call_llm_structured(structured_llm, jd_dict: Dict[str, Any], resume_text: str) -> MatchReport:
    msg = PROMPT.format(job_description=yaml.dump(jd_dict, sort_keys=False),
                        resume_text=resume_text)
    
    # IMPORTANT: for structured outputs, invoke with messages
    return structured_llm.invoke(msg)

# ========== Deterministic scoring (optional) ==========

def normalize_skill(s: str) -> str:
    s = (s or "").strip().lower()
    aliases = {
        "js": "javascript",
        "nodejs": "node.js",
        "py": "python",
        "torch": "pytorch",
    }
    return aliases.get(s, s)


def compute_deterministic_score(jd: Dict[str, Any], parsed: Dict[str, Any]) -> Tuple[int, Dict[str, float]]:
    """Compute a reproducible final score from components.
    JD may include optional weights:
      weights: {required:0.45, optional:0.2, experience:0.15, education:0.1, location:0.1}
    Fallback defaults used if missing.
    """
    weights = {
        "required": 0.45,
        "optional": 0.20,
        "experience": 0.15,
        "education": 0.10,
        "location": 0.10,
    }
    jd_weights = (jd or {}).get("weights") or {}
    weights.update({k: float(v) for k, v in jd_weights.items() if k in weights})

    req = [normalize_skill(s) for s in (jd.get("required_skills") or [])]
    opt = [normalize_skill(s) for s in (jd.get("optional_skills") or [])]

    matched_req = set(normalize_skill(s) for s in (parsed.get("matched_required_skills") or []))
    matched_opt = set(normalize_skill(s) for s in (parsed.get("matched_optional_skills") or []))

    req_cov = (len(matched_req & set(req)) / max(1, len(req))) if req else 0.0
    opt_cov = (len(matched_opt & set(opt)) / max(1, len(opt))) if opt else 0.0

    # naive booleans from strings
    exp_fit = 1.0 if str(parsed.get("experience_match", "")).lower().startswith(("true", "yes")) else 0.5
    edu_fit = 1.0 if str(parsed.get("education_match", "")).lower().startswith(("true", "yes")) else 0.5
    loc_fit = 1.0 if bool(parsed.get("city_tier_match")) else 0.0

    score = (
        weights["required"] * req_cov +
        weights["optional"] * opt_cov +
        weights["experience"] * exp_fit +
        weights["education"] * edu_fit +
        weights["location"] * loc_fit
    )
    return int(round(score * 100)), {
        "req_cov": req_cov,
        "opt_cov": opt_cov,
        "exp_fit": exp_fit,
        "edu_fit": edu_fit,
        "loc_fit": loc_fit,
    }

# ========== Per-resume processing ==========

def merge_chunk_reports(reports: List[MatchReport]) -> Dict[str, Any]:
    """Merge multiple chunk-level reports into one resume-level report.
    Strategy: union for lists, max for tenure and scores, any True for booleans, longest summary.
    """
    if not reports:
        return {}

    out: Dict[str, Any] = {
        "matched_required_skills": set(),
        "missing_required_skills": set(),
        "matched_optional_skills": set(),
        "education_match": "",
        "experience_match": "",
        "keywords_matched": set(),
        "soft_skills_match": set(),
        "resume_summary": "",
        "match_score": 0.0,
        "city_tier_match": False,
        "longest_tenure_months": 0,
        "final_score": 0,
    }

    for r in reports:
        data = r.model_dump()
        out["matched_required_skills"].update(data.get("matched_required_skills", []))
        out["missing_required_skills"].update(data.get("missing_required_skills", []))
        out["matched_optional_skills"].update(data.get("matched_optional_skills", []))
        out["keywords_matched"].update(data.get("keywords_matched", []))
        out["soft_skills_match"].update(data.get("soft_skills_match", []))
        if len((data.get("resume_summary") or "")) > len(out["resume_summary"]):
            out["resume_summary"] = data.get("resume_summary") or ""
        out["match_score"] = max(out["match_score"], float(data.get("match_score") or 0))
        out["city_tier_match"] = out["city_tier_match"] or bool(data.get("city_tier_match"))
        out["longest_tenure_months"] = max(out["longest_tenure_months"], int(data.get("longest_tenure_months") or 0))
        out["final_score"] = max(out["final_score"], int(data.get("final_score") or 0))
        # keep the most "positive" education/experience note if any
        if str(data.get("education_match", "")).lower().startswith(("true", "yes")):
            out["education_match"] = str(data.get("education_match"))
        if str(data.get("experience_match", "")).lower().startswith(("true", "yes")):
            out["experience_match"] = str(data.get("experience_match"))

    # convert sets back to lists
    for k in ("matched_required_skills", "missing_required_skills", "matched_optional_skills",
              "keywords_matched", "soft_skills_match"):
        out[k] = sorted(out[k])
    return out


def process_one_resume(jd: Dict[str, Any], resume_path: Path, structured_llm) -> Optional[Dict[str, Any]]:
    text = load_resume_text(str(resume_path))
    if not text.strip():
        logging.warning(f"Empty/unsupported resume: {resume_path.name}; skipping.")
        return None

    # chunk & score each chunk
    chunks = chunk_text(text)
    chunk_reports: List[MatchReport] = []
    for ch in chunks:
        try:
            r = call_llm_structured(structured_llm, jd, ch)
            chunk_reports.append(r)
        except Exception as e:
            logging.error(f"LLM error on {resume_path.name} chunk: {e}")

    if not chunk_reports:
        logging.warning(f"No LLM outputs for {resume_path.name}")
        return None

    merged = merge_chunk_reports(chunk_reports)

    # Optional deterministic override/consistency check
    det_score, components = compute_deterministic_score(jd, merged)
    merged["final_score_deterministic"] = det_score
    merged["scoring_components"] = components

    candidate = safe_candidate_name_from_file(resume_path.name)
    report = {
        "candidate_name": candidate,
        "job_title": jd.get("Job_Title") or jd.get("job_title"),
        **merged,
    }
    return report

# ========== Batch processing ==========

def process_all(job_description_file: str, resumes_folder: str, workers: int = 4, model: str = "gpt-4o-mini", topk: Optional[int] = None, topk_frac: float = 0.4) -> None:
    reports_dir = Path("reports"); reports_dir.mkdir(exist_ok=True)

    jd = load_yaml_job_description(job_description_file)
    structured_llm = make_llm(model=model)

    resume_files = [Path(resumes_folder) / fn for fn in os.listdir(resumes_folder)
                    if fn.lower().endswith((".pdf", ".txt"))]

    reports: List[Dict[str, Any]] = []

    # --- Load texts once for embedding + later scoring ---
    resume_texts = [load_resume_text(str(p)) for p in resume_files]

    # --- Pre-filter via embeddings ---
    ranked = prefilter_resumes(jd, resume_files, resume_texts, topk=topk, topk_frac=topk_frac)
    selected_files = [p for p, _ in ranked]
    logging.info(f"Pre-filter selected {len(selected_files)}/{len(resume_files)} resumes via embeddings")

    # Concurrency
    with ThreadPoolExecutor(max_workers=max(1, int(workers))) as ex:
        futs = {ex.submit(process_one_resume, jd, p, structured_llm): p for p in selected_files}
        for fut in as_completed(futs):
            p = futs[fut]
            try:
                rep = fut.result()
                if rep:
                    reports.append(rep)
                    # write per-candidate JSON immediately
                    out_path = Path("reports") / f"{rep['candidate_name']}_report.json"
                    with open(out_path, 'w', encoding='utf-8') as f:
                        json.dump(rep, f, indent=2, ensure_ascii=False)
            except Exception as e:
                logging.error(f"Failed {p.name}: {e}")

    # CSV summary
    if reports:
        summary_fields = [
            "candidate_name", "job_title", "final_score", "final_score_deterministic",
            "match_score", "longest_tenure_months", "city_tier_match",
            "missing_required_skills", "matched_required_skills"
        ]
        csv_path = Path("reports") / "summary.csv"
        with open(csv_path, "w", newline="", encoding="utf-8") as f:
            w = csv.DictWriter(f, fieldnames=summary_fields)
            w.writeheader()
            for r in reports:
                row = {k: r.get(k) for k in summary_fields}
                # lists -> semicolon string for CSV
                for k in ("missing_required_skills", "matched_required_skills"):
                    v = row.get(k)
                    if isinstance(v, list):
                        row[k] = "; ".join(v)
                w.writerow(row)
        logging.info(f"Wrote {csv_path}")
    return reports

# ========== CLI ==========

# if __name__ == "__main__":
    import argparse

    # parser = argparse.ArgumentParser(description="Run NexResume matching pipeline (refactored)")
    # parser.add_argument('--job', type=str, required=True, help="Path to job_description.yml")
    # parser.add_argument('--resumes', type=str, required=True, help="Path to folder containing resumes (.pdf/.txt)")
    # parser.add_argument('--workers', type=int, default=4, help="Max concurrent resumes")
    # parser.add_argument('--model', type=str, default="gpt-4o-mini", help="OpenAI model name")
    # parser.add_argument('--topk', type=int, default=None, help="Absolute number of resumes to pass to LLM after prefilter")
    # parser.add_argument('--topk-frac', type=float, default=0.4, help="Fraction of resumes to pass if --topk is not set")
    # args = parser.parse_args()

job_description_file = r"C:\Users\Lenovo\resume_matcher\jd.yaml"
resumes_folder = r"C:\Users\Lenovo\resume_matcher\resumes"
topk = 2
Path("reports").mkdir(exist_ok=True)

t0 = time.time()
reports = process_all(job_description_file=job_description_file, resumes_folder=resumes_folder, topk=topk)
logging.info(f"Done processing resumes in {time.time() - t0:.1f}s")


  _embedder = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2")
2025-09-20 12:26:44,331 INFO PyTorch version 2.7.1 available.
2025-09-20 12:26:45,642 INFO Use pytorch device_name: cpu
2025-09-20 12:26:45,643 INFO Load pretrained SentenceTransformer: all-MiniLM-L6-v2
2025-09-20 12:26:50,492 INFO Pre-filter selected 1/1 resumes via embeddings
2025-09-20 12:26:52,810 INFO HTTP Request: POST https://openrouter.ai/api/v1/chat/completions "HTTP/1.1 200 OK"
2025-09-20 12:26:57,672 INFO Wrote reports\summary.csv
2025-09-20 12:26:57,673 INFO Done processing resumes in 19.6s


In [None]:
#!/usr/bin/env python3
"""
NexResume Pipeline (Simplified, No Chunking):
- Robust PDF parsing (pypdf -> pdfminer.six fallback)
- Structured LLM outputs via Pydantic (no more JSON guessing)
- Deterministic scoring (optional, based on JD weights)
- Concurrency + retries + logging
- CSV summary aggregation

Usage:
  python nexresume_refactor.py --job path/to/job.yml --resumes path/to/resumes_folder [--workers 4]

Requires:
  pip install langchain-openai langchain pydantic backoff pypdf pdfminer.six pyyaml numpy sentence-transformers torch --upgrade
  export OPENAI_API_KEY=...
"""

from __future__ import annotations

import os
import re
import json
import yaml
import csv
import logging
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple
from concurrent.futures import ThreadPoolExecutor, as_completed

# LangChain (modern imports)
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field
import backoff
import numpy as np
from langchain_community.embeddings import SentenceTransformerEmbeddings

# ========== Logging ==========
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(message)s"
)

# ========== IO helpers ==========

def load_yaml_job_description(path: str) -> Dict[str, Any]:
    with open(path, 'r', encoding='utf-8') as f:
        jd = yaml.safe_load(f) or {}
    return jd


def _read_pdf_pypdf(file_path: str) -> str:
    try:
        from pypdf import PdfReader
    except Exception:
        return ""
    try:
        text = []
        with open(file_path, "rb") as f:
            reader = PdfReader(f)
            for page in reader.pages:
                try:
                    text.append(page.extract_text() or "")
                except Exception:
                    text.append("")
        return "\n".join(text)
    except Exception as e:
        logging.warning(f"pypdf failed on {file_path}: {e}")
        return ""


def _read_pdf_pdfminer(file_path: str) -> str:
    try:
        from pdfminer.high_level import extract_text
        return extract_text(file_path) or ""
    except Exception as e:
        logging.warning(f"pdfminer failed on {file_path}: {e}")
        return ""


def load_resume_text(file_path: str) -> str:
    file_path = str(file_path)
    if file_path.lower().endswith('.txt'):
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                return f.read()
        except Exception as e:
            logging.warning(f"Could not read TXT {file_path}: {e}")
            return ""
    elif file_path.lower().endswith('.pdf'):
        text = _read_pdf_pypdf(file_path)
        if not text.strip():
            text = _read_pdf_pdfminer(file_path)
        if not text.strip():
            logging.warning(f"Could not extract text from PDF {file_path}")
        return text
    else:
        logging.warning(f"Unsupported resume format: {file_path}")
        return ""


def safe_candidate_name_from_file(file_name: str) -> str:
    stem = Path(file_name).stem
    return re.sub(r"[^A-Za-z0-9_.-]", "_", stem)

# ========== Pydantic schema for structured output ==========

class MatchReport(BaseModel):
    matched_required_skills: List[str] = []
    missing_required_skills: List[str] = []
    matched_optional_skills: List[str] = []
    education_match: str
    experience_match: str
    keywords_matched: List[str] = []
    soft_skills_match: List[str] = []
    resume_summary: str
    match_score: float = Field(ge=0, le=1)
    city_tier_match: bool
    longest_tenure_months: int
    final_score: int = Field(ge=0, le=100)
    detected_city: Optional[str] = None
    detected_city_tier: Optional[int] = None
    max_job_gap_months: Optional[int] = None
    stability_score: Optional[float] = Field(default=None, ge=0, le=1)

# ========== Prompt ==========

PROMPT = ChatPromptTemplate.from_messages([
    (
    "system",
    "You are an expert technical recruiter and data scientist. "
    "Your job is to read a job description (JD) and a resume, then return a STRICT JSON object matching the schema. "
    "Be precise, consistent, and terse. If the information is not present, return a sensible null/empty value rather than guessing. "
    "NEVER add commentary, markdown, or keys not in the schema."
),
(
    "user",
    "<OBJECTIVE>\n"
    "Evaluate the resume against the JD and produce high-quality, schema-valid JSON capturing skills, education, experience fit, city-tier & gaps, longest tenure, and a calibrated final_score.\n"
    "\n"
    "<INPUTS>\n"
    "Job Description (YAML): {job_description}\n"
    "Resume : {resume_text}\n"
    "\n"
     "<RUBRIC FOR final_score (100-point scale)>\n"
    "Weightage:\n"
    "- required skills coverage: 40%\n"
    "- optional skills coverage: 15%\n"
    "- experience fit (years/recency/scope): 15%\n"
    "- education fit: 10%\n"
    "- location fit: 5% (true if city_tier meets JD or is unspecified)\n"
    "- stability: 10% (longest_tenure_months; full credit at 48 months; scale proportionally)\n"
    "- diversity by city tier: 5% bonus (Tier-3 > Tier-2 > Tier-1; score 100 for T3, 60 for T2, 0 for T1)\n"
    "\n"
    "<SCHEMA AND CONSTRAINTS>\n"
    "You must return a single JSON object with the following keys and constraints:\n"
    "- matched_required_skills: string[]\n"
    "- missing_required_skills: string[]\n"
    "- matched_optional_skills: string[]\n"
    "- education_match: string\n"
    "- experience_match: string\n"
    "- keywords_matched: string[]\n"
    "- soft_skills_match: string[]\n"
    "- resume_summary: string\n"
    "- match_score: number in [0,1]\n"
    "- city_tier_match: boolean\n"
    "- longest_tenure_months: integer >= 0\n"
    "- final_score: integer in [0,100]\n"
    "- detected_city: string|null\n"
    "- detected_city_tier: 1|2|3|null\n"
    "- max_job_gap_months: integer|null\n"
    "- stability_score: number in [0,1]|null\n"
    "\n"
    "<ROBUSTNESS & STYLE>\n"
    "- Keep outputs concise; arrays deduplicated and normalized to lowercase where appropriate.\n"
    "- Never include markdown or commentary—only the JSON object.\n"
    "\n"
    "<OUTPUT> Return ONLY the JSON object."
)
])

# ========== Embeddings (pre-filter) ==========

def _cosine(a: np.ndarray, b: np.ndarray) -> float:
    if a is None or b is None:
        return -1.0
    na = np.linalg.norm(a); nb = np.linalg.norm(b)
    if na == 0 or nb == 0:
        return -1.0
    return float(np.dot(a, b) / (na * nb))

_embedder: Optional[SentenceTransformerEmbeddings] = None

def get_embedder() -> SentenceTransformerEmbeddings:
    global _embedder
    if _embedder is None:
        _embedder = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2")
    return _embedder

def embed_text(text: str) -> np.ndarray:
    emb = get_embedder().embed_query(text or "")
    return np.array(emb, dtype=np.float32)

def prefilter_resumes(jd: Dict[str, Any], resume_paths: List[Path], texts: List[str], topk: Optional[int] = None, topk_frac: float = 0.4) -> List[Tuple[Path, float]]:
    jd_text = yaml.dump(jd, sort_keys=False)
    jd_vec = embed_text(jd_text)

    sims: List[Tuple[int, float]] = []
    for i, t in enumerate(texts):
        try:
            v = embed_text(t)
            sims.append((i, _cosine(jd_vec, v)))
        except Exception:
            sims.append((i, -1.0))
    sims.sort(key=lambda x: x[1], reverse=True)

    n = len(resume_paths)
    k = int(topk) if topk is not None else int(np.ceil(max(1, n) * float(topk_frac)))
    k = max(1, min(n, k))

    selected = [(resume_paths[i], score) for i, score in sims[:k]]
    return selected

# ========== LLM client ==========
key = os.getenv("OPENROUTER_API_KEY")
base = os.getenv("OPENROUTER_BASE", "https://openrouter.ai/api/v1")
model_name = "qwen/qwen2.5-vl-72b-instruct:free"

def make_llm(model: str = "gpt-4o-mini", temperature: float = 0.6):
    llm = ChatOpenAI(model=model_name, temperature=temperature, api_key=key, base_url=base)
    return llm

# ========== Retry wrapper ==========

@backoff.on_exception(backoff.expo, Exception, max_time=90)
def call_llm_structured(structured_llm, jd_dict: Dict[str, Any], resume_text: str) -> MatchReport:
    msg = PROMPT.format(job_description=yaml.dump(jd_dict, sort_keys=False), resume_text=resume_text)
    return structured_llm.invoke(msg)

# ========== Per-resume processing ==========

def clean_text_v2(text):
    if not text:
        return ""
    text = re.sub(r"```(?:json)?\n?|```", "", text, flags=re.IGNORECASE).strip()
    json_start = re.search(r'[\{\[]', text)
    if json_start:
        text = text[json_start.start():]
    return text.strip()

def process_one_resume(jd: Dict[str, Any], resume_path: Path, structured_llm) -> Optional[Dict[str, Any]]:
    text = load_resume_text(str(resume_path))
    if not text.strip():
        logging.warning(f"Empty/unsupported resume: {resume_path.name}; skipping.")
        return None

    try:
        r = call_llm_structured(structured_llm, jd, text)
        cleaned_result = clean_text_v2(r.content)
        parsed = json.loads(cleaned_result)
    except Exception as e:
        logging.error(f"LLM error on {resume_path.name}: {e}")
        return None

    candidate = safe_candidate_name_from_file(resume_path.name)
    report = {
        "candidate_name": candidate,
        "job_title": jd.get("Job_Title") or jd.get("job_title"),
        **parsed,
    }
    return report

# ========== Batch processing ==========

def process_all(job_description_file: str, resumes_folder: str, workers: int = 4, model: str = "gpt-4o-mini", topk: Optional[int] = None, topk_frac: float = 0.4) -> None:
    reports_dir = Path("reports"); reports_dir.mkdir(exist_ok=True)

    jd = load_yaml_job_description(job_description_file)
    structured_llm = make_llm(model=model)

    resume_files = [Path(resumes_folder) / fn for fn in os.listdir(resumes_folder)
                    if fn.lower().endswith((".pdf", ".txt"))]

    reports: List[Dict[str, Any]] = []

    resume_texts = [load_resume_text(str(p)) for p in resume_files]

    ranked = prefilter_resumes(jd, resume_files, resume_texts, topk=topk, topk_frac=topk_frac)
    selected_files = [p for p, _ in ranked]
    logging.info(f"Pre-filter selected {len(selected_files)}/{len(resume_files)} resumes via embeddings")

    with ThreadPoolExecutor(max_workers=max(1, int(workers))) as ex:
        futs = {ex.submit(process_one_resume, jd, p, structured_llm): p for p in selected_files}
        for fut in as_completed(futs):
            p = futs[fut]
            try:
                rep = fut.result()
                if rep:
                    reports.append(rep)
                    out_path = Path("reports") / f"{rep['candidate_name']}_report.json"
                    with open(out_path, 'w', encoding='utf-8') as f:
                        json.dump(rep, f, indent=2, ensure_ascii=False)
            except Exception as e:
                logging.error(f"Failed {p.name}: {e}")

    if reports:
        summary_fields = [
            "candidate_name", "job_title", "final_score",
            "match_score", "longest_tenure_months", "city_tier_match",
            "missing_required_skills", "matched_required_skills"
        ]
        csv_path = Path("reports") / "summary.csv"
        with open(csv_path, "w", newline="", encoding="utf-8") as f:
            w = csv.DictWriter(f, fieldnames=summary_fields)
            w.writeheader()
            for r in reports:
                row = {k: r.get(k) for k in summary_fields}
                for k in ("missing_required_skills", "matched_required_skills"):
                    v = row.get(k)
                    if isinstance(v, list):
                        row[k] = "; ".join(v)
                w.writerow(row)
        logging.info(f"Wrote {csv_path}")
