In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

# CREATE FLAG PARAMETER

In [0]:
dbutils.widgets.text("increamental_flag","0")

In [0]:
increamental_flag = dbutils.widgets.get("increamental_flag")
print(increamental_flag)

0


In [0]:
df_src=spark.sql('''select distinct Dealer_ID as Dealer_ID,DealerName from parquet.`abfss://silver@carmetricdatalake.dfs.core.windows.net/carsales`''')
df_src.display()

Dealer_ID,DealerName
DLR0058,Fiat do Brasil Motors
DLR0107,Land Rover Motors
DLR0129,Mia Motors
DLR0111,Lotus Motors
DLR0085,Humber Motors
DLR0001,AC Cars Motors
DLR0218,Lagonda Motors
DLR0082,Honda Motors
DLR0063,Ford do Brasil Motors
DLR0193,Tazzari Motors


# DIM_MODEL SINK-INITIAL,INCREAMENTAL-just bring the Schema if not exist

In [0]:

if spark.catalog.tableExists("cars_catalog.gold.dim_dealer"):
    df_sink=spark.sql('''select  dimdealer_key,Dealer_ID,DealerName
                  from cars_catalog.gold.dim_dealer
                  
                ''')
else:
    df_sink=spark.sql('''select 1 as dimdealer_key,Dealer_ID,DealerName
                  from  parquet.`abfss://silver@carmetricdatalake.dfs.core.windows.net/carsales`
                  where 1=0
                ''')


df_sink.display()

dimdealer_key,Dealer_ID,DealerName
1,DLR0058,Fiat do Brasil Motors
2,DLR0107,Land Rover Motors
3,DLR0129,Mia Motors
4,DLR0111,Lotus Motors
5,DLR0085,Humber Motors
6,DLR0001,AC Cars Motors
7,DLR0218,Lagonda Motors
8,DLR0082,Honda Motors
9,DLR0063,Ford do Brasil Motors
10,DLR0193,Tazzari Motors


# filtering new records and old records

In [0]:
df_filter=df_src.join(df_sink,df_src["Dealer_ID"]==df_sink["Dealer_ID"],'left').select(df_src["Dealer_ID"],df_src["DealerName"],df_sink["dimdealer_key"])
display(df_filter)

Dealer_ID,DealerName,dimdealer_key
DLR0058,Fiat do Brasil Motors,
DLR0107,Land Rover Motors,
DLR0129,Mia Motors,
DLR0111,Lotus Motors,
DLR0085,Humber Motors,
DLR0001,AC Cars Motors,
DLR0218,Lagonda Motors,
DLR0082,Honda Motors,
DLR0063,Ford do Brasil Motors,
DLR0193,Tazzari Motors,


In [0]:
df_filter_old = df_filter.filter(col('dimdealer_key').isNotNull())

In [0]:
df_filter_new = df_filter.filter(col('dimdealer_key').isNull()).select(df_src["Dealer_ID"],df_src["DealerName"])
df_filter_new .display()

Dealer_ID,DealerName
DLR0058,Fiat do Brasil Motors
DLR0107,Land Rover Motors
DLR0129,Mia Motors
DLR0111,Lotus Motors
DLR0085,Humber Motors
DLR0001,AC Cars Motors
DLR0218,Lagonda Motors
DLR0082,Honda Motors
DLR0063,Ford do Brasil Motors
DLR0193,Tazzari Motors


# CREATE SURROGATE KEY

### FETCH MAX SURROGATE KEY

In [0]:
if increamental_flag=="0":
    max_value=1
else:
    max_value_df=spark.sql('''select max("dimdealer_key") from cars_catalog.gold.dim_dealer''')
    max_value=max_value_df.collect()[0][0]

In [0]:
df_filter_new=df_filter_new.withColumn("dimdealer_key",max_value +monotonically_increasing_id()+1)


In [0]:
display(df_filter_new)


Dealer_ID,DealerName,dimdealer_key
DLR0058,Fiat do Brasil Motors,1
DLR0107,Land Rover Motors,2
DLR0129,Mia Motors,3
DLR0111,Lotus Motors,4
DLR0085,Humber Motors,5
DLR0001,AC Cars Motors,6
DLR0218,Lagonda Motors,7
DLR0082,Honda Motors,8
DLR0063,Ford do Brasil Motors,9
DLR0193,Tazzari Motors,10


In [0]:
df_final=df_filter_new.union(df_filter_old)
display(df_final)

Dealer_ID,DealerName,dimdealer_key
DLR0058,Fiat do Brasil Motors,1
DLR0107,Land Rover Motors,2
DLR0129,Mia Motors,3
DLR0111,Lotus Motors,4
DLR0085,Humber Motors,5
DLR0001,AC Cars Motors,6
DLR0218,Lagonda Motors,7
DLR0082,Honda Motors,8
DLR0063,Ford do Brasil Motors,9
DLR0193,Tazzari Motors,10


In [0]:
from delta.tables import DeltaTable

# SCD TYPE1-UPSERT

In [0]:
if spark.catalog.tableExists("cars_catalog.gold.dim_dealer"):
    delta_tb1=DeltaTable.forPath(spark, "abfss://gold@carmetricdatalake.dfs.core.windows.net/dim_dealer")
    delta_tbl1=delta_tb1.alias("trg").merge(df_final.alias("src"),"trg.dimdealer_key=src.dimdealer_key")\
        .whenMatchedUpdateAll()\
        .whenNotMatchedInsertAll()\
        .execute()
#initial run
else:
    df_final.write.format("delta")\
        .mode("overwrite")\
        .option("path","abfss://gold@carmetricdatalake.dfs.core.windows.net/dim_dealer")\
        .saveAsTable("cars_catalog.gold.dim_dealer")
                                                          


In [0]:
%sql
select*from cars_catalog.gold.dim_dealer

Dealer_ID,DealerName,dimdealer_key
DLR0058,Fiat do Brasil Motors,1
DLR0107,Land Rover Motors,2
DLR0129,Mia Motors,3
DLR0111,Lotus Motors,4
DLR0085,Humber Motors,5
DLR0001,AC Cars Motors,6
DLR0218,Lagonda Motors,7
DLR0082,Honda Motors,8
DLR0063,Ford do Brasil Motors,9
DLR0193,Tazzari Motors,10
