In [None]:
"""
NYC Job Data Engineering Challenge
---------------------------------

This file contains an end-to-end Spark-based data engineering pipeline
built for analyzing NYC job postings data.

The focus of this implementation is:
- Clean and readable transformation logic
- Realistic feature engineering
- Business-focused analytics (KPIs)
- Strong unit test coverage using pytest + ipytest

The code is intentionally written in a clear, explicit style so that
a reviewer can easily follow the reasoning and verify correctness.
"""

# ============================================================
# Imports
# ============================================================

import os
# Remove pytest and ipytest imports if not available
# import pytest
# import ipytest
import matplotlib.pyplot as plt

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import (
    col, lower, when, to_date, add_months, max as spark_max, split, explode, trim, avg, round, regexp_replace, length
)
from pyspark import StorageLevel

In [None]:

# ============================================================
# Initialize ipytest for Jupyter environments
# ============================================================
ipytest.autoconfig()


# ============================================================
# 1. Spark Session Management (Spark 2.4.5 SAFE)
# ============================================================
def get_spark():
    """
    Creates a SparkSession connected to a standalone Spark cluster.
    """
    return (
        SparkSession.builder
        .appName("NYC_Job_DE_Challenge")
        .master("spark://master:7077")
        .getOrCreate()
    )



In [None]:

# ============================================================
# 2. Transformation Functions
# ============================================================
def normalize_columns(df: DataFrame) -> DataFrame:
    new_cols = [
        c.lower().replace(" ", "_").replace("/", "_").replace("-", "_")
        for c in df.columns
    ]
    return df.toDF(*new_cols)


def clean_data(df: DataFrame) -> DataFrame:
    return (
        df.dropDuplicates()
        .withColumn("posting_date", to_date("posting_date"))
        .withColumn("post_until", to_date("post_until"))
        .filter(col("posting_date").isNotNull())
        .filter(col("salary_range_from").isNotNull())
        .filter(col("salary_range_to").isNotNull())
        .filter(col("salary_range_to") >= col("salary_range_from"))
    )


def apply_feature_engineering(df: DataFrame) -> DataFrame:
    return (
        df.withColumn(
            "avg_salary",
            (col("salary_range_from") + col("salary_range_to")) / 2
        )
        .withColumn(
            "education_level",
            when(lower(col("minimum_qual_requirements")).contains("phd"), "PhD")
            .when(lower(col("minimum_qual_requirements")).contains("doctorate"), "Doctorate")
            .when(lower(col("minimum_qual_requirements")).contains("master"), "Masters")
            .when(lower(col("minimum_qual_requirements")).contains("graduate"), "Graduate")
            .when(lower(col("minimum_qual_requirements")).contains("bachelor"), "Bachelors")
            .when(lower(col("minimum_qual_requirements")).contains("associate"), "Associates")
            .when(lower(col("minimum_qual_requirements")).contains("vocational"), "Vocational")
            .when(lower(col("minimum_qual_requirements")).contains("high school"), "HighSchool")
            .otherwise("Other")
        )
        .withColumn(
            "edu_rank",
            when(col("education_level") == "PhD", 6)
            .when(col("education_level") == "Doctorate", 6)
            .when(col("education_level") == "Masters", 5)
            .when(col("education_level") == "Graduate", 5)
            .when(col("education_level") == "Bachelors", 4)
            .when(col("education_level") == "Associates", 2)
            .when(col("education_level") == "Vocational", 1)
            .when(col("education_level") == "HighSchool", 1)
            .otherwise(0)
        )
    )


In [None]:

# ============================================================
# 3. KPI Analytics
# ============================================================
def compute_kpis(df: DataFrame):
    df.createOrReplaceTempView("jobs")
    spark = df.sql_ctx.sparkSession

    max_date_val = df.select(spark_max("posting_date")).collect()[0][0]

    stats = spark.sql("""
        SELECT job_category,
               COUNT(*) AS job_count,
               ROUND(AVG(avg_salary), 2) AS avg_sal
        FROM jobs
        GROUP BY job_category
        ORDER BY job_count DESC
        LIMIT 10
    """)
    salary_distribution = spark.sql("""
        SELECT job_category,
               MIN(avg_salary) AS min_salary,
               MAX(avg_salary) AS max_salary,
               ROUND(AVG(avg_salary), 2) AS avg_salary,
               percentile_approx(avg_salary, 0.5) AS median_salary
        FROM jobs
        GROUP BY job_category
        ORDER BY avg_salary DESC
    """)
    correlation = spark.sql("""
        SELECT corr(edu_rank, avg_salary) AS edu_salary_corr
        FROM jobs
        WHERE edu_rank > 0
    """)

    highest_sal_agency = spark.sql("""
        SELECT agency, business_title, avg_salary
        FROM (
            SELECT agency,
                   business_title,
                   avg_salary,
                   ROW_NUMBER() OVER (
                       PARTITION BY agency
                       ORDER BY avg_salary DESC
                   ) AS rnk
            FROM jobs
        )
        WHERE rnk = 1
        ORDER BY avg_salary DESC
    """)

    avg_sal_2yr = spark.sql(f"""
        SELECT agency,
               ROUND(AVG(avg_salary), 2) AS rolling_avg
        FROM jobs
        WHERE posting_date >= add_months(
            CAST('{max_date_val}' AS DATE), -24
        )
        GROUP BY agency
        ORDER BY rolling_avg DESC
    """)

    # Robust skill extraction for NYC jobs
    stopwords = [
        "and", "or", "with", "experience", "years", "year", "skills",
        "knowledge", "ability", "including", "etc", "preferred", "required",
        "strong", "excellent", "good", "demonstrated", "responsibilities"
    ]
    nyc_regex = (
        r"(\bmanhattan\b|\bnew york city\b|\bnyc\b|\bnew york,? ny\b|\bnew york\b|"
        r"\bbronx\b|\bbrooklyn\b|\bqueens\b|\bstaten island\b|\bny,? ny\b|"
        r"\bny ny\b|\bnyc\b|\bnew york county\b|\bqueens county\b|\bbronx county\b|"
        r"\bbrooklyn,? ny\b|\bmanhattan,? ny\b|\bstaten island,? ny\b)"
    )
    from pyspark.sql.functions import regexp_replace, length
    top_skills = (
        df.filter(col("work_location").isNotNull())
          .filter(lower(col("work_location")).rlike(nyc_regex))
          .withColumn("skill", explode(split(lower(col("preferred_skills")), "[,;/|]+")))
          .withColumn("skill", trim(col("skill")))
          .withColumn("skill", regexp_replace(col("skill"), "[\u2018\u2019\u201c\u201d\u2013\u2014,\.\(\)\[\]:]", ""))
          .filter(col("skill").isNotNull())
          .filter(col("skill") != "")
          .filter(length(col("skill")) > 1)
          .filter(col("skill").rlike("^[a-zA-Z].*$"))
          .filter(~col("skill").isin(*stopwords))
          .groupBy("skill")
          .agg(round(avg(col("avg_salary").cast("double")), 2).alias("skill_val"))
          .orderBy(col("skill_val").desc())
          .limit(10)
    )

    return stats,salary_distribution, correlation, highest_sal_agency, avg_sal_2yr, top_skills



In [None]:

# ============================================================
# 5. Pipeline Execution
# ============================================================
spark = None
try:
    print(">>> Phase 1: Running Unit Tests...")
    ipytest.run()

    spark = get_spark()
    print(f">>> Phase 2: Connected to Master. UI: {spark.sparkContext.uiWebUrl}")

    path = "/dataset/nyc-jobs.csv"
    if not os.path.exists(path):
        raise FileNotFoundError(f"{path} not found. Check volume mapping.")

    raw_df = (
        spark.read
        .option("header", True)
        .option("inferSchema", True)
        .csv(path)
    )

    df = apply_feature_engineering(
        clean_data(
            normalize_columns(raw_df)
        )
    )

    df.persist(StorageLevel.MEMORY_AND_DISK)
    df.show()

    stats, salary_distribution, corr, high_agency, avg_2yr, skills_us = compute_kpis(df)

    stats.show(5)
    corr.show()
    high_agency.show(5)
    avg_2yr.show(5)
    skills_us.show(5)

    pdf = stats.toPandas()
    pdf.plot(kind="bar", x="job_category", y="job_count", title="NYC Job Categories")
    plt.show()

    df.unpersist()
    print(">>> Pipeline Finished Successfully")

finally:
    if spark:
        print(">>> Stopping Spark session...")
        spark.stop()


In [None]:
# ============================================================
# 4. Unit Tests
# ============================================================
@pytest.fixture(scope="session")
def spark_session():
    return get_spark()


@pytest.fixture
def base_df(spark_session):
    data = [
        ("IT", "Agency1", "Engineer", "2023-01-01", "2023-12-31", 60000, 80000,
         "Bachelor degree required", "python, spark"),
        ("IT", "Agency1", "Senior Engineer", "2024-01-01", "2024-12-31", 90000, 120000,
         "Master degree preferred", "spark, kafka"),
        ("HR", "Agency2", "HR Manager", "2023-06-01", "2023-12-31", 50000, 70000,
         "Bachelor degree", "communication, leadership")
    ]

    cols = [
        "job_category", "agency", "business_title",
        "posting_date", "post_until",
        "salary_range_from", "salary_range_to",
        "minimum_qual_requirements", "preferred_skills"
    ]

    return spark_session.createDataFrame(data, cols)


def test_normalize_columns():
    spark = get_spark()
    df = spark.createDataFrame([(1,)], ["Job ID"])
    result = normalize_columns(df)
    assert "job_id" in result.columns


def test_clean_data_filters_invalid_salary(base_df):
    df = base_df.union(
        base_df.limit(1).withColumn("salary_range_to", col("salary_range_from") - 1)
    )
    cleaned = clean_data(df)
    assert cleaned.count() == 3


def test_clean_data_removes_null_posting_date(base_df):
    df = base_df.withColumn(
        "posting_date",
        when(col("job_category") == "IT", None).otherwise(col("posting_date"))
    )
    cleaned = clean_data(df)
    assert cleaned.count() == 1


def test_feature_engineering_avg_salary(base_df):
    df = apply_feature_engineering(clean_data(base_df))
    row = df.filter(col("business_title") == "Engineer").collect()[0]
    assert row.avg_salary == 70000


def test_feature_engineering_education_level(base_df):
    df = apply_feature_engineering(clean_data(base_df))
    levels = {r.education_level for r in df.select("education_level").collect()}
    assert "Bachelors" in levels
    assert "Masters" in levels


In [None]:
# ============================================================
# 5. Pipeline Execution
# ============================================================
spark = None
try:
    print(">>> Phase 1: Running Unit Tests...")
    ipytest.run()

    spark = get_spark()
    print(f">>> Phase 2: Connected to Master. UI: {spark.sparkContext.uiWebUrl}")

    path = "/dataset/nyc-jobs.csv"
    if not os.path.exists(path):
        raise FileNotFoundError(f"{path} not found. Check volume mapping.")

    raw_df = (
        spark.read
        .option("header", True)
        .option("inferSchema", True)
        .csv(path)
    )

    df = apply_feature_engineering(
        clean_data(
            normalize_columns(raw_df)
        )
    )

    df.persist(StorageLevel.MEMORY_AND_DISK)
    df.show()

    stats, salary_distribution, corr, high_agency, avg_2yr, skills = compute_kpis(df)

    stats.show(5)
    corr.show()
    high_agency.show(5)
    avg_2yr.show(5)
    skills.show(5)

    pdf = stats.toPandas()
    pdf.plot(kind="bar", x="job_category", y="job_count", title="NYC Job Categories")
    plt.show()

    df.unpersist()
    print(">>> Pipeline Finished Successfully")

finally:
    if spark:
        print(">>> Stopping Spark session...")
        spark.stop()