In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.window import *

In [0]:
spark.sql("USE CATALOG adventureworks")
spark.sql("USE SCHEMA silver")

In [0]:
dbutils.widgets.removeAll()

In [0]:
# Create widgets
dbutils.widgets.text("bronze_schema", "adventureworks.bronze", "Bronze Schema")
dbutils.widgets.text("silver_schema", "adventureworks.silver", "Silver Schema")

# Retrieve values
bronze_schema = dbutils.widgets.get("bronze_schema")
silver_schema = dbutils.widgets.get("silver_schema")

bronze_table = bronze_schema + ".products"
silver_table = silver_schema + ".products"

In [0]:
df = spark.read.table(bronze_table)

display(df)

## Data Cleaning

In [0]:
df = df.na.fill({"weight": 0})

Drop Nulls

In [0]:
# Drop Nulls
df = df.dropna(how="all")\
    .filter((col("product_id").isNotNull()))

Drop Duplicates

In [0]:
# Drop duplicates
df = df.drop_duplicates(["product_id"])

## Data Enrichment

In [0]:
# Add is_active flag: True when discontinue_date is null
df = df.withColumn(
    "is_active",
    F.when(F.col("discontinued_date").isNull(), F.lit(True)).otherwise(F.lit(False))
)

In [0]:
# Calculate margin
df = df.withColumn("margin", F.round(F.col("list_price") - F.col("standard_cost"), 4))

In [0]:
# Calculate margin percentage
df = df.withColumn(
    "margin_percentage",
    F.round(
        F.coalesce(try_divide(F.col("margin"), F.col("list_price")) * 100, F.lit(0)),
        2
    )
)

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# UDF to calculate price range category
def price_range(list_price):
    if list_price is None:
        return None
    elif list_price < 200:
        return "low"
    elif 200 <= list_price <= 500:
        return "mid"
    else:
        return "high"

price_range_udf = udf(price_range, StringType())

df = df.withColumn("list_price_range", price_range_udf(F.col("list_price")))

In [0]:
# Add timestamp
df = df.withColumn("ingestion_timestamp_utc", to_utc_timestamp(current_timestamp(), "UTC"))

In [0]:
df = df.select("product_id", "name"
               , "product_number"
               , "is_active"
               , "list_price"
               , "standard_cost"
               , "margin"
               , "margin_percentage"
               , "list_price_range"
               , "ingestion_timestamp_utc"
               )
df.display()

## Upsert

In [0]:
from delta.tables import DeltaTable

# Get a reference to the Delta table
deltaTable = DeltaTable.forName(spark, silver_table)

# Count rows before merge
before_count = spark.read.table(silver_table).count()
print(f"Rows before merge: {before_count}")

# Perform merge (upsert) operation
deltaTable.alias("target").merge(
    df.alias("source"),
    "target.product_id = source.product_id"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()

# Count rows after merge
after_count = spark.read.table(silver_table).count()
print(f"Rows after merge: {after_count}")

In [0]:
from pyspark.sql.functions import desc

most_recent_row = spark.table(silver_table).orderBy(desc("ingestion_timestamp_utc")).limit(1)
display(most_recent_row)