In [0]:
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
from delta.tables import DeltaTable

spark = SparkSession.builder.getOrCreate()

bronze_table = "workspace.applied_research_bronze.hr_bronze_data"
silver_table = "workspace.applied_research_silver.hr_silver_data_test"

In [0]:
if not spark.catalog.tableExists(silver_table):
    print("Silver table not found. Creating it")

    bronze_schema_df = spark.table(bronze_table).limit(0)

    empty_silver_df = (
        bronze_schema_df
            .drop("snapshot_date", "time_in_company", "previous_job_level",
                  "last_raise_year", "month", "promotion_count")
            .withColumnsRenamed({
                "Employee_ID": "employee_id",
                "Full_Name": "full_name",
                "Department": "department",
                "Job_Title": "job_title",
                "Hire_Date": "hire_date",
                "Location": "location",
                "Performance_Rating": "performance_rating",
                "Experience_Years": "experience_years",
                "Status": "status",
                "Work_Mode": "work_mode",
                "Annual_Salary": "annual_salary",
                "Job_Level": "job_level"
            })
            .select(
                "employee_id", "full_name", "department", "job_title", "hire_date",
                "location", "performance_rating", "experience_years",
                "status", "work_mode", "annual_salary", "job_level",
                "ingestion_timestamp"
            )
            .withColumn("data_hash", f.lit(None).cast("string"))
            .withColumn("start_effectivity_date", f.lit(None).cast("timestamp"))
            .withColumn("end_effectivity_date", f.lit(None).cast("timestamp"))
            .withColumn("is_active", f.lit(False))
    )

    empty_silver_df.write.format("delta").mode("overwrite").saveAsTable(silver_table)
    print("Silver table created.")

silver_tabler = DeltaTable.forName(spark, silver_table)

In [0]:
timestamps = (
    spark.table(bronze_table)
         .select("ingestion_timestamp")
         .distinct()
         .orderBy("ingestion_timestamp")
         .collect()
)

timestamp_list = [row["ingestion_timestamp"] for row in timestamps]

In [0]:
tracked_columns = [
    "employee_id", "full_name", "department", "job_title",
    "location", "performance_rating", "status", "work_mode", "job_level"
]

def prepare_dataframe(df):
    """Applies column cleaning, renaming, ordering and SCD2 metadata."""
    df = df.drop("snapshot_date", "time_in_company", "previous_job_level",
                 "last_raise_year", "month", "promotion_count")

    df = df.withColumnsRenamed({
        "Employee_ID": "employee_id",
        "Full_Name": "full_name",
        "Department": "department",
        "Job_Title": "job_title",
        "Hire_Date": "hire_date",
        "Location": "location",
        "Performance_Rating": "performance_rating",
        "Experience_Years": "experience_years",
        "Status": "status",
        "Work_Mode": "work_mode",
        "Annual_Salary": "annual_salary",
        "Job_Level": "job_level"
    })

    df = df.select(
        "employee_id", "full_name", "department", "job_title", "hire_date",
        "location", "performance_rating", "experience_years",
        "status", "work_mode", "annual_salary", "job_level",
        "ingestion_timestamp"
    )

    df = df.withColumn("data_hash", f.sha2(f.concat_ws("_", *tracked_columns), 256)) \
           .withColumn("start_effectivity_date", f.col("ingestion_timestamp")) \
           .withColumn("end_effectivity_date", f.lit(None).cast("timestamp")) \
           .withColumn("is_active", f.lit(True))

    return df

In [0]:
for ts in timestamp_list:

    bronze_batch = spark.sql(f"""
        SELECT *
        FROM {bronze_table}
        WHERE ingestion_timestamp = '{ts}'
    """)

    df = prepare_dataframe(bronze_batch)
    

    current_silver_df = silver_tabler.toDF().filter(f.col("is_active") == True)

    changed_df = (
        df.alias("bronze")
          .join(current_silver_df.alias("silver"), on="employee_id")
          .filter(f.col("bronze.data_hash") != f.col("silver.data_hash"))
          .select("bronze.*")
    )

    new_employees_df = (
        df.alias("bronze")
          .join(current_silver_df.select("employee_id").alias("silver"),
                on="employee_id", how="left_anti")
    )

    departed_employees_df = (
        current_silver_df.alias("silver")
            .join(df.select("employee_id").alias("bronze"),
                  on="employee_id", how="left_anti")
    )

    changed_records = changed_df.unionByName(departed_employees_df).distinct()
    new_records = changed_df.unionByName(new_employees_df)

    if changed_records.count() > 0:
        silver_tabler.alias("silver").merge(
            changed_records.alias("bronze"),
            "silver.employee_id = bronze.employee_id AND silver.is_active = True"
        ).whenMatchedUpdate(set={
            "is_active": f.lit(False),
            "end_effectivity_date": f.current_timestamp()
        }).execute()

    if new_records.count() > 0:
        silver_tabler.alias("silver").merge(
            new_records.alias("bronze"),
            "silver.employee_id = bronze.employee_id AND silver.is_active = True"
        ).whenNotMatchedInsertAll().execute()




In [0]:
test = spark.read.table("workspace.applied_research_silver.hr_silver_data_test")
print(test.count())

In [0]:
test.groupBy('ingestion_timestamp') \
  .pivot('is_active') \
  .count() \
  .orderBy('ingestion_timestamp') \
  .show(100)