# Regression, Classification, and Topic Insights

# Load Dataset

In [2]:

from pyspark.sql import SparkSession
import pandas as pd
from collections import Counter
import seaborn as sns
import matplotlib.pyplot as plt



spark = SparkSession.builder \
    .appName("JobPostingsAnalysis") \
    .getOrCreate()

file_path = "lightcast_job_postings.csv"

df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("multiLine", "true") \
    .option("escape", "\"") \
    .csv(file_path)
#df.show(5)



                                                                                

# Multiple Regression

In [4]:
from pyspark.sql.functions import col, lower, when

# Create AI Role flag with VERY specific patterns
df = df.withColumn(
    "IS_AI_ROLE",
    when(
        # Specific AI/ML terms (these are safe)
        lower(col("TITLE_CLEAN")).rlike(r'\b(machine learning|deep learning|artificial intelligence|generative ai|neural network|computer vision|data scientist)\b') |
        lower(col("TITLE_RAW")).rlike(r'\b(machine learning|deep learning|artificial intelligence|generative ai|neural network|computer vision|data scientist)\b') |
        lower(col("LOT_V6_SPECIALIZED_OCCUPATION_NAME")).rlike(r'\b(machine learning|deep learning|artificial intelligence|generative ai|neural network|computer vision|data scientist)\b') |
        
        # AI as a standalone word (with spaces or punctuation around it)
        lower(col("TITLE_CLEAN")).rlike(r'(\s|^)ai(\s|$|/|-)') |
        lower(col("TITLE_RAW")).rlike(r'(\s|^)ai(\s|$|/|-)') |
        lower(col("LOT_V6_SPECIALIZED_OCCUPATION_NAME")).rlike(r'(\s|^)ai(\s|$|/|-)') |
        
        # ML Engineer/Scientist variants
        lower(col("TITLE_CLEAN")).rlike(r'\bml\s+(engineer|scientist|developer|analyst)\b') |
        lower(col("TITLE_RAW")).rlike(r'\bml\s+(engineer|scientist|developer|analyst)\b') |
        
        # NLP specifically
        lower(col("TITLE_CLEAN")).rlike(r'\bnlp\b') |
        lower(col("TITLE_RAW")).rlike(r'\bnlp\b'),
        1
    ).otherwise(0)
)

# Verify
df.select(
    "TITLE_CLEAN",
    "TITLE_RAW",
    "LOT_V6_SPECIALIZED_OCCUPATION_NAME",
    "IS_AI_ROLE"
).orderBy(col("IS_AI_ROLE").desc()) \
 .show(5)




[Stage 186:>                                                        (0 + 1) / 1]

+--------------------+--------------------+----------------------------------+----------+
|         TITLE_CLEAN|           TITLE_RAW|LOT_V6_SPECIALIZED_OCCUPATION_NAME|IS_AI_ROLE|
+--------------------+--------------------+----------------------------------+----------+
|sr bi analyst dat...|Sr BI Analyst/Dat...|                      Data Analyst|         1|
|ai ml governance ...|AI/ML Governance ...|                      Data Analyst|         1|
|data engineering ...|Data Engineering ...|                      Data Analyst|         1|
|data engineering ...|Data Engineering ...|                      Data Analyst|         1|
|data engineering ...|Data Engineering ...|                      Data Analyst|         1|
+--------------------+--------------------+----------------------------------+----------+
only showing top 5 rows



                                                                                

In [5]:
# Missing Value Treatment
from pyspark.sql import Window
from pyspark.sql.functions import col, when, isnan, count, expr, median
from pyspark.sql import functions as F

# Calculate overall median salary
overall_median_salarly = df.approxQuantile("SALARY", [0.5], 0.01)[0]

median_by_employment_type = df.groupBy("EMPLOYMENT_TYPE").agg(expr("percentile_approx(SALARY, 0.5)").alias("median_salary_emp_type"))
median_by_employment_type_name = df.groupBy("EMPLOYMENT_TYPE_NAME").agg(expr("percentile_approx(SALARY, 0.5)").alias("median_salary_emp_type_name"))

# Join median values back to the original dataframe
df_salary_imputed = df.join(median_by_employment_type, on="EMPLOYMENT_TYPE", how = "left").join(median_by_employment_type_name, on="EMPLOYMENT_TYPE_NAME", how = "left")


# Replace missing SALARY values
df_salary_imputed=df_salary_imputed.withColumn("SALARY", when(col("SALARY").isNull(), 
                                when (col("median_salary_emp_type").isNotNull(), col("median_salary_emp_type"))
                                .when(col("median_salary_emp_type_name").isNotNull(), col("median_salary_emp_type_name"))
                                .otherwise(overall_median_salarly)
).otherwise(col("SALARY"))) 

                                                                                

In [None]:
from pyspark.sql.functions import col, when, trim, regexp_replace, coalesce, lit
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

regression_df = df_salary_imputed.dropna(subset=["SALARY"])

# Impute numeric columns with median values
for c in ["MIN_YEARS_EXPERIENCE", "MAX_YEARS_EXPERIENCE", "DURATION"]:
    med = df_salary_imputed.approxQuantile(c, [0.5], 0.01)[0]
    if med is not None:
        regression_df = regression_df.na.fill({c: med})

# Cast duration to integer 
regression_df = regression_df.withColumn("DURATION", col("DURATION").cast(IntegerType()))

# Cast boolean columns to integers (0/1)
regression_df = regression_df.withColumn(
    "IS_INTERNSHIP", coalesce(col("IS_INTERNSHIP").cast(IntegerType()), lit(0))
)
regression_df = regression_df.withColumn(
    "COMPANY_IS_STAFFING", coalesce(col("COMPANY_IS_STAFFING").cast(IntegerType()), lit(0))
)


                                                                                

In [8]:
# Clean Remote Type Name
regression_df = regression_df.withColumn(
    "REMOTE_TYPE_NAME",
    when(col("REMOTE_TYPE_NAME") == "Remote", "Remote")
    .when(col("REMOTE_TYPE_NAME") == "[None]", "Undefined")
    .when(col("REMOTE_TYPE_NAME") == "Not Remote", "On Premise")
    .when(col("REMOTE_TYPE_NAME") == "Hybrid Remote", "Hybrid")
    .when(col("REMOTE_TYPE_NAME").isNull(), "On Premise")
    .otherwise(col("REMOTE_TYPE_NAME"))
)

# Clean Employment Type Name
regression_df = regression_df.withColumn(
    "EMPLOYMENT_TYPE_NAME",
    when(col("EMPLOYMENT_TYPE_NAME") == "Part-time / full-time", "Flexible")
    .when(col("EMPLOYMENT_TYPE_NAME") == "Part-time (â‰¤ 32 hours)", "Parttime")
    .when(col("EMPLOYMENT_TYPE_NAME") == "Full-time (> 32 hours)", "Fulltime")
    .when(col("EMPLOYMENT_TYPE_NAME").isNull(), "Fulltime")
    .otherwise(col("EMPLOYMENT_TYPE_NAME"))
)

# Clean Education Levels
regression_df = regression_df.withColumn(
    "EDUCATION_LEVELS_NAME",
    trim(regexp_replace(col("EDUCATION_LEVELS_NAME"), r"[\[\]\n\"]", ""))
)
regression_df = regression_df.fillna({"EDUCATION_LEVELS_NAME": "No Education Listed"})

regression_df = regression_df.select(
    "SALARY", "MIN_YEARS_EXPERIENCE", "MAX_YEARS_EXPERIENCE",
    "EDUCATION_LEVELS_NAME", "EMPLOYMENT_TYPE_NAME", "REMOTE_TYPE_NAME",
    "DURATION", "IS_INTERNSHIP", "COMPANY_IS_STAFFING", "IS_AI_ROLE"
)

regression_df.show(5, truncate=False)


[Stage 216:>                                                        (0 + 1) / 1]

+--------+--------------------+--------------------+-----------------------------------+--------------------+----------------+--------+-------------+-------------------+----------+
|SALARY  |MIN_YEARS_EXPERIENCE|MAX_YEARS_EXPERIENCE|EDUCATION_LEVELS_NAME              |EMPLOYMENT_TYPE_NAME|REMOTE_TYPE_NAME|DURATION|IS_INTERNSHIP|COMPANY_IS_STAFFING|IS_AI_ROLE|
+--------+--------------------+--------------------+-----------------------------------+--------------------+----------------+--------+-------------+-------------------+----------+
|92500.0 |5                   |3                   |No Education Listed                |Flexible            |Undefined       |15      |0            |0                  |0         |
|100000.0|5                   |3                   |No Education Listed                |Flexible            |Undefined       |18      |1            |0                  |0         |
|222000.0|10                  |3                   |Bachelor's degree,  Master's degree|Flexibl

                                                                                

In [9]:
ai_df = regression_df.filter(col("IS_AI_ROLE") == 1)
non_ai_df = regression_df.filter(col("IS_AI_ROLE") == 0)

if ai_df.count() > 0:
    ratio = non_ai_df.count() / ai_df.count()
    ai_df_balanced = ai_df.sample(withReplacement=True, fraction=ratio, seed=42)
    regression_df = non_ai_df.union(ai_df_balanced)


                                                                                

In [10]:
categorical_cols = ["EDUCATION_LEVELS_NAME", "EMPLOYMENT_TYPE_NAME",
                    "REMOTE_TYPE_NAME", "IS_INTERNSHIP", "COMPANY_IS_STAFFING"]

indexers = [StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid="skip") for c in categorical_cols]
encoders = [OneHotEncoder(inputCol=f"{c}_idx", outputCol=f"{c}_vec") for c in categorical_cols]

assembler = VectorAssembler(
    inputCols=["MIN_YEARS_EXPERIENCE", "MAX_YEARS_EXPERIENCE", "DURATION", "IS_AI_ROLE"] +
              [f"{c}_vec" for c in categorical_cols],
    outputCol="features"
)

pipeline = Pipeline(stages=indexers + encoders + [assembler])
regression_data = pipeline.fit(regression_df).transform(regression_df)



                                                                                

In [None]:
# Split Data
regression_train, regression_test = regression_data.randomSplit([0.8, 0.2], seed=42)

print((regression_data.count(), len(regression_data.columns)))
print((regression_train.count(), len(regression_train.columns)))
print((regression_test.count(), len(regression_test.columns)))

                                                                                

(5039, 21)


                                                                                

(4070, 21)


[Stage 1264:>                                                       (0 + 1) / 1]

(969, 21)


                                                                                

In [65]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Train Multiple Linear Regression Model
lr = LinearRegression(featuresCol="features", labelCol="SALARY")
lr_model = lr.fit(regression_train)

# Make predictions
predictions = lr_model.transform(regression_test)

# Evaluate model
evaluator = RegressionEvaluator(labelCol="SALARY", predictionCol="prediction")
rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})
mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})

print(f"RMSE: {rmse}")
print(f"R²: {r2}")
print(f"MAE: {mae}")



25/10/09 21:29:25 WARN Instrumentation: [64cbbb93] regParam is zero, which might cause numerical instability and overfitting.
25/10/09 21:29:34 WARN Instrumentation: [64cbbb93] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
[Stage 1311:>                                                       (0 + 1) / 1]

RMSE: 22058.821158120867
R²: 0.1915025455889091
MAE: 15535.593012196921


                                                                                

In [66]:
predictions.select("SALARY", "prediction", "IS_AI_ROLE") \
    .groupBy("IS_AI_ROLE") \
    .agg(
        F.count("*").alias("job_count"),
        F.avg("SALARY").alias("avg_actual_salary"),
        F.avg("prediction").alias("avg_predicted_salary")
    ).orderBy("IS_AI_ROLE") \
    .show()

[Stage 1318:>                                                       (0 + 1) / 1]

+----------+---------+------------------+--------------------+
|IS_AI_ROLE|job_count| avg_actual_salary|avg_predicted_salary|
+----------+---------+------------------+--------------------+
|         0|      955|113290.49214659687|  111576.33820922484|
|         1|       14|111894.64285714286|  117446.94495596984|
+----------+---------+------------------+--------------------+



                                                                                