In [0]:
parsed_df = spark.read.table("workspace.resume_project.resume_parsed_data")

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col
from pyspark.sql.functions import explode, split, trim, monotonically_increasing_id


w = Window.partitionBy("candidate_id").orderBy(col("upload_date").desc())

latest_df = parsed_df.withColumn("rn", row_number().over(w)) \
                     .filter(col("rn") == 1) \
                     .drop("rn")

incoming_skills_df = parsed_df.select(explode(split("skills", ",")).alias("skill")) \
                              .withColumn("skill", trim("skill")) \
                              .filter("skill != '' AND skill IS NOT NULL") \
                              .dropDuplicates()

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.utils import AnalysisException

dim_skill_table = "workspace.resume_project.dim_skill"

try:
    dim_skill_df = spark.read.table(dim_skill_table)
    max_id = dim_skill_df.agg({"skill_id": "max"}).collect()[0][0]
    start_id = max_id + 1 if max_id is not None else 1

    new_skills = incoming_skills_df.join(dim_skill_df, on="skill", how="left_anti")

    window = Window.orderBy("skill")
    new_skills = new_skills.withColumn("skill_id", row_number().over(window) + start_id - 1)

    dim_skill_df = dim_skill_df.unionByName(new_skills)

except AnalysisException:
    from pyspark.sql.functions import row_number
    window = Window.orderBy("skill")

    dim_skill_df = incoming_skills_df.withColumn("skill_id", row_number().over(window))

dim_skill_df.write.format("delta").mode("overwrite").saveAsTable(dim_skill_table)




In [0]:
dim_skill_df.show(1000)

+-----------------+--------+
|            skill|skill_id|
+-----------------+--------+
|     adaptability|       1|
|                c|       2|
|    collaboration|       3|
|    communication|       4|
|       confluence|       5|
|       creativity|       6|
|critical thinking|       7|
|              css|       8|
|  decision making|       9|
|            excel|      10|
|               go|      11|
|    google sheets|      12|
|             html|      13|
|      illustrator|      14|
|             java|      15|
|       javascript|      16|
|       leadership|      17|
|            linux|      18|
|      negotiation|      19|
|       networking|      20|
|           oracle|      21|
|       peopleSoft|      22|
|        photoshop|      23|
|  problem solving|      24|
|           puppet|      25|
|                r|      26|
|             safe|      27|
|              sap|      28|
|     social media|      29|
|           spring|      30|
|       sql server|      31|
|            s

In [0]:
def upsert_dimension(df_new, dim_table, key_col):
    try:
        dim_df = spark.read.table(dim_table)  # Try to load the dimension table if it exists
        max_id = dim_df.agg({"{}_id".format(key_col): "max"}).collect()[0][0]  # Get the max id so far
        start_id = (max_id or 0) + 1  # If no ID exists, start from 1

        # Find new records not already in the dimension table
        to_insert = (
            df_new.join(dim_df, on=key_col, how="left_anti")  # only new values
                  .withColumn(f"{key_col}_id", 
                              row_number().over(Window.orderBy(key_col)) + start_id - 1)
        )

        # Union new rows with existing ones
        dim_df = dim_df.unionByName(to_insert)

    except AnalysisException:
        # If table doesn't exist, this is the first load
        dim_df = (
            df_new.withColumn(f"{key_col}_id", 
                              row_number().over(Window.orderBy(key_col)))
        )

    # Save (overwrite or create) the dimension table
    dim_df.write.format("delta").mode("overwrite").saveAsTable(dim_table)
    return dim_df


In [0]:
edu_df_new = (
    parsed_df
    .select(trim(col("education_level")).alias("education_level"))  # remove whitespace
    .filter("education_level IS NOT NULL AND education_level != ''")  # clean invalids
    .dropDuplicates()
)
dim_education = upsert_dimension(edu_df_new, "workspace.resume_project.dim_education_level", "education_level")


In [0]:
cat_df_new = (
    parsed_df
    .select(trim(col("Category")).alias("Category"))
    .filter("Category IS NOT NULL AND Category != ''")
    .dropDuplicates()
)
dim_category = upsert_dimension(cat_df_new, "workspace.resume_project.dim_category", "Category")


In [0]:
skill_lookup = spark.read.table("workspace.resume_project.dim_skill")

bridge_candidate_skill = (
    parsed_df
    .select("candidate_id", explode(split("skills", ",")).alias("skill"))  # split comma-separated skills
    .withColumn("skill", trim("skill"))  # remove leading/trailing spaces
    .filter("skill IS NOT NULL AND skill != ''")  # drop blanks
    .join(skill_lookup, on="skill", how="inner")  # match with dim_skill for skill_id
    .select("candidate_id", "skill_id")
    .dropDuplicates()
)

bridge_candidate_skill.write.format("delta").mode("overwrite").saveAsTable(
    "workspace.resume_project.bridge_candidate_skill"
)


In [0]:
dim_skill_df.show()
dim_education.show()
dim_category.show()
bridge_candidate_skill.show()



+-----------------+--------+
|            skill|skill_id|
+-----------------+--------+
|     adaptability|       1|
|                c|       2|
|    collaboration|       3|
|    communication|       4|
|       confluence|       5|
|       creativity|       6|
|critical thinking|       7|
|              css|       8|
|  decision making|       9|
|            excel|      10|
|               go|      11|
|    google sheets|      12|
|             html|      13|
|      illustrator|      14|
|             java|      15|
|       javascript|      16|
|       leadership|      17|
|            linux|      18|
|      negotiation|      19|
|       networking|      20|
+-----------------+--------+
only showing top 20 rows
+--------------------+------------------+
|     education_level|education_level_id|
+--------------------+------------------+
|            BACHELOR|                 1|
|            Bachelor|                 2|
|                 MBA|                 3|
|            MBA, B.E|     

In [0]:
cand_stage = (
    parsed_df
    .join(dim_education, on="education_level", how="left")
    .join(dim_category, on="Category", how="left")
    .select(
        "candidate_id",
        "Category",
        "category_id",
        "education_level",
        "education_level_id",
        "emails",
        "urls",
        "is_usable",
        "effective_start_date",
        "effective_end_date",
        "is_current"
    )
)


In [0]:
cand_stage.write.format("delta").mode("overwrite").saveAsTable(
    "workspace.resume_project.dim_candidate_stage"
)

In [0]:
cand_stage.show(1000)



+------------+--------------------+-----------+--------------------+------------------+------+--------------------+---------+--------------------+------------------+----------+
|candidate_id|            Category|category_id|     education_level|education_level_id|emails|                urls|is_usable|effective_start_date|effective_end_date|is_current|
+------------+--------------------+-----------+--------------------+------------------+------+--------------------+---------+--------------------+------------------+----------+
|      000038|               Other|         17|                    |              NULL|      |                    |    false|          2023-01-01|        3000-01-01|     false|
|      000047|               Other|         17|                    |              NULL|      |                    |    false|          2023-01-01|        3000-01-01|     false|
|      000052|               Other|         17|                    |              NULL|      |                    |

In [0]:
# Candidate dimension rows for SCD2 merge
cand_stage.printSchema()

root
 |-- candidate_id: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- category_id: integer (nullable = true)
 |-- education_level: string (nullable = true)
 |-- education_level_id: integer (nullable = true)
 |-- emails: string (nullable = true)
 |-- urls: string (nullable = true)
 |-- is_usable: boolean (nullable = true)
 |-- effective_start_date: date (nullable = true)
 |-- effective_end_date: string (nullable = true)
 |-- is_current: boolean (nullable = true)



In [0]:
bridge_candidate_skill.printSchema()

root
 |-- candidate_id: string (nullable = true)
 |-- skill_id: integer (nullable = true)



In [0]:
dim_skill_df.printSchema()

root
 |-- skill: string (nullable = true)
 |-- skill_id: integer (nullable = true)



In [0]:
dim_category.printSchema()

root
 |-- Category: string (nullable = true)
 |-- Category_id: integer (nullable = true)



In [0]:
dim_education.printSchema()

root
 |-- education_level: string (nullable = true)
 |-- education_level_id: integer (nullable = true)



## SCD Type 2

In [0]:
from pyspark.sql.functions import col, lit, to_date, date_sub

stage_df = spark.read.table("workspace.resume_project.dim_candidate_stage")

table_name = "workspace.resume_project.dim_candidate"

if not spark.catalog.tableExists(table_name):
    first_run = True
else:
    first_run = False
    existing_df = spark.read.table(table_name)

In [0]:
stage_df.show()

+------------+--------+-----------+---------------+------------------+------+----+---------+--------------------+------------------+----------+
|candidate_id|Category|category_id|education_level|education_level_id|emails|urls|is_usable|effective_start_date|effective_end_date|is_current|
+------------+--------+-----------+---------------+------------------+------+----+---------+--------------------+------------------+----------+
|      000038|   Other|         17|               |              NULL|      |    |    false|          2023-01-01|        3000-01-01|     false|
|      000047|   Other|         17|               |              NULL|      |    |    false|          2023-01-01|        3000-01-01|     false|
|      000052|   Other|         17|               |              NULL|      |    |    false|          2023-01-01|        3000-01-01|     false|
|      000077|      HR|         10|               |              NULL|      |    |     true|          2023-01-01|        3000-01-01|    

In [0]:
if first_run:
    stage_df.write.format("delta").mode("overwrite").saveAsTable(table_name)
    print("First run: dim_candidate table created.")
else:
    current_df = existing_df.filter("is_current = true")

    join_cond = [stage_df.candidate_id == current_df.candidate_id]
    stage = stage_df.alias("stage")
    curr = current_df.alias("curr")

    changes_df = (
        stage.join(curr, join_cond, "left")
        .filter(
            (curr.candidate_id.isNull()) |
            (stage.category_id != curr.category_id) |
            (stage.education_level_id != curr.education_level_id) |
            (stage.emails != curr.emails) |
            (stage.urls != curr.urls) |
            (stage.is_usable != curr.is_usable)
    )
    .select("stage.*")
    )

    if changes_df.count() == 0:
        print("No new or updated candidate records.")
    else:
        changes_df.show(n=10)
        current_df.show(n=10)
        stage = changes_df.alias("stage")
        curr = current_df.alias("curr")

        joined_df = curr.join(stage, curr.candidate_id == stage.candidate_id, "inner")
        joined_df.show(n=10)
        expired_df = (
            curr
            .drop("effective_end_date")
            .drop("is_current")
            .join(stage, curr.candidate_id == stage.candidate_id, "inner")
            .withColumn("effective_end_date", date_sub(col("curr.effective_start_date"), 1))
            .withColumn("effective_end_date", col("effective_end_date").cast("string"))
            .withColumn("is_current", lit(False))
            .select(
                col("curr.candidate_id").alias("candidate_id"),
                col("curr.Category").alias("Category"),
                col("curr.category_id").alias("category_id"),
                col("curr.education_level").alias("education_level"),
                col("curr.education_level_id").alias("education_level_id"),
                col("curr.emails").alias("emails"),
                col("curr.urls").alias("urls"),
                col("curr.is_usable").alias("is_usable"),
                col("curr.effective_start_date").alias("effective_start_date"),
                col("effective_end_date"),  
                col("is_current")
            )
        )

        final_df = (
            existing_df.filter("is_current = false")
            .unionByName(expired_df)
            .unionByName(changes_df)
        )

        final_df.write.format("delta").mode("overwrite").saveAsTable("workspace.resume_project.dim_candidate")
        print(f"SCD2 merge complete. {changes_df.count()} candidate(s) updated.")


+------------+--------+-----------+---------------+------------------+------+----+---------+--------------------+------------------+----------+
|candidate_id|Category|category_id|education_level|education_level_id|emails|urls|is_usable|effective_start_date|effective_end_date|is_current|
+------------+--------+-----------+---------------+------------------+------+----+---------+--------------------+------------------+----------+
|      000038|   Other|         17|               |              NULL|      |    |    false|          2023-01-01|        3000-01-01|     false|
|      000047|   Other|         17|               |              NULL|      |    |    false|          2023-01-01|        3000-01-01|     false|
|      000052|   Other|         17|               |              NULL|      |    |    false|          2023-01-01|        3000-01-01|     false|
|      000077|      HR|         10|               |              NULL|      |    |     true|          2023-01-01|        3000-01-01|    