In [24]:
import pandas as pd
import plotly.express as px
import plotly.io as pio
pio.renderers.default = "vscode"
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, explode, col, regexp_replace, transform, isnan

spark = SparkSession.builder.appName("LightcastCleanedData").getOrCreate()

# 重新加载处理后的数据
df_cleaned = spark.read.option("header", "true").option("inferSchema", "true").option("multiLine","true").csv("data/lightcast_cleaned.csv")

# 查看数据结构和样本
df_cleaned.show()

25/04/20 06:04:55 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
[Stage 593:>                                                        (0 + 1) / 1]

+--------------------+-----------------+----------------------+----------+----------+----------+--------+--------------------+--------------------+--------------------+-----------+-------------------+--------------------+--------------------+---------------+----------------+---------+--------------------+--------------------+-------------------+----------------+---------------------+-------------+-------------------+-------------+------------------+---------------+--------------------+--------------------+--------------------+-------------+------+-----------+----------------+-------------------+---------+-----------+--------------------+--------------------+------------------+------+--------------------+-----+--------------------+-----+-------------+---------------+--------------------+---------------+--------------------+------------+--------------------+------------+--------------------+------+--------------------+------+--------------------+------+--------------------+------+-------

                                                                                

In [26]:
df_cleaned.select("ONET", "ONET_NAME", "ONET_2019", "ONET_2019_NAME").show(10, False)

+----------+------------------------------+----------+------------------------------+
|ONET      |ONET_NAME                     |ONET_2019 |ONET_2019_NAME                |
+----------+------------------------------+----------+------------------------------+
|15-2051.01|Business Intelligence Analysts|15-2051.01|Business Intelligence Analysts|
|15-2051.01|Business Intelligence Analysts|15-2051.01|Business Intelligence Analysts|
|15-2051.01|Business Intelligence Analysts|15-2051.01|Business Intelligence Analysts|
|15-2051.01|Business Intelligence Analysts|15-2051.01|Business Intelligence Analysts|
|15-2051.01|Business Intelligence Analysts|15-2051.01|Business Intelligence Analysts|
|15-2051.01|Business Intelligence Analysts|15-2051.01|Business Intelligence Analysts|
|15-2051.01|Business Intelligence Analysts|15-2051.01|Business Intelligence Analysts|
|15-2051.01|Business Intelligence Analysts|15-2051.01|Business Intelligence Analysts|
|15-2051.01|Business Intelligence Analysts|15-2051.01|

In [8]:
from pyspark.sql.functions import col, when
# Create a new column EDU_MATCH, mark it as a match or not
df_compare = df_cleaned.withColumn(
    "EDU_MATCH",
    when(col("ONET") == col("ONET_2019"), "Match").otherwise("Mismatch")
)

df_compare.select("ONET", "ONET_2019", "EDU_MATCH").show(truncate=False)

# 统计不匹配的行数
unmatched_count = df_cleaned.filter(col("MIN_EDULEVELS") != col("EDUCATION_LEVELS")).count()
print(f"Not Match: {unmatched_count}")

+----------+----------+---------+
|ONET      |ONET_2019 |EDU_MATCH|
+----------+----------+---------+
|15-2051.01|15-2051.01|Match    |
|15-2051.01|15-2051.01|Match    |
|15-2051.01|15-2051.01|Match    |
|15-2051.01|15-2051.01|Match    |
|15-2051.01|15-2051.01|Match    |
|15-2051.01|15-2051.01|Match    |
|15-2051.01|15-2051.01|Match    |
|15-2051.01|15-2051.01|Match    |
|15-2051.01|15-2051.01|Match    |
|15-2051.01|15-2051.01|Match    |
|15-2051.01|15-2051.01|Match    |
|15-2051.01|15-2051.01|Match    |
|15-2051.01|15-2051.01|Match    |
|15-2051.01|15-2051.01|Match    |
|15-2051.01|15-2051.01|Match    |
|15-2051.01|15-2051.01|Match    |
|15-2051.01|15-2051.01|Match    |
|15-2051.01|15-2051.01|Match    |
|15-2051.01|15-2051.01|Match    |
|15-2051.01|15-2051.01|Match    |
+----------+----------+---------+
only showing top 20 rows



[Stage 17:>                                                         (0 + 1) / 1]

Not Match: 0


                                                                                

In [9]:
from pyspark.sql.functions import col, when
# Create a new column EDU_MATCH, mark it as a match or not
df_compare = df_cleaned.withColumn(
    "EDU_MATCH",
    when(col("ONET_NAME") == col("ONET_2019_NAME"), "Match").otherwise("Mismatch")
)

df_compare.select("ONET_NAME", "ONET_2019_NAME", "EDU_MATCH").show(truncate=False)

# 统计不匹配的行数
unmatched_count = df_cleaned.filter(col("MIN_EDULEVELS") != col("EDUCATION_LEVELS")).count()
print(f"Not Match: {unmatched_count}")

+------------------------------+------------------------------+---------+
|ONET_NAME                     |ONET_2019_NAME                |EDU_MATCH|
+------------------------------+------------------------------+---------+
|Business Intelligence Analysts|Business Intelligence Analysts|Match    |
|Business Intelligence Analysts|Business Intelligence Analysts|Match    |
|Business Intelligence Analysts|Business Intelligence Analysts|Match    |
|Business Intelligence Analysts|Business Intelligence Analysts|Match    |
|Business Intelligence Analysts|Business Intelligence Analysts|Match    |
|Business Intelligence Analysts|Business Intelligence Analysts|Match    |
|Business Intelligence Analysts|Business Intelligence Analysts|Match    |
|Business Intelligence Analysts|Business Intelligence Analysts|Match    |
|Business Intelligence Analysts|Business Intelligence Analysts|Match    |
|Business Intelligence Analysts|Business Intelligence Analysts|Match    |
|Business Intelligence Analysts|Busine

[Stage 21:>                                                         (0 + 1) / 1]

Not Match: 0


                                                                                

In [28]:
# ====== Step 1: Import Libraries ======
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType
from sklearn.metrics import normalized_mutual_info_score, adjusted_rand_score
import pandas as pd

# ====== Step 4: Select & Clean Relevant Fields ======
df_kmeans = df.select("TITLE", "STATE_NAME", "SOFTWARE_SKILLS_NAME", "ONET", "ONET_NAME").dropna()

# ====== Step 5: Indexing & One-Hot Encoding ======
indexers = [
    StringIndexer(inputCol="TITLE", outputCol="TITLE_IDX"),
    StringIndexer(inputCol="STATE_NAME", outputCol="STATE_IDX"),
    StringIndexer(inputCol="SOFTWARE_SKILLS_NAME", outputCol="SKILL_IDX")
]

encoder = OneHotEncoder(
    inputCols=["TITLE_IDX", "STATE_IDX", "SKILL_IDX"],
    outputCols=["TITLE_VEC", "STATE_VEC", "SKILL_VEC"]
)

# ====== Step 6: Assemble Features ======
assembler = VectorAssembler(
    inputCols=["TITLE_VEC", "STATE_VEC", "SKILL_VEC"],
    outputCol="features"
)

# ====== Step 7: Train KMeans Model ======
kmeans = KMeans(k=5, seed=42, featuresCol="features", predictionCol="cluster")
pipeline = Pipeline(stages=indexers + [encoder, assembler, kmeans])
model = pipeline.fit(df_kmeans)

# ====== Step 8: Predict Clusters ======
clustered = model.transform(df_kmeans)

# ====== Step 9: Collect for Evaluation ======
rows = clustered.select("ONET", "ONET_NAME", "cluster").collect()
df_clustered = pd.DataFrame([row.asDict() for row in rows])

# ====== Step 10: Evaluation (NMI & ARI) ======
from sklearn.preprocessing import LabelEncoder
le = LabelEncoder()
true_labels = le.fit_transform(df_clustered["ONET"])
pred_labels = df_clustered["cluster"]

nmi = normalized_mutual_info_score(true_labels, pred_labels)
ari = adjusted_rand_score(true_labels, pred_labels)
print(f"\n✅ KMeans Clustering Evaluation:")
print(f"NMI: {nmi:.4f}")
print(f"ARI: {ari:.4f}")

# ====== Step 11: Show Dominant ONET_NAME per Cluster ======
print("\nDominant ONET_NAME per cluster:")
print(df_clustered.groupby("cluster")["ONET_NAME"].agg(lambda x: x.value_counts().index[0]))

25/04/20 06:10:09 WARN DAGScheduler: Broadcasting large task binary with size 7.1 MiB
25/04/20 06:10:11 WARN DAGScheduler: Broadcasting large task binary with size 7.1 MiB
25/04/20 06:10:11 WARN DAGScheduler: Broadcasting large task binary with size 7.2 MiB
25/04/20 06:10:14 WARN DAGScheduler: Broadcasting large task binary with size 7.2 MiB
25/04/20 06:10:14 WARN DAGScheduler: Broadcasting large task binary with size 7.2 MiB
25/04/20 06:10:15 WARN DAGScheduler: Broadcasting large task binary with size 7.2 MiB
25/04/20 06:10:16 WARN DAGScheduler: Broadcasting large task binary with size 7.2 MiB
25/04/20 06:10:16 WARN DAGScheduler: Broadcasting large task binary with size 7.2 MiB
25/04/20 06:10:17 WARN DAGScheduler: Broadcasting large task binary with size 7.2 MiB
25/04/20 06:10:17 WARN DAGScheduler: Broadcasting large task binary with size 7.2 MiB
25/04/20 06:10:18 WARN DAGScheduler: Broadcasting large task binary with size 7.2 MiB
25/04/20 06:10:19 WARN DAGScheduler: Broadcasting larg


✅ KMeans Clustering Evaluation:
NMI: 0.5791
ARI: 0.6408

Dominant ONET_NAME per cluster:
cluster
0    Business Intelligence Analysts
1    Business Intelligence Analysts
2                           15-2050
3    Business Intelligence Analysts
4    Business Intelligence Analysts
Name: ONET_NAME, dtype: object


In [30]:
# ====== Step 1: Import Libraries ======
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

# ====== Step 4: Select Fields & Drop Nulls ======
df_lr = df.select(
    "DURATION", "EDUCATION_LEVELS", "EMPLOYMENT_TYPE", "SALARY",
    "REMOTE_TYPE", "SALARY_FROM", "SALARY_TO", "STATE_NAME"
).dropna()

# ====== Step 5: Type Conversion for Numeric Columns ======
df_lr = df_lr \
    .withColumn("DURATION", col("DURATION").cast(DoubleType())) \
    .withColumn("EDUCATION_LEVELS", col("EDUCATION_LEVELS").cast(DoubleType())) \
    .withColumn("SALARY", col("SALARY").cast(DoubleType())) \
    .withColumn("SALARY_FROM", col("SALARY_FROM").cast(DoubleType())) \
    .withColumn("SALARY_TO", col("SALARY_TO").cast(DoubleType()))

# ====== Step 6: Encode Categorical Features ======
indexers = [
    StringIndexer(inputCol="EMPLOYMENT_TYPE", outputCol="EMPLOYMENT_TYPE_IDX"),
    StringIndexer(inputCol="REMOTE_TYPE", outputCol="REMOTE_TYPE_IDX"),
    StringIndexer(inputCol="STATE_NAME", outputCol="STATE_IDX")
]

encoder = OneHotEncoder(
    inputCols=["EMPLOYMENT_TYPE_IDX", "REMOTE_TYPE_IDX", "STATE_IDX"],
    outputCols=["EMPLOYMENT_VEC", "REMOTE_VEC", "STATE_VEC"]
)

# ====== Step 7: Assemble Feature Vector ======
assembler = VectorAssembler(
    inputCols=["DURATION", "EDUCATION_LEVELS", "SALARY_FROM", "SALARY_TO",
               "EMPLOYMENT_VEC", "REMOTE_VEC", "STATE_VEC"],
    outputCol="features"
)

# ====== Step 8: Build Linear Regression Model ======
lr = LinearRegression(featuresCol="features", labelCol="SALARY")

# ====== Step 9: Pipeline & Train/Test Split ======
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
(train_data, test_data) = df_lr.randomSplit([0.8, 0.2], seed=42)

pipeline = Pipeline(stages=indexers + [encoder, assembler, lr])
model = pipeline.fit(train_data)

# ====== Step 10: Evaluate Model ======
predictions = model.transform(test_data)

evaluator_r2 = RegressionEvaluator(labelCol="SALARY", predictionCol="prediction", metricName="r2")
evaluator_rmse = RegressionEvaluator(labelCol="SALARY", predictionCol="prediction", metricName="rmse")

r2 = evaluator_r2.evaluate(predictions)
rmse = evaluator_rmse.evaluate(predictions)

print("\n✅ Linear Regression Evaluation:")
print(f"R² Score: {r2:.4f}")
print(f"RMSE: {rmse:.2f}")


25/04/20 06:11:37 WARN Instrumentation: [f4357b8e] regParam is zero, which might cause numerical instability and overfitting.
25/04/20 06:11:38 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
[Stage 644:>                                                      (0 + 16) / 16]


✅ Linear Regression Evaluation:
R² Score: 0.9994
RMSE: 1103.76


                                                                                