In [0]:
from pyspark.sql.functions import *

## Creating FLAG for indicating intitial loading or incremental loading 

In [0]:
dbutils.widgets.text('load_flg', '0')

In [0]:
dbutils.widgets.get('load_flg')

'0'

## Reading silver layer data

In [0]:
silver_layer_data = spark.sql("""
                              SELECT DISTINCT(Model_ID) AS Model_ID, Model_Category
                              FROM PARQUET.`abfss://silver@firstendtoenddeproject.dfs.core.windows.net/carsales`
                              """)

In [0]:
silver_layer_data.display()

Model_ID,Model_Category
Hon-M220,Hon
Hon-M215,Hon
Vol-M110,Vol
Vol-M260,Vol
BMW-M2,BMW
Acu-M60,Acu
Bui-M31,Bui
Hyu-M157,Hyu
Ren-M130,Ren
Tat-M179,Tat


## Creating Dimensional table

In [0]:
## For initial run the dim table should be empty
## For incremental load the dim table should contains all the records
if not spark.catalog.tableExists('carsales.gold.model_dim_table'):
    data_sink = spark.sql("""
            SELECT 1 as dim_model_id, Model_ID, Model_Category 
            FROM PARQUET.`abfss://silver@firstendtoenddeproject.dfs.core.windows.net/carsales` 
            WHERE 1=0
            """)
else:
    data_sink = spark.sql("""
            SELECT dim_model_id, Model_ID, Model_Category 
            FROM carsales.gold.model_dim_table 
            """)


## Perform a LEFT JOIN between source table and dim table

In [0]:
data_sink.display()

dim_model_id,Model_ID,Model_Category
2,Hon-M215,Hon
3,Vol-M110,Vol
4,Vol-M260,Vol
5,BMW-M2,BMW
6,Acu-M60,Acu
7,Bui-M31,Bui
8,Hyu-M157,Hyu
9,Ren-M130,Ren
10,Tat-M179,Tat
11,Lin-M29,Lin


In [0]:
joined_dim_table = silver_layer_data.join(data_sink, ['Model_ID', 'Model_Category'], 'left')

In [0]:
joined_dim_table.display()

Model_ID,Model_Category,dim_model_id
Hon-M220,Hon,1.0
Hon-M215,Hon,2.0
Vol-M110,Vol,3.0
Vol-M260,Vol,4.0
BMW-M2,BMW,5.0
Acu-M60,Acu,6.0
Bui-M31,Bui,7.0
Hyu-M157,Hyu,8.0
Ren-M130,Ren,9.0
Tat-M179,Tat,10.0


### Separate INSERTING data and UPDATING data and add SEROGATE KEY

**Spliting old_records**

In [0]:
dim_old_records = joined_dim_table.filter(col('dim_model_id').isNotNull())

**spliting new_records**

In [0]:
dim_new_records = joined_dim_table.filter(col('dim_model_id').isNull())

#### Creating SAROGATE KEY

In [0]:
print(f"length of old data: {dim_old_records.count()}")
print(f"length of new data: {dim_new_records.count()}")

length of old data: 277
length of new data: 1


In [0]:
if not spark.catalog.tableExists('carsales.gold.model_dim_table'):
  max_value = 1
else:
  max_value_df = spark.sql("SELECT max(dim_model_id) FROM carsales.gold.model_dim_table")
  max_value = max_value_df.collect()[0][0]+1

In [0]:
print(max_value)

278


In [0]:

df_filtered_new = dim_new_records.withColumn('dim_model_id', lit(max_value+monotonically_increasing_id()))

In [0]:
df_filtered_new.display()

Model_ID,Model_Category,dim_model_id
ZYXM13,ZYXM13,278


In [0]:
df_final = df_filtered_new.union(dim_old_records)

## SCD TYPE - I (UPSERT)

In [0]:
from delta.tables import DeltaTable

In [0]:
## if catalog is available in the local just do the upsert method if not push the data to gold layer container and catalog

if spark.catalog.tableExists("carsales.gold.model_dim_table"):
  delta_tab = DeltaTable.forPath(spark, "abfss://gold@firstendtoenddeproject.dfs.core.windows.net/model_dim_table")

  delta_tab.alias("trg").merge(df_final.alias("src"), "trg.dim_model_id = src.dim_model_id")\
    .whenMatchedUpdateAll()\
    .whenNotMatchedInsertAll()\
    .execute()

else:
  df_final.write\
    .format("delta")\
    .mode("overwrite")\
    .option("path", "abfss://gold@firstendtoenddeproject.dfs.core.windows.net/model_dim_table")\
    .saveAsTable("carsales.gold.model_dim_table")

In [0]:
%sql

SELECT * FROM carsales.gold.model_dim_table;

Model_ID,Model_Category,dim_model_id
Hon-M220,Hon,1
Hon-M215,Hon,2
Vol-M110,Vol,3
Vol-M260,Vol,4
BMW-M2,BMW,5
Acu-M60,Acu,6
Bui-M31,Bui,7
Hyu-M157,Hyu,8
Ren-M130,Ren,9
Tat-M179,Tat,10
