In [0]:
from pyspark.sql.functions import *
from delta.tables import DeltaTable

#### Parameters

In [0]:
# Source code for Passengers

# Catalog Name
catalog_name = "flights"

# Source Schema
source_schema = "silver"

# Source Object
source_object = "silver_bookings"

# CDC Column
cdc_col = "modified_date"

# Back-dated Refresh
# Back-dated refresh is an extension of incremental refresh, allowing you to specify a date range within the historical data to be refreshed.
backdated_refresh = ""

# Source Fact Table
fact_table = f"{catalog_name}.{source_schema}.{source_object}"

# Target Schema
target_schema = "gold"

# Target Object
target_object = "fact_bookings"

# Fact Key Columns List
fact_key_cols = ["DimPassengersKey", "DimFlightsKey", "DimAirportsKey", "booking_date"]



In [0]:
dimensions = [
    {
        "table": f"{catalog_name}.{target_schema}.dim_passengers",
        "alias": "dim_passengers",
        "join_keys": [("passenger_id", "passenger_id")] # (fact_col, dim_col)
    },
    {
        "table": f"{catalog_name}.{target_schema}.dim_flights",
        "alias": "dim_flights",
        "join_keys": [("flight_id", "flight_id")] # (fact_col, dim_col)
    },
    {
        "table": f"{catalog_name}.{target_schema}.dim_airports",
        "alias": "dim_airports",
        "join_keys": [("airport_id", "airport_id")] # (fact_col, dim_col)
    }
]

# Columns you want to keep from the Fact table (besides the surrogate keys)
fact_columns = ["amount", "booking_date", "modified_date"]

In [0]:
dimensions

[{'table': 'flights.gold.dim_passengers',
  'alias': 'dim_passengers',
  'join_keys': [('passenger_id', 'passenger_id')]},
 {'table': 'flights.gold.dim_flights',
  'alias': 'dim_flights',
  'join_keys': [('flight_id', 'flight_id')]},
 {'table': 'flights.gold.dim_airports',
  'alias': 'dim_airports',
  'join_keys': [('airport_id', 'airport_id')]}]

#### Last Load Date

In [0]:
# This is our Incremental Data

# No Back-dated Refresh
if len(backdated_refresh) == 0:

    # If table exists in the destination
    if spark.catalog.tableExists(f"{catalog_name}.{target_schema}.{target_object}"):

        last_load = spark.sql(f"SELECT max({cdc_col}) FROM {catalog_name}.{target_schema}.{target_object}").collect()[0][0]

    # Yes back-dated refresh
    else:

        last_load = "1900-01-01 00:00:00"

else:

    last_load = backdated_refresh

# Test the Last Load
last_load


datetime.datetime(2025, 7, 11, 0, 9, 11, 806000)

## Dynamic Fact Query (bring keys)

In [0]:
def generate_fact_query_incremental(fact_table, dimensions, fact_columns, cdc_col, processing_date):

    fact_alias = "f"

    # Base columns to select
    select_columns = [f"{fact_alias}.{col}" for col in fact_columns]

    # Build joins dynamically
    join_clauses = []
    for dim in dimensions:
        table_full = dim["table"]
        alias = dim["alias"]
        table_name = table_full.split('.')[-1]

        # Convert table name to PascalCase
        pascal_name = ''.join(word.capitalize() for word in table_name.split('_'))
        surrogate_key = f"{alias}.{pascal_name}Key"
        select_columns.append(surrogate_key)

        # Build ON clause
        on_conditions = [
            f"{fact_alias}.{fk} = {alias}.{dk}" for fk, dk in dim["join_keys"]
        ]
        join_clause = f"LEFT JOIN {table_full} {alias} ON " + " AND ".join(on_conditions)
        join_clauses.append(join_clause)
        
    # Final SELECT and JOIN clauses
    select_clause = ",\n    ".join(select_columns)
    joins = "\n".join(join_clauses)

    # WHERE clause for incremental filtering
    where_clause = f"{fact_alias}.{cdc_col} >= DATE('{last_load}')"

    # Final query
    query = f"""
SELECT
    {select_clause}
FROM
    {fact_table} {fact_alias}
{joins}
WHERE
    {where_clause}
    """.strip()

    return query

In [0]:
query = generate_fact_query_incremental(fact_table, dimensions, fact_columns, cdc_col, last_load)

In [0]:
print(query)

SELECT
    f.amount,
    f.booking_date,
    f.modified_date,
    dim_passengers.DimPassengersKey,
    dim_flights.DimFlightsKey,
    dim_airports.DimAirportsKey
FROM
    flights.silver.silver_bookings f
LEFT JOIN flights.gold.dim_passengers dim_passengers ON f.passenger_id = dim_passengers.passenger_id
LEFT JOIN flights.gold.dim_flights dim_flights ON f.flight_id = dim_flights.flight_id
LEFT JOIN flights.gold.dim_airports dim_airports ON f.airport_id = dim_airports.airport_id
WHERE
    f.modified_date >= DATE('2025-07-11 00:09:11.806000')


#### Data Frame DF_FACT

In [0]:
df_fact = spark.sql(query)

In [0]:
df_fact.display()

amount,booking_date,modified_date,DimPassengersKey,DimFlightsKey,DimAirportsKey
850.72,2025-05-29,2025-07-11T00:09:11.806Z,119,2,28
376.63,2025-06-09,2025-07-11T00:09:11.806Z,42,4,15
534.02,2025-06-03,2025-07-11T00:09:11.806Z,97,38,1
1333.7,2025-06-16,2025-07-11T00:09:11.806Z,132,61,18
1334.96,2025-06-17,2025-07-11T00:09:11.806Z,41,21,40
296.13,2025-05-18,2025-07-11T00:09:11.806Z,142,13,26
460.14,2025-04-05,2025-07-11T00:09:11.806Z,172,47,33
1402.02,2025-06-04,2025-07-11T00:09:11.806Z,90,74,34
1444.51,2025-05-16,2025-07-11T00:09:11.806Z,84,17,33
292.39,2025-05-16,2025-07-11T00:09:11.806Z,69,59,24


In [0]:
df_fact.groupBy("DimPassengersKey", "DimAirportsKey", "DimFlightsKey").count().display()

DimPassengersKey,DimAirportsKey,DimFlightsKey,count
71,15,48,1
125,4,79,1
177,46,3,1
179,20,96,1
90,13,51,1
89,31,57,1
21,1,32,1
186,9,24,1
134,31,22,1
185,23,11,1


In [0]:
# Duplicated Records
df_fact.groupBy("DimPassengersKey", "DimAirportsKey", "DimFlightsKey").count().filter("count > 1").display()

DimPassengersKey,DimAirportsKey,DimFlightsKey,count
180,44,8,2
115,42,47,2


In [0]:
# Duplicated Record Example
df_fact.filter((col('DimPassengersKey') == 180) & (col('DimairportsKey') == 44) & (col('DimFlightsKey') == 8)).display()


amount,booking_date,modified_date,DimPassengersKey,DimFlightsKey,DimAirportsKey
397.77,2025-06-06,2025-07-11T00:09:11.806Z,180,8,44
1472.47,2025-05-04,2025-07-11T00:09:11.806Z,180,8,44


### Upsert

In [0]:
# Fact Key Columns Merge Condition
fact_key_cols_str = " AND ".join([f"src.{col} = trg.{col}" for col in fact_key_cols])
fact_key_cols_str


'src.DimPassengersKey = trg.DimPassengersKey AND src.DimFlightsKey = trg.DimFlightsKey AND src.DimAirportsKey = trg.DimAirportsKey AND src.booking_date = trg.booking_date'

In [0]:
if spark.catalog.tableExists(f"{catalog_name}.{target_schema}.{target_object}"):

    dlt_obj = DeltaTable.forName(spark, f"{catalog_name}.{target_schema}.{target_object}")

    dlt_obj.alias("trg").merge(df_fact.alias("src"), fact_key_cols_str)\
                        .whenMatchedUpdateAll(condition = f"src.{cdc_col} >= trg.{cdc_col}")\
                        .whenNotMatchedInsertAll()\
                        .execute()

else:

    df_fact.write.format("delta")\
                  .mode("append")\
                  .saveAsTable(f"{catalog_name}.{target_schema}.{target_object}")

In [0]:
%sql
SELECT *
FROM flights.gold.fact_bookings

amount,booking_date,modified_date,DimPassengersKey,DimFlightsKey,DimAirportsKey
1368.6,2025-06-07,2025-07-11T00:09:11.806Z,40,88,13
427.56,2025-03-28,2025-07-11T00:09:11.806Z,40,66,15
320.52,2025-05-05,2025-07-11T00:09:11.806Z,40,95,5
220.26,2025-06-16,2025-07-11T00:09:11.806Z,40,43,18
716.33,2025-06-01,2025-07-11T00:09:11.806Z,122,54,29
940.65,2025-04-19,2025-07-11T00:09:11.806Z,122,79,6
265.87,2025-05-17,2025-07-11T00:09:11.806Z,122,25,39
835.11,2025-06-10,2025-07-11T00:09:11.806Z,122,37,19
910.26,2025-05-21,2025-07-11T00:09:11.806Z,122,85,27
915.79,2025-04-01,2025-07-11T00:09:11.806Z,122,76,12


In [0]:
df = spark.sql("SELECT * FROM flights.gold.dim_airports").groupBy("DimAirportsKey").count().filter(col("count") > 1)
df.display()

DimAirportsKey,count


In [0]:
df = spark.sql("SELECT * FROM flights.gold.dim_flights").groupBy("DimFlightsKey").count().filter(col("count") > 1)
df.display()

DimFlightsKey,count


In [0]:
df = spark.sql("SELECT * FROM flights.gold.dim_passengers").groupBy("DimPassengersKey").count().filter(col("count") > 1)
df.display()

DimPassengersKey,count
