In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
# 0 = Merge / Upsert
# 1 = Initial load
init_load_flag = int(dbutils.widgets.get("init_load_flag"))

### **Data Reading From Source**

In [0]:
df = spark.sql("SELECT * FROM databricks_cata.silver.customers_silver")

In [0]:
df.display()

### **Removing Duplicates**

In [0]:
# ลบแถวที่มี pk ซ้ำ
df = df.dropDuplicates(subset=['customer_id'])

# **Dividing New vs Old Records**

In [0]:
if init_load_flag == 0:
    # Incremental load (Upsert: Update Old data or Inert New data)
    df_old = spark.sql('''
                        SELECT 
                            DimCustomerKey, 
                            customer_id, 
                            create_date, 
                            update_date 
                        FROM databricks_cata.gold.DimCustomers
                        ''')
else:
    # Initial load
    df_old = spark.sql('''
                    SELECT 
                        0 DimCustomersKey, 
                        0 customer_id, 
                        0 create_date, 
                        0 update_date
                    FROM databricks_cata.silver.customers_silver
                    WHERE 1=0
                    ''')
    

In [0]:
df_old.display()

**Renaming Columns of df_old**

In [0]:
df_old = df_old.withColumnRenamed("DimCustomerKey", "old_DimCustomerKey") \
            .withColumnRenamed("customer_id", "old_customer_id") \
            .withColumnRenamed("create_date", "old_create_date") \
            .withColumnRenamed("update_date", "old_update_date")

### **Applying Join with the Old Records**

In [0]:
df_join = df.join(df_old, df.customer_id == df_old.old_customer_id, 'left')


In [0]:
df_join.display()

**Seperating New vs Old Records**

In [0]:
# จะเขียน df_join.old_customer_id หรือ df_join['old_customer_id'] ก็ได้เหมือนกัน
df_new = df_join.filter(df_join['old_customer_id'].isNull())

In [0]:
df_old = df_join.filter(df_join['old_customer_id'].isNotNull())

In [0]:
df_new.limit(5).display()
df_old.limit(5).display()

**Preparing df_old**

In [0]:
# Dropping all the columns which are not required
df_old = df_old.drop("old_customer_id", "old_update_date")

# Renaming "old_DimCustomerKey" column to "DimCustomerKey"
df_old = df_old.withColumnRenamed("old_DimCustomerKey", "DimCustomerKey")

# Renaming "old_create_date" column to "create_date" and then convert data type to timestamp
df_old = df_old.withColumnRenamed("old_create_date", "create_date") \
                .withColumn("create_date", to_timestamp(col("create_date")))

# Recreating "update_date" column with current timestamp
df_old = df_old.withColumn("update_date", current_timestamp())

In [0]:
df_old.display()

**Preparing df_new**

In [0]:
df_new.limit(5).display()

In [0]:
# Dropping all the columns which are not required
df_new = df_new.drop("old_DimCustomerKey", "old_customer_id", "old_create_date", "old_update_date")

# Recreating "create_date" and "update_date" column with current timestamp
df_new = df_new.withColumn("create_date", current_timestamp()) \
                .withColumn("update_date", current_timestamp())

In [0]:
df_new.display()

### **Surrogate Key - From 1**

In [0]:
# ฟังก์ชั่น monotonically_increasing_id() ใน PySpark จะสร้างคอลัมน์ที่มีค่าเป็นตัวเลขที่เพิ่มขึ้นเรื่อย ๆ โดยไม่ซ้ำกัน ซึ่งสามารถใช้เป็น primary key หรือ identifier ได้ ฟังก์ชั่นนี้จะสร้างค่าเริ่มต้นจาก 0 และเพิ่มขึ้นเรื่อย ๆ ตามลำดับของแถวใน DataFrame

# การใช้ lit(1) แทนที่จะใช้ +1 ธรรมดาเป็นเพราะว่าใน PySpark การดำเนินการทางคณิตศาสตร์ระหว่างคอลัมน์และค่าคงที่ต้องใช้ฟังก์ชั่น lit() เพื่อแปลงค่าคงที่ให้เป็นคอลัมน์ก่อน
df_new = df_new.withColumn("DimCustomerKey", monotonically_increasing_id() + lit(1))

# ใน PySpark การใช้ monotonically_increasing_id() + 1 อาจทำงานได้ในบางกรณี แต่การใช้ lit(1) เป็นวิธีที่ถูกต้องและปลอดภัยกว่า เนื่องจาก lit() ใช้ในการแปลงค่าคงที่ให้เป็นคอลัมน์ ซึ่งเป็นวิธีที่แนะนำในการดำเนินการทางคณิตศาสตร์ระหว่างคอลัมน์และค่าคงที่ใน PySpark.

In [0]:
df_new.display()

**Adding Max Surrogate Key**

In [0]:
if init_load_flag == 1:
    max_surrogate_key = 0
else:
    # ใน PySpark, การใช้ .collect() ต่อท้าย spark.sql เป็นการดึงข้อมูลทั้งหมดจาก DataFrame ที่ได้จากการ query มาเก็บไว้ในรูปแบบของ list ของ Row objects ใน driver node. ถ้าคุณต้องการแค่ค่าเดียว (เช่น max_surrogate_key), คุณสามารถใช้ .first() แทน .collect() เพื่อดึงแค่แถวแรกของผลลัพธ์ได้.
    df_maxsur = spark.sql("SELECT MAX(DimCustomerKey) as max_surrogate_key FROM databricks_cata.gold.DimCustomers")
    
    # Convert df_maxsur to max_sorrogate_key variable
    max_surrogate_key = df_maxsur.collect()[0]['max_surrogate_key']
    # หรือใช้ .first() max_surrogate_key = df_maxsur.first()['max_surrogate_key']


    # ตัวอย่างวิธีอื่นๆ
    # ######### first() ########
    # df_maxsur = spark.sql("SELECT MAX(DimCustomerKey) as max_surrogate_key FROM databricks_cata.gold.DimCustomers").first()
    # max_surrogate_key = df_maxsur['max_surrogate_key']

    # ######### collect() ########
    # df_maxsur = spark.sql("SELECT MAX(DimCustomerKey) as max_surrogate_key FROM databricks_cata.gold.DimCustomers").collect()
    # max_surrogate_key = df_maxsur[0]['max_surrogate_key']

In [0]:
print(max_surrogate_key)

In [0]:
# สมมุติ max_sorrogate_key = 2000 และ DimCustomerKey = 1, 2, 3, .....
# ค่าต่อไปในแต่ละ rows ก็จะเป็น 2000+1, 2000+2, 2000+3, ....
df_new = df_new.withColumn("DimCustomerKey", lit(max_surrogate_key) + col("DimCustomerKey"))

In [0]:
df_new.limit(5).display()
df_old.limit(5).display()

### **Union of df_old and df_new**

In [0]:
df_final = df_new.unionByName(df_old)

In [0]:
df_final.display()

## **SCD Type - 1**

In [0]:
from delta.tables import DeltaTable
# ถ้าจะใช้ Merge / Upsert ต้อง Import อันนี้


# if init_load_flag == 1:
# หรือจะใช่ if statement ด้านล่างเพื่อเช็คค่าได้เหมือนกัน
if spark.catalog.tableExists("databricks_cata.gold.DimCustomers"):
    dlt_obj = DeltaTable.forPath(spark, "abfss://gold@dlsdatabrickseteteamea.dfs.core.windows.net/DimCustomers")

    # whenMatchedUpdateAll(), whenNotMatchedInsertAll() จะหมายถึง Update ทุกคอลัมน์ หรือ Inert ทุกคอลัมน์ และไม่ต้องระบุคอลัมน์
    # whenMatchedUpdate(), whenNotMatchedInsert() จะหมายถึง Update บางคอลัมน์ หรือ Inert บางคอลัมน์ และต้องระบุคอลัมน์
    dlt_obj.alias("trg") \
        .merge(
            df_final.alias("src"), 
            "trg.DimCustomerKey = src.DimCustomerKey"
        ) \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()
else:
    df_final.write.mode("overwrite") \
        .format("delta") \
        .option("path", "abfss://gold@dlsdatabrickseteteamea.dfs.core.windows.net/DimCustomers") \
        .saveAsTable("databricks_cata.gold.DimCustomers")

In [0]:
df_gold = spark.sql("SELECT * FROM databricks_cata.gold.DimCustomers")
df_gold.display()