## Gold Layer: Business Insights, ML Products, and Reporting

The **Gold Layer** consumes the features from the Silver Layer and produces the final, highly aggregated, and consumable business insights required for the project's four use cases. This is the **Reporting Layer**, where data is simplified for direct use in dashboards and final reports.

In [0]:
# Setup: Imports required for Analytics, ML, Joins, and Visualization
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.ml.feature import HashingTF, IDF, VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.sql.window import Window
import pandas as pd
import matplotlib.pyplot as plt

### Use Case 1: Skill Demand Analytics

This section addresses the goal of finding **market demand**. It utilizes **Spark SQL** with the powerful `LATERAL VIEW EXPLODE` function to efficiently unpack the `extracted_skills` array for every resume and count the frequency of each individual skill, producing the `gold_skill_demand` table.

In [0]:
# CELL 2: USE CASE 1: Skill Demand Analytics (Top 10 Skills)

silver_df = spark.table("silver_features")

# Runs Spark SQL to aggregate and count the frequency of all extracted skills
skill_demand_query = """
SELECT
  skill,
  COUNT(Resume_ID) AS resume_count
FROM silver_features
LATERAL VIEW EXPLODE(extracted_skills) exploded_table AS skill  
GROUP BY skill
ORDER BY resume_count DESC
LIMIT 10
"""
gold_demand_df = spark.sql(skill_demand_query)
gold_demand_df.write.format("delta").mode("overwrite").saveAsTable("gold_skill_demand")

print("✅ Gold table 'gold_skill_demand' created.")

✅ Gold table 'gold_skill_demand' created.


### Use Case 2: Resume Clustering (Unsupervised ML)

This segment uses **PySpark MLlib** for candidate segmentation. Resumes are converted into numerical vectors (via HashingTF/IDF) based on their skills, and the **K-Means algorithm** groups them into clusters based on similarity. This output is stored in `gold_clustered_resumes`.

In [0]:
# CELL 3: USE CASE 2: K-Means Clustering for Candidate Segmentation

silver_df = spark.table("silver_features")

# Feature Vectorization (Converting text array into a numerical vector)
hashing_tf = HashingTF(inputCol="extracted_skills", outputCol="rawFeatures", numFeatures=100)
tf_df = hashing_tf.transform(silver_df)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idf_model = idf.fit(tf_df)
feature_df = idf_model.transform(tf_df)

# K-Means Clustering
K = 3
kmeans = KMeans(featuresCol="features", k=K, seed=1)
model = kmeans.fit(feature_df)
clustered_df = model.transform(feature_df)

# Save the Gold Table with cluster IDs
gold_clustered_df = clustered_df.select(
    F.col("Resume_ID"),
    F.col("extracted_skills"),
    F.col("prediction").alias("Cluster_ID")
)
gold_clustered_df.write.format("delta").mode("overwrite").saveAsTable("gold_clustered_resumes")

print("✅ Gold table 'gold_clustered_resumes' created.")

✅ Gold table 'gold_clustered_resumes' created.


### Cluster Interpretation and Labeling

Since ML clusters use generic IDs (0, 1, 2), this is a critical step to make the dashboard consumable. Based on the dominant skills in each cluster, we apply a manual **mapping** to assign human-readable names (e.g., 'Data Analyst') to the segments, creating the final `gold_labeled_segments` table.

In [0]:
# CELL 4: CLUSTER INTERPRETATION: Labeling Segments

# --- IMPORTANT: Manually Define the Mapping based on previous analysis! ---
CLUSTER_MAPPING = {
    0: "General/Entry-Level",
    1: "ML Specialist",
    2: "Data Analyst" 
    # Adjust these names based on your actual cluster analysis
}

# Create a PySpark UDF to map the Cluster ID to the Name
map_func = F.udf(lambda x: CLUSTER_MAPPING.get(x, "Unlabeled"), T.StringType())

# Read the original cluster data and apply the new label
gold_clustered_df = spark.table("gold_clustered_resumes")
labeled_clusters_df = gold_clustered_df \
    .withColumn("Talent_Segment", map_func(F.col("Cluster_ID"))) \
    .select("Resume_ID", "extracted_skills", "Talent_Segment")

# Save the final consumable Gold Table
labeled_clusters_df.write.format("delta").mode("overwrite").saveAsTable("gold_labeled_segments")

print("✅ Gold table 'gold_labeled_segments' (Human Labels) created.")

✅ Gold table 'gold_labeled_segments' (Human Labels) created.


In [0]:
# CELL 5: USE CASE 3: Job-Readiness Scoring (ATS Logic)

silver_df = spark.table("silver_features")

# Scoring Logic (Requires the UDF definition to be included here, or defined in an earlier cell)
def calculate_readiness_score(extracted_skills, raw_text):
    score = 0
    if extracted_skills:
        score += min(len(extracted_skills) * 5, 50) 
    if 'python' in extracted_skills:
        score += 30 
    if 'sql' in extracted_skills:
        score += 20 
    if raw_text is None or len(raw_text) < 50:
        score = 0
    return min(int(score), 100)

score_udf = F.udf(calculate_readiness_score, T.IntegerType())

# Apply the scoring UDF 
gold_readiness_df = silver_df.withColumn(
    "Readiness_Score", 
    score_udf(F.col("extracted_skills"), F.col("Resume_Text"))
)

gold_readiness_df.write.format("delta").mode("overwrite").saveAsTable("gold_readiness_scores")

print("✅ Gold table 'gold_readiness_scores' created.")

✅ Gold table 'gold_readiness_scores' created.


### Use Case 4: Market Intelligence Report

This final aggregation step creates the most powerful business insight. It **joins** the scores (`gold_readiness_scores`) with the labeled segments (`gold_labeled_segments`) to calculate the **Average Readiness Score per Talent Segment**. This tells companies exactly which segment of their talent pool is the most prepared.

In [0]:
# CELL 6: USE CASE 4: Market Intelligence Report & Visualization

# Read the scores and the labeled cluster segments
scores_df = spark.table("gold_readiness_scores").select("Resume_ID", "Readiness_Score")
labeled_segments_df = spark.table("gold_labeled_segments").select("Resume_ID", "Talent_Segment")

# Join the scores and labeled segments
market_intelligence_df = scores_df.join(labeled_segments_df, on="Resume_ID", how="inner")

# Calculate the average score for each segment
avg_score_by_segment_df = market_intelligence_df.groupBy("Talent_Segment").agg(
    F.avg("Readiness_Score").alias("Avg_Readiness_Score"),
    F.count("Resume_ID").alias("Candidate_Count")
).orderBy(F.col("Avg_Readiness_Score").desc())

# Save the final Gold Table
avg_score_by_segment_df.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("gold_market_intelligence")

print("✅ Gold table 'gold_market_intelligence' created.")

# Display final results for dashboard creation
print("\n⭐ Final Dashboard Charts:")
display(spark.table("gold_skill_demand"))
display(spark.table("gold_labeled_segments").groupBy("Talent_Segment").count())
display(spark.table("gold_readiness_scores").select("Readiness_Score"))
display(avg_score_by_segment_df)

✅ Gold table 'gold_market_intelligence' created.

⭐ Final Dashboard Charts:


skill,resume_count
excel,28
aws,23
python,23
sql,19
deep learning,16
machine learning,12
tensorflow,11
pytorch,10
azure,10
power bi,9


Talent_Segment,count
General/Entry-Level,498
Data Analyst,33
ML Specialist,10


Readiness_Score
0
0
0
0
0
0
0
0
0
0


Talent_Segment,Avg_Readiness_Score,Candidate_Count
Data Analyst,10.303030303030305,33
ML Specialist,4.5,10
General/Entry-Level,1.536144578313253,498


In [0]:
# CELL 12: USE CASE 5:SKILL GAP ANALYSIS: Comparative Skill Frequency

# Read the gold table containing the clustered resumes (includes extracted_skills and Cluster_ID)
clustered_df = spark.table("gold_clustered_resumes")

# --- Define Target Clusters (Based on your results: 0 is low, 1 and 2 are high) ---
LOW_CLUSTER_ID = 0 
HIGH_CLUSTERS_IDS = [1, 2]

# 1. Filter and Explode skills for the LOW-READINESS cluster (Cluster 0)
low_readiness_skills = clustered_df.filter(F.col("Cluster_ID") == LOW_CLUSTER_ID) \
    .withColumn("skill", F.explode("extracted_skills")) \
    .groupBy("skill").agg(F.count("*").alias("count_low"))

# 2. Filter and Explode skills for the HIGH-READINESS clusters (Cluster 1 or 2)
high_readiness_skills = clustered_df.filter(F.col("Cluster_ID").isin(HIGH_CLUSTERS_IDS)) \
    .withColumn("skill", F.explode("extracted_skills")) \
    .groupBy("skill").agg(F.count("*").alias("count_high"))

# 3. Join the two frequency tables to compare side-by-side
gap_analysis_df = low_readiness_skills.join(
    high_readiness_skills, 
    on="skill", 
    how="full_outer" # Use full outer join to include skills unique to either group
).fillna(0) # Replace null counts (where a skill is missing in one group) with 0

# 4. Calculate the Gap Score: (Count in High Cluster) - (Count in Low Cluster)
gap_analysis_df = gap_analysis_df.withColumn(
    "Gap_Score", 
    F.col("count_high") - F.col("count_low")
).orderBy(F.col("Gap_Score").desc())

# 5. Save and display the final report
gap_analysis_df.write.format("delta").mode("overwrite").saveAsTable("gold_skill_gap_report")

print("✅ Skill Gap Analysis Report (gold_skill_gap_report) created.")

# Show the top 10 skills that have the largest gap score (i.e., missing in the low cluster)
print("\n⭐ Top Skills Missing in Low-Readiness Segment (Skill Gap):")
gap_analysis_df.select("skill", "count_high", "count_low", "Gap_Score").show(10, truncate=False)

✅ Skill Gap Analysis Report (gold_skill_gap_report) created.

⭐ Top Skills Missing in Low-Readiness Segment (Skill Gap):
+------------+----------+---------+---------+
|skill       |count_high|count_low|Gap_Score|
+------------+----------+---------+---------+
|excel       |28        |0        |28       |
|azure       |10        |0        |10       |
|sql         |14        |5        |9        |
|power bi    |9         |0        |9        |
|tableau     |5         |0        |5        |
|r           |0         |1        |-1       |
|java        |0         |3        |-3       |
|kubernetes  |0         |4        |-4       |
|docker      |0         |5        |-5       |
|scikit-learn|0         |7        |-7       |
+------------+----------+---------+---------+
only showing top 10 rows


In [0]:
# CELL X: DASHBOARD CHART 5: Skill Gap Analysis Visualization

# Read the final gap report table
gap_analysis_df = spark.table("gold_skill_gap_report")

# We only want to visualize the POSITIVE gaps (skills the high cluster has, but the low cluster needs)
# Limit to the top 10 most critical missing skills
critical_gap_df = gap_analysis_df.filter(F.col("Gap_Score") > 0).orderBy(F.col("Gap_Score").desc()).limit(10)

print("Visualization: 5. Critical Skill Gaps to Address")

# Use the native display() function to render the chart
display(critical_gap_df) 

# --- After running this cell, configure the chart: ---
# 1. Select the 'Bar' chart type.
# 2. Set X-Axis to 'skill'.
# 3. Set Y-Axis to 'Gap_Score'.
# 4. Save the chart as "5. Critical Skill Gaps".

Visualization: 5. Critical Skill Gaps to Address


skill,count_low,count_high,Gap_Score
excel,0,28,28
azure,0,10,10
power bi,0,9,9
sql,5,14,9
tableau,0,5,5


Databricks visualization. Run in Databricks to view.