# **Parameters**

In [0]:
#Catalog Name
catalog = "workspace"

#Source Object
source_object = "silver_bookings"

#Source Schema
source_schema = "silver"

#CDC Column
cdc_column = "modifiedDate"

#Back dated Refresh
backdated_refresh = ""

#Source Fact Table
fact_table = f"{catalog}.{source_schema}.{source_object}"

#Target Object
target_object = "FactBookings"

#Target Schema
target_schema = "gold"

#Fact Key Cols List
fact_key_cols = ["DimPassengersKey","DimFlightsKey","DimAirportsKey","booking_date"]

In [0]:
dimensions = [
    {
        "table" : f"{catalog}.{target_schema}.DimPassengers",
        "alias" : "DimPassengers",
        "join_keys" : [("passenger_id", "passenger_id")] #(fact_col, dim_col)
    },
    {
        "table" : f"{catalog}.{target_schema}.DimFlights",
        "alias" : "DimFlights",
        "join_keys" : [("flight_id", "flight_id")] #(fact_col, dim_col)
    },
    {
        "table" : f"{catalog}.{target_schema}.DimAirports",
        "alias" : "DimAirports",
        "join_keys" : [("airport_id", "airport_id")] #(fact_col, dim_col)
    }
]

#Columns you want to keep from Fact Table (besides the surrogate keys)
fact_columns = ["amount","booking_date","modifiedDate"]

### LAST LOAD DATE

In [0]:
#No Back Dated Refresh
if len(backdated_refresh) == 0:
    
    #If table exists in the destination 
    if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"):

        last_load = spark.sql(f"SELECT max({cdc_column}) FROM {catalog}.{target_schema}.{target_object}").collect()[0][0]
    
    else:
        
        last_load = "1900-01-01 00:00:00"

#Back Dated Refresh
else:

    last_load = backdated_refresh

last_load

datetime.datetime(2025, 7, 14, 7, 0, 46, 165000)

## Dynamic Fact Key [Bring Keys]

In [0]:
def generate_fact_query_incremental(fact_table, dimensions, fact_columns, cdc_column, processing_date):
    fact_alias = "f"

    #Base columns to select
    select_cols = [f"{fact_alias}.{col}" for col in fact_columns]

    #Build joins dynamically
    join_clauses = []
    for dim in dimensions:
        table_full = dim["table"]
        alias = dim["alias"]
        table_name = table_full.split('.')[-1]
        surrogate_key = f"{alias}.{table_name}Key"
        select_cols.append(surrogate_key)

        #Build ON Clause
        on_conditions = [
            f"{fact_alias}.{fk} = {alias}.{dk}" for fk,dk in dim["join_keys"]
        ]
        join_clause = f"LEFT JOIN {table_full} {alias} ON " + " AND ".join(on_conditions)
        join_clauses.append(join_clause)
        
    #Final SELECT and JOIN clauses
    select_clause = ",\n   ".join(select_cols)
    joins = "\n".join(join_clauses)

    #WHERE clause for incremental filtering
    where_clause = f"{fact_alias}.{cdc_column} >= DATE('{last_load}')"

    #Final query
    query = f"""
        SELECT  
        {select_clause}
    FROM  {fact_table} {fact_alias}
        {joins}
    WHERE {where_clause}
    """.strip()

    return query

In [0]:
query = generate_fact_query_incremental(fact_table, dimensions, fact_columns, cdc_column, last_load)

## DF_FACT

In [0]:
df_fact = spark.sql(query)

In [0]:
df_fact.limit(10).display()

amount,booking_date,modifiedDate,DimPassengersKey,DimFlightsKey,DimAirportsKey
850.72,2025-05-29,2025-07-14T07:00:46.165Z,128,2,28
376.63,2025-06-09,2025-07-14T07:00:46.165Z,44,4,15
534.02,2025-06-03,2025-07-14T07:00:46.165Z,105,38,1
1333.7,2025-06-16,2025-07-14T07:00:46.165Z,141,61,18
1334.96,2025-06-17,2025-07-14T07:00:46.165Z,43,21,40
296.13,2025-05-18,2025-07-14T07:00:46.165Z,151,13,26
460.14,2025-04-05,2025-07-14T07:00:46.165Z,181,47,33
1402.02,2025-06-04,2025-07-14T07:00:46.165Z,98,74,34
1444.51,2025-05-16,2025-07-14T07:00:46.165Z,92,17,33
292.39,2025-05-16,2025-07-14T07:00:46.165Z,74,59,24


## UPSERT

In [0]:
#Fact Key Columns Merge Condition
fact_key_cols_str = " AND ".join([f"src.{col} = trg.{col}" for col in fact_key_cols])
fact_key_cols_str

'src.DimPassengersKey = trg.DimPassengersKey AND src.DimFlightsKey = trg.DimFlightsKey AND src.DimAirportsKey = trg.DimAirportsKey AND src.booking_date = trg.booking_date'

In [0]:
from delta.tables import DeltaTable

In [0]:
if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"):
  dlt_obj = DeltaTable.forName(spark,f"{catalog}.{target_schema}.{target_object}")
  dlt_obj.alias("trg").merge(df_fact.alias("src"),fact_key_cols_str)\
                        .whenMatchedUpdateAll(condition = f'src.{cdc_column} >= trg.{cdc_column}')\
                        .whenNotMatchedInsertAll()\
                        .execute()
else:
  df_fact.write.format('delta')\
                .mode("append")\
                .saveAsTable(f"{catalog}.{target_schema}.{target_object}")

In [0]:
%sql
SELECT * FROM workspace.gold.factbookings LIMIT 10

amount,booking_date,modifiedDate,DimPassengersKey,DimFlightsKey,DimAirportsKey
1368.6,2025-06-07,2025-07-14T07:00:46.165Z,42,88,13
427.56,2025-03-28,2025-07-14T07:00:46.165Z,42,66,15
320.52,2025-05-05,2025-07-14T07:00:46.165Z,42,95,5
220.26,2025-06-16,2025-07-14T07:00:46.165Z,42,43,18
716.33,2025-06-01,2025-07-14T07:00:46.165Z,131,54,29
940.65,2025-04-19,2025-07-14T07:00:46.165Z,131,79,6
265.87,2025-05-17,2025-07-14T07:00:46.165Z,131,25,39
835.11,2025-06-10,2025-07-14T07:00:46.165Z,131,37,19
910.26,2025-05-21,2025-07-14T07:00:46.165Z,131,85,27
915.79,2025-04-01,2025-07-14T07:00:46.165Z,131,76,12


In [0]:
%sql
SELECT * FROM workspace.dbt_amohite.my_first_business_view LIMIT 10

country,total_amount
South Georgia and the South Sandwich Islands,55720.81999999999
Libyan Arab Jamahiriya,37338.86
Switzerland,37338.81000000001
Cote d'Ivoire,28785.210000000003
Cayman Islands,28575.560000000005
Macedonia,26128.290000000005
Korea,25090.83
Tokelau,25013.709999999995
Hong Kong,24416.46
Ireland,24156.390000000003
