In [1]:
from pyspark.sql import SparkSession, functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer,OneHotEncoder,VectorAssembler,Tokenizer,HashingTF,IDF,Word2Vec,StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.regression import LinearRegression, GBTRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator,TrainValidationSplit
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, RegressionEvaluator, ClusteringEvaluator

from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.clustering import KMeans
from pyspark.sql.types import IntegerType, DoubleType
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import re
from pyspark.ml.feature import StopWordsRemover

In [2]:
spark = (SparkSession.builder
    .appName("JobMarket ML")
    .master("local[*]")
    .config("spark.driver.memory", "8g")
    .config("spark.driver.cores", "8")
    .config("spark.default.parallelism", "8")
    .config("spark.sql.shuffle.partitions", "8")
    .getOrCreate()
)

25/04/26 17:31:13 WARN Utils: Your hostname, Amey resolves to a loopback address: 127.0.0.1; using 192.168.1.153 instead (on interface en0)
25/04/26 17:31:13 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/26 17:31:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
data=spark.read.parquet("hdfs://localhost:9000/user/hadoop/processed/whole_merged_cleaned_jobs_data_parquet")

                                                                                

In [4]:
print(f"Total records: {data.count()}")

Total records: 1294346


In [6]:
def null_summary(df, name):
    print(f"\nNull summary for: {name}")
    df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

In [7]:
null_summary(data,"Whole data null values")


Null summary for: Whole data null values


                                                                                

+--------+-------------------+---------+-------+------------+----------+-----------+--------------+---------------+---------+--------+----------+-----------+
|job_link|last_processed_time|job_title|company|job_location|first_seen|search_city|search_country|search_position|job_level|job_type|job_skills|job_summary|
+--------+-------------------+---------+-------+------------+----------+-----------+--------------+---------------+---------+--------+----------+-----------+
|       0|                  0|        0|      0|           0|         0|          0|             0|              0|        0|       0|         0|          0|
+--------+-------------------+---------+-------+------------+----------+-----------+--------------+---------------+---------+--------+----------+-----------+



In [8]:
print("Schema:")
data.printSchema()

Schema:
root
 |-- job_link: string (nullable = true)
 |-- last_processed_time: timestamp (nullable = true)
 |-- job_title: string (nullable = true)
 |-- company: string (nullable = true)
 |-- job_location: string (nullable = true)
 |-- first_seen: date (nullable = true)
 |-- search_city: string (nullable = true)
 |-- search_country: string (nullable = true)
 |-- search_position: string (nullable = true)
 |-- job_level: string (nullable = true)
 |-- job_type: string (nullable = true)
 |-- job_skills: string (nullable = true)
 |-- job_summary: string (nullable = true)



In [9]:
data = data.withColumn('skill_count',F.size(F.split(F.col('job_skills'),',')).cast('double'))

In [10]:
tokenizer_summary=Tokenizer(inputCol="job_summary",outputCol="summary_tokens")
remover_summary=StopWordsRemover(inputCol="summary_tokens",outputCol="summary_words")

hashing_summary=HashingTF(inputCol="summary_words",outputCol="summary_tf",numFeatures=10000)
idf_summary=IDF(inputCol="summary_tf", outputCol="summary_tfidf")

In [11]:
tokenizer_skills=Tokenizer(inputCol="job_skills", outputCol="skills_tokens")
remover_skills= StopWordsRemover(inputCol="skills_tokens", outputCol="skills_words")
hashing_skills=HashingTF(inputCol="skills_words", outputCol="skills_tf", numFeatures=5000)
idf_skills=IDF(inputCol="skills_tf", outputCol="skills_tfidf")

In [12]:
indexer_location = StringIndexer(inputCol="job_location",outputCol="loc_index",handleInvalid="keep")

In [13]:
encoder_location = OneHotEncoder(inputCol="loc_index",outputCol="loc_vec")

In [14]:
assembler=VectorAssembler(inputCols=["summary_tfidf","skills_tfidf","loc_vec"],outputCol="features")

## Random Forest Classifier

In [15]:
target_indexer=StringIndexer(inputCol="job_level",outputCol="label",handleInvalid="keep")
clf = RandomForestClassifier(labelCol="label", featuresCol="features", seed=1)

In [16]:
pipeline_clf = Pipeline(stages=[
    tokenizer_summary,remover_summary,hashing_summary,idf_summary,
    tokenizer_skills,remover_skills,hashing_skills,idf_skills,
    indexer_location,encoder_location,
    assembler,
    target_indexer,
    clf
])

In [17]:
train_clf,test_clf=data.randomSplit([0.8, 0.2],seed=1)

In [18]:
paramGrid_clf = (ParamGridBuilder().addGrid(clf.numTrees, [47,98])
      .addGrid(clf.maxDepth,  [5,13]).build())

In [19]:
evaluator_clf = MulticlassClassificationEvaluator(labelCol="label",predictionCol="prediction",metricName="f1")

In [20]:
cv_clf = CrossValidator(
    estimator=pipeline_clf,
    estimatorParamMaps=paramGrid_clf,
    evaluator=evaluator_clf,
    numFolds=2,
    parallelism=4
)

In [21]:
cv_model_clf = cv_clf.fit(train_clf)

25/04/26 17:31:32 WARN BlockManager: Block rdd_23_0 already exists on this machine; not re-adding it
25/04/26 17:31:42 WARN DAGScheduler: Broadcasting large task binary with size 1666.2 KiB
25/04/26 17:31:42 WARN DAGScheduler: Broadcasting large task binary with size 1666.2 KiB
25/04/26 17:31:42 WARN DAGScheduler: Broadcasting large task binary with size 1666.2 KiB
25/04/26 17:31:42 WARN DAGScheduler: Broadcasting large task binary with size 1666.2 KiB
25/04/26 17:31:42 WARN DAGScheduler: Broadcasting large task binary with size 1666.3 KiB
25/04/26 17:31:42 WARN DAGScheduler: Broadcasting large task binary with size 1666.3 KiB
25/04/26 17:31:42 WARN DAGScheduler: Broadcasting large task binary with size 1666.3 KiB
25/04/26 17:31:42 WARN DAGScheduler: Broadcasting large task binary with size 1666.3 KiB
25/04/26 17:31:47 WARN DAGScheduler: Broadcasting large task binary with size 1820.3 KiB
25/04/26 17:31:47 WARN DAGScheduler: Broadcasting large task binary with size 1820.3 KiB
25/04/26 

In [22]:
preds = cv_model_clf.transform(test_clf)

In [23]:
evaluator_prec = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
precision = evaluator_prec.evaluate(preds)

25/04/26 17:42:25 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
                                                                                

In [24]:
evaluator_acc = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator_acc.evaluate(preds)

evaluator_recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
recall = evaluator_recall.evaluate(preds)
f1_score = evaluator_clf.evaluate(preds)

25/04/26 17:42:45 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
25/04/26 17:43:02 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
25/04/26 17:43:19 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
                                                                                

In [25]:
print(f"Classification Accuracy : {accuracy:.4f}")
print(f"Classification Precision): {precision:.4f}")
print(f"Classification Recall : {recall:.4f}")
print(f"Classification F1 : {f1_score:.4f}")

Classification Accuracy : 0.8931
Classification Precision): 0.9046
Classification Recall : 0.8931
Classification F1 : 0.8466


In [26]:
cv_model_clf.bestModel.write().overwrite().save(
    "hdfs://localhost:9000/user/hadoop/models/job_level_classifier"
)

                                                                                

In [27]:
preds.select("job_link","prediction","label") \
    .write.mode("overwrite") \
    .option("header",True) \
    .csv("hdfs://localhost:9000/user/hadoop/output/classification_results")

25/04/26 17:43:53 WARN DAGScheduler: Broadcasting large task binary with size 3.5 MiB
                                                                                

## Random Forest Regressor

In [28]:
reg = RandomForestRegressor(labelCol="skill_count", featuresCol="features", seed=1)

In [29]:
pipeline_reg = Pipeline(stages=[
    tokenizer_summary,remover_summary, hashing_summary, idf_summary,
    tokenizer_skills,   remover_skills,   hashing_skills,   idf_skills,
    indexer_location, encoder_location,
    assembler, reg
])

In [30]:
train_reg, test_reg = data.randomSplit([0.8, 0.2], seed=1)

In [31]:
paramGrid_reg = (ParamGridBuilder().addGrid(reg.numTrees,[33,75]).addGrid(reg.maxDepth,[3,9]).build())

In [32]:
evaluator_reg=RegressionEvaluator(labelCol="skill_count", predictionCol="prediction", metricName="rmse")

In [33]:
tvs_reg=TrainValidationSplit(estimator=pipeline_reg,estimatorParamMaps=paramGrid_reg,evaluator=evaluator_reg,trainRatio=0.8)

In [34]:
model_reg=tvs_reg.fit(train_reg)

25/04/26 17:44:38 WARN DAGScheduler: Broadcasting large task binary with size 1765.3 KiB
25/04/26 17:44:39 WARN DAGScheduler: Broadcasting large task binary with size 1765.4 KiB
25/04/26 17:44:47 WARN DAGScheduler: Broadcasting large task binary with size 1920.0 KiB
25/04/26 17:45:07 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
25/04/26 17:45:30 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB
25/04/26 17:45:39 WARN DAGScheduler: Broadcasting large task binary with size 5.2 MiB
25/04/26 17:46:05 WARN DAGScheduler: Broadcasting large task binary with size 1753.1 KiB
25/04/26 17:46:18 WARN DAGScheduler: Broadcasting large task binary with size 1765.3 KiB
25/04/26 17:46:18 WARN DAGScheduler: Broadcasting large task binary with size 1765.4 KiB
25/04/26 17:46:28 WARN DAGScheduler: Broadcasting large task binary with size 1920.0 KiB
25/04/26 17:46:48 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
25/04/26 17:47:09 WARN DAGSchedul

In [37]:
preds_reg=model_reg.transform(test_reg)

In [38]:
evaluator_mse = RegressionEvaluator(
    labelCol="skill_count", predictionCol="prediction", metricName="mse"
)

In [39]:
mse = evaluator_mse.evaluate(preds_reg)

25/04/26 18:08:40 WARN DAGScheduler: Broadcasting large task binary with size 1802.0 KiB
                                                                                

In [40]:
evaluator_mae = RegressionEvaluator(
    labelCol="skill_count", predictionCol="prediction", metricName="mae"
)
mae = evaluator_mae.evaluate(preds_reg)

25/04/26 18:09:23 WARN DAGScheduler: Broadcasting large task binary with size 1802.0 KiB
                                                                                

In [41]:
evaluator_r2 = RegressionEvaluator(
    labelCol="skill_count", predictionCol="prediction", metricName="r2"
)
r2 = evaluator_r2.evaluate(preds_reg)

25/04/26 18:09:45 WARN DAGScheduler: Broadcasting large task binary with size 1802.0 KiB
                                                                                

In [42]:
rmse=evaluator_reg.evaluate(preds_reg)

25/04/26 18:10:08 WARN DAGScheduler: Broadcasting large task binary with size 1802.0 KiB
                                                                                

In [43]:
print(f"Regression MSE (skill_count): {mse:.4f}")
print(f"Regression MAE (skill_count): {mae:.4f}")
print(f"Regression R2  (skill_count): {r2:.4f}")
print(f"Regression RMSE (skill_count): {rmse:.4f}")

Regression MSE (skill_count): 84.1528
Regression MAE (skill_count): 6.2672
Regression R2  (skill_count): 0.3464
Regression RMSE (skill_count): 9.1735


In [44]:
model_reg.bestModel.write().overwrite().save("hdfs://localhost:9000/user/hadoop/models/skill_count_regressor")

                                                                                

In [45]:
preds_reg.select("job_link","prediction","skill_count").write.mode("overwrite").option("header",True).csv("hdfs://localhost:9000/user/hadoop/output/regression_results")

25/04/26 18:10:54 WARN DAGScheduler: Broadcasting large task binary with size 1998.4 KiB
                                                                                

## Kmeans

In [46]:
kmeans=KMeans(featuresCol="features",predictionCol="cluster",seed=1)

In [47]:
pipeline_km = Pipeline(stages=[
    tokenizer_summary,remover_summary,hashing_summary,idf_summary,
    tokenizer_skills,remover_skills,hashing_skills,idf_skills,
    indexer_location,encoder_location,
    assembler, kmeans
])

In [48]:
paramGrid_km = (ParamGridBuilder()
    .addGrid(kmeans.k, [5, 10, 15])
    .build()
)

In [49]:
evaluator_km = ClusteringEvaluator(
    featuresCol="features", predictionCol="cluster",metricName="silhouette", distanceMeasure="squaredEuclidean")

In [50]:
tvs_km = TrainValidationSplit(
    estimator=pipeline_km,estimatorParamMaps=paramGrid_km,evaluator=evaluator_km,
    trainRatio=0.8)

In [51]:
model_km = tvs_km.fit(data)

25/04/26 18:11:29 WARN DAGScheduler: Broadcasting large task binary with size 1800.6 KiB
25/04/26 18:11:40 WARN DAGScheduler: Broadcasting large task binary with size 1790.6 KiB
25/04/26 18:11:50 WARN DAGScheduler: Broadcasting large task binary with size 1791.3 KiB
25/04/26 18:11:50 WARN DAGScheduler: Broadcasting large task binary with size 1792.7 KiB
25/04/26 18:11:50 WARN DAGScheduler: Broadcasting large task binary with size 1793.0 KiB
25/04/26 18:11:50 WARN DAGScheduler: Broadcasting large task binary with size 1793.0 KiB
25/04/26 18:11:51 WARN DAGScheduler: Broadcasting large task binary with size 1793.4 KiB
25/04/26 18:11:51 WARN DAGScheduler: Broadcasting large task binary with size 1792.8 KiB
25/04/26 18:11:51 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/04/26 18:11:51 WARN DAGScheduler: Broadcasting large task binary with size 1809.9 KiB
25/04/26 18:12:03 WARN DAGScheduler: Broadcasting large task binary with size 1809.8 KiB
25/

In [52]:
preds_km=model_km.transform(data)

In [53]:
silhouette = evaluator_km.evaluate(preds_km)

25/04/26 18:22:00 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
25/04/26 18:22:37 WARN DAGScheduler: Broadcasting large task binary with size 1360.0 KiB
                                                                                

In [54]:
print(f"Clustering Silhouette Score: {silhouette:.4f}")

Clustering Silhouette Score: 0.7855


In [55]:
model_km.bestModel.write().overwrite().save(
    "hdfs://localhost:9000/user/hadoop/models/job_clustering_model")

In [56]:
preds_km.select("job_link","cluster").write.mode("overwrite").option("header",True).csv("hdfs://localhost:9000/user/hadoop/output/clustering_results")

25/04/26 23:03:46 WARN DAGScheduler: Broadcasting large task binary with size 1536.6 KiB
                                                                                

In [57]:
spark.stop()