In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import *
from pyspark.sql.types import *
from delta.tables import *

## **Read The Customer data From the silver Layer and then Apply The SCD Type 1 and make this table as Dimension Table**

In [0]:
gold_customer_path = "/Volumes/sampleproject/practice/gold/dim_customers"
source_customers = spark.read.format('delta').load(f"/Volumes/sampleproject/practice/silver/customers")

is_gold_table_exists = DeltaTable.isDeltaTable(spark, gold_customer_path)

if not is_gold_table_exists:

    window_spec = Window.orderBy("customer_id")
    df_with_sk = source_customers.withColumn("customer_sk", row_number().over(window_spec))
    
    df_with_sk.write.format("delta").mode("overwrite").save(gold_customer_path)
    print("First run complete. Dimension created.")

else:
    target_table = DeltaTable.forPath(spark, gold_customer_path)
    target_df = target_table.toDF()
    
    max_sk_row = target_df.agg(max("customer_sk")).collect()[0][0]
    max_sk = max_sk_row if max_sk_row is not None else 0
    
    new_rows = source_customers.join(target_df, "customer_id", "left_anti")

    window_spec = Window.orderBy("customer_id")
    new_rows_with_sk = new_rows.withColumn("customer_sk", lit(max_sk) + row_number().over(window_spec))
    
    existing_rows = source_customers.join(target_df, "customer_id", "inner").select(source_customers["*"])
    existing_rows_staged = existing_rows.withColumn("customer_sk", lit(-1))
    
    staging_df = new_rows_with_sk.unionByName(existing_rows_staged)
    
    target_table.alias("target") \
        .merge(
            staging_df.alias("source"),
            "target.customer_id = source.customer_id"
        ) \
        .whenMatchedUpdate(set={
            "target.email": "source.email",
            "target.city": "source.city",
            "target.state": "source.state",
            "target.full_name": "source.full_name",
            "target.email_domain": "source.email_domain",
        }) \
        .whenNotMatchedInsertAll() \
        .execute()



## **Read The Products data From the silver Layer and then Apply The SCD Type 2 and make this table as Dimension Table**

In [0]:
gold_path_products_scd2 = "/Volumes/sampleproject/practice/gold/dim_products"
source_df = spark.read.format('delta').load(f"/Volumes/sampleproject/practice/silver/products")  

if not DeltaTable.isDeltaTable(spark, gold_path_products_scd2):
    
    df_first_run = source_df \
        .withColumn("start_date", current_timestamp()) \
        .withColumn("end_date", lit(None).cast("timestamp")) \
        .withColumn("is_current", lit(True))
        
    window_spec = Window.orderBy("product_id")
    df_with_sk = df_first_run.withColumn("product_sk", row_number().over(window_spec))
    
    df_with_sk.write.format("delta").mode("overwrite").save(gold_path_products_scd2)
    print("Dim_Products_SCD2 created successfully.")

else:
    
    target_table = DeltaTable.forPath(spark, gold_path_products_scd2)
    target_df = target_table.toDF()

    max_sk_row = target_df.agg(max("product_sk")).collect()[0][0]
    max_sk = max_sk_row if max_sk_row is not None else 0


    current_target = target_df.filter(col("is_current") == True)\

    join_df = source_df.alias("src").join(
        current_target.alias("tgt"),
        on="product_id",
        how="left"
    )

    rows_to_insert = join_df.filter(
        (col("tgt.product_id").isNull()) | 
        (col("src.price") != col("tgt.price")) |
        (col("src.price_tier") != col("tgt.price_tier")) |
        (col("src.data_quality_flag") != col("tgt.data_quality_flag"))
    ).select("src.*")
    
    rows_to_close = join_df.filter(
        (col("tgt.product_id").isNotNull()) & (
            (col("src.price") != col("tgt.price")) |
            (col("src.price_tier") != col("tgt.price_tier")) |
            (col("src.data_quality_flag") != col("tgt.data_quality_flag"))
        )
    ).select("tgt.*") 
   
    window_spec_new = Window.orderBy("product_id")
    rows_to_insert_with_sk = rows_to_insert \
        .withColumn("product_sk", lit(max_sk) + row_number().over(window_spec_new)) \
        .withColumn("start_date", current_timestamp()) \
        .withColumn("end_date", lit(None).cast("timestamp")) \
        .withColumn("is_current", lit(True)) \
        .withColumn("mergeKey", col("product_sk"))
        
    rows_to_close_prepared = rows_to_close \
        .withColumn("mergeKey", col("product_sk")) \
        .select("product_sk", "mergeKey") 

    for col_name in rows_to_insert_with_sk.columns:
        if col_name not in ["product_sk", "mergeKey"]:
            rows_to_close_prepared = rows_to_close_prepared.withColumn(col_name, lit(None))
            
    staging_df = rows_to_insert_with_sk.unionByName(rows_to_close_prepared)
    
    target_table.alias("target") \
        .merge(
            staging_df.alias("source"),
            "target.product_sk = source.mergeKey"
        ) \
        .whenMatchedUpdate(
            condition="target.is_current = true", 
            set={
                "target.is_current": lit(False),
                "target.end_date": current_timestamp()
            }
        ) \
        .whenNotMatchedInsertAll() \
        .execute()



Dim_Products_SCD2 created successfully.


## **Read The Regions data From the silver Layer and then Apply The SCD Type 1 and make this table as Dimension Table**

In [0]:
gold_path_regions = "/Volumes/sampleproject/practice/gold/dim_regions"
source_df_regions = spark.read.format('delta').load(f"/Volumes/sampleproject/practice/silver/regions")

if not DeltaTable.isDeltaTable(spark, gold_path_regions):
    print("Gold Region table does not exist. Creating first version...")
    
    window_spec = Window.orderBy("region_id")
    df_with_sk = source_df_regions.withColumn("region_sk", row_number().over(window_spec))
    
    df_with_sk.write.format("delta").mode("overwrite").save(gold_path_regions)

else:
    target_table = DeltaTable.forPath(spark, gold_path_regions)
    target_df = target_table.toDF()

    max_sk_row = target_df.agg(max("region_sk")).collect()[0][0]
    max_sk = max_sk_row if max_sk_row is not None else 0
    
    new_rows = source_df_regions.join(target_df, "region_id", "left_anti")
    
    window_spec = Window.orderBy("region_id")
    new_rows_with_sk = new_rows.withColumn("region_sk", lit(max_sk) + row_number().over(window_spec))
    
    existing_rows = source_df_regions.join(target_df, "region_id", "inner").select(source_df_regions["*"])
    existing_rows_staged = existing_rows.withColumn("region_sk", lit(-1))
    
    staging_df = new_rows_with_sk.unionByName(existing_rows_staged)
    
    target_table.alias("target") \
        .merge(
            staging_df.alias("source"),
            "target.region_id = source.region_id"
        ) \
        .whenMatchedUpdate(set={
            "target.region": "source.region",
            "target.region_code": "source.region_code",
            "target.zone_type": "source.zone_type"
        }) \
        .whenNotMatchedInsertAll() \
        .execute()

Gold Region table does not exist. Creating first version...




##  **Create Order Table As Fact Table**

In [0]:
gold_fact_path = "/Volumes/sampleproject/practice/gold/fact_orders"

df = spark.read.format("delta").load("/Volumes/sampleproject/practice/silver/orders")

df_dimcus = spark.read.format("delta").load("/Volumes/sampleproject/practice/gold/dim_customers") \
    .withColumnRenamed("customer_id", "dim_customer_id") 

df_dimpro = spark.read.format("delta").load("/Volumes/sampleproject/practice/gold/dim_products") \
    .withColumnRenamed("product_id", "dim_product_id")

df_fact = df.join(
    df_dimcus, 
    df['customer_id'] == df_dimcus['dim_customer_id'], 
    how='left'
).join(
    df_dimpro, 
    df['product_id'] == df_dimpro['dim_product_id'], 
    how='left'
)

df_fact_new = df_fact.drop('dim_customer_id', 'dim_product_id', 'customer_id', 'product_id','_rescued_data')

if DeltaTable.isDeltaTable(spark, gold_fact_path):
    
    dlt_obj = DeltaTable.forPath(spark, gold_fact_path)

    dlt_obj.alias("trg").merge(
        df_fact_new.alias("src"), 
        "trg.order_id = src.order_id AND trg.customer_sk = src.customer_sk AND trg.product_sk = src.product_sk"
    )\
    .whenMatchedUpdateAll()\
    .whenNotMatchedInsertAll()\
    .execute()

else:
    df_fact_new.select('order_id','order_date','quantity','total_amount','year','customer_sk','product_sk').write.format("delta").mode("overwrite").save(gold_fact_path)

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS sampleproject.powerbi;

CREATE OR REPLACE VIEW sampleproject.powerbi.dim_customers AS 
SELECT * FROM delta.`/Volumes/sampleproject/practice/gold/dim_customers`;

CREATE OR REPLACE VIEW sampleproject.powerbi.dim_products AS 
SELECT * FROM delta.`/Volumes/sampleproject/practice/gold/dim_products`;

CREATE OR REPLACE VIEW sampleproject.powerbi.dim_regions AS 
SELECT * FROM delta.`/Volumes/sampleproject/practice/gold/dim_regions`;

CREATE OR REPLACE VIEW sampleproject.powerbi.fact_orders AS 
SELECT * FROM delta.`/Volumes/sampleproject/practice/gold/fact_orders`;


[0;36m  File [0;32m<command-8799344854048558>, line 10[0;36m[0m
[0;31m    CREATE OR REPLACE VIEW sampleproject.powerbi.dim_regions AS[0m
[0m           ^[0m
[0;31mSyntaxError[0m[0;31m:[0m invalid syntax
