In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
%sql
select * from workspace.silver.silver_passengers where passenger_id = 'P0049'

### **Parameters**

In [0]:
# KEY COLUMNS
dbutils.widgets.text("keycols","")

# CDC COLUMN
dbutils.widgets.text("cdccol","")

# BACK-DATED REFRESH
dbutils.widgets.text("backdated_refresh","")

# Source Object
dbutils.widgets.text("source_object","")

# Source Schema
dbutils.widgets.text("source_schema","")

### **Fetching Parameters & Creating Variables**

In [0]:
# Catalog Name
catalog = "workspace"

# Key Cols List
key_cols_list = eval(dbutils.widgets.get("keycols"))

# CDC Column
cdc_col = dbutils.widgets.get("cdccol")

# Backdated Refresh
backdated_refresh = dbutils.widgets.get("backdated_refresh")

# Source Object
source_object = dbutils.widgets.get("source_object")

# Source Schema
source_schema = dbutils.widgets.get("source_schema")

# Target Schema
target_schema = "gold"

# Target Object
target_object = "DimFlights"

# Surrogate Key
surrogate_key = "DimFlightsKey"

In [0]:
# # Catalog Name
# catalog = "workspace"

# # Key Cols List
# key_cols_list = eval(dbutils.widgets.get("keycols"))

# # CDC Column
# cdc_col = dbutils.widgets.get("cdccol")

# # Backdated Refresh
# backdated_refresh = dbutils.widgets.get("backdated_refresh")

# # Source Object
# source_object = dbutils.widgets.get("source_object")

# # Source Schema
# source_schema = dbutils.widgets.get("source_schema")

# # Target Schema
# target_schema = "gold"

# # Target Object
# target_object = "DimAirports"

# # Surrogate Key
# surrogate_key = "DimAirportsKey"

In [0]:
# # Catalog Name
# catalog = "workspace"

# # Key Cols List
# key_cols_list = eval(dbutils.widgets.get("keycols"))

# # CDC Column
# cdc_col = dbutils.widgets.get("cdccol")

# # Backdated Refresh
# backdated_refresh = dbutils.widgets.get("backdated_refresh")

# # Source Object
# source_object = dbutils.widgets.get("source_object")

# # Source Schema
# source_schema = dbutils.widgets.get("source_schema")

# # Target Schema
# target_schema = "gold"

# # Target Object
# target_object = "DimPassengers"

# # Surrogate Key
# surrogate_key = "DimPassengersKey"

## **INCREMENTAL DATA INGESTTION**

#### **Last Load Date**

In [0]:
# No backdated refresh
if len(backdated_refresh) == 0:

    # If table exists in the destination, select max value of cdc column
    if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"):
        last_load = spark.sql(f"SELECT max({cdc_col}) FROM workspace.{target_schema}.{target_object}").collect()[0][0]
    
    # If table does not exist in the destination
    else:
        last_load = "1900-01-01 00:00:00"

# Yes Backdated refresh
else:
    last_load = backdated_refresh

# Test the last load
last_load

In [0]:
df_src = spark.sql(f"SELECT * FROM {source_schema}.{source_object} WHERE {cdc_col} > '{last_load}'")

In [0]:
df_src.display()

#### OLD vs NEW RECORDS 

In [0]:
if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"):

    # Key Columns String - Incremental
    key_cols_string_incremental = ", ".join(key_cols_list)
    df_tgt = spark.sql(f"select {key_cols_string_incremental}, {surrogate_key}, create_date, update_date from {catalog}.{target_schema}.{target_object}")

else:

    # Key Columns String - Initial
    key_cols_string_init = [f"'' as {i}" for i in key_cols_list]
    key_cols_string_init = ", ".join(key_cols_string_init)

    df_tgt = spark.sql(f"""
                       select {key_cols_string_init}, CAST('0' AS INT) as {surrogate_key}, CAST('1900-01-01 00:00:00' AS TIMESTAMP) as create_date, 
                       CAST('1900-01-01 00:00:00' AS TIMESTAMP) as update_date where 1=0""")

In [0]:
df_tgt.display()


In [0]:
join_condition = ' AND '.join([f"src.{i} = tgt.{i}" for i in key_cols_list]) 

In [0]:
df_src.createOrReplaceTempView("src")
df_tgt.createOrReplaceTempView("tgt")

sql_query = f"""
    select  src.*,
            tgt.{surrogate_key},
            tgt.create_date,
            tgt.update_date
    from src
    left join tgt
    on {join_condition}
"""

df_join = spark.sql(sql_query)


In [0]:
df_join.display()


In [0]:
# Old records
df_old = df_join.filter(col(f'{surrogate_key}').isNotNull())

# New records
df_new = df_join.filter(col(f'{surrogate_key}').isNull())


In [0]:
df_old.display()


## **Enriching Dataframes**

#### **Preparing df_old**

In [0]:
df_old_enriched = df_old.withColumn('update_date', lit(current_timestamp()))


In [0]:
df_old_enriched.display()

#### **Preparing df_new**


In [0]:
df_new.display()

In [0]:
if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"):
    max_surrogate_key = spark.sql(f"select max({surrogate_key}) from {catalog}.{target_schema}.{target_object}").collect()[0][0]
    df_new_enriched = df_new.withColumn(f'{surrogate_key}', lit(max_surrogate_key) + lit(1) + monotonically_increasing_id())\
                    .withColumn('create_date', current_timestamp())\
                    .withColumn('update_date', current_timestamp())  
else:
    max_surrogate_key = 0
    df_new_enriched = df_new.withColumn(f'{surrogate_key}', lit(max_surrogate_key) + lit(1) + monotonically_increasing_id())\
                    .withColumn('create_date', current_timestamp())\
                    .withColumn('update_date', current_timestamp())    

In [0]:
max_surrogate_key

In [0]:
df_new_enriched.display()

#### **Unioning OLD and NEW records**

In [0]:
df_union = df_old_enriched.unionByName(df_new_enriched)


In [0]:
df_union.display()

## **UPSERT**

In [0]:
from delta.tables import DeltaTable

In [0]:
if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"):
    dlt_obj = DeltaTable.forName(spark, f"{catalog}.{target_schema}.{target_object}")
    dlt_obj.alias("tgt").merge(
        df_union.alias("src"),
        f"tgt.{surrogate_key} = src.{surrogate_key}")\
        .whenMatchedUpdateAll(condition = f"src.{cdc_col} >= tgt.{cdc_col}")\
        .whenNotMatchedInsertAll()\
        .execute()
else:
    df_union.write.format("delta")\
        .mode("append")\
        .saveAsTable(f"{catalog}.{target_schema}.{target_object}")

In [0]:
%sql
select * from gold.dimpassengers where passenger_id = 'P0049'