## Let's create Dimensional model table out of big table as 'dim_branch'

In [0]:
from pyspark.sql.functions import col, monotonically_increasing_id
from delta.tables import DeltaTable

In [0]:
%sql
select * from delta.`abfss://silver@projectandstorage.dfs.core.windows.net/carsales`

In [0]:
# create  Dimensional table out of big table
df_src = spark.sql("""
                   select distinct(Branch_ID)as Branch_ID, BranchName 
                  from delta.`abfss://silver@projectandstorage.dfs.core.windows.net/carsales`
                  """)

df_src.show()

In [0]:
# Lets creat the schema of the table for dim_brach table
df_sink = spark.sql('''
select 1 as dim_branch_key, Branch_ID, BranchName
from delta.`abfss://silver@projectandstorage.dfs.core.windows.net/carsales`
where 1=0''')

df_sink.show()

In [0]:
if spark.catalog.tableExists("catalog_for_sale.gold_schema.dim_brach"):
    # incremantal load(dont create the new table in next round)
    df_sink = spark.sql("""
                        select dim_branch_key, Branch_ID, BranchName
                        from catalog_for_sale.gold_schema.dim_branch
                        """)
else:
    # initial load (creating a new dim model Table)
    df_sink= spark.sql("""
                       select 1 as  dim_branch_key, Branch_ID, BranchName
                       from delta.`abfss://silver@projectandstorage.dfs.core.windows.net/carsales`
                       where 1=0
                       """)

In [0]:
# As of now Our dim_brach table is empty, lets join the two tables and see what we get
df_filter = df_src.join(df_sink,on=df_src["Branch_ID"]== df_sink["Branch_ID"], how="Left")\
            .select(df_src["Branch_ID"], df_src["BranchName"], df_sink["dim_branch_key"])

In [0]:
# select the records which have dim_brach_key (INCREMENTAL LOAD)
df_filter_old = df_filter.filter(col("dim_branch_key").isNotNull())
df_filter_old.show()

# selecting the new records, where dim_brach_key is not assigned(INTIAL LOAD)
df_filter_new = df_filter.filter(col("dim_branch_key").isNull())\
                .select(df_src["Branch_ID"], df_src["BranchName"])

df_filter_new.show()    

In [0]:
# Create the text widget
dbutils.widgets.text("increamental_flag", "0")

# Use the retrieved widget value
incremental_flag_value = dbutils.widgets.get("increamental_flag")

if incremental_flag_value == "0":
    # Initial load
    max_value = 1
else:
    # Incremental load
    if spark._jsparkSession.catalog().tableExists("catalog_for_sales.gold_schema.dim_branch"):
        max_value = spark.sql("""
                              select max(dim_branch_key) from catalog_for_sales.gold_schema.dim_brach
                              """).collect()[0][0] + 1
    else:
        raise Exception("Table catalog_for_sales.gold_schema.dim_branch does not exist.")
        
print(f"Max Value: {max_value}")


In [0]:
# create a surrogate key column in the newtable and add the max_value into
df_filter_new = df_filter_new.withColumn("dim_branch_key",max_value+monotonically_increasing_id())
df_filter_new.show()

# 277 records

In [0]:
# append/merge all the records into final table
df_final= df_filter_new.union(df_filter_old)
df_final.tail(5)

# 277 records

In [0]:
if spark.catalog.tableExists("catalog_for_sale.gold_schema.dim_branch"):
    # increamental load
    delta_table_obj = DeltaTable.forPath(spark, "abfss://gold@projectandstorage.dfs.core.windows.net/dim_branch")
    delta_table_obj.alias("t").merge(df_final.alias("s"), "t.dim_branch_key = s.dim_branch_key")\
        .whenMatchedUpdateAll()\
        .whenNotMatchedInsertAll()\
        .execute()
else:
    # in the intaila load save the table
    df_final.write.format('delta')\
                .mode('overwrite')\
                .option("path","abfss://gold@projectandstorage.dfs.core.windows.net/dim_branch")\
                .saveAsTable("catalog_for_sales.gold_schema.dim_branch")