In [2]:
import re, json, argparse, yaml
from pathlib import Path
from typing import Dict, List, Set, Optional

# ---------- tiny normalizers ----------
# Normalize text: remove extra spaces, lowercase, strip edges
_norm = lambda s: re.sub(r"\s+"," ",(s or "").strip().lower())

# Normalize email: strip whitespace and lowercase
_norm_email = lambda s: (s or "").strip().lower()

# Normalize skills list: clean each skill, remove empties, return as set
_norm_skills = lambda xs: { _norm(x) for x in (xs or []) if _norm(x) }

# ---------- load config ONCE ----------
def load_cfg(config_path: str) -> Dict:
    """Load and validate YAML configuration file."""
    # Get config file path and its parent directory for relative path resolution
    cfgp = Path(config_path); base = cfgp.parent
    
    # Load YAML configuration safely
    cfg = yaml.safe_load(cfgp.read_text(encoding="utf-8"))
    
    # Validate required configuration fields exist
    for k in ["input_dir", "output_dir", "ground_truth_path"]:
        assert k in cfg, f"Missing '{k}' in config.yaml"
    
    # Resolve all paths relative to the config file location
    paths = {
        "input_dir":       (base / cfg["input_dir"]).resolve(),        # Directory with resume files
        "output_dir":      (base / cfg["output_dir"]).resolve(),       # Output directory for results
        "ground_truth":    (base / cfg["ground_truth_path"]).resolve(), # Ground truth JSON file
    }
    
    # Extract optional settings with defaults
    options = {
        "print_each": bool(cfg.get("print_each_resume", False)),  # Print status for each file
        "exts": set(cfg.get("exts", [".pdf", ".docx"])),         # Supported file extensions
    }
    
    return {"paths": paths, "options": options}

In [3]:
# ---------- Class 1: simple extractor (name/email/skills) ----------
class ResumeExtractor:
    """Rule-based resume parser for extracting name, email, and skills from PDF/DOCX files."""
    
    def __init__(self, cfg: Dict):
        """Initialize with predefined skill lexicon covering major tech areas."""
        # Comprehensive skill lexicon: programming languages, frameworks, databases, 
        # cloud platforms, data science tools, ML libraries, DevOps tools
        self.SKILL_LEXICON = sorted({
            "python","java","c#",".net","javascript","typescript","nodejs","react","angular",
            "sql","mysql","postgres","postgresql","mssql","mongodb","docker","kubernetes",
            "aws","azure","gcp","pandas","numpy","scikit-learn","pyspark","spark","hadoop",
            "airflow","mlflow","tensorflow","pytorch","nlp","machine learning","deep learning",
            "lstm","xgboost","lightgbm","catboost","git","jenkins","linux","bash","fastapi",
            "flask","django","opencv","transformers","express","node.js","react.js","rest",
            "rest api","rest apis","microservices","postgres","postgresql"
        })

    def _read_text(self, file_path: str) -> str:
        """Extract plain text from .docx or .pdf files."""
        p = Path(file_path); suf = p.suffix.lower()
        
        # Handle Word documents
        if suf == ".docx":
            try:
                from docx import Document  # Lazy import to avoid dependency issues
                return "\n".join(para.text for para in Document(str(p)).paragraphs)
            except Exception:
                return ""  # Silently handle missing library or corrupted files
        
        # Handle PDF documents
        if suf == ".pdf":
            try:
                import pdfplumber  # Lazy import to avoid dependency issues
                text = []
                with pdfplumber.open(str(p)) as pdf:
                    for page in pdf.pages:
                        text.append(page.extract_text() or "")  # Handle None returns
                return "\n".join(text)
            except Exception:
                return ""  # Silently handle missing library or corrupted files
        
        return ""  # Unsupported file type

    def _extract_email(self, text: str) -> str:
        """Find first email address using regex pattern matching."""
        # Standard email regex: username@domain.tld format
        m = re.search(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}", text or "")
        return m.group(0) if m else ""

    def _extract_name(self, text: str) -> str:
        """Extract person's name using heuristic pattern matching."""
        for line in (text or "").splitlines():
            line = line.strip()
            # Look for lines that look like names: reasonable length, 2-5 words, valid characters
            if 3 <= len(line) <= 80 and 2 <= len(line.split()) <= 5:
                # Allow letters, spaces, hyphens, apostrophes, periods (including international chars)
                if re.fullmatch(r"[A-Za-zÀ-ÖØ-öø-ÿ.'\- ]+", line):
                    return line  # Return first match (assume name appears early)
        return ""

    def _extract_skills(self, text: str) -> List[str]:
        """Extract skills using lexicon-based matching with word boundaries."""
        # Add spaces for boundary matching to avoid partial matches
        t = " " + _norm(text) + " "
        found = []
        
        # Search for each skill in the lexicon
        for s in self.SKILL_LEXICON:
            s_norm = " " + _norm(s) + " "  # Add boundaries to skill
            if s_norm in t:
                found.append(_norm(s))
        
        # Standardize common skill variants
        canon = []
        for sk in found:
            sk = sk.replace("node.js", "nodejs").replace("react.js", "react")
            sk = sk.replace("postgresql", "postgres")
            canon.append(sk)
        
        return sorted({*canon})  # Return deduplicated, sorted list

    def parse_one(self, file_path: str) -> Dict:
        """Parse a single resume file and return extracted information."""
        txt = self._read_text(file_path)
        return {
            "name": self._extract_name(txt),
            "email": self._extract_email(txt),
            "skills": self._extract_skills(txt),
        }

In [4]:
# ---------- Class 2: evaluator + writer (adds `missing_block`, clean metrics) ----------
class ResumeEvaluator:
    """Evaluates extraction results against ground truth and writes output files."""
    
    def __init__(self, cfg: Dict):
        """Initialize evaluator with config paths and load ground truth data."""
        self.paths = cfg["paths"]; self.options = cfg["options"]
        
        # Create output directory if it doesn't exist
        self.paths["output_dir"].mkdir(parents=True, exist_ok=True)
        
        # Load ground truth JSON file (filename -> expected data mapping)
        self.gt: Dict[str, Dict] = json.loads(self.paths["ground_truth"].read_text(encoding="utf-8"))
        
        # Initialize evaluation metrics counters
        self.n_name=self.n_email=self.ok_name=self.ok_email=0  # Name/email accuracy counters
        self.tp=self.fp=self.fn=0  # True positive, false positive, false negative for skills

    def _missing_block(self, extracted: Dict, gt: Dict) -> Dict:
        """Compare extracted data with ground truth and identify missing fields."""
        miss = {}
        
        # Check if name matches (normalized comparison)
        if _norm(extracted.get("name")) != _norm(gt.get("name")):
            miss["name"] = gt.get("name")
        
        # Check if email matches (normalized comparison)
        if _norm_email(extracted.get("email")) != _norm_email(gt.get("email")):
            miss["email"] = gt.get("email")
        
        # Compare skills sets and find missing ones
        exs, gts = _norm_skills(extracted.get("skills")), _norm_skills(gt.get("skills"))
        lack = sorted(gts - exs)  # Skills in ground truth but not extracted
        if lack: 
            miss["skills"] = lack
        
        return miss, exs, gts

    def _update_metrics(self, exs:Set[str], gts:Set[str], name_ok:bool, email_ok:bool):
        """Update evaluation metrics based on comparison results."""
        # Update name/email counters
        self.n_name+=1; self.n_email+=1
        if name_ok: self.ok_name+=1
        if email_ok: self.ok_email+=1
        
        # Update skills metrics using micro-averaging approach
        inter = len(exs & gts)  # Intersection: correctly identified skills
        self.tp += inter  # True positives
        self.fp += max(0, len(exs)-inter)  # False positives: extracted but not in ground truth
        self.fn += max(0, len(gts)-inter)  # False negatives: in ground truth but not extracted

    def process_one(self, file_path: str, extracted: Dict) -> bool:
        """Process one resume: compare with ground truth, update metrics, write output."""
        fname = Path(file_path).name
        gt = self.gt.get(fname)
        
        # Skip files not in ground truth (can't evaluate without expected results)
        if gt is None:
            return False
        
        # Compare extracted data with ground truth
        miss, exs, gts = self._missing_block(extracted, gt)
        
        # Update evaluation metrics
        self._update_metrics(
            exs, gts,
            _norm(extracted.get("name")) == _norm(gt.get("name")),
            _norm_email(extracted.get("email")) == _norm_email(gt.get("email"))
        )
        
        # Prepare output object with extracted data and diagnostic info
        out_obj = {
            "name": extracted.get("name",""),
            "email": extracted.get("email",""),
            "skills": extracted.get("skills",[]),
            "missing_block": miss,  # Shows what was missed (for debugging)
        }
        
        # Print individual results if requested
        if self.options["print_each"]:
            print(json.dumps({fname: out_obj}, ensure_ascii=False, indent=2))
        
        # Write individual result file (filename.json in output directory)
        output_file = self.paths["output_dir"] / (Path(fname).stem + ".json")
        output_file.write_text(
            json.dumps(out_obj, ensure_ascii=False, indent=2), encoding="utf-8")
        
        return True

    def finalize(self) -> Dict:
        """Calculate and return final evaluation metrics."""
        # Calculate precision, recall, F1 for skills (micro-averaged)
        prec = self.tp/(self.tp+self.fp) if (self.tp+self.fp) else 0.0
        rec  = self.tp/(self.tp+self.fn) if (self.tp+self.fn) else 0.0
        f1   = 2*prec*rec/(prec+rec) if (prec+rec) else 0.0
        
        return {
            "name_accuracy": round(self.ok_name/self.n_name,4) if self.n_name else 0.0,
            "email_accuracy": round(self.ok_email/self.n_email,4) if self.n_email else 0.0,
            "skills_precision_micro": round(prec,4),  # Precision: TP/(TP+FP)
            "skills_recall_micro": round(rec,4),      # Recall: TP/(TP+FN)
            "skills_f1_micro": round(f1,4),           # F1: harmonic mean of precision/recall
            "num_evaluated": self.n_name,             # Total files processed
        }

In [5]:
# ---------- Pipeline: single place for file enumeration + run ----------
class ResumePipeline:
    """Main pipeline that orchestrates resume processing from input to evaluation."""
    
    def __init__(self, config_path: str):
        """Initialize pipeline with config and create extractor/evaluator instances."""
        self.cfg = load_cfg(config_path)  # Load configuration once
        self.extractor = ResumeExtractor(self.cfg)  # Create resume parser
        self.evaluator = ResumeEvaluator(self.cfg)  # Create evaluator/writer

    def _iter_files(self):
        """Get list of resume files to process based on config settings."""
        d = self.cfg["paths"]["input_dir"]  # Input directory from config
        exts = self.cfg["options"]["exts"]  # Allowed file extensions
        # Return sorted list of files matching supported extensions
        return [p for p in sorted(Path(d).glob("*")) if p.suffix.lower() in exts]

    def run(self):
        """Execute the complete pipeline: extract -> evaluate -> report results."""
        # Process each resume file
        for p in self._iter_files():
            extracted = self.extractor.parse_one(str(p))  # Extract data from resume
            self.evaluator.process_one(str(p), extracted)  # Evaluate and write results
        
        # Print final evaluation metrics as JSON
        print(json.dumps(self.evaluator.finalize(), indent=2))

In [6]:
# ---------- Notebook-safe main ----------
def main(argv=None):
    """Main entry point with command-line argument parsing."""
    # Set up argument parser with config file option
    ap = argparse.ArgumentParser()
    ap.add_argument("--config", default="config.yaml", help="Path to YAML config file")
    
    # Parse arguments (argv=None uses sys.argv by default)
    args, _ = ap.parse_known_args(argv)
    
    # Create and run the pipeline with specified config
    ResumePipeline(args.config).run()

def run_from_notebook(config_path="config.yaml"):
    """Convenience function for running from Jupyter notebooks."""
    # Call main() with config argument to avoid sys.argv issues in notebooks
    main(["--config", config_path])

# Standard Python script entry point
if __name__ == "__main__":
    main()

{
  "resume_alice.pdf": {
    "name": "Alice Johnson",
    "email": "alice.johnson@example.com",
    "skills": [
      "aws",
      "docker",
      "git",
      "machine learning",
      "pandas",
      "python",
      "scikit-learn",
      "sql"
    ],
    "missing_block": {
      "skills": [
        "numpy"
      ]
    }
  }
}
{
  "resume_brian.pdf": {
    "name": "Brian Lee",
    "email": "brian.lee@example.com",
    "skills": [
      "aws",
      "docker",
      "git",
      "java",
      "kubernetes",
      "microservices",
      "rest",
      "rest apis"
    ],
    "missing_block": {
      "skills": [
        "linux",
        "postgresql",
        "spring boot"
      ]
    }
  }
}
{
  "resume_carmen.pdf": {
    "name": "Carmen Diaz",
    "email": "carmen.diaz@example.com",
    "skills": [
      "azure",
      "git",
      "react"
    ],
    "missing_block": {
      "skills": [
        "express",
        "javascript",
        "mongodb",
        "nodejs",
        "typescript"
     