## Import Dependencies
Import the necessary functions and classes from PySpark, including `pyspark.sql.functions` for transformations and `Window` for deduplication logic.

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.window import Window

## Configure Catalog and Schema
Set the active catalog and schema for the current session. This ensures that all subsequent table references will resolve to our target `lakehouse_sales_bronze` schema without needing to fully qualify them.

In [0]:
lakehouse_catalog = "main"
bronze_schema = "lakehouse_sales_bronze"

spark.sql(f"USE CATALOG {lakehouse_catalog}")
spark.sql(f"USE SCHEMA {bronze_schema}")

## Read Bronze Layer Data
Load the raw `bronze_sales` table into a Spark DataFrame. We'll inspect its schema to confirm the initial data types before applying transformations.

In [0]:
bronze_sales = spark.table("bronze_sales")
bronze_sales.printSchema()

## Apply Schema and Type Casting
Define the Silver layer schema by selecting and transforming columns from the Bronze DataFrame. This step involves:
- Casting columns to their appropriate data types (e.g., `long` for IDs, `date` for dates).
- Adding a new `ingestion_ts` column to track when the record was processed into the Silver layer.
Using `select` centralizes schema definition, making it cleaner and more performant than chained `withColumn` calls.

In [0]:
silver_df = bronze_sales.select(
    F.col("order_id").cast("long"),
    F.to_date(F.col("order_date")).alias("order_date"),
    F.col("country").cast("string"),
    F.col("sku").cast("string"),
    F.col("qty").cast("integer"),
    F.col("unit_price").cast("double"),
    F.current_timestamp().alias("ingestion_ts")
)

silver_df.printSchema()

## Deduplicate and Handle Nulls

Clean the dataset by removing duplicates and rows with missing critical information.

- We use a window that partitions by `order_id` and sorts by `ingestion_ts` to identify the **most recent record** for each order via `row_number()`.
- For ranking functions used in deduplication (e.g., `row_number`, `rank`, `dense_rank`), **we do not define `rowsBetween(...)` explicitly**. With `ORDER BY`, Spark/Databricks already applies (and requires) the **`ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW`** frame. Specifying `UNBOUNDED FOLLOWING` conflicts with this requirement and causes a parsing error.
- When you work with **window aggregate functions** that need to scan the entire partition (e.g., a "global" `last_value` for the partition), then it may be necessary to **explicitly define the frame** (e.g., from `UNBOUNDED PRECEDING` to `UNBOUNDED FOLLOWING`), depending on the use case.
- Finally, we discard records with `null` in essential business columns (`order_id`, `order_date`, etc.) to ensure data quality.

In [0]:
window = (
    Window
    .partitionBy("order_id")
    .orderBy(F.col("ingestion_ts").desc())
)

silver_deduped_df = (silver_df
    .withColumn("rn", F.row_number().over(window))
    .filter("rn = 1")
    .drop("rn")
    .dropna(subset=["order_id", "order_date", "sku", "qty", "unit_price"])
)

## Write to Silver Layer
Create the target Silver schema if it doesn't already exist and write the cleaned, deduplicated DataFrame as a managed Delta table named `silver_sales`.
- `mode("overwrite")` ensures the table is fully replaced during development runs.
- `option("overwriteSchema", "true")` allows schema evolution if we add or modify columns in the future.

In [0]:
silver_schema = "lakehouse_sales_silver"

spark.sql(f"CREATE SCHEMA IF NOT EXISTS {lakehouse_catalog}.{silver_schema}")
spark.sql(f"USE SCHEMA {silver_schema}")

silver_sales = (
    silver_deduped_df
    .write
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable("silver_sales")
)

## Validation Query: Total Row Count
Run a simple SQL query to verify that the `silver_sales` table was created and populated with data. Counting the total rows is a basic sanity check.

In [0]:
%sql
SELECT
  COUNT(*) AS total_rows
FROM
  silver_sales;

## Validation Query: Revenue by Country
Execute a more complex aggregation to validate business logic. This query calculates total revenue per country, which helps confirm that numeric types (`qty`, `unit_price`) are correct and that aggregate functions perform as expected without errors.

In [0]:
%sql
SELECT
  country,
  SUM(qty*unit_price) AS revenue
FROM
  silver_sales
GROUP BY
  country
ORDER BY
  revenue DESC;