In [0]:
from pyspark.sql.functions import *

In [0]:
# catalog
catalog = "workspace"

# source schema
source_schema = "silver"

# Source Object
source_object = "silver_bookings"

# cdc column
cdc_col = "modifiedDate"

# back-date refresh
backdated_refresh = ""

# Source fact table
fact_table = f"{catalog}.{source_schema}.{source_object}"

# Target schema
target_schema = "gold"

# Target object
target_object = "FactBookings"

# fact key cols list
fact_key_cols = ["DimPassengerskey", "Dimflightskey", "DimAirportskey", "booking_date"]


In [0]:
dimensions = [
    {
        "table" : f"{catalog}.{target_schema}.DimPassengers",
        "alias" : "DimPassengers",
        "join_key" : [("passenger_id", "passenger_id")]  #(fact_col, dim_col)
    },
     {
        "table" : f"{catalog}.{target_schema}.Dimflights",
        "alias" : "Dimflights",
        "join_key" : [("flight_id", "flight_id")]  #(fact_col, dim_col)
    },
     {
        "table" : f"{catalog}.{target_schema}.DimAirports",
        "alias" : "DimAirports",
        "join_key" : [("airport_id", "airport_id")]  #(fact_col, dim_col)
    },
]


#Columns you want to keep from fact table (besides the surrogate keys)
fact_columns = ["amount", "booking_date", "modifiedDate"]

##  ***Last Load Data***

In [0]:
# no backdated refresh
if len(backdated_refresh) == 0:

  #if table EXIST in the destination 
  if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"):

    last_load = spark.sql(f"select max({cdc_col}) from workspace.{target_schema}.{target_object}").collect()[0][0]
  
  else:
    last_load = "1900-01-01 00:00:00"
    
# do back data refresh
else:
  last_load = backdated_refresh


## test
last_load

'1900-01-01 00:00:00'

## **Dynamic Fact Query [Bring Key]**

In [0]:
def generate_fact_query_incremental(fact_table, dimensions, cdc_col, processing_date):
    fact_alias = "f"

    #Base columns to select
    select_cols = [f"{fact_alias}.{col}" for col in fact_columns]

    # Build joins dynamically
    join_clauses = []
    for dim in dimensions:
        table_full = dim["table"]
        alias = dim["alias"]
        table_name = table_full.split('.')[-1]
        surrogate_key = f"{alias}.{table_name}key"
        select_cols.append(surrogate_key)

        # Build ON Clause
        on_conditions = [
            f"{fact_alias}.{fk} = {alias}.{dk}" for fk, dk in dim["join_key"]
        ]
        join_clause = f"LEFT JOIN {table_full} {alias} ON " + " AND ".join(on_conditions)
        join_clauses.append(join_clause)
    
    #Final SELECT and JOIN Clauses
    select_clause = ",\n  ".join(select_cols)
    joins = "\n".join(join_clauses)

    #WHERE Clause for incremental filtering
    where_clause = f"{fact_alias}.{cdc_col} >= DATE('{last_load}')"

    #Final Query
    query = f"""
SELECT
      {select_clause}
FROM
      {fact_table} {fact_alias}
    {joins}
WHERE
      {where_clause}
      """.strip()
    return query
# DBTITLE 1)
        


In [0]:
query = generate_fact_query_incremental(fact_table, dimensions, cdc_col, last_load)


## DF_FACT

In [0]:
df_fact = spark.sql(query)

In [0]:
fact_key_col_str = " AND ".join([f"src.{col} = tgt.{col}" for col in fact_key_cols])
fact_key_col_str

'src.DimPassengerskey = tgt.DimPassengerskey AND src.Dimflightskey = tgt.Dimflightskey AND src.DimAirportskey = tgt.DimAirportskey AND src.booking_date = tgt.booking_date'

In [0]:
from delta.tables import DeltaTable

if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"):
    
    dlt_obj = DeltaTable.forName(spark, f"{catalog}.{target_schema}.{target_object}")
    dlt_obj.alias("tgt").merge(df_fact.alias("src"), fact_key_col_str)\
                        .whenMatchedUpdateAll(condition = f"src.{cdc_col} >= tgt.{cdc_col}")\
                        .whenNotMatchedInsertAll()\
                        .execute()

else:
    df_fact.write.format("delta")\
      .mode("overwrite")\
      .saveAsTable(f"{catalog}.{target_schema}.{target_object}")