In [1]:
# Welcome to your new notebook
# Type here in the cell editor to add code!
# Show all tables available in the attached Lakehouse
spark.sql("SHOW TABLES").show(truncate=False)



StatementMeta(, 92df48ae-e772-4e05-87aa-aa9589907da5, 3, Finished, Available, Finished)

+---------------+-------------------------------+-----------+
|namespace      |tableName                      |isTemporary|
+---------------+-------------------------------+-----------+
|sales_lakehouse|bronze_dim_customers           |false      |
|sales_lakehouse|bronze_dim_products            |false      |
|sales_lakehouse|bronze_fact_sales              |false      |
|sales_lakehouse|bronze_dim_salespersons        |false      |
|sales_lakehouse|bronze_dim_date                |false      |
|sales_lakehouse|bronze_fx_rates                |false      |
|sales_lakehouse|bronze_dimension_changes_events|false      |
+---------------+-------------------------------+-----------+



In [4]:
%run ./nb_utils_silver


StatementMeta(, f10d53f9-cd0a-43f0-820b-44dc466e0258, 6, Finished, Available, Finished)

In [2]:
#Transforming bronze customer to silver_customer
# Silver Layer - Customers

"""
This notebook transforms `bronze_dim_customers` into `silver_dim_customers`.

Steps performed:
1. Load data from Bronze.
2. Clean string columns (trim spaces, proper casing).
3. Remove duplicates.
4. Handle null values with defaults.
5. Convert `created_date` to proper Date.
6. Save the cleaned data into a Silver Delta table.
"""

from pyspark.sql.functions import col, initcap, upper, coalesce, lit

# Load Bronze
b = spark.table("bronze_dim_customers")

# Clean & standardize
silver = (
    b.dropDuplicates(["customer_id"])              # 1 row per customer
     .transform(trim_all)                          # trim all string columns
     .withColumn("customer_name", initcap(col("customer_name")))  # "john doe" -> "John Doe"
     .withColumn("tier", upper(col("tier")))       # gold/Gold -> GOLD
     .withColumn("status", coalesce(col("status"), lit("Inactive")))  # default if null
)
silver = to_date_safe(silver, "created_date")      # cast to DateType safely

# Save as managed Delta table (idempotent)
write_delta_table(silver, "silver_dim_customers")

# Quick peek
spark.table("silver_dim_customers").show(5, truncate=False)


StatementMeta(, 14afe12c-0c9f-452f-8cec-7d40b55c08e3, 4, Finished, Available, Finished)

✅ Wrote silver_dim_customers: 400 rows
+-----------+-------------+------+--------------+-------------+---------+------------+----------------+-----------+------------+----------------------------+------------+------+
|customer_id|customer_name|tier  |address_line1 |address_line2|city     |country     |reporting_region|postal_code|phone       |email                       |created_date|status|
+-----------+-------------+------+--------------+-------------+---------+------------+----------------+-----------+------------+----------------------------+------------+------+
|CUST00001  |Gulftech 773 |GOLD  |189 Palm Ave  |NULL         |Singapore|Singapore   |SEA             |584269     |+96-11223252|gulftech7736904@example.com |2024-07-04  |Active|
|CUST00002  |Meera Das    |SILVER|51 MG Road    |NULL         |Dammam   |Saudi Arabia|GCC             |345139     |+87-81590817|meeradas9072@example.com    |2024-12-02  |Active|
|CUST00003  |Asternet 469 |SILVER|134 Bay Street|Block-23     |Mumbai  

In [3]:
display(spark.table("silver_dim_customers").limit(5))


StatementMeta(, 14afe12c-0c9f-452f-8cec-7d40b55c08e3, 5, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 350c753e-b602-4d67-8a11-cb980cca44a1)

In [8]:
# Bronze -> Silver: Products
from pyspark.sql import functions as F
from pyspark.sql.window import Window


# 1) Read Bronze
prod = spark.table("bronze_dim_products")

# 2) Trim every string column (helper from nb_utils_silver)
prod = trim_all(prod)

# 3) Standardize casing for label-like columns (if present)
for colname in ["product_name", "category", "brand", "uom"]:
    if colname in prod.columns:
        prod = prod.withColumn(colname, F.initcap(F.col(colname)))

# 4) Type-cast common numeric/date columns (if present)
if "unit_price" in prod.columns:
    prod = prod.withColumn("unit_price", F.col("unit_price").cast("double"))

if "unit_cost" in prod.columns:
    prod = prod.withColumn("unit_cost", F.col("unit_cost").cast("double"))

# Cast created_date safely if present
has_created_date = "created_date" in prod.columns
if has_created_date:
    prod = to_date_safe(prod, "created_date")

# 5) Filter out rows with missing product_id
prod = prod.filter(F.col("product_id").isNotNull())

# 6) Deduplicate by product_id (keep latest by created_date when available)
if has_created_date:
    # Push NULLs last without using desc_nulls_last (works on all Spark versions)
    # Sort key: non-null created_date first (False < True), then created_date desc
    order_cols = [F.col("created_date").isNull().asc(), F.col("created_date").desc()]
else:
    # Fallback deterministic order when no timestamp column
    order_cols = [F.col("product_id").desc()]

w = Window.partitionBy("product_id").orderBy(*order_cols)

prod = (
    prod.withColumn("rn", F.row_number().over(w))
        .filter(F.col("rn") == 1)
        .drop("rn")
)

# 7) Optional standard boolean from a status column (kept neutral in Silver)
if "status" in prod.columns and "is_active_std" not in prod.columns:
    prod = prod.withColumn(
        "is_active_std",
        F.when(F.upper(F.col("status")).isin("ACTIVE", "A", "TRUE", "Y"), F.lit(True))
         .when(F.upper(F.col("status")).isin("INACTIVE", "I", "FALSE", "N"), F.lit(False))
         .otherwise(F.lit(None).cast("boolean"))
    )

# 8) Write Silver table (managed Delta in Lakehouse catalog)
write_delta_table(prod, "silver_dim_products")

# 9) Pretty previews
print("Top 10 after cleaning:")
display(prod.limit(10))

if "category" in prod.columns:
    print("Counts by category:")
    display(prod.groupBy("category").count().orderBy(F.col("count").desc()))


StatementMeta(, ff2ad395-0244-4bcc-813a-6ced8d2bd418, 10, Finished, Available, Finished)

✅ Wrote silver_dim_products: 176 rows
Top 10 after cleaning:


SynapseWidget(Synapse.DataFrame, 428a2fcc-e9bc-46cf-b31b-a417f482c809)

Counts by category:


SynapseWidget(Synapse.DataFrame, 48067aef-a694-4669-98a7-568a813dfe5f)

In [18]:
# Bronze -> Silver: sales
from pyspark.sql import functions as F

# 1) Read Bronze
df = spark.table("bronze_fact_sales")

# 2) Null-safe numeric helpers (create temporary numeric fields for calc)
if "qty" in df.columns:
    df = df.withColumn("qty_d", F.coalesce(F.col("qty").cast("double"), F.lit(0.0)))
else:
    df = df.withColumn("qty_d", F.lit(0.0))

if "unit_price" in df.columns:
    df = df.withColumn("unit_price_d", F.coalesce(F.col("unit_price").cast("double"), F.lit(0.0)))
else:
    df = df.withColumn("unit_price_d", F.lit(0.0))

if "discount" in df.columns:
    df = df.withColumn("discount_d", F.coalesce(F.col("discount").cast("double"), F.lit(0.0)))
else:
    df = df.withColumn("discount_d", F.lit(0.0))

if "tax_amount" in df.columns:
    df = df.withColumn("tax_amount_d", F.coalesce(F.col("tax_amount").cast("double"), F.lit(0.0)))
else:
    df = df.withColumn("tax_amount_d", F.lit(0.0))

# 3) Amounts (now guaranteed non-NULL)
df = (
    df
    .withColumn("extended_amount", F.round(F.col("qty_d") * F.col("unit_price_d"), 2))
    .withColumn("net_amount",      F.round(F.col("extended_amount") - F.col("discount_d"), 2))
    .withColumn("gross_amount",    F.round(F.col("net_amount") + F.col("tax_amount_d"), 2))
)

# 4) Currency cleanup only (no conversion)
if "currency" in df.columns:
    df = df.withColumn("currency", F.upper(F.trim(F.col("currency"))))

# 5) Drop temp helper columns
df = df.drop("qty_d", "unit_price_d", "discount_d", "tax_amount_d")

# 6) Write Silver
(df.write
   .mode("overwrite")
   .option("overwriteSchema","true")   # keeps table, replaces schema if needed
   .format("delta")
   .saveAsTable("silver_fact_sales"))

print("✅ silver_fact_sales rows:", spark.table("silver_fact_sales").count())

# 7) Quick verification — amounts should have NO NULLs
display(df.select("order_id","qty","unit_price","extended_amount","net_amount","gross_amount","currency").limit(20))


StatementMeta(, ff2ad395-0244-4bcc-813a-6ced8d2bd418, 20, Finished, Available, Finished)

✅ silver_fact_sales rows: 333500


SynapseWidget(Synapse.DataFrame, ab607884-b0e4-4271-af06-2540b4a7bf86)

In [19]:
#bronze_dim_salespersons → silver_dim_salespersons)

from pyspark.sql import functions as F
from pyspark.sql.window import Window

sp = spark.table("bronze_dim_salespersons")

# Trim all strings (inline – avoids depending on a utils cell)
for c,t in sp.dtypes:
    if t == "string":
        sp = sp.withColumn(c, F.trim(F.col(c)))

# Normalise common columns if they exist
if "email" in sp.columns:
    sp = sp.withColumn("email", F.lower(F.col("email")))
if "full_name" in sp.columns:
    sp = sp.withColumn("full_name", F.initcap(F.col("full_name")))
if "country" in sp.columns:
    sp = sp.withColumn("country", F.initcap(F.col("country")))
for c in ["quota","target","base_salary"]:
    if c in sp.columns:
        sp = sp.withColumn(c, F.col(c).cast("double"))
for c in ["is_active","active_flag"]:
    if c in sp.columns:
        sp = sp.withColumn(c, F.col(c).cast("boolean"))

# De-dupe on salesperson_id (keep most recent by created_date if it exists)
if "created_date" in sp.columns:
    sp = sp.withColumn("created_date", F.to_date(F.col("created_date")))
    w = Window.partitionBy("salesperson_id").orderBy(F.col("created_date").desc_nulls_last())
else:
    w = Window.partitionBy("salesperson_id").orderBy(F.col("salesperson_id"))
sp = sp.withColumn("rn", F.row_number().over(w)).filter("rn=1").drop("rn")

# Save
(sp.write
  .mode("overwrite")
  .format("delta")
  .option("overwriteSchema","true")
  .saveAsTable("silver_dim_salespersons"))

print("✅ silver_dim_salespersons:", spark.table("silver_dim_salespersons").count())
display(spark.table("silver_dim_salespersons").limit(10))


StatementMeta(, ff2ad395-0244-4bcc-813a-6ced8d2bd418, 21, Finished, Available, Finished)

✅ silver_dim_salespersons: 55


SynapseWidget(Synapse.DataFrame, 258851cb-e41b-4891-a5e2-882ca7c4fad5)

In [21]:
#bronze_dim_date → silver_dim_date)
from pyspark.sql import functions as F

dt = spark.table("bronze_dim_date")

# ---- Case-insensitive column lookup ----
cols = dt.columns
lc_map = {c.lower(): c for c in cols}   # lower -> actual

def has(colname): 
    return colname.lower() in lc_map

def col(colname):
    return F.col(lc_map[colname.lower()])

# ---- Build canonical `date` column from best available source ----
if has("date"):
    dt = dt.withColumn("date", F.to_date(col("date")))
elif has("calendar_date"):
    dt = dt.withColumn("date", F.to_date(col("calendar_date")))
elif has("full_date") or has("fulldate"):
    src = "full_date" if has("full_date") else "fulldate"
    # Try direct to_date; if it's already a date/timestamp, this is a no-op
    dt = dt.withColumn("date",
           F.coalesce(F.to_date(col(src)),
                      F.to_date(col(src).cast("string"), "yyyy-MM-dd"),
                      F.to_date(col(src).cast("string"), "dd/MM/yyyy"),
                      F.to_date(col(src).cast("string"), "MM/dd/yyyy")))
elif has("datekey"):
    # Typical int 20240101 or string '20240101'
    dt = dt.withColumn("date", F.to_date(col("datekey").cast("string"), "yyyyMMdd"))
else:
    raise ValueError(
        "No usable date column found (looked for: date, calendar_date, full_date/FullDate, datekey/DateKey)."
    )

# ---- Derive standard parts ----
dt = (dt
      .withColumn("year",    F.year("date"))
      .withColumn("month",   F.month("date"))
      .withColumn("day",     F.dayofmonth("date"))
      .withColumn("quarter", F.quarter("date"))
      .withColumn("week",    F.weekofyear("date"))
      .dropDuplicates(["date"])
     )

# ---- Write Silver ----
(dt.write
   .mode("overwrite")
   .format("delta")
   .option("overwriteSchema", "true")
   .saveAsTable("silver_dim_date"))

print("✅ silver_dim_date:", spark.table("silver_dim_date").count())
display(spark.table("silver_dim_date").orderBy("date").limit(10))



StatementMeta(, ff2ad395-0244-4bcc-813a-6ced8d2bd418, 23, Finished, Available, Finished)

✅ silver_dim_date: 656


SynapseWidget(Synapse.DataFrame, 981b9285-00d1-4f76-b5bf-06f9ddcfc2e9)

In [28]:
# bronze_fx_rates → silver_fx_rates (no date transformation)

# bronze_fx_rates → silver_fx_rates (keep original currency)

from pyspark.sql import functions as F

fx = spark.table("bronze_fx_rates")

fx = (fx
      .withColumnRenamed("Date", "date")              # just rename
      .withColumnRenamed("Currency", "currency")      # keep original values
      .withColumnRenamed("rate_to_USD", "rate")       # rename rate column
      .select("date","currency","rate")
     )

(fx.write
   .mode("overwrite")
   .format("delta")
   .option("overwriteSchema","true")
   .saveAsTable("silver_fx_rates"))

display(spark.table("silver_fx_rates").limit(20))


StatementMeta(, ff2ad395-0244-4bcc-813a-6ced8d2bd418, 30, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 32bf36ba-b8d7-4724-bf2c-831f6868d597)

In [2]:
spark.sql("DROP TABLE IF EXISTS silver_dimension_changes_events")


StatementMeta(, f10d53f9-cd0a-43f0-820b-44dc466e0258, 4, Finished, Available, Finished)

DataFrame[]

In [7]:
#Bronze → Silver (SCD2) for Events

from pyspark.sql import functions as F
from pyspark.sql.window import Window

# 1) Read Bronze
ch = spark.table("bronze_dimension_changes_events")

# 2) Standardize values
ch = (ch
      .withColumn("entity", F.trim(F.col("entity")))
      .withColumn("business_key", F.trim(F.col("business_key")))
      .withColumn("attribute", F.lower(F.trim(F.col("attribute"))))
      .withColumn("effective_ts", F.to_timestamp("effective_ts"))
)

# 3) Pivot: turn attribute rows into actual columns
pivoted = (ch
    .groupBy("entity","business_key","effective_ts")
    .pivot("attribute")
    .agg(F.first("new_value"))
)

# 4) Build SCD2 validity ranges
w = Window.partitionBy("entity","business_key").orderBy("effective_ts")

silver = (pivoted
    .withColumn("effective_start", F.col("effective_ts"))
    .withColumn("effective_end", F.lead("effective_ts").over(w))
    .withColumn("is_current", F.when(F.col("effective_end").isNull(), True).otherwise(False))
    .drop("effective_ts")
)

# 5) Write Silver table
(silver.write
   .mode("overwrite")
   .format("delta")
   .option("overwriteSchema","true")
   .saveAsTable("silver_dim_events"))

print("✅ silver_dim_events (SCD2):", spark.table("silver_dim_events").count())
display(spark.table("silver_dim_events").limit(20))


StatementMeta(, f10d53f9-cd0a-43f0-820b-44dc466e0258, 9, Finished, Available, Finished)

✅ silver_dim_events (SCD2): 187


SynapseWidget(Synapse.DataFrame, 77320a8b-6499-45c3-891e-2a99b6f6b7a0)