### Fetching parameters & Creating variables

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
# Key cols list
key_cols = "['flight_id']"
key_cols_list = eval(key_cols)

# CDC column
cdc_col = "modifiedDate"

# Back-dated refresh
backdated_refresh = ""

# Source database
catalog = "flightsproject"

# Source object
source_object = "silver_flights"

# Source schema
source_schema = "silver"

# Target schema
target_schema = "gold"

# Target object
target_object = "DimFlights"

# Surrogate key
surrogate_key = "DimFlightsKey"

In [0]:
# # Key cols list
# key_cols = "['airport_id']"
# key_cols_list = eval(key_cols)

# # CDC column
# cdc_col = "modifiedDate"

# # Back-dated refresh
# backdated_refresh = ""

# # Source database
# catalog = "flightsproject"

# # Source object
# source_object = "silver_airports"

# # Source schema
# source_schema = "silver"

# # Target schema
# target_schema = "gold"

# # Target object
# target_object = "DimAirports"

# # Surrogate key
# surrogate_key = "DimAirportsKey"

In [0]:
# # Key cols list
# key_cols = "['passenger_id']"
# key_cols_list = eval(key_cols)

# # CDC column
# cdc_col = "modifiedDate"

# # Back-dated refresh
# backdated_refresh = ""

# # Source database
# catalog = "flightsproject"

# # Source object
# source_object = "silver_passengers"

# # Source schema
# source_schema = "silver"

# # Target schema
# target_schema = "gold"

# # Target object
# target_object = "DimPassengers"

# # Surrogate key
# surrogate_key = "DimPassengersKey"

### Incremental Data Ingestion

### Last load date

In [0]:
# no back dated refresh
if len(backdated_refresh) == 0:
    # if table exists in destination
    if spark.catalog.tableExists(f"{target_schema}.{target_object}"):
        last_load = spark.sql(f"select max({cdc_col}) from {catalog}.{target_schema}.{target_object}").collect()[0][0]
    else:
        last_load = "1900-01-01 00:00:00" # initial load, so we can load everything
# yes back dated refresh
else:
    last_load = backdated_refresh

# test the last load
last_load

In [0]:
df_src = spark.sql(f"select * from {catalog}.{source_schema}.{source_object} where {cdc_col} > '{last_load}'")
display(df_src)

### Old vs New Records

In [0]:
key_cols_string_incremental = ", ".join(key_cols_list)
key_cols_string_incremental

In [0]:
if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"):
    # key columns string for incremental
    key_cols_string_incremental = ", ".join(key_cols_list)
    
    df_tgt = spark.sql(f"select {key_cols_string_incremental}, {surrogate_key}, create_date, update_date \
                       from {catalog}.{target_schema}.{target_object}")
else:
    # key columns string for initial
    key_cols_string_initial = [f"'' as {i}" for i in key_cols_list]
    key_cols_string_initial = ", ".join(key_cols_string_initial)
    
    df_tgt = spark.sql(f"select {key_cols_string_initial}, cast('0' as int) as {surrogate_key}, cast('1900-01-01 00:00:00' as timestamp) as create_date, cast('1900-01-01 00:00:00' as timestamp) as update_date where 1=0")

df_tgt.display()

### Join Condition

In [0]:
join_condition = ' AND '.join([f"src.{i} = tgt.{i}" for i in key_cols_list])
print(join_condition)

In [0]:
df_src.createOrReplaceTempView("src")
df_tgt.createOrReplaceTempView("tgt")

df_join = spark.sql(f"""
                    select src.*,
                            tgt.{surrogate_key},
                            tgt.create_date,
                            tgt.update_date
                    from src
                    left join tgt
                    on {join_condition}
                    """)
df_join.display()

In [0]:
# Old records
df_old = df_join.filter(col(f'{surrogate_key}').isNotNull())
display(df_old)

# New records
df_new = df_join.filter(col(f'{surrogate_key}').isNull())
display(df_new)


### Preparing df_old

In [0]:
df_old_enriched = df_old.withColumn('update_date', current_timestamp())

### Preparing df_new

In [0]:
if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"):
    max_surrogate_key = spark.sql(f"""
                                  select max({surrogate_key}) \
                                  from {catalog}.{target_schema}.{target_object}""") \
                                  .collect()[0][0] # collect() returns a list of rows and [0][0] gets its first element (value)
    df_new_enriched = df_new.withColumn(f'{surrogate_key}', lit(max_surrogate_key + 1) + monotonically_increasing_id()) \
                            .withColumn('create_date', current_timestamp()) \
                            .withColumn('update_date', current_timestamp())
else:
    max_surrogate_key = 0 # just adding 1 to start the surrogate key at 1 instead of 0. monotonically_increasing_id() is used to increment the surrogate key
    df_new_enriched = df_new.withColumn(f'{surrogate_key}', lit(max_surrogate_key + 1) + monotonically_increasing_id()) \
                            .withColumn('create_date', current_timestamp()) \
                            .withColumn('update_date', current_timestamp())

display(df_new_enriched)

### Union between old and new records

In [0]:
df_old.printSchema()

In [0]:
df_union = df_old_enriched.unionByName(df_new_enriched)
display(df_union)

### Upsert

In [0]:
from delta.tables import DeltaTable

In [0]:
if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"):
    dlt_obj = DeltaTable.forName(spark, f"{catalog}.{target_schema}.{target_object}")
    
    dlt_obj.alias("tgt").merge(df_union.alias("src"), \
            f"tgt.{surrogate_key} = src.{surrogate_key}") \
        .whenMatchedUpdateAll(condition = f"src.{cdc_col} >= tgt.{cdc_col}") \
        .whenNotMatchedInsertAll() \
        .execute()
else:
    df_union.write.format("delta") \
        .mode("append") \
        .saveAsTable(f"{catalog}.{target_schema}.{target_object}")

In [0]:
%sql
select * from flightsproject.gold.dimflights