# Hudi + Lance Demo: Intelligent Recruitment Platform
**(Hybrid Search + Analytics on the Lakehouse)**

### Flow:
1. Load real job postings from HuggingFace
2. User "Uploads" a Resume (Vector Search)
3. Apply Business Rules (Hybrid Search: Vector + SQL Filters)
4. Show Executive Dashboard (Analytics on the same data)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from datasets import load_dataset
import shutil
import os
import pandas as pd
import matplotlib.pyplot as plt
from sentence_transformers import SentenceTransformer

## Configuration

In [None]:
CONFIG = {
    "table_path": "/tmp/hudi_recruiting_lake",
    "table_name": "job_market",
    "embedding_model": "all-MiniLM-L6-v2",
    "clean_start": True
}

## 1. Spark Setup

In [None]:
def create_spark():
    return (SparkSession.builder.appName("Recruiting-Lakehouse")
            .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
            .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
            .config("spark.ui.showConsoleProgress", "false")
            .getOrCreate())

## 2. Load Dataset from HuggingFace

In [None]:
def load_job_data():
    """Load real data science job descriptions from HuggingFace."""
    print("Loading job descriptions from HuggingFace...")
    ds = load_dataset("nathansutton/data-science-job-descriptions", split="train")

    data = []
    for i, row in enumerate(ds):
        data.append({
            "job_id": f"job_{i:04d}",
            "company": row["company"],
            "title": row["title"],
            "job_description": row["job_description"],
            "text_for_vector": f"{row['title']} {row['job_description']}"
        })

    companies = set(r["company"] for r in data)
    print(f"\u2713 Loaded {len(data)} job postings from {len(companies)} companies.")
    return data

## 3. Ingestion (The "Lakehouse" Foundation)

In [None]:
def ingest_data(spark, data):
    # 1. Embed Descriptions
    model = SentenceTransformer(CONFIG["embedding_model"])
    embeddings = model.encode([r["text_for_vector"] for r in data], show_progress_bar=True)

    for i, row in enumerate(data):
        row["embedding"] = embeddings[i].tolist()

    # 2. Define Schema
    schema = StructType([
        StructField("job_id", StringType(), False),
        StructField("company", StringType(), False),
        StructField("title", StringType(), False),
        StructField("job_description", StringType(), False),
        StructField("text_for_vector", StringType(), False),
        StructField("embedding", ArrayType(FloatType()), False),
    ])

    # 3. Write to Hudi (Lance Format)
    if CONFIG["clean_start"] and os.path.exists(CONFIG["table_path"]):
        shutil.rmtree(CONFIG["table_path"])

    df = spark.createDataFrame(data, schema=schema)

    hudi_options = {
        "hoodie.table.name": CONFIG["table_name"],
        "hoodie.datasource.write.recordkey.field": "job_id",
        "hoodie.datasource.write.partitionpath.field": "company",
        "hoodie.datasource.write.table.type": "COPY_ON_WRITE",
        "hoodie.datasource.write.operation": "upsert",
        "hoodie.table.base.file.format": "lance",
        "hoodie.write.record.merge.custom.implementation.classes": "org.apache.hudi.DefaultSparkRecordMerger"
    }

    df.write.format("hudi").options(**hudi_options).mode("overwrite").save(CONFIG["table_path"])
    print(f"\u2713 Ingested {len(data)} jobs into the Lakehouse.")
    return model

## 4. The Demo: Resume Matching

In [None]:
def demo_resume_matching(spark, model):
    print("\n" + "="*50)
    print("DEMO PART 1: The 'Smart' Candidate Match")
    print("="*50)

    # Simulate a Resume Upload
    resume_text = """
    EXPERIENCE:
    - 5 years building Machine Learning models using Python and Scikit-Learn.
    - Deployed Large Language Models (LLMs) to production.
    - Strong background in backend engineering and API design.
    """
    print(f"\ud83d\udcc4 User Resume Uploaded: \n{resume_text.strip()}\n")

    # Vectorize Resume
    resume_vector = model.encode([resume_text])[0].tolist()

    # Register Query Vector
    spark.createDataFrame([(resume_vector,)], ["q_vec"]).createOrReplaceTempView("query_input")

    # --- SCENARIO A: Pure Vector Search ---
    print("\ud83d\udd0e Executing Vector Search (Semantic Match)...")
    matches = spark.sql(f"""
        SELECT title, company, (1 - _distance) as score
        FROM hudi_vector_search(
            '{CONFIG['table_path']}', 'embedding', (SELECT q_vec FROM query_input), 5, 'cosine'
        )
    """).collect()

    print("\nTop Matches for your Resume:")
    for row in matches:
        print(f"  \u2022 {row.title} at {row.company} \u2014 Score: {row.score:.2f}")

    # --- SCENARIO B: Hybrid Search (The Business Requirement) ---
    print("\n\u26a0\ufe0f  User Feedback: 'I specifically want to work at Reddit.'")
    print("\ud83d\udd0e Executing Hybrid Search (Vector + SQL Filters)...")

    hybrid_query = f"""
        SELECT * FROM (
            SELECT title, company, (1 - _distance) as score
            FROM hudi_vector_search(
                '{CONFIG['table_path']}', 'embedding', (SELECT q_vec FROM query_input), 50, 'cosine'
            )
        )
        WHERE company = 'Reddit'
        ORDER BY score DESC
        LIMIT 5
    """
    hybrid_matches = spark.sql(hybrid_query).collect()

    print("\nTop HYBRID Matches (Reddit only):")
    if not hybrid_matches:
        print("  (No matches found with these constraints)")
    for row in hybrid_matches:
        print(f"  \u2022 {row.title} at {row.company} \u2014 Score: {row.score:.2f}")

## 5. The Demo: Analytics Dashboard

In [None]:
def demo_analytics_dashboard(spark):
    print("\n" + "="*50)
    print("DEMO PART 2: The Executive Dashboard")
    print("Value: The SAME data matches resumes AND powers BI.")
    print("="*50)

    spark.read.format("hudi").load(CONFIG["table_path"]).createOrReplaceTempView("jobs_table")

    # 1. Hiring Activity by Company
    print("Generating 'Hiring Activity' Chart...")
    company_df = spark.sql("""
        SELECT company, count(*) as job_count
        FROM jobs_table
        GROUP BY company
        ORDER BY job_count DESC
        LIMIT 15
    """).toPandas()

    # 2. Most Common Job Titles
    print("Generating 'Top Roles' Chart...")
    title_df = spark.sql("""
        SELECT title, count(*) as title_count
        FROM jobs_table
        GROUP BY title
        ORDER BY title_count DESC
        LIMIT 15
    """).toPandas()

    # PLOTTING
    fig, axes = plt.subplots(1, 2, figsize=(15, 5))

    # Plot 1: Hiring Activity
    axes[0].barh(company_df["company"], company_df["job_count"], color="green")
    axes[0].set_title("Hiring Activity: Postings by Company")
    axes[0].set_xlabel("Number of Postings")
    axes[0].invert_yaxis()

    # Plot 2: Top Roles
    axes[1].barh(title_df["title"], title_df["title_count"], color="skyblue")
    axes[1].set_title("Most Common Data Science Roles")
    axes[1].set_xlabel("Number of Postings")
    axes[1].invert_yaxis()

    plt.tight_layout()
    plt.show()

    print("\u2713 Dashboard generated from Hudi table.")
    print("  (In a real app, this would be a live Streamlit/Tableau view)")

## Run the Demo

In [None]:
spark = create_spark()

In [None]:
jobs_data = load_job_data()
model = ingest_data(spark, jobs_data)

In [None]:
demo_resume_matching(spark, model)

In [None]:
demo_analytics_dashboard(spark)

In [None]:
spark.stop()