In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable

In [0]:
%sql
SELECT * FROM workspace_bryanq.silver.silver_flights

## **INCREMENTAL DATA INGESTION**

### **Incremental Variables**

In [0]:
file_name = "flights".capitalize()
print(file_name)

In [0]:
# Key Col List
key_col = "['flight_id']"
key_col_list = eval(key_col)

# CDC Column
cdc_col = "modifiedDate"

# Backdated Refresh
backdated_refresh = ""

# Surrogate Key
surrogate_key = "DimFlighstKey"

# Catalog
catalog_name = "workspace_bryanq"

# Source Object
source_object = "silver_flights"

# Source Schema
source_schema = "silver"

# Target Object
target_object = "flights"

# Target Schema
target_schema = "gold"

### **Last Load Date**

In [0]:
if len(backdated_refresh) == 0:

  if spark.catalog.tableExists(f"{catalog_name}.{target_schema}.{target_object}"):
     
    last_load = spark.sql(f"SELECT MAX({cdc_col}) FROM {catalog_name}.{target_schema}.{target_object}")\
                    .collect()[0][0]
  
  else:
    
    last_load = "1900-01-01 00:00:00"

else:

  last_load = backdated_refresh

last_load

In [0]:
df_source = spark.sql(f"SELECT * FROM {catalog_name}.{source_schema}.{source_object} WHERE {cdc_col} > '{last_load}' ")

df_source.display()

### New Records vs Old Records

In [0]:
if spark.catalog.tableExists(f"{catalog_name}.{target_schema}.{target_object}"):
  # incremental load
  key_col_string = (', ').join(key_col_list)

  # if table exists, bring records to compare
  df_target = spark.sql(f"SELECT {key_col_string}, {surrogate_key}, create_date, update_date FROM {catalog_name}.{target_schema}.{target_object}")

else:
  # initial load - list comprehension
  key_col_list_init =  [f"'' AS {i}" for i in key_col_list]
  key_col_list_init = ', '.join(key_col_list_init)

  # if table doesn't exist, create empty dataframe
  df_target = spark.sql(f"SELECT {key_col_list_init}, CAST('0' AS INT) AS {surrogate_key}, CAST('1900-01-01 00:00:00' AS TIMESTAMP) AS create_date, CAST('1900-01-01 00:00:00' AS TIMESTAMP) AS update_date WHERE 1 = 0")

In [0]:
df_target.display()

### **Joining df_target with df_source**

In [0]:
join_condition = ' AND '.join([f"source.{i} = target.{i}" for i in key_col_list])
join_condition

In [0]:
df_source.createOrReplaceTempView("source")
df_target.createOrReplaceTempView("target")

join_condition = ' AND '.join([f"source.{i} = target.{i}" for i in key_col_list])

df_join = spark.sql(f"""
          SELECT 
            source.*
            ,target.{surrogate_key}
            ,target.create_date
            ,target.update_date
          FROM source
          LEFT JOIN target
          ON {join_condition}
          """)

In [0]:
df_join.display()

In [0]:
df_old = df_join.filter(col(f'{surrogate_key}').isNotNull())
df_new = df_join.filter(col(f'{surrogate_key}').isNull())

df_old.display()
df_new.display()

### **Setting Surrogate key**

In [0]:
if spark.catalog.tableExists(f"{catalog_name}.{target_schema}.{target_object}"):
  max_surrogate_key = spark.sql(f"""
                                SELECT MAX({surrogate_key}) FROM {catalog_name}.{target_schema}.{target_object}
                                """).collect()[0][0]
  
  df_new = df_new.withColumn(f"{surrogate_key}", monotonically_increasing_id() + max_surrogate_key + 1)\
                .withColumn('update_date', current_timestamp())\
                .withColumn('create_date', current_timestamp())

else:
  max_surrogate_key = 1
  df_new = df_new.withColumn(surrogate_key, monotonically_increasing_id() + max_surrogate_key)\
                .withColumn('create_date', current_timestamp())\
                .withColumn('update_date', current_timestamp())

In [0]:
df_new.display()

In [0]:
df_new.printSchema()

In [0]:
df_old.printSchema() 

In [0]:
df_union = df_old.unionByName(df_new)
df_union.display()

## **UPSERT Command**

In [0]:
if spark.catalog.tableExists(f"{catalog_name}.{target_schema}.{target_object}"):
    dlt_obj = DeltaTable.forName(spark, f"{catalog_name}.{target_schema}.{target_object}")

    dlt_obj.alias("target").merge(
        df_union.alias("source"),
        f"target.{surrogate_key} = source.{surrogate_key}"
    )\
    .whenMatchedUpdate(
        condition=f"source.{cdc_col} > target.{cdc_col}",
        set={
            # Only update columns if change is detected
            col: f"source.{col}" for col in df_union.columns if col not in ["create_date", "update_date"]
        } | {
            "update_date": "current_timestamp()"
        }
    )\
    .whenNotMatchedInsertAll()\
    .execute()
else:
    df_union.write.format("delta")\
                  .mode("append")\
                  .saveAsTable(f"{catalog_name}.{target_schema}.{target_object}")


In [0]:
spark.sql(f"SELECT * FROM {catalog_name}.{target_schema}.{target_object} ORDER BY {surrogate_key} DESC").display()

In [0]:
spark.sql(f"select * from {catalog_name}.{target_schema}.{target_object} WHERE flight_id = 'F0045'").display()

In [0]:
spark.sql(f"drop table if exists workspace_bryanq.silver.transform_business")