# Setup Logic:

In [0]:
# Azure storage account details
account_name = "***************"
account_key = "***********************"


In [0]:
from delta.tables import DeltaTable
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import lit, current_timestamp, current_date, col, xxhash64, concat, row_number, desc, when, lag
import pandas as pd
import numpy as np

spark = SparkSession.builder.appName("DWH Population").getOrCreate()


spark.conf.set(f"fs.azure.account.key.{account_name}.dfs.core.windows.net", account_key)

# Define base paths
input_base_path = "abfss://conformed@atomicatraining.dfs.core.windows.net/Input/Lego"
output_base_path = "abfss://conformed@atomicatraining.dfs.core.windows.net/Output/Lego"


# SCD type 2 implementation

- Make Sure to only append the new and unique rows.
    - Use `Left_anti JOIN` between both tables using a hashing column to compare between the source and target rows:
        - If both rows have the same hash value -> ignore
        - If both rows don't have the same hash value -> append the new record to the target table
- Append all the source records that don't have an equivalent value as to any of the target table records.
    - Use `row_number()` window function on all columns that have the same primary keys, order by the start_date descendingly.
    - Inactivate all the records that don't have a _row_number_ rank = 1.

In [0]:


def upsert_to_delta(input_path, output_path, join_columns, zorder_columns):
    """
    A function that handles SCD type 2 upserts
    """
    source_df = spark.read.format("delta").load(input_path)
    
    # Check if the target Delta table exists
    if DeltaTable.isDeltaTable(spark, output_path):
        # Load the existing Delta table
        target_df = spark.read.format("delta").load(output_path)
        
        #1) create a hash column:
        df_source = source_df.withColumn(
            "hash64", 
            xxhash64(concat(*[col(c) for c in source_df.columns if c not in ["start_date", "end_date", "is_active"]]))
        )

        df_target = target_df.withColumn(
            "hash64", 
            xxhash64(concat(*[col(c) for c in target_df.columns if c not in ["start_date", "end_date", "is_active"]]))
        )


        #2) left_anti join then drop the hash column and append the result to the target table:
        df_result = df_source.join(df_target, df_target.hash64 == df_source.hash64, 'leftanti').drop('hash64')
        df_result = df_result.withColumn("end_date", col("end_date").cast("timestamp"))
        if df_result.isEmpty():
            print("######> No New Records Were Detected <######")
        else:
            df_result.coalesce(1).write.format("delta").option("mergeSchema", "true").mode("append").save(output_path)

            # target_df.printSchema()
            # df_result.printSchema()


            #3)apply row_number():
            target_df = spark.read.format("delta").load(output_path)
            target_df.createOrReplaceTempView("target")
            df_tbl = spark.sql('SELECT * FROM target')

            #row_number() OVER(PARTITION BY join_columns, ORDER BY start_date desc)
            window_def = Window.partitionBy(*[col(c) for c in join_columns]).orderBy(df_tbl.start_date.desc())
            df_row = df_tbl.withColumn('row_number', row_number().over(window_def)) 

            #4) Inactivate all the rows that has a row_number > 1
            window_row_number =  Window.partitionBy(*[col(c) for c in join_columns]).orderBy(col("row_number"))
            df_updated_end_date = df_row.withColumn(
                'end_date', 
                when(col("row_number") > 1, lag("start_date",1).over(window_row_number)).otherwise(col("end_date"))
                )
            df_final = df_updated_end_date.withColumn(
                'is_active',
                when(col("end_date").isNotNull(), lit(False)).otherwise(col("is_active"))
                )
            df_final = df_final.drop("row_number")

            df_final.coalesce(1).write.format("delta").option("mergeSchema", "true").mode("overwrite").save(output_path)
            
            # Optimize and Z-Order using Spark SQL
            zorder_columns_str = ", ".join(zorder_columns)
            spark.sql(f"OPTIMIZE delta.`{output_path}` ZORDER BY ({zorder_columns_str})")
    else:
        # Cast the 'start_date' in the source to 'timestamp' to match the target schema
        source_df = source_df.withColumn("start_date", col("start_date").cast("timestamp"))
        # Cast the 'start_date' in the source to 'timestamp' to match the target schema
        source_df = source_df.withColumn("end_date", col("end_date").cast("timestamp"))
        # If the target table does not exist, write the data to create it with schema evolution enabled
        source_df.coalesce(1).write.format("delta").option("mergeSchema", "true").mode("overwrite").save(output_path)


In [0]:


# Fact_Inventory
upsert_to_delta(
    input_path=f"{input_base_path}/Fact_Inventory",
    output_path=f"{output_base_path}/Fact_Inventory",
    join_columns=["inventory_id", "set_num", "part_num", "fig_num", "color_id", "is_spare"],
    zorder_columns=["inventory_id"]
)

# Dim_Set
upsert_to_delta(
    input_path=f"{input_base_path}/Dim_sets",
    output_path=f"{output_base_path}/Dim_sets",
    join_columns=["set_num"],
    zorder_columns=["set_num"]
)

# Dim_Theme
upsert_to_delta(
    input_path=f"{input_base_path}/Dim_themes",
    output_path=f"{output_base_path}/Dim_themes",
    join_columns=["theme_id"],
    zorder_columns=["theme_id"]
)

# Dim_Minifig
upsert_to_delta(
    input_path=f"{input_base_path}/Dim_minifig",
    output_path=f"{output_base_path}/Dim_minifig",
    join_columns=["fig_num"],
    zorder_columns=["fig_num"]
)

# Dim_Part
upsert_to_delta(
    input_path=f"{input_base_path}/Dim_parts",
    output_path=f"{output_base_path}/Dim_parts",
    join_columns=["part_num"],
    zorder_columns=["part_num"]
)

# Dim_Color
upsert_to_delta(
    input_path=f"{input_base_path}/Dim_colors",
    output_path=f"{output_base_path}/Dim_colors",
    join_columns=["color_id"],
    zorder_columns=["color_id"]
)

# Dim_Part_Relationships
upsert_to_delta(
    input_path=f"{input_base_path}/Dim_part_relationships",
    output_path=f"{output_base_path}/Dim_part_relationships",
    join_columns=["rel_type","child_part_num", "parent_part_num"],
    zorder_columns=["child_part_num", "parent_part_num"]
)
