In [0]:
from pyspark.sql.functions import *

silver_df = spark.read.table("surakshadb.bronze_stg.hiring_raw")\
    .withColumn("employees", explode(col("employees")))\
        .select(col("employees.id").alias("employee_id"),
                col("employees.name").alias("employee_name"),
                col("employees.profile.contact.email").alias("employee_email"),
                col("employees.profile.contact.phone").alias("employee_phone"),
                col("employees.profile.contact.address.street").alias("employee_address"),
                col("employees.profile.contact.address.city").alias("employee_city"),
                col("employees.profile.contact.address.location.state").alias("employee_state"),
                col("employees.profile.contact.address.location.country").alias("employee_country"),
                current_timestamp().alias("ingested_at")
                )
        
silver_df.limit(3).display()


employee_id,employee_name,employee_email,employee_phone,employee_address,employee_city,employee_state,employee_country,ingested_at
E00001,Crystal Cooper,bautistaalexis@example.org,(463)417-5451,434 Christopher Place,Bryanfort,CT,USA,2026-01-29T11:45:55.875Z
E00002,Joseph Alexander,brianna31@example.net,(357)722-0455,960 Michelle Rapids Suite 254,Lake Derrickchester,TN,USA,2026-01-29T11:45:55.875Z
E00003,Larry Gomez,david10@example.com,638.780.9891,857 Alexandra Points Apt. 746,South Scott,MT,USA,2026-01-29T11:45:55.875Z


In [0]:
silver_df_proj = spark.read.table("surakshadb.bronze_stg.hiring_raw")\
    .withColumn("employees", explode(col("employees")))\
        .withColumn("projects", explode(col("employees.profile.projects")))\
            .withColumn("tasks", explode(col("projects.tasks")))\
            .select(col("employees.id").alias("employee_id"),
                    col("projects.projectId").alias("project_id"),
                    col("projects.name").alias("project_name"),
                    col("tasks.taskId").alias("task_Id"),
                    col("tasks.description").alias("description"),
                    col("tasks.assignedTo.id").alias("assignedTo"),
                    col("tasks.assignedTo.skills.primary").alias("primary_skills"),
                    col("tasks.assignedTo.skills.secondary").alias("secondary_skill"),
                    col("tasks.assignedTo.skills.experience.years").alias("experience"),
                    col("tasks.assignedTo.skills.experience.domains").alias("domains"),
                    col("tasks.assignedTo.skills.experience.certifications.current").alias("certifications"),
                    col("tasks.assignedTo.skills.experience.certifications.meta.verified").alias("IsVerified"),
                    current_timestamp().alias("ingested_at")
                    )
            
silver_df_proj.limit(3).display()

employee_id,project_id,project_name,task_Id,description,assignedTo,primary_skills,secondary_skill,experience,domains,certifications,IsVerified,ingested_at
E00001,P945,Unleash Value-Added Communities,T740,Robust uniform middleware,E00001,Python,"List(AWS, GCP)",6,"List(Healthcare, AI)","List(AWS Certified Developer, Scrum Master)",True,2026-01-29T11:46:19.201Z
E00002,P725,Generate Best-Of-Breed Infrastructures,T566,Business-focused full-range matrices,E00002,JavaScript,"List(Node.js, Kubernetes)",2,"List(DevOps, AI)","List(Scrum Master, AWS Certified Developer)",True,2026-01-29T11:46:19.201Z
E00003,P291,Evolve Leading-Edge Bandwidth,T495,Switchable neutral installation,E00003,Go,"List(AWS, Docker)",6,"List(E-commerce, Finance)","List(Scrum Master, AWS Solutions Architect)",True,2026-01-29T11:46:19.201Z


In [0]:
from delta.tables import DeltaTable

if spark.catalog.tableExists("surakshadb.silver_stg.hiring_demography"):
     deltaTable = DeltaTable.forName(spark, "surakshadb.silver_stg.hiring_demography")

     deltaTable.alias("target")\
         .merge(
             silver_df.alias("source"),
             "target.employee_id = source.employee_id"
         ).whenMatchedUpdateAll()\
             .whenNotMatchedInsertAll()\
                 .execute()
else:
    silver_df.write.format("delta").saveAsTable("surakshadb.silver_stg.hiring_demography")

In [0]:
from delta.tables import DeltaTable

if spark.catalog.tableExists("surakshadb.silver_stg.hiring_domain"):
    deltaTable = DeltaTable.forName(spark, "surakshadb.silver_stg.hiring_domain")

    deltaTable.alias("target")\
        .merge(
            silver_df_proj.alias("source"),
            "target.employee_id = source.employee_id"
        ).whenMatchedUpdateAll()\
            .whenNotMatchedInsertAll()\
                .execute()
else:
    silver_df_proj.write.format("delta").saveAsTable("surakshadb.silver_stg.hiring_domain")
