In [None]:
"""
ResumeFlow â€” Databricks HR Resume Extraction Pipeline (Documentation Version)

This script extracts structured candidate data from PDF resumes stored in a GitHub
repository using a Databricks LLM endpoint, and writes them into a Delta table
for analytics and dashboarding.

Instructions:
1. Fill in the placeholders below (Databricks endpoint, GitHub repo info, etc.)
2. Run inside a Databricks notebook or job.
3. Verify that the Delta table 'hr.resume.candidates_extracted_new' is created.
"""

# -------------------------------------------------------------
# Install dependencies (only needed once per cluster)
# -------------------------------------------------------------
# %pip install pdfminer.six pydantic openai requests pandas

# -------------------------------------------------------------
# Imports
# -------------------------------------------------------------
import json
import requests
import io
import os
import sys
import time
import pandas as pd
from datetime import datetime
from typing import Any, Dict, List
from pydantic import BaseModel, Field

try:
    from pyspark.sql import SparkSession, functions as F
    from pyspark.sql.types import DoubleType
except ImportError:
    print("Running in local mode; Spark not available.")

from pdfminer.high_level import extract_text_to_fp

# -------------------------------------------------------------
# Databricks LLM Configuration
# -------------------------------------------------------------
GPT_MODEL_NAME = "<your-databricks-llm-endpoint-model-name>"
GPT_BASE_URL = "<your-databricks-workspace-url>/serving-endpoints"

try:
    DATABRICKS_TOKEN = (
        dbutils.notebook.entry_point.getDbutils()
        .notebook()
        .getContext()
        .apiToken()
        .get()
    )
    print("Using Databricks notebook context token.")
except Exception:
    DATABRICKS_TOKEN = os.environ.get("DATABRICKS_TOKEN", None)

if not DATABRICKS_TOKEN:
    raise ValueError(
        "No Databricks token found. Set DATABRICKS_TOKEN or run inside Databricks."
    )

from openai import OpenAI

client = OpenAI(api_key=DATABRICKS_TOKEN, base_url=GPT_BASE_URL, timeout=60.0)

# -------------------------------------------------------------
# GitHub Source Configuration
# -------------------------------------------------------------
GITHUB_USER = "<your-github-username>"
GITHUB_REPO = "<your-github-repo>"
GITHUB_BRANCH = "<branch-name>"
GITHUB_FOLDER = "<folder-containing-pdfs>"

GITHUB_BASE_URL = (
    f"https://raw.githubusercontent.com/{GITHUB_USER}/{GITHUB_REPO}/"
    f"{GITHUB_BRANCH}/{GITHUB_FOLDER}/"
)

# -------------------------------------------------------------
# Data Schema
# -------------------------------------------------------------
class CandidateData(BaseModel):
    fullName: str = Field(description="Name of the applicant.")
    email: str = Field(description="Email address.")
    phoneNumber: str = Field(description="Phone number.")
    educationDegree: str = Field(description="Highest educational degree.")
    currentCompany: str = Field(description="Current employer or most recent.")
    experienceYears: float = Field(description="Years of experience (float).")
    skillsText: str = Field(description="Comma-separated skills.")
    languagesText: str = Field(description="Comma-separated languages.")
    category: str = Field(description="Job category or field.")


TARGET_KEYS = list(CandidateData.model_fields.keys())

# -------------------------------------------------------------
# GitHub File Discovery
# -------------------------------------------------------------
def get_github_folder_contents(user: str, repo: str, branch: str, folder: str) -> List[str]:
    """Return a list of PDF file names from a GitHub repository folder."""
    api_url = f"https://api.github.com/repos/{user}/{repo}/contents/{folder}?ref={branch}"
    print(f"Fetching GitHub folder contents from: {api_url}")
    try:
        response = requests.get(api_url, timeout=10)
        response.raise_for_status()
        contents = response.json()
        return [
            item["name"] for item in contents if item.get("name", "").lower().endswith(".pdf")
        ]
    except Exception as e:
        print(f"Error fetching GitHub contents: {e}")
        return []


# -------------------------------------------------------------
# PDF Text Extraction
# -------------------------------------------------------------
def pdf_to_text(file_url: str) -> str:
    """Download a PDF file and extract its text."""
    try:
        r = requests.get(file_url, timeout=30)
        r.raise_for_status()
        pdf_file = io.BytesIO(r.content)
        output_string = io.StringIO()
        extract_text_to_fp(pdf_file, output_string)
        return output_string.getvalue().strip()
    except Exception as e:
        print(f"PDF extraction error: {e}")
        return ""


# -------------------------------------------------------------
# LLM Extraction Function
# -------------------------------------------------------------
def analyze_resume_with_llm(filename: str, file_url: str) -> Dict[str, Any]:
    """Use the Databricks LLM to extract structured candidate data from a resume."""
    pdf_text = pdf_to_text(file_url)
    if not pdf_text:
        return {}

    if len(pdf_text) > 16000:
        pdf_text = pdf_text[:16000]

    system_prompt = (
        "You are an expert HR data extractor. Extract structured data from the provided resume text. "
        "Output each field in the format key:value, one per line. "
        "If a field is missing, leave it blank. The current year is 2025."
    )

    fields_list = "\n".join(TARGET_KEYS)
    user_prompt = f"Extract the following fields:\n{fields_list}\n\nResume Text:\n{pdf_text}"

    try:
        completion = client.chat.completions.create(
            model=GPT_MODEL_NAME,
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": user_prompt},
            ],
            temperature=0.0,
        )
        raw_output = completion.choices[0].message.content
        lines = [line for line in raw_output.splitlines() if ":" in line]
        data = {}
        for line in lines:
            key, val = line.split(":", 1)
            key, val = key.strip(), val.strip()
            if key in TARGET_KEYS:
                data[key] = val
        for k in TARGET_KEYS:
            data.setdefault(k, "")
        if data.get("experienceYears"):
            try:
                data["experienceYears"] = float(data["experienceYears"])
            except ValueError:
                data["experienceYears"] = 0.0
        return CandidateData(**data).model_dump()
    except Exception as e:
        print(f"Error during LLM extraction: {e}")
        return {}


# -------------------------------------------------------------
# Batch Processing Logic
# -------------------------------------------------------------
def main():
    """Run the resume extraction pipeline."""
    files = get_github_folder_contents(GITHUB_USER, GITHUB_REPO, GITHUB_BRANCH, GITHUB_FOLDER)
    if not files:
        print("No PDF files found.")
        return

    all_records = []
    for f in files:
        url = f"{GITHUB_BASE_URL}{f}"
        print(f"Processing: {f}")
        record = analyze_resume_with_llm(f, url)
        if record:
            record["sourceFile"] = f
            record["loadTs"] = datetime.utcnow().isoformat()
            all_records.append(record)

    if not all_records:
        print("No records extracted.")
        return

    df = pd.DataFrame(all_records)
    print(f"Extracted {len(df)} records.")

    try:
        spark_df = spark.createDataFrame(df)
        spark_df = spark_df.withColumn("experienceYears", F.col("experienceYears").cast(DoubleType()))
        spark_df.write.mode("overwrite").format("delta").saveAsTable("hr.resume.candidates_extracted_new")
        print("Data saved to Delta table: hr.resume.candidates_extracted_new")
        display(spark_df)
    except Exception as e:
        print(f"Warning: Could not save to Delta. ({e})")
        display(df)


# -------------------------------------------------------------
# Run
# -------------------------------------------------------------
if __name__ == "__main__":
    main()
