In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

Creating Variables

In [0]:
# ##################################### Creating Variables for flightDim ###################

# catalog = 'flight'
# keycol = "['flight_id']"
# key_cols_list = eval(keycol)
# cdc_col = "modifiedDate"
# backdated_refresh = ""
# source_schema = "silver"
# source_object = "silver_flights"
# target_schema = "gold"
# target_object = "DimFlights"
# surrogate_key = "DimFlightsKey"

In [0]:
# ##################################### Creating Variables for airportDim ###################

# catalog = 'flight'
# keycol = "['airport_id']"
# key_cols_list = eval(keycol)
# cdc_col = "modifiedDate"
# backdated_refresh = ""
# source_schema = "silver"
# source_object = "silver_airports"
# target_schema = "gold"
# target_object = "DimAirports"
# surrogate_key = "DimAirportsKey"

In [0]:
##################################### Creating Variables for customerDim ###################

catalog = 'flight'
keycol = "['passenger_id']"
key_cols_list = eval(keycol)
cdc_col = "modifiedDate"
backdated_refresh = ""
source_schema = "silver"
source_object = "silver_customers"
target_schema = "gold"
target_object = "DimPassengers"
surrogate_key = "DimPassengersKey"

Getting last_load date from target table

In [0]:
# No Back Dated Refresh
if len(backdated_refresh) == 0:
  
  # If Table Exists In The Destination
  if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"):

    last_load = spark.sql(f"SELECT max({cdc_col}) FROM workspace.{target_schema}.{target_object}").collect()[0][0]
    
  else:
    last_load = "1900-01-01 00:00:00"

# Yes Back Dated Refresh
else:
  last_load = backdated_refresh

# Test The Last Load 
last_load

'1900-01-01 00:00:00'

Getting latest records of source table

In [0]:
df_src = spark.sql(f"SELECT * FROM {catalog}.{source_schema}.{source_object} WHERE {cdc_col} > '{last_load}'")

Creating target df for Incremental & Inicial Data.

In [0]:
if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"): 

  # Key Columns String For Incremental
  key_cols_string_incremental = ", ".join(key_cols_list)

  df_trg = spark.sql(f"""SELECT {key_cols_string_incremental}, {surrogate_key}, create_date, update_date 
                      FROM {catalog}.{target_schema}.{target_object}""")


else:

  # Key Columns String For Initial
  key_cols_string_init = [f"'' AS {i}" for i in key_cols_list]
  key_cols_string_init = ", ".join(key_cols_string_init)
  
  df_trg = spark.sql(f"""SELECT {key_cols_string_init}, CAST('0' AS INT) AS {surrogate_key}, CAST('1900-01-01 00:00:00' AS timestamp) AS create_date, CAST('1900-01-01 00:00:00' AS timestamp) AS update_date WHERE 1=0""")

In [0]:
# df_trg.display()

Join the source table and target table.

In [0]:
join_condition = ' AND '.join([f"src.{i} = trg.{i}" for i in key_cols_list])

In [0]:
df_src.createOrReplaceTempView("src")
df_trg.createOrReplaceTempView("trg")

df_join = spark.sql(f"""
            SELECT src.*, 
                   trg.{surrogate_key},
                   trg.create_date,
                   trg.update_date
            FROM src
            LEFT JOIN trg
            ON {join_condition}
            """)

In [0]:
# df_join.display()

Filtering Old data & New data from Join table.

In [0]:
# OLD RECORDS
df_old = df_join.filter(col(f'{surrogate_key}').isNotNull())

# NEW RECOERDS
df_new = df_join.filter(col(f'{surrogate_key}').isNull())

### Enriching DataFrames

In [0]:
# df_old.display()

Preparing old dataframe

In [0]:
df_old_enr = df_old.withColumn('update_date',current_timestamp())

In [0]:
# df_old_enr.display()

Preparing new dataframe

In [0]:
# df_new.display()

In [0]:
if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"): 
    max_surrogate_key = spark.sql(f"""
                            SELECT max({surrogate_key}) FROM {catalog}.{target_schema}.{target_object}
                        """).collect()[0][0]
    df_new_enr = df_new.withColumn(f'{surrogate_key}', lit(max_surrogate_key)+lit(1)+monotonically_increasing_id())\
                    .withColumn('create_date', current_timestamp())\
                    .withColumn('update_date', current_timestamp())    

else:
    max_surrogate_key = 0
    df_new_enr = df_new.withColumn(f'{surrogate_key}', lit(max_surrogate_key)+lit(1)+monotonically_increasing_id())\
                    .withColumn('create_date', current_timestamp())\
                    .withColumn('update_date', current_timestamp())


In [0]:
# df_new_enr.display()

Union Both Enrich dataframes.

In [0]:
df_union = df_old_enr.unionByName(df_new_enr)

In [0]:
# df_union.display()

### **Performing Upsert**

In [0]:
from delta.tables import DeltaTable

In [0]:
if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"):

    dlt_obj = DeltaTable.forName(spark, f"{catalog}.{target_schema}.{target_object}")
    dlt_obj.alias("trg").merge(df_union.alias("src"), f"trg.{surrogate_key} = src.{surrogate_key}")\
                        .whenMatchedUpdateAll(condition = f"src.{cdc_col} >= trg.{cdc_col}")\
                        .whenNotMatchedInsertAll()\
                        .execute()

else: 

    df_union.write.format("delta")\
            .mode("append")\
            .saveAsTable(f"{catalog}.{target_schema}.{target_object}")
