In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable

In [0]:
dbutils.widgets.text("initial_load_flag","")
initial_load_flag = dbutils.widgets.get("initial_load_flag")

##Data Reading 


In [0]:
df = spark.sql("select * from ete_project.silver.customers")
df.display()

##Removing Duplicates

In [0]:
df = df.dropDuplicates(subset=['customer_id'])
df.count()

##Dividing New Vs Old Records

In [0]:
if spark.catalog.tableExists('ete_project.gold.dimcustomers'):
  df_old = spark.sql("select dimcustomerkey,customer_id,create_date,update_date from ete_project.gold.dimcustomers")
else:
  df_old = spark.sql("select 0 dimcustomerkey,customer_id, 0 create_date, 0 update_date from ete_project.silver.customers where 1=0")
df_old.display()


##Renaming Columns of df_old

In [0]:
df_old = df_old.withColumnRenamed('dimcustomerkey','old_dimcustomerkey') \
                .withColumnRenamed('customer_id','old_customer_id') \
                .withColumnRenamed('create_date','old_create_date') \
                .withColumnRenamed('update_date','old_update_date')
df_old.display()

##Applying Join With Old Records

In [0]:
df_join = df.join(df_old,df.customer_id == df_old.old_customer_id,'left')
df_join.display()



##Sepearting New Vs Old Records

In [0]:
df_new = df_join.filter(df_join.old_dimcustomerkey.isNull())
df_old = df_join.filter(df_join.old_dimcustomerkey.isNotNull())
df_old.display()



##Preparing df_old

In [0]:
# Dropping all the columns which are not required
df_old = df_old.drop('old_customer_id','old_update_date')

#renaming 'old_dimcustomerkey' column to 'dimcustomerkey'

df_old = df_old.withColumnRenamed('old_dimcustomerkey','dimcustomerkey')

# renaming "old_create_date column to create_date"

df_old = df_old.withColumnRenamed('old_create_date','create_date')
df_old = df_old.withColumn('create_date',to_timestamp(df_old['create_date']))

#recreating "update_date" column with current date
df_old = df_old.withColumn('update_date',current_timestamp())
df_old.display()



##Preparing Df_new


In [0]:
df_new.display()

In [0]:
# Dropping all the columns which are not required
df_new = df_new.drop('old_dimcustomerkey','old_customer_id','old_update_date','old_create_date')

#recreating "update_date" & "create_date" column with current timestamp
df_new = df_new.withColumn('update_date',current_timestamp())
df_new = df_new.withColumn('create_date',current_timestamp())


In [0]:
df_new.display()

##Adding Surrogate Key to df_new

In [0]:
df_new = df_new.withColumn('dimcustomerkey',monotonically_increasing_id()+ lit(1))
df_new.display()

##Adding Max Surrogate Key

In [0]:
if (initial_load_flag == "0"):
  max_surrogate_key = 0
else:
  df_maxsur = spark.sql("select max(dimcustomerkey) as max_surrogate_key from ete_project.gold.dimcustomers")
  
  ##Converting df_maxsur to max_surrogate_key variable
  max_surrogate_key = df_maxsur.collect()[0]['max_surrogate_key']

  

In [0]:
df_new = df_new.withColumn("dimcustomerkey",lit(max_surrogate_key)+ col("dimcustomerkey"))
df_new.display()


## Union of df_old and df_new

In [0]:
df_final = df_new.unionByName(df_old)

In [0]:
df_final.display()


##Scd Type -1

In [0]:
if spark.catalog.tableExists("ete_project.gold.dimcustomers"):
  dlt_obj = DeltaTable.forPath(spark, "abfss://sales@adls7428.dfs.core.windows.net/gold/dimcustomers")

  dlt_obj.alias('trgt').merge(df_final.alias('src'), 'trgt.dimcustomerkey = src.dimcustomerkey') \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()
else:
    df_final.write.mode("append") \
    .format("delta") \
    .option('path','abfss://sales@adls7428.dfs.core.windows.net/gold/dimcustomers') \
    .saveAsTable("ete_project.gold.dimcustomers")

In [0]:
%sql

select * from ete_project.gold.dimcustomers;