In [0]:
from pyspark.sql.functions import col,current_timestamp,lit,row_number
from pyspark.sql.window import Window
from delta.tables import DeltaTable

### **Parameters**


In [0]:
"""
catalog="flights"                                                                       #catalog_name

source_schema="silver"                                                                  #source schema
source_object="silver_flights"                                                          #source table

target_schema="gold"                                                                    #target schema
target_object="dim_flights"                                                             #target table

cdc_col="modified_date"                                                                 #cdc column name

surrogate_key="dims_flights_key"                                                        #surrogate key column name

key_columns="['flight_id']"                                                             #key columns name
key_col_list=eval(key_columns)                                                          #key columns list

backdated_refresh=""                                                                    #backdate fill value
"""


In [0]:
"""
catalog="flights"                                                                       #catalog_name

source_schema="silver"                                                                  #source schema
source_object="silver_passengers"                                                          #source table

target_schema="gold"                                                                    #target schema
target_object="dim_passengers"                                                             #target table

cdc_col="modified_date"                                                                 #cdc column name

surrogate_key="dims_passengers_key"                                                        #surrogate key column name

key_columns="['passenger_id']"                                                             #key columns name
key_col_list=eval(key_columns)                                                          #key columns list

backdated_refresh=""
"""                                                                  

In [0]:

catalog="flights"                                                                       #catalog_name

source_schema="silver"                                                                  #source schema
source_object="silver_airports"                                                          #source table

target_schema="gold"                                                                    #target schema
target_object="dim_airports"                                                             #target table

cdc_col="modified_date"                                                                 #cdc column name

surrogate_key="dims_airports_key"                                                        #surrogate key column name

key_columns="['airport_id']"                                                             #key columns name
key_col_list=eval(key_columns)                                                          #key columns list

backdated_refresh="" 
                                                          

### **Incremental Ingestion**

In [0]:
#Compute Last load date
last_load_date=backdated_refresh

if len(backdated_refresh)==0:
    if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"):
        last_load_date=spark.sql(f"""
                                 SELECT max({cdc_col}) From {catalog}.{target_schema}.{target_object}
                                 """).collect()[0][0]
    else:
        last_load_date="1900-01-01 00:00:00"

#checking last_load_date
(last_load_date)


In [0]:
#fetching incremental source data
#change >= to > if not done already

df_src=spark.sql(f"""
                 SELECT * FROM {catalog}.{source_schema}.{source_object} WHERE {cdc_col}>'{last_load_date}'    
                 """)
df_src.display()


In [0]:
#creating key_col_list for incremental and initial table

if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"):
    key_col_incremental=', '.join([f"{col}" for col in key_col_list])

    df_tgt=spark.sql(f"""
                     Select {key_col_incremental},{surrogate_key},create_date,update_date 
                     from {catalog}.{target_schema}.{target_object}
                     """)
else:
    key_col_initial=', '.join([f"'' as {col}" for col in key_col_list])
    
    df_tgt=spark.sql(f"""
                     Select {key_col_initial},0 as {surrogate_key},CAST('1900-01-01 00:00:00' as timestamp) as create_date,CAST('1900-01-01 00:00:00' as timestamp) as update_date 
                     where 1=0
                     """)

#displaying target df
df_tgt.display()    
    


In [0]:
#creating temp views for src and tgt DFs

df_src.createOrReplaceTempView("df_src")
df_tgt.createOrReplaceTempView("df_tgt")

In [0]:
# creating join condition

join_condition='AND '.join([f"df_src.{col} = df_tgt.{col}" for col in key_col_list])

#viewing created join condition
join_condition

In [0]:
#joining src and tgt

df_joined=spark.sql(f"""
                    Select df_src.*,df_tgt.{surrogate_key},df_tgt.create_date,df_tgt.update_date 
                    from df_src 
                    left join df_tgt on {join_condition}
                    """)

#displaying joined df
df_joined.display()

### **Segregating Old and New Records**

In [0]:
#df_old -> for old data and df_new -> for new incoming data

df_old=df_joined.filter(f"{surrogate_key} is not null")
df_new=df_joined.filter(f"{surrogate_key} is null")

#displaying old and new DF

df_old.display()
df_new.display()

In [0]:
#transformations on df_old
df_old=df_old.withColumn("update_date",current_timestamp())

#transformations on df_new
df_new=df_new.withColumn("create_date",current_timestamp())\
        .withColumn("update_date",current_timestamp())

#displaying old and new DF

df_old.display()
df_new.display()

In [0]:
#creating surrogate key values for df_new

if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"):
    max_surrogate_key=spark.sql(f"""
                                SELECT max({surrogate_key}) From {catalog}.{target_schema}.{target_object}
                                """).collect()[0][0]
    
    df_new=df_new.withColumn("flag",lit(1))\
            .withColumn(f"{surrogate_key}",max_surrogate_key+row_number().over(Window.partitionBy("flag").orderBy(col("flag"))))
else:
    df_new=df_new.withColumn("flag",lit(1))\
            .withColumn(f"{surrogate_key}",row_number().over(Window.partitionBy("flag").orderBy(col("flag"))))

#displaying df_new
df_new=df_new.drop("flag")
df_new.display() 
df_old.display()

In [0]:
#Unioning df_old and df_new

df_enriched=df_old.unionByName(df_new)

df_enriched.display()


### ***Upserting data into target***

In [0]:
if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"):
    delta_table=DeltaTable.forName(spark,f"{catalog}.{target_schema}.{target_object}")

    delta_table.alias("tgt").merge(df_enriched.alias("src"),f"tgt.{surrogate_key}=src.{surrogate_key}")\
                    .whenMatchedUpdateAll(condition=f"src.{cdc_col}>=tgt.{cdc_col}")\
                    .whenNotMatchedInsertAll()\
                    .execute()
else:
    df_enriched.write.format("delta").mode("append")\
        .saveAsTable(f"{catalog}.{target_schema}.{target_object}")

In [0]:
spark.sql(f"""
          SELECT * FROM {catalog}.{target_schema}.{target_object}
          """).display()