# Spark Cleaning + Modeling (Benchmark Notebook)

This notebook mirrors the non-spark cleaning/modeling workflow using straightforward PySpark APIs.

## Included benchmarking controls
- Fixed split and random seed (`randomSplit(..., seed=42)`)
- Same evaluation metrics (R2, MAE, RMSE)
- End-to-end runtime timing
- Lightweight CPU/memory snapshots before and after major stages

In [1]:
import os, sys, glob, subprocess, time
import pandas as pd

try:
    import psutil
except ImportError:
    psutil = None

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator


def resource_snapshot(tag: str):
    if psutil is None:
        print(f"[{tag}] psutil not installed (pip install psutil for CPU/memory snapshots)")
        return
    cpu_pct = psutil.cpu_percent(interval=0.3)
    mem_pct = psutil.virtual_memory().percent
    print(f"[{tag}] CPU%: {cpu_pct:.1f} | Memory%: {mem_pct:.1f}")

# 1) Remove stale PySpark gateway env (can trigger insecure gateway error)
for k in ["PYSPARK_GATEWAY_PORT", "PYSPARK_GATEWAY_SECRET"]:
    os.environ.pop(k, None)

# 2) Set JAVA_HOME dynamically (Temurin JDK)
jdk_candidates = sorted(glob.glob(r"C:\Program Files\Eclipse Adoptium\jdk-*"))
if not jdk_candidates:
    raise RuntimeError("No JDK found under C:\\Program Files\\Eclipse Adoptium\\jdk-*")

os.environ["JAVA_HOME"] = jdk_candidates[-1]
os.environ["PATH"] = os.path.join(os.environ["JAVA_HOME"], "bin") + ";" + os.environ["PATH"]

# 3) Ensure Spark uses this notebook's Python
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

# 4) Verify Java
print(subprocess.check_output(["java", "-version"], stderr=subprocess.STDOUT).decode())
print("Python executable:", sys.executable)
print("JAVA_HOME:", os.environ["JAVA_HOME"])

openjdk version "17.0.18" 2026-01-20
OpenJDK Runtime Environment Temurin-17.0.18+8 (build 17.0.18+8)
OpenJDK 64-Bit Server VM Temurin-17.0.18+8 (build 17.0.18+8, mixed mode, sharing)

Python executable: C:\Users\Alex\Desktop\BigData\Job-Market-Analyser\Notebooks\.venv\Scripts\python.exe
JAVA_HOME: C:\Program Files\Eclipse Adoptium\jdk-17.0.18.8-hotspot


In [2]:
RANDOM_STATE = 42
TEST_SIZE = 0.2

employee_path = r"C:\Users\Alex\Downloads\Employee_dataset.csv"
salary_path = r"C:\Users\Alex\Downloads\Employee_salaries.csv"

assert os.path.exists(employee_path), f"Missing file: {employee_path}"
assert os.path.exists(salary_path), f"Missing file: {salary_path}"

run_start = time.perf_counter()
resource_snapshot("start")

spark = (
    SparkSession.builder
    .appName("spark_cleaning_model_benchmark")
    .master("local[*]")
    .config("spark.driver.host", "127.0.0.1")
    .config("spark.driver.bindAddress", "127.0.0.1")
    .config("spark.sql.shuffle.partitions", "16")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("WARN")

[start] CPU%: 17.5 | Memory%: 32.5


In [3]:
# ---- Load + Standardize ----
emp = (
    spark.read.option("header", True).option("inferSchema", True).csv(employee_path)
    .withColumnRenamed("jobId", "job_id")
    .withColumnRenamed("companyId", "company_id")
    .withColumnRenamed("jobRole", "job_role")
    .withColumnRenamed("Industry", "industry")
    .withColumnRenamed("yearsExperience", "years_experience")
    .withColumnRenamed("distanceFromCBD", "distance_from_cbd")
)

sal = (
    spark.read.option("header", True).option("inferSchema", True).csv(salary_path)
    .withColumnRenamed("jobId", "job_id")
    .withColumnRenamed("salaryInThousands", "salary_in_thousands")
)

print("Loaded rows:", emp.count(), sal.count())

Loaded rows: 1000000 1000000


In [4]:
# ---- Cleaning (Spark equivalent of non-spark logic) ----

# ID parsing from JOBxxxx / COMPxxxx
emp = emp.withColumn("job_id", F.regexp_extract(F.col("job_id").cast("string"), r"(\d+)", 1).cast(T.LongType()))
emp = emp.withColumn("company_id", F.regexp_extract(F.col("company_id").cast("string"), r"(\d+)", 1).cast(T.LongType()))

sal = sal.withColumn("job_id", F.regexp_extract(F.col("job_id").cast("string"), r"(\d+)", 1).cast(T.LongType()))
sal = sal.withColumn("salary_in_thousands", F.col("salary_in_thousands").cast(T.DoubleType()))

# Normalize text fields
emp = emp.withColumn("job_role", F.upper(F.trim(F.col("job_role"))))
emp = emp.withColumn("industry", F.upper(F.trim(F.col("industry"))))

# Fill education/major missing with NONE
emp = emp.fillna({"education": "NONE", "major": "NONE"})

# Cast numeric columns
emp = emp.withColumn("years_experience", F.col("years_experience").cast(T.DoubleType()))
emp = emp.withColumn("distance_from_cbd", F.col("distance_from_cbd").cast(T.DoubleType()))

# Drop critical missing rows
emp = emp.dropna(subset=["job_id", "company_id", "job_role", "industry", "years_experience", "distance_from_cbd"])
sal = sal.dropna(subset=["job_id", "salary_in_thousands"])

# Remove known invalid role from EDA
emp = emp.filter(F.col("job_role") != "SCAMMER")

# Range filters
emp = emp.filter((F.col("years_experience") >= 0) & (F.col("years_experience") <= 50))
emp = emp.filter((F.col("distance_from_cbd") >= 0) & (F.col("distance_from_cbd") <= 100))
sal = sal.filter((F.col("salary_in_thousands") > 0) & (F.col("salary_in_thousands") <= 500))

# Merge + de-dup
sdf = emp.join(sal, on="job_id", how="inner").dropDuplicates().dropDuplicates(["job_id"])

print("Cleaned + merged rows:", sdf.count())
resource_snapshot("post_clean")

Cleaned + merged rows: 999465
[post_clean] CPU%: 18.0 | Memory%: 34.0


In [5]:
# ---- Feature Engineering (Spark) ----

def map_from_dict(mapping: dict):
    return F.create_map([F.lit(x) for kv in mapping.items() for x in kv])

edu_map = {"NONE": 0, "HIGH_SCHOOL": 1, "BACHELORS": 2, "MASTERS": 3, "DOCTORAL": 4}
role_map = {"JANITOR": 0, "JUNIOR": 1, "SENIOR": 2, "MANAGER": 3, "VICE_PRESIDENT": 4, "CTO": 5, "CFO": 5, "CEO": 6}
industry_map = {"EDUCATION": 1, "SERVICE": 1, "AUTO": 2, "HEALTH": 3, "WEB": 4, "FINANCE": 5, "OIL": 5}
major_map = {"NONE": 0, "LITERATURE": 1, "BIOLOGY": 2, "CHEMISTRY": 3, "PHYSICS": 4, "COMPSCI": 5, "MATH": 6, "BUSINESS": 7, "ENGINEERING": 8}

sdf = sdf.withColumn("education_level", map_from_dict(edu_map)[F.col("education")].cast(T.DoubleType()))
sdf = sdf.withColumn("job_role_rank", map_from_dict(role_map)[F.col("job_role")].cast(T.DoubleType()))
sdf = sdf.withColumn("industry_score", map_from_dict(industry_map)[F.col("industry")].cast(T.DoubleType()))
sdf = sdf.withColumn("major_score", map_from_dict(major_map)[F.col("major")].cast(T.DoubleType()))

sdf = sdf.withColumn(
    "handcrafted_score",
    (F.col("education_level") + F.col("job_role_rank") + F.col("industry_score") + F.col("major_score")).cast(T.DoubleType())
)
sdf = sdf.withColumn("exp_sq", (F.col("years_experience") * F.col("years_experience")).cast(T.DoubleType()))

feature_cols = [
    "years_experience",
    "distance_from_cbd",
    "education_level",
    "job_role_rank",
    "industry_score",
    "major_score",
    "handcrafted_score",
    "exp_sq",
]

target_col = "salary_in_thousands"

sdf = sdf.dropna(subset=feature_cols + [target_col])
print("Model rows:", sdf.count())

Model rows: 999464


In [6]:
# ---- Split + Train + Evaluate (same metrics) ----
train_sdf, test_sdf = sdf.randomSplit([1 - TEST_SIZE, TEST_SIZE], seed=RANDOM_STATE)

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
train_vec = assembler.transform(train_sdf).select("features", F.col(target_col).alias("label"))
test_vec = assembler.transform(test_sdf).select("features", F.col(target_col).alias("label"))

r2_eval = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")
mae_eval = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="mae")
rmse_eval = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")

rows = []

# Linear Regression
t0 = time.perf_counter()
lr = LinearRegression(featuresCol="features", labelCol="label")
lr_model = lr.fit(train_vec)
lr_pred = lr_model.transform(test_vec)
lr_time = time.perf_counter() - t0
rows.append({
    "workflow": "spark",
    "model": "LinearRegression",
    "runtime_seconds": lr_time,
    "R2": float(r2_eval.evaluate(lr_pred)),
    "MAE": float(mae_eval.evaluate(lr_pred)),
    "RMSE": float(rmse_eval.evaluate(lr_pred)),
})

# Random Forest (kept moderate for laptop stability)
t0 = time.perf_counter()
rf = RandomForestRegressor(featuresCol="features", labelCol="label", numTrees=30, maxDepth=8, seed=RANDOM_STATE)
rf_model = rf.fit(train_vec)
rf_pred = rf_model.transform(test_vec)
rf_time = time.perf_counter() - t0
rows.append({
    "workflow": "spark",
    "model": "RandomForest",
    "runtime_seconds": rf_time,
    "R2": float(r2_eval.evaluate(rf_pred)),
    "MAE": float(mae_eval.evaluate(rf_pred)),
    "RMSE": float(rmse_eval.evaluate(rf_pred)),
})

spark_results = pd.DataFrame(rows).sort_values("RMSE").reset_index(drop=True)
spark_results

Unnamed: 0,workflow,model,runtime_seconds,R2,MAE,RMSE
0,spark,LinearRegression,7.060047,0.741258,15.913671,19.702405
1,spark,RandomForest,8.559929,0.728262,16.21092,20.191133


In [7]:
# Small prediction sample from best spark model
best_name = spark_results.iloc[0]["model"]
best_pred_df = rf_pred if best_name == "RandomForest" else lr_pred

sample = best_pred_df.select(F.col("label").alias("actual_salary_k"), F.col("prediction").alias("predicted_salary_k")).limit(15)
sample.toPandas()

Unnamed: 0,actual_salary_k,predicted_salary_k
0,100.0,102.617274
1,174.0,159.753291
2,130.0,143.509845
3,110.0,121.903034
4,100.0,107.627587
5,76.0,77.908098
6,100.0,119.378546
7,131.0,158.421987
8,102.0,110.457632
9,173.0,151.570753


In [8]:
run_total = time.perf_counter() - run_start
resource_snapshot("end")
print(f"Total notebook runtime: {run_total:.2f} seconds")

spark.stop()

[end] CPU%: 13.0 | Memory%: 35.0
Total notebook runtime: 42.35 seconds
