In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import *

stage_1: load data from landing

In [0]:
stage_1 = (
    spark.table("dev_catalog.bronze.shop_name_raw")
 # add sk column : key unique   
    .withColumn("sk",monotonically_increasing_id())
#   drop useless column
    .drop("_load_dt","_load_dttm")
           )
stage_1.display()

stage_2: filter invalid data 
- create invalid_df base on stage_1
  - invalid data type
  - key is null
- left anti invalid_df with stage_1


In [0]:
# function control column
def list_all_control_col(df:DataFrame)->list[str]:
    return[_column_name for _column_name in df.columns if _column_name.startswith("_")]

In [0]:
# stage 2 clear key is missing
# invalid datatype
int_rules = "^[0-9]+$"
date_rules = "^\\d{4}-\\d{2}-\\d{2}$"
invalid_df = (
    stage_1
    .withColumn("_shop_id_missing", col("shop_id").isNull())
    .withColumn("_shop_id_invalid",when(col("shop_id").rlike(int_rules),False).otherwise(True))
    .withColumn("_file_dt_invalid",when(col("file_dt").rlike(date_rules),False).otherwise(True))
              )

invalid_df.display()

In [0]:
list_all_control_col(invalid_df)

In [0]:
invalid_col = list_all_control_col(invalid_df)
invalid_col

In [0]:
# à¹ˆ.join :join list
" OR ".join(invalid_col)

In [0]:
invalid_col = " OR ".join(list_all_control_col(invalid_df))

invalid_df = (
    invalid_df
    # .filter(col("_shop_id_missing") | col("_shop_id_invalid") | col("_file_dt_invalid") )
    .filter(invalid_col)
)
invalid_df.display()

In [0]:
stage_2 = (
    stage_1
    .join(invalid_df,["sk"], "left_anti")
)

stage_2.display()

stage_3: filter duplicate data
- create duplicate data base on stage_2
  - row duplicate
  - key duplicate
  - combine to duplicate_df
- left anti duplicate_df with stage_2

which one should do manage first betweeen key duplicate and row duplicate?


In [0]:
# row duplicate
partition_by_all = Window.partitionBy("shop_id","shop_name","branch_name","file_dt").orderBy("shop_id")

row_dup_df = (
  stage_2
  .withColumn("rn", row_number().over(partition_by_all))
  .filter(col("rn")>1)
  .drop("rn")
  .withColumn("_remarks",lit("row_dup"))
)
row_dup_df.display()

In [0]:
# key duplicate
partition_by_key = Window.partitionBy("shop_id").orderBy("shop_id")

key_dup_df = (
    stage_2
    .join(row_dup_df, ["sk"],"left_anti")
    .withColumn("rn", count("*").over(partition_by_key))
    .filter(col("rn") == 2)
    .drop("rn")
    .withColumn("_remarks",lit("key_dup"))
)
key_dup_df.display()

In [0]:
duplicate_df = row_dup_df.union(key_dup_df)
duplicate_df.display()

In [0]:
stage_3 = (
    stage_2
    .join(duplicate_df,["sk"],"left_anti")
)

stage_3.display()

cast_data_type_df

In [0]:
cast_df = (
    stage_3
    .select(
        col("sk")
        ,col("shop_id").cast("int")
        ,col("shop_name").cast("string")
        ,col("branch_name").cast("string")
        ,col("file_dt").cast("date")
        
    )
)

cast_df.display()

final_df: filter business logic if exists
  - create error_df
    - filter all branch name that is null 

In [0]:
error_df = (
    cast_df
    .filter(col("branch_name").isNull())
)
print("error_df")
error_df.display()

final_df = (
    cast_df
    .join(error_df,["sk"],"left_anti")
    .withColumn("_insert_date",current_date())
    .withColumn("_update_date",current_date())
    .withColumn("_last_mod_ts",current_timestamp())
    .drop("sk")
)
print("final_df")
final_df.display()


write to table

In [0]:

spark.sql("drop table if exists dev_catalog.silver.shop_name_dwh")

final_df.write.mode("overwrite").saveAsTable(
    "dev_catalog.silver.shop_name_dwh"
)

spark.table("dev_catalog.silver.shop_name_dwh").display()

check CDF

In [0]:
# spark.sql("desc extended dev_catalog.silver.shop_name_dwh").display()
# spark.sql("desc detail dev_catalog.silver.shop_name_dwh").display()

enable_cdf("dev_catalog.silver.shop_name_dwh")

open properties CDF

In [0]:
# function to open properties CDF
def enable_cdf(table_name:str):
    spark.sql(f"ALTER TABLE {table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")
    spark.sql(f"desc detail {table_name}").select("name","properties").display()

In [0]:
spark.sql("desc history dev_catalog.silver.shop_name_dwh").display()

In [0]:
final_df = (
    cast_df
    .join(error_df,["sk"],"left_anti")
    .withColumn("_insert_date",current_date())
    .withColumn("_update_date",current_date())
    .withColumn("_last_mod_ts",current_timestamp())
    .drop("sk")
    .withColumn("shop_name",lit("new_shop_name"))
    .limit(2)
)

In [0]:
final_df.display()

Merge


In [0]:
from delta.tables import DeltaTable
delta_table = DeltaTable.forName(spark,"dev_catalog.silver.shop_name_dwh")
(
    delta_table.alias("target")
    .merge(final_df.alias("source"),"target.shop_id = source.shop_id")
    .whenMatchedUpdate(set = {
        "shop_name":"source.shop_name",
        "branch_name":"source.branch_name",
        "file_dt":"source.file_dt",
        "_update_date":"source._update_date",
        "_last_mod_ts":"source._last_mod_ts"})
    .whenNotMatchedInsertAll()
    .execute()
).display()



In [0]:
spark.table("dev_catalog.silver.shop_name_dwh").display()

In [0]:
# spark.sql("select * from table_changes('dev_catalog.silver.shop_name_dwh',2,2 )").display()