# **Incremental Parameter**

In [1]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

StatementMeta(, 42a44620-bd81-4d0e-bc35-8346b5e0147c, 3, Finished, Available, Finished)

# **CREATING DIMENSION -dealer**

In [2]:
df_src=spark.sql('''
SELECT DISTINCT(Dealer_ID),DealerName
FROM parquet.`abfss://incre_ws@onelake.dfs.fabric.microsoft.com/lake_san.Lakehouse/Files/silver/carsales`
''')

StatementMeta(, 42a44620-bd81-4d0e-bc35-8346b5e0147c, 4, Finished, Available, Finished)

In [3]:
display(df_src)

StatementMeta(, 42a44620-bd81-4d0e-bc35-8346b5e0147c, 5, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, f93f375c-4d45-49e4-a60d-32e2c354a59d)

## initial and incremental

In [4]:
if spark.catalog.tableExists("dim_dealer"):
    df_sink=spark.sql('''
    SELECT dim_Dealer_key,Dealer_ID,DealerName
    FROM dim_dealer''')
else:
    df_sink=spark.sql('''
    SELECT 1 AS dim_Dealer_key,Dealer_ID,DealerName
    FROM parquet.`abfss://incre_ws@onelake.dfs.fabric.microsoft.com/lake_san.Lakehouse/Files/silver/carsales`
    WHERE 1=0 ''') #creating empty table with schema

StatementMeta(, 42a44620-bd81-4d0e-bc35-8346b5e0147c, 6, Finished, Available, Finished)

In [5]:
df_filter=df_src.join(df_sink,df_src['Dealer_ID']==df_sink['Dealer_ID'],'left').select(df_src['Dealer_ID'],df_src['DealerName'],df_sink['dim_Dealer_key'])
df_old=df_filter.filter(col('dim_Dealer_key').isNotNull())
df_new=df_filter.filter(col('dim_Dealer_key').isNull()).select(df_src['Dealer_ID'],df_src['DealerName'])

StatementMeta(, 42a44620-bd81-4d0e-bc35-8346b5e0147c, 7, Finished, Available, Finished)

In [6]:
display(df_new)

StatementMeta(, 42a44620-bd81-4d0e-bc35-8346b5e0147c, 8, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 1b5f5bbb-b598-4cf8-ab7f-d7e6a329ed4f)

## Surrogate key creation

In [7]:
if spark.catalog.tableExists("dim_dealer"):
    max_value_df=spark.sql('''SELECT MAX(dim_Dealer_key) FROM dim_dealer''') #return as a dataset
    max_value=max_value_df.collect()[0][0]
else:
    max_value=1

StatementMeta(, 42a44620-bd81-4d0e-bc35-8346b5e0147c, 9, Finished, Available, Finished)

In [8]:
df_new = df_new.withColumn(
    "dim_Dealer_key",
    monotonically_increasing_id() + 1)


StatementMeta(, 42a44620-bd81-4d0e-bc35-8346b5e0147c, 10, Finished, Available, Finished)

In [9]:
display(df_new)

StatementMeta(, 42a44620-bd81-4d0e-bc35-8346b5e0147c, 11, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 6adb384c-cfd3-4a3d-ae77-ea485d7ab7bb)

In [10]:
df_final=df_new.union(df_old)
df_final=df_final.dropDuplicates(['dim_Dealer_key'])

StatementMeta(, 42a44620-bd81-4d0e-bc35-8346b5e0147c, 12, Finished, Available, Finished)

# **SCD TYPE-1: UPSERT**

In [11]:
from delta.tables import DeltaTable

StatementMeta(, 42a44620-bd81-4d0e-bc35-8346b5e0147c, 13, Finished, Available, Finished)

In [12]:
if spark.catalog.tableExists('dim_dealer'):            #Incremental run
    delta_table=DeltaTable.forPath(spark,'abfss://incre_ws@onelake.dfs.fabric.microsoft.com/lake_san.Lakehouse/Files/gold/dim_dealer')
    delta_table.alias("trg").merge(df_final.alias("src"),"trg.Dealer_ID=src.Dealer_ID")\
        .whenMatchedUpdateAll()\
        .whenNotMatchedInsertAll()\
        .execute()
else:                                                      #Initial run
    df_final.write.format("delta")\
        .mode('overwrite')\
        .option('path','abfss://incre_ws@onelake.dfs.fabric.microsoft.com/lake_san.Lakehouse/Files/gold/dim_dealer')\
        .saveAsTable('dim_dealer')

StatementMeta(, 42a44620-bd81-4d0e-bc35-8346b5e0147c, 14, Finished, Available, Finished)

In [13]:
%%sql
SELECT * FROM dim_dealer

StatementMeta(, 42a44620-bd81-4d0e-bc35-8346b5e0147c, 15, Finished, Available, Finished)

<Spark SQL result set with 269 rows and 3 fields>

In [2]:
%%sql
SELECT COUNT(*) FROM dim_dealer

StatementMeta(, dc934166-b97d-46d4-bd85-d1316a0251f3, 3, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 1 fields>