In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

##### A `surrogate key` is a system-generated number used to uniquely identify a row in a table. It has no business meaning and never changes. A made-up ID created by the system to identify each row.

### **Parameters for `FLIGHTS`**

In [0]:
# # Key Cols list
# key_col = "['flight_id']"
# key_col_list = eval(key_col)

# # CDC Cols [Change Data Capture] is a method of identifying and capturing changes made to a database. It captures data changes and enables businesses to keep track of all modifications 
# cdc_col = 'modified_date'

# # Back-Dated Refresh (usually called backfilling) is the process of going back in time to fix or fill in data that is missing, incorrect, or outdated. 
# back_dated_refresh = ''

# # Source Catalog
# my_catalog = "flightsproj"

# # Source Schema
# src_schema = "silver"

# # Source Object
# src_obj = "silver_flights"

# # Target Schema
# tgt_schema = "gold"

# # Target Object
# tgt_obj = "Dim_flights"

# # Surrogate Key
# surrogate_key = "DimFlights_SK"


### **Parameters for `AIRPORTS`**

In [0]:
# # Key Cols list
# key_col = "['airport_id']"
# key_col_list = eval(key_col)

# # CDC Cols [Change Data Capture] is a method of identifying and capturing changes made to a database. It captures data changes and enables businesses to keep track of all modifications 
# cdc_col = 'modified_date'

# # Back-Dated Refresh (usually called backfilling) is the process of going back in time to fix or fill in data that is missing, incorrect, or outdated. 
# back_dated_refresh = ''

# # Source Catalog
# my_catalog = "flightsproj"

# # Source Schema
# src_schema = "silver"

# # Source Object
# src_obj = "silver_airports"

# # Target Schema
# tgt_schema = "gold"

# # Target Object
# tgt_obj = "Dim_Airports"

# # Surrogate Key
# surrogate_key = "DimAirports_SK"


### **Parameters for `PASSENGERS`**

In [0]:
# Key Cols list
key_col = "['passenger_id']"
key_col_list = eval(key_col)

# CDC Cols [Change Data Capture] is a method of identifying and capturing changes made to a database. It captures data changes and enables businesses to keep track of all modifications 
cdc_col = 'modified_date'

# Back-Dated Refresh (usually called backfilling) is the process of going back in time to fix or fill in data that is missing, incorrect, or outdated. 
back_dated_refresh = ''

# Source Catalog
my_catalog = "flightsproj"

# Source Schema
src_schema = "silver"

# Source Object
src_obj = "silver_passengers"

# Target Schema
tgt_schema = "gold"

# Target Object
tgt_obj = "Dim_Passengers"

# Surrogate Key
surrogate_key = "DimPassengers_SK"


## **_INCREMENTAL DATA INGESTION_** DYNAMIC SOLUTION

#### **Last Load Date**

In [0]:
# If the backdated refresh is not specified, get the last load date from the target table
if len(back_dated_refresh) == 0:
    # Check if the table exists in the target database so that we can get the last load date
    if spark.catalog.tableExists(f"{my_catalog}.{tgt_schema}.{tgt_obj}"):
        last_load = spark.sql(f"SELECT max({cdc_col}) FROM {my_catalog}.{tgt_schema}.{tgt_obj}").collect()[0][0]
    # if the table doesn't exist, set the last load date to a date that is guaranteed to be before the first load date
    else:
        last_load = '1900-01-01 00:00:00'
# if the backdated refresh is specified, use it as the last load date
else:
    last_load = back_dated_refresh 

In [0]:
# Test the last load
last_load

In [0]:
spark.sql(f"SELECT * FROM {my_catalog}.{src_schema}.{src_obj} WHERE {cdc_col} > '{last_load}'").display()

In [0]:
df_src = spark.sql(f"SELECT * FROM {my_catalog}.{src_schema}.{src_obj} WHERE {cdc_col} > '{last_load}'")

#### OLD vs NEW Records

In [0]:
if spark.catalog.tableExists(f"{my_catalog}.{tgt_schema}.{tgt_obj}"):
    # Key column list for incremental load
    key_col_string_incremental = ', '.join(key_col_list)

    df_tgt = spark.sql(f"select {key_col_string_incremental}, {surrogate_key}, create_date, update_date from {my_catalog}.{tgt_schema}.{tgt_obj}")
else:
    # Key column list for intital load
    key_col_string_init = [f"'' as {k}" for k in key_col_list]
    key_col_string_init = ', '.join(key_col_string_init)

    df_tgt = spark.sql(f"select cast('0' as INT) as {surrogate_key}, {key_col_string_init}, '1900-01-01 00:00:00' as create_date, '1900-01-01 00:00:00' as update_date where 1=0") 


In [0]:
df_tgt.display()

In [0]:
%skip
key_col_list = ['flight_id', 'flight_name']

In [0]:
%skip
[f"src.{i} = tgt.{i}" for i in key_col_list]

In [0]:
%skip
'and '.join([f"src.{i} = tgt.{i}" for i in key_col_list])

In [0]:
# Join Condition
join_condition = ' and '.join([f"src.{i} = tgt.{i}" for i in key_col_list])  

In [0]:
df_src.createOrReplaceTempView("src")
df_tgt.createOrReplaceTempView("tgt")

df_joined = spark.sql (f"""
           select src.*,
           tgt.{surrogate_key},
           tgt.create_date,
           tgt.update_date
           from src
           left join tgt
           on {join_condition}
           """)

In [0]:
df_joined.display()

In [0]:
# OLD records
df_old = df_joined.filter(col(f'{surrogate_key}').isNotNull())
df_old.display()
# NEW records
df_new = df_joined.filter(df_joined[surrogate_key].isNull())
df_new.display()

##### Preparing `df_old`

In [0]:
df_old_enriched = df_old.withColumn('update_date', current_timestamp())

In [0]:
df_old_enriched.display()

##### Preparing `df_new`

In [0]:
if spark.catalog.tableExists(f"{my_catalog}.{tgt_schema}.{tgt_obj}"):
    max_surrogate_key = spark.sql(f"""
                                  select max({surrogate_key}) from {my_catalog}.{tgt_schema}.{tgt_obj}
                                  """).collect()[0][0]
    df_new_enriched = df_new.withColumn(f'{surrogate_key}', lit(max_surrogate_key) + 1 + monotonically_increasing_id ())\
                   .withColumn('create_date', current_timestamp())\
                   .withColumn('update_date', current_timestamp())
else:
    max_surrogate_key = 0
    df_new_enriched = df_new.withColumn(f'{surrogate_key}', lit(max_surrogate_key) + 1 + monotonically_increasing_id())\
                   .withColumn('create_date', current_timestamp())\
                   .withColumn('update_date', current_timestamp())

In [0]:
display(df_new_enriched)


#### **Union in old and new records**

In [0]:
df_union = df_old_enriched.unionByName(df_new_enriched)
df_union.display()

## **UPSERT**

In [0]:
from delta.tables import DeltaTable
if spark.catalog.tableExists(f"{my_catalog}.{tgt_schema}.{tgt_obj}"):
    
    dlt_object = DeltaTable.forName(spark, f'{my_catalog}.{tgt_schema}.{tgt_obj}')
    dlt_object.alias("tgt").merge(df_union.alias("src"), f"tgt.{surrogate_key} = src.{surrogate_key}")\
              .whenMatchedUpdateAll(condition = f'src.{cdc_col} >= tgt.{cdc_col}')\
              .whenNotMatchedInsertAll()\
              .execute()
else:
    df_union.write.format('delta')\
                  .mode('append')\
                  .saveAsTable(f"{my_catalog}.{tgt_schema}.{tgt_obj}")

In [0]:
spark.sql(f"select * from {my_catalog}.{tgt_schema}.{tgt_obj}").display()