In [0]:
from pyspark.sql.functions import input_file_name, current_timestamp


In [0]:
raw_tables = {
    "departments": ["dbfs:/mnt/raw/departments/", "dept_no"],
    "dept_emp": ["dbfs:/mnt/raw/dept_emp/", "emp_no, dept_no"],  
    "dept_manager": ["dbfs:/mnt/raw/dept_manager/", "emp_no, dept_no"],  
    "employees": ["dbfs:/mnt/raw/employees/", "emp_no"],
    "salaries": ["dbfs:/mnt/raw/salaries/", "emp_no, from_date"],  
    "titles": ["dbfs:/mnt/raw/titles/", "emp_no, from_date"]  
}

In [0]:
spark.sql(f"CREATE SCHEMA IF NOT EXISTS employee_catalog.raw")

DataFrame[]

In [0]:
# %sql
# update employee_catalog.silver.employees
# set first_name = 'Chidera'
# where emp_no = 10001


In [0]:
%sql
select *
from employee_catalog.silver.employees
where emp_no = 10001

emp_no,first_name,last_name,gender,hire_date,birth_date,updated_at,full_name,load_date
10001,Georgi,Facello,M,1986-06-26,1953-09-02,2025-02-12T15:07:18Z,Georgi Facello,2025-02-12T14:46:50.424Z


In [0]:
for table, table_info in raw_tables.items():
    path = table_info[0]
    primary_key = table_info[1]
    primary_key_cols = [col.strip() for col in primary_key.split(",")]

    df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(path) \
    .withColumn("_file_name", input_file_name()) \
    .withColumn("load_date", current_timestamp())

    # Create a temporary view with only the latest record for each primary key
    dedup_query = f"""
    WITH RankedRecords AS (
        SELECT *,
               ROW_NUMBER() OVER (
                   PARTITION BY {", ".join(primary_key_cols)}
                   ORDER BY _airbyte_emitted_at DESC
               ) as rn
        FROM temp_view
    )
    SELECT {", ".join(df.columns)}
    FROM RankedRecords
    WHERE rn = 1
    """


    # creating temp view to compare data
    df.createOrReplaceTempView("temp_view")

    # Create a new temp view with deduplicated records
    spark.sql(dedup_query).createOrReplaceTempView("deduplicated_source")

 
    target_table = f"employee_catalog.raw.{table}"

    # spark.sql(f"TRUNCATE TABLE {target_table}")


    # merge statement to implement scd type 1
    merge_query = f"""
    MERGE INTO {target_table} AS target
    USING deduplicated_source AS source
    ON {" AND ".join([f"target.{col.strip()} = source.{col.strip()}" for col in primary_key_cols])}
    WHEN MATCHED 
        AND source._airbyte_emitted_at > target._airbyte_emitted_at
    THEN
        UPDATE SET {", ".join([f"target.{col} = source.{col}" for col in df.columns])}
    WHEN NOT MATCHED THEN
        INSERT ({", ".join(df.columns)}) 
        VALUES ({", ".join([f"source.{col}" for col in df.columns])})
    """

    # Execute the MERGE statement
    spark.sql(merge_query)

    print(f"Incrementally loaded {table} into employee_catalog.raw.{table} with SCD Type 1")

Incrementally loaded departments into employee_catalog.raw.departments with SCD Type 1
Incrementally loaded dept_emp into employee_catalog.raw.dept_emp with SCD Type 1
Incrementally loaded dept_manager into employee_catalog.raw.dept_manager with SCD Type 1
Incrementally loaded employees into employee_catalog.raw.employees with SCD Type 1
Incrementally loaded salaries into employee_catalog.raw.salaries with SCD Type 1
Incrementally loaded titles into employee_catalog.raw.titles with SCD Type 1
