In [0]:
%sql
SELECT * FROM workspace.default.silver_flights

flight_id,airline,origin,destination,flight_date,modifiedDate
F0001,Delta,Kellyfort,South Kathleen,2025-05-04,2026-02-09T06:19:05.812Z
F0002,Qatar Airways,Lake Stephen,New Vincent,2025-04-29,2026-02-09T06:19:05.812Z
F0003,Lufthansa,East Patrickborough,North Mary,2025-05-11,2026-02-09T06:19:05.812Z
F0004,Delta,Maddenshire,Johnchester,2025-05-16,2026-02-09T06:19:05.812Z
F0005,Qatar Airways,Bennettside,New Mistyhaven,2025-06-13,2026-02-09T06:19:05.812Z
F0006,Air Canada,New Richardside,South Jamesborough,2025-05-16,2026-02-09T06:19:05.812Z
F0007,Delta,Berryport,Miguelburgh,2025-05-24,2026-02-09T06:19:05.812Z
F0008,Lufthansa,Briannachester,Cervantesland,2025-05-26,2026-02-09T06:19:05.812Z
F0009,Delta,Alexandraborough,North Alexishaven,2025-06-10,2026-02-09T06:19:05.812Z
F0010,Emirates,Kruegerchester,Martintown,2025-05-20,2026-02-09T06:19:05.812Z


In [0]:
val = 'flight_id'
eval(f"['{val}']")

['flight_id']

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import *

# ==========================================
# 1. CONFIGURATION (Fixed Column Name)
# ==========================================
key_columns_list = ["flight_id"] 

# CHANGED: Updated to match your table's actual column name
cdc_column = "modifiedDate" 

source_schema = "default" 
source_table = "silver_flights"

target_schema = "gold"
target_table = "dim_flights"
surrogate_key = "dim_flights_key"

catalog = "workspace"

# Construct full paths
source_table_path = f"{catalog}.{source_schema}.{source_table}"
target_table_path = f"{catalog}.{target_schema}.{target_table}"

print(f"Reading from: {source_table_path}")
print(f"Writing to:   {target_table_path}")
print(f"CDC Column:   {cdc_column}")

# ==========================================
# 2. CHECK SOURCE DATA
# ==========================================
try:
    df_source_check = spark.table(source_table_path)
    print(f"Success! Found {df_source_check.count()} rows in source.")
except Exception as e:
    print(f"CRITICAL ERROR: Could not find {source_table_path}.")
    dbutils.notebook.exit("Source table missing")

# ==========================================
# 3. DETERMINE LOAD DATE
# ==========================================
if spark.catalog.tableExists(target_table_path):
    try:
        max_date_row = spark.sql(f"SELECT MAX({cdc_column}) FROM {target_table_path}").collect()[0][0]
        last_load_date = max_date_row if max_date_row else "1900-01-01 00:00:00"
    except:
        last_load_date = "1900-01-01 00:00:00"
else:
    last_load_date = "1900-01-01 00:00:00"

print(f"Processing data newer than: {last_load_date}")

# ==========================================
# 4. READ & FILTER SOURCE
# ==========================================
df_source = spark.table(source_table_path).filter(col(cdc_column) > lit(last_load_date))

if df_source.count() == 0:
    print("No new data to process.")
else:
    # ==========================================
    # 5. PREPARE JOIN LOGIC
    # ==========================================
    df_source.createOrReplaceTempView("source_view")
    
    if spark.catalog.tableExists(target_table_path):
        spark.table(target_table_path).createOrReplaceTempView("target_view")
    else:
        # Create empty temp view if table doesn't exist
        cols = ", ".join([f"CAST(NULL AS STRING) AS {c}" for c in key_columns_list])
        spark.sql(f"""
            SELECT {cols}, CAST(NULL AS LONG) AS {surrogate_key}, 
            CAST(NULL AS TIMESTAMP) AS create_date, 
            CAST(NULL AS TIMESTAMP) AS update_date 
            WHERE 1=0
        """).createOrReplaceTempView("target_view")

    join_cond = " AND ".join([f"source_view.{c} = target_view.{c}" for c in key_columns_list])
    
    df_joined = spark.sql(f"""
        SELECT source_view.*, 
               target_view.{surrogate_key} as existing_key,
               target_view.create_date as existing_create_date
        FROM source_view
        LEFT JOIN target_view ON {join_cond}
    """)

    df_new = df_joined.filter("existing_key IS NULL")
    df_old = df_joined.filter("existing_key IS NOT NULL")

    # ==========================================
    # 6. ENRICHMENT & KEY GENERATION
    # ==========================================
    if spark.catalog.tableExists(target_table_path):
        mx = spark.sql(f"SELECT MAX({surrogate_key}) FROM {target_table_path}").collect()[0][0]
        max_key = mx if mx else 0
    else:
        max_key = 0

    df_new_enriched = df_new.withColumn(surrogate_key, (monotonically_increasing_id() + 1 + max_key).cast("long")) \
                            .withColumn("create_date", current_timestamp()) \
                            .withColumn("update_date", current_timestamp()) \
                            .drop("existing_key", "existing_create_date")

    df_old_enriched = df_old.withColumn(surrogate_key, col("existing_key").cast("long")) \
                            .withColumn("create_date", col("existing_create_date")) \
                            .withColumn("update_date", current_timestamp()) \
                            .drop("existing_key", "existing_create_date")

    # ==========================================
    # 7. MERGE (UPSERT)
    # ==========================================
    df_final = df_new_enriched.unionByName(df_old_enriched)

    # Ensure Gold schema exists
    spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{target_schema}")

    if spark.catalog.tableExists(target_table_path):
        dt = DeltaTable.forName(spark, target_table_path)
        dt.alias("t").merge(df_final.alias("s"), f"t.{surrogate_key} = s.{surrogate_key}") \
            .whenMatchedUpdateAll(condition = f"s.{cdc_column} >= t.{cdc_column}") \
            .whenNotMatchedInsertAll() \
            .execute()
        print(f"Upsert Complete. {df_final.count()} rows processed.")
    else:
        df_final.write.format("delta").saveAsTable(target_table_path)
        print(f"Table {target_table_path} created with {df_final.count()} rows.")

Reading from: workspace.default.silver_flights
Writing to:   workspace.gold.dim_flights
CDC Column:   modifiedDate
Success! Found 110 rows in source.
Processing data newer than: 1900-01-01 00:00:00
Table workspace.gold.dim_flights created with 110 rows.


In [0]:
# --- Configuration for Airports ---
key_columns_list = ["airport_id"] 
cdc_column = "modifiedDate"

source_schema = "default"
source_table = "silver_airports"

target_schema = "gold"
target_table = "dim_airports"
surrogate_key = "dim_airports_key"

catalog = "workspace"

In [0]:
# --- Configuration for Passengers ---
key_columns_list = ["passenger_id"] 
cdc_column = "modifiedDate"

source_schema = "default"
source_table = "silver_passengers"

target_schema = "gold"
target_table = "dim_passengers"
surrogate_key = "dim_passengers_key"

catalog = "workspace"

In [0]:
from pyspark.sql.functions import *
from delta.tables import *

# ==========================================
# 1. CONFIGURATION
# ==========================================
catalog = "workspace"
silver_schema = "default"  # Where your DLT created the tables
gold_schema = "gold"       # Where we are writing

# Table Names
source_table = f"{catalog}.{silver_schema}.silver_bookings"
fact_table = f"{catalog}.{gold_schema}.fact_bookings"

# Dimension Tables to Join
dim_flights = f"{catalog}.{gold_schema}.dim_flights"
dim_passengers = f"{catalog}.{gold_schema}.dim_passengers"
dim_airports = f"{catalog}.{gold_schema}.dim_airports"

print(f"Building Fact Table: {fact_table}")

# ==========================================
# 2. READ DATA
# ==========================================
# Read the transactional data (Bookings)
df_bookings = spark.table(source_table)

# Read the Dimensions (We only need the Keys and the Natural IDs for joining)
df_dim_flights = spark.table(dim_flights).select("flight_id", "dim_flights_key")
df_dim_passengers = spark.table(dim_passengers).select("passenger_id", "dim_passengers_key")
df_dim_airports = spark.table(dim_airports).select("airport_id", "dim_airports_key")

# ==========================================
# 3. JOINING (Replacing IDs with Surrogate Keys)
# ==========================================
# This creates the "Star Schema" by linking facts to dimensions
df_fact = df_bookings.alias("b") \
    .join(df_dim_flights.alias("f"), "flight_id", "left") \
    .join(df_dim_passengers.alias("p"), "passenger_id", "left") \
    .join(df_dim_airports.alias("a"), "airport_id", "left") \
    .select(
        col("b.booking_id"),
        col("b.booking_date"),
        col("b.amount"),
        col("f.dim_flights_key"),
        col("p.dim_passengers_key"),
        col("a.dim_airports_key"),
        col("b.modifiedDate").alias("modify_date") # Keeping lineage
    )

# ==========================================
# 4. WRITE TO GOLD (Fact Table)
# ==========================================
# Fact tables are usually large, so we append or overwrite. 
# For this project, we will overwrite to ensure a clean state.

print("Writing data to Gold Layer...")

df_fact.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(fact_table)

print(f"Success! Fact Table created at {fact_table}")
display(spark.sql(f"SELECT * FROM {fact_table} LIMIT 5"))

Building Fact Table: workspace.gold.fact_bookings
Writing data to Gold Layer...


[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-4791962059803034>, line 62[0m
[1;32m     52[0m [38;5;66;03m# 4. WRITE TO GOLD (Fact Table)[39;00m
[1;32m     54[0m [38;5;66;03m# Fact tables are usually large, so we append or overwrite. [39;00m
[1;32m     55[0m [38;5;66;03m# For this project, we will overwrite to ensure a clean state.[39;00m
[1;32m     57[0m [38;5;28mprint[39m([38;5;124m"[39m[38;5;124mWriting data to Gold Layer...[39m[38;5;124m"[39m)
[1;32m     59[0m df_fact[38;5;241m.[39mwrite[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mdelta[39m[38;5;124m"[39m) \
[1;32m     60[0m     [38;5;241m.[39mmode([38;5;124m"[39m[38;5;124moverwrite[39m[38;5;124m"[39m) \
[1;32m     61[0m     [38;5;241m.[39moption([38;5;124m"[39m[38;5;124moverwriteSchema[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import *

# --- CONFIGURATION FOR PASSENGERS ---
key_columns_list = ["passenger_id"] 
cdc_column = "modifiedDate"
source_table_path = "workspace.default.silver_passengers"
target_table_path = "workspace.gold.dim_passengers"
surrogate_key = "dim_passengers_key"

print(f"FORCING BUILD: {target_table_path}...")

# --- BUILDER LOGIC (Compact Version) ---
# 1. Check Source
try:
    df_source = spark.table(source_table_path)
except:
    print(f"Error: {source_table_path} missing. Run Silver pipeline.")
    dbutils.notebook.exit("Source missing")

# 2. Last Load Date
if spark.catalog.tableExists(target_table_path):
    try:
        mx = spark.sql(f"SELECT MAX({cdc_column}) FROM {target_table_path}").collect()[0][0]
        last_load = mx if mx else "1900-01-01"
    except: last_load = "1900-01-01"
else:
    last_load = "1900-01-01"

# 3. Read & Join
df_src = df_source.filter(col(cdc_column) > lit(last_load))
df_src.createOrReplaceTempView("src_view")

if not spark.catalog.tableExists(target_table_path):
    # Init Table Logic
    spark.sql(f"""
        SELECT CAST(NULL AS STRING) AS passenger_id, CAST(0 AS LONG) AS {surrogate_key}, 
        CAST(NULL AS TIMESTAMP) AS create_date, CAST(NULL AS TIMESTAMP) AS update_date WHERE 1=0
    """).createOrReplaceTempView("tgt_view")
else:
    spark.table(target_table_path).createOrReplaceTempView("tgt_view")

# 4. Filter New vs Old
df_joined = spark.sql(f"""
    SELECT s.*, t.{surrogate_key} as k, t.create_date as cd FROM src_view s 
    LEFT JOIN tgt_view t ON s.passenger_id = t.passenger_id
""")
df_new = df_joined.filter("k IS NULL")
df_old = df_joined.filter("k IS NOT NULL")

# 5. Generate Keys
max_k = 0
if spark.catalog.tableExists(target_table_path):
    r = spark.sql(f"SELECT MAX({surrogate_key}) FROM {target_table_path}").collect()[0][0]
    if r: max_k = r

df_new_final = df_new.withColumn(surrogate_key, (monotonically_increasing_id() + 1 + max_k).cast("long")) \
    .withColumn("create_date", current_timestamp()).withColumn("update_date", current_timestamp()).drop("k", "cd")
df_old_final = df_old.withColumn(surrogate_key, col("k").cast("long")) \
    .withColumn("create_date", col("cd")).withColumn("update_date", current_timestamp()).drop("k", "cd")

df_final = df_new_final.unionByName(df_old_final)

# 6. Save
if spark.catalog.tableExists(target_table_path):
    DeltaTable.forName(spark, target_table_path).alias("t").merge(df_final.alias("s"), f"t.{surrogate_key}=s.{surrogate_key}") \
        .whenMatchedUpdateAll(condition=f"s.{cdc_column} >= t.{cdc_column}").whenNotMatchedInsertAll().execute()
    print("Upsert Done.")
else:
    df_final.write.format("delta").saveAsTable(target_table_path)
    print("Table Created.")

FORCING BUILD: workspace.gold.dim_passengers...
Table Created.


In [0]:
# --- CONFIGURATION FOR AIRPORTS ---
key_columns_list = ["airport_id"] 
cdc_column = "modifiedDate"
source_table_path = "workspace.default.silver_airports"
target_table_path = "workspace.gold.dim_airports"
surrogate_key = "dim_airports_key"

print(f"FORCING BUILD: {target_table_path}...")

# (Re-running the exact same logic as above with these new variables)
# ... [Logic is identical to Step 1, this block ensures variables update] ...

# 1. Check Source
try:
    df_source = spark.table(source_table_path)
except:
    dbutils.notebook.exit("Source missing")

# 2. Last Load
if spark.catalog.tableExists(target_table_path):
    try:
        mx = spark.sql(f"SELECT MAX({cdc_column}) FROM {target_table_path}").collect()[0][0]
        last_load = mx if mx else "1900-01-01"
    except: last_load = "1900-01-01"
else:
    last_load = "1900-01-01"

# 3. Read & Join
df_src = df_source.filter(col(cdc_column) > lit(last_load))
df_src.createOrReplaceTempView("src_view")

if not spark.catalog.tableExists(target_table_path):
    spark.sql(f"""
        SELECT CAST(NULL AS STRING) AS airport_id, CAST(0 AS LONG) AS {surrogate_key}, 
        CAST(NULL AS TIMESTAMP) AS create_date, CAST(NULL AS TIMESTAMP) AS update_date WHERE 1=0
    """).createOrReplaceTempView("tgt_view")
else:
    spark.table(target_table_path).createOrReplaceTempView("tgt_view")

# 4. Filter
df_joined = spark.sql(f"""
    SELECT s.*, t.{surrogate_key} as k, t.create_date as cd FROM src_view s 
    LEFT JOIN tgt_view t ON s.airport_id = t.airport_id
""")
df_new = df_joined.filter("k IS NULL")
df_old = df_joined.filter("k IS NOT NULL")

# 5. Generate Keys
max_k = 0
if spark.catalog.tableExists(target_table_path):
    r = spark.sql(f"SELECT MAX({surrogate_key}) FROM {target_table_path}").collect()[0][0]
    if r: max_k = r

df_new_final = df_new.withColumn(surrogate_key, (monotonically_increasing_id() + 1 + max_k).cast("long")) \
    .withColumn("create_date", current_timestamp()).withColumn("update_date", current_timestamp()).drop("k", "cd")
df_old_final = df_old.withColumn(surrogate_key, col("k").cast("long")) \
    .withColumn("create_date", col("cd")).withColumn("update_date", current_timestamp()).drop("k", "cd")

df_final = df_new_final.unionByName(df_old_final)

# 6. Save
if spark.catalog.tableExists(target_table_path):
    DeltaTable.forName(spark, target_table_path).alias("t").merge(df_final.alias("s"), f"t.{surrogate_key}=s.{surrogate_key}") \
        .whenMatchedUpdateAll(condition=f"s.{cdc_column} >= t.{cdc_column}").whenNotMatchedInsertAll().execute()
    print("Upsert Done.")
else:
    df_final.write.format("delta").saveAsTable(target_table_path)
    print("Table Created.")

FORCING BUILD: workspace.gold.dim_airports...
Table Created.


In [0]:
from pyspark.sql.functions import *
from delta.tables import *

# ==========================================
# 1. CONFIGURATION
# ==========================================
catalog = "workspace"
silver_schema = "default"  # Where your DLT pipeline put the tables
gold_schema = "gold"       # Where we are writing

# Source Table (Bookings)
source_table = f"{catalog}.{silver_schema}.silver_bookings"

# Target Table (Fact)
fact_table = f"{catalog}.{gold_schema}.fact_bookings"

# Dimension Tables to Join (These must exist now!)
dim_flights = f"{catalog}.{gold_schema}.dim_flights"
dim_passengers = f"{catalog}.{gold_schema}.dim_passengers"
dim_airports = f"{catalog}.{gold_schema}.dim_airports"

print(f"Building Fact Table: {fact_table}")

# ==========================================
# 2. READ DATA
# ==========================================
# Read the transactional data
try:
    df_bookings = spark.table(source_table)
    print("Read Bookings Table.")
except:
    print(f"Error: Could not find {source_table}")
    dbutils.notebook.exit("Source missing")

# Read the Dimensions
# We only need the Natural Keys (ID) and Surrogate Keys (Key)
df_dim_flights = spark.table(dim_flights).select("flight_id", "dim_flights_key")
df_dim_passengers = spark.table(dim_passengers).select("passenger_id", "dim_passengers_key")
df_dim_airports = spark.table(dim_airports).select("airport_id", "dim_airports_key")

# ==========================================
# 3. JOINING (The "Star Schema" Logic)
# ==========================================
# This replaces the text IDs with your new Gold Surrogate Keys
df_fact = df_bookings.alias("b") \
    .join(df_dim_flights.alias("f"), "flight_id", "left") \
    .join(df_dim_passengers.alias("p"), "passenger_id", "left") \
    .join(df_dim_airports.alias("a"), "airport_id", "left") \
    .select(
        col("b.booking_id"),
        col("b.booking_date"),
        col("b.amount"),
        col("f.dim_flights_key"),
        col("p.dim_passengers_key"),
        col("a.dim_airports_key"),
        col("b.modifiedDate").alias("modify_date") 
    )

# ==========================================
# 4. WRITE TO GOLD
# ==========================================
print("Writing to Gold Layer...")

df_fact.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(fact_table)

print(f"Success! Fact Table created at {fact_table}")
display(spark.sql(f"SELECT * FROM {fact_table} LIMIT 5"))

Building Fact Table: workspace.gold.fact_bookings
Read Bookings Table.
Writing to Gold Layer...
Success! Fact Table created at workspace.gold.fact_bookings


booking_id,booking_date,amount,dim_flights_key,dim_passengers_key,dim_airports_key,modify_date
B00001,2025-05-29,850.72,14,48,48,2026-02-09T06:19:14.396Z
B00002,2025-06-09,376.63,52,11,3,2026-02-09T06:19:14.396Z
B00003,2025-06-03,534.02,23,79,12,2026-02-09T06:19:14.396Z
B00004,2025-06-16,1333.7,1,68,39,2026-02-09T06:19:14.396Z
B00005,2025-06-17,1334.96,19,189,8,2026-02-09T06:19:14.396Z
