In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [0]:
dbutils.widgets.text('init_load','0')

In [0]:
init_load = int(dbutils.widgets.get("init_load"))

In [0]:
init_load

In [0]:
type(init_load)

In [0]:
df = spark.read.table('dbx_cata.silver.customers')

In [0]:
df = df.dropDuplicates(subset = ['customer_id'])
display(df)

In [0]:
df = df.withColumn('sk', monotonically_increasing_id() + lit (1))

In [0]:
df = df.withColumnRenamed('sk', 'dimCustomerKey')
df.display()

In [0]:
windowspec = Window.orderBy(col('customer_id').asc())

df.withColumn('dimCustomerKey', row_number().over(windowspec)).display()

In [0]:
#creating a pseudo table, to hold the old data. Then on rerun, the init_flag parameter will be set to 0, and the old data will be compared to the new data to see if has a dimcustomer key. if it does, then its old data, but if it doesnt, its new data.
if init_load == 0:
    
    df_old = spark.sql('select dimCustomerKey, customer_id, create_date, update_date from dbx_cata.gold.customers')

else:
    df_old = spark.sql('select 0 dimCustomerKey, 0 customer_id, 0 create_date, 0 update_date from dbx_cata.silver.customers where 1 = 0')

In [0]:
df_old.display()

In [0]:
df_old = df_old.withColumnRenamed('dimCustomerKey', 'dimCustomerKey_old')\
    .withColumnRenamed('customer_id', 'customer_id_old')\
    .withColumnRenamed("create_date", 'create_date_old')\
    .withColumnRenamed("update_date", 'update_date_old')

In [0]:
df_join =  df.join(df_old, df.customer_id == df_old.customer_id_old, 'left')

df_join.display()

In [0]:
df_join.filter(col('dimCustomerKey_old').isNull()).display()

In [0]:
df_new_data = df_join.filter(col('dimCustomerKey_old').isNull())

In [0]:
df_old_data = df_join.filter(col('dimCustomerKey_old').isNotNull())


In [0]:
df_old_data = df_old_data.drop('dimCustomerKey_old', 'customer_id_old', 'update_date_old')

df_old_data = df_old_data.withColumnRenamed('create_date_old', 'create_date')

df_old_data = df_old_data.withColumn('create_date', to_timestamp(col('create_date')))

df_old_data = df_old_data.withColumn('update_date', current_timestamp())

In [0]:
df_old_data.display()

In [0]:
df_new_data.display()

In [0]:
df_new_data = df_new_data.drop('customer_id_old', 'dimCustomerKey_old', 'create_date_old', 'update_date_old')

df_new_data = df_new_data.withColumn('create_date', current_timestamp())

df_new_data = df_new_data.withColumn('update_date', current_timestamp())

In [0]:
df_new_data.display()

In [0]:
df_new_data = df_new_data.withColumn('dimCustomerKey', monotonically_increasing_id() + lit(1))
df_new_data.display()


In [0]:
if init_load == 1:
    max_surrogate_key = 0
else:
    df_max_sur_key = spark.sql('select max(dimCustomerKey) from dbx_cata.gold.customers')
    max_surrogate_key = df_max_sur_key.collect()[0][0] #converts df to value

In [0]:
df_new_data = df_new_data.withColumn('dimCustomerKey', col("dimCustomerKey") + lit(max_surrogate_key))

In [0]:
df_final = df_new_data.unionByName(df_old_data)

In [0]:
df_final.display()

In [0]:
df_old_data.display()

In [0]:
#SCD Type 1 - Upsert
from delta.tables import DeltaTable

In [0]:
if spark.catalog.tableExists('dbx_cata.gold.customers'):
    delta_obj = DeltaTable.forPath(spark, 'abfss://gold@etedbxcrop.dfs.core.windows.net/customers')
    delta_obj.alias('trg').merge(df_final.alias('src'), "trg.dimCustomerKey = src.dimCustomerKey")\
        .whenMatchedUpdateAll()\
        .whenNotMatchedInsertAll()\
        .execute()
else:
    df_final.write.format('delta').mode('overwrite')\
        .option('path', 'abfss://gold@etedbxcrop.dfs.core.windows.net/customers')\
        .saveAsTable('dbx_cata.gold.customers')


In [0]:
%sql
select * from dbx_cata.gold.customers