# Medallion Architecture Lab: Bronze ‚Üí Silver ‚Üí Gold (PySpark) ü•âü•àü•á

Welcome to the Medallion Architecture lab! In this hands-on lab, you'll build a complete data pipeline using PySpark.

---

## üéØ Learning Objectives

By the end of this lab, you will be able to:

1. ‚úÖ Understand the medallion architecture (bronze, silver, gold)
2. ‚úÖ Use **Auto Loader** to ingest raw data into bronze tables
3. ‚úÖ Use **Delta merge API** to clean and deduplicate data for silver tables
4. ‚úÖ Create aggregated business metrics in gold tables
5. ‚úÖ Build an end-to-end data pipeline using PySpark
6. ‚úÖ Apply data quality checks and transformations

---

## üèóÔ∏è What is Medallion Architecture?

The **Medallion Architecture** organizes data into three layers:

### **ü•â Bronze Layer (Raw)**
* **Purpose:** Ingest raw data as-is
* **Characteristics:** Minimal processing, append-only, full history
* **Quality:** May have duplicates, nulls, bad data
* **Method:** Auto Loader for incremental ingestion

### **ü•à Silver Layer (Cleaned)**
* **Purpose:** Clean, validate, and deduplicate
* **Characteristics:** Business-ready, no duplicates, validated
* **Quality:** High quality, consistent schema
* **Method:** Delta merge API for upserts and deduplication

### **ü•á Gold Layer (Aggregated)**
* **Purpose:** Business-level aggregations and metrics
* **Characteristics:** Optimized for analytics, pre-aggregated
* **Quality:** Report-ready, fast queries
* **Method:** PySpark aggregations and DataFrame API

---

## üìä Lab Scenario: E-Commerce Orders

You're building a data pipeline for an e-commerce company:

**Raw data:** Order files arrive in cloud storage (CSV format)

**Your pipeline:**
1. **Bronze:** Ingest raw order files with Auto Loader
2. **Silver:** Clean data, remove duplicates, validate with Delta merge
3. **Gold:** Create daily sales metrics using PySpark aggregations

---

## üõ†Ô∏è Lab Structure

This lab has **10 tasks** to complete:

**Setup (Tasks 1-2):**
1. Create volumes and generate sample order files
2. Explore the raw data

**Bronze Layer (Tasks 3-4):**
3. Create bronze table
4. Use Auto Loader to ingest raw data

**Silver Layer (Tasks 5-6):**
5. Create silver table with data quality rules
6. Use Delta merge API to clean and deduplicate

**Gold Layer (Tasks 7-8):**
7. Create gold table with daily metrics
8. Build aggregation with PySpark

**Validation (Tasks 9-10):**
9. Verify data quality across layers
10. Test incremental updates

**Each task includes:**
* üìù Clear instructions
* üí° Hints to guide you
* ‚úÖ Solutions at the end (try first!)

---

**Let's get started!** üöÄ

## Task 1: Setup - Create Volume and Generate Raw Data üõ†Ô∏è

**Your Challenge:**

Create a Unity Catalog volume and generate sample order files to simulate raw data arriving from an e-commerce system.

**Requirements:**

**Part A: Create Volume**
1. Create a volume: `main.default.ecommerce_raw_data`
2. This will store your raw CSV files

**Part B: Generate Sample Order Files**
1. Create 2 batches of order data (simulating data arriving at different times)
2. Each batch should be a CSV file with these columns:
   * `order_id` - INT
   * `customer_id` - INT
   * `order_date` - STRING (YYYY-MM-DD format)
   * `product_name` - STRING
   * `quantity` - INT
   * `unit_price` - DOUBLE
   * `status` - STRING ('completed', 'pending', 'cancelled')

**Batch 1:** 100 orders (order_id 1-100)  
**Batch 2:** 50 orders (order_id 101-150), **including 5 duplicates** from Batch 1 (order_id 1-5)

**Data quality issues to include:**
* Some null values in product_name
* Some negative quantities (data errors)
* Duplicates in Batch 2

---

**Write your code in the cell below:**

In [0]:
# TODO: Create volume and generate 2 batches of CSV files
# Batch 1: 100 orders
# Batch 2: 50 orders + 5 duplicates from Batch 1
# Include data quality issues (nulls, negatives, duplicates)



### üí° Hints for Task 1

<details>
<summary><b>Hint 1:</b> Create volume (click to expand)</summary>

```python
spark.sql("""
  CREATE VOLUME IF NOT EXISTS main.default.ecommerce_raw_data
  COMMENT 'Raw order files for medallion architecture lab'
""")
```
</details>

<details>
<summary><b>Hint 2:</b> Generate sample data (click to expand)</summary>

```python
import random
from datetime import datetime, timedelta

products = ['Laptop', 'Mouse', 'Keyboard', 'Monitor', 'Headphones', None]  # Include None for nulls
statuses = ['completed', 'pending', 'cancelled']

data_batch1 = [
    (i, 
     random.randint(1, 50),
     (datetime(2024, 1, 1) + timedelta(days=random.randint(0, 30))).strftime("%Y-%m-%d"),
     random.choice(products),
     random.randint(-2, 10),  # Include negative for errors
     round(random.uniform(10, 500), 2),
     random.choice(statuses))
    for i in range(1, 101)
]
```
</details>

<details>
<summary><b>Hint 3:</b> Write CSV files (click to expand)</summary>

```python
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

schema = StructType([
    StructField("order_id", IntegerType()),
    StructField("customer_id", IntegerType()),
    StructField("order_date", StringType()),
    StructField("product_name", StringType()),
    StructField("quantity", IntegerType()),
    StructField("unit_price", DoubleType()),
    StructField("status", StringType())
])

df = spark.createDataFrame(data, schema)
df.coalesce(1).write.mode("overwrite").option("header", "true").csv("/Volumes/main/default/ecommerce_raw_data/batch1")
```
</details>

<details>
<summary><b>Hint 4:</b> Create duplicates in Batch 2 (click to expand)</summary>

```python
# Batch 2: New orders + duplicates from Batch 1
data_batch2 = data_batch1[0:5]  # First 5 orders (duplicates)
data_batch2 += [(i, ...) for i in range(101, 151)]  # New orders
```
</details>

## Task 2: Explore the Raw Data üîç

**Your Challenge:**

Examine the raw CSV files you created to understand the data quality issues using PySpark.

**Requirements:**

1. Use `spark.read.csv()` to load the CSV files
2. Look for:
   * Null values in product_name
   * Negative quantities
   * Duplicate order_ids (between batch1 and batch2)
3. Count total rows across both batches

**Questions to answer:**
* How many total rows are in the raw files?
* How many rows have null product_name?
* How many rows have negative quantity?
* Are there duplicate order_ids?

**PySpark approach:**
```python
df = spark.read.csv(
    "/Volumes/main/default/ecommerce_raw_data/",
    header=True,
    inferSchema=True
)
```

---

**Write your code in the cell below:**

In [0]:
# TODO: Use spark.read.csv() to explore the raw CSV data
# Check for nulls, negatives, and duplicates
# Use display() to show results



### üí° Hints for Task 2

<details>
<summary><b>Hint 1:</b> Read all CSV files (click to expand)</summary>

```python
df_raw = spark.read.csv(
    "/Volumes/main/default/ecommerce_raw_data/",
    header=True,
    inferSchema=True
)

display(df_raw.limit(20))
```
</details>

<details>
<summary><b>Hint 2:</b> Count data quality issues (click to expand)</summary>

```python
from pyspark.sql.functions import col, count, when

quality_check = df_raw.select(
    count("*").alias("total_rows"),
    count(when(col("product_name").isNull(), 1)).alias("null_products"),
    count(when(col("quantity") < 0, 1)).alias("negative_quantities")
)

display(quality_check)
```
</details>

<details>
<summary><b>Hint 3:</b> Find duplicates (click to expand)</summary>

```python
duplicates = df_raw.groupBy("order_id") \
    .count() \
    .filter(col("count") > 1) \
    .orderBy("order_id")

display(duplicates)
```
</details>

---

# ü•â Bronze Layer: Raw Data Ingestion

The bronze layer stores raw data exactly as it arrives - no cleaning, no transformations.

**Characteristics:**
* Append-only (keep all data)
* Minimal processing
* May contain duplicates and errors
* Full audit trail
* Uses **Auto Loader** for incremental loading

**Auto Loader vs COPY INTO:**
* Auto Loader is the PySpark streaming approach
* Better for continuous ingestion
* Automatic schema inference and evolution
* Scalable file discovery
* Checkpoint-based progress tracking

---

## Task 3: Create Bronze Table ü•â

**Your Challenge:**

Create a Delta table for the bronze layer using PySpark.

**Requirements:**

1. Table name: `main.default.orders_bronze`
2. Columns (match the CSV structure):
   * `order_id` INT
   * `customer_id` INT
   * `order_date` STRING (we'll convert to DATE in silver)
   * `product_name` STRING
   * `quantity` INT
   * `unit_price` DOUBLE
   * `status` STRING
   * `ingestion_timestamp` TIMESTAMP (add this for tracking)
   * `_rescued_data` STRING (Auto Loader metadata column)
3. Use Delta format

**You can create the table using SQL or let Auto Loader create it automatically.**

**Bronze layer principle:** Store raw data as-is, add metadata columns for tracking.

---

**Write your code in the cell below:**

In [0]:
# TODO: Create the bronze table
# You can use spark.sql() or create a DataFrame with the schema
# Or skip this and let Auto Loader create it automatically



### üí° Hints for Task 3

<details>
<summary><b>Hint 1:</b> CREATE TABLE with SQL (click to expand)</summary>

```python
spark.sql("""
  CREATE TABLE IF NOT EXISTS main.default.orders_bronze (
    order_id INT,
    customer_id INT,
    order_date STRING,
    product_name STRING,
    quantity INT,
    unit_price DOUBLE,
    status STRING,
    ingestion_timestamp TIMESTAMP,
    _rescued_data STRING
  )
  USING DELTA
  COMMENT 'Raw order data - Bronze layer'
""")
```
</details>

<details>
<summary><b>Hint 2:</b> Or let Auto Loader create it (click to expand)</summary>

You can skip creating the table manually.
Auto Loader will create it automatically when you run the stream in Task 4.
</details>

<details>
<summary><b>Hint 3:</b> Why _rescued_data? (click to expand)</summary>

Auto Loader adds `_rescued_data` column:
* Captures data that doesn't match the schema
* Prevents data loss from schema mismatches
* Useful for debugging
</details>

## Task 4: Ingest Data with Auto Loader üì•

**Your Challenge:**

Use Auto Loader to incrementally load data from CSV files into the bronze table.

**Requirements:**

1. Use `spark.readStream.format("cloudFiles")` (Auto Loader)
2. Set options:
   * `cloudFiles.format` = "csv"
   * `cloudFiles.schemaLocation` = checkpoint path
   * `header` = "true"
3. Add `ingestion_timestamp` column with `current_timestamp()`
4. Write as a stream to the bronze table
5. Use `.trigger(availableNow=True)` for one-time processing
6. Set checkpoint location

**Auto Loader benefits:**
* Automatically discovers new files
* Tracks processed files (idempotent)
* Handles schema evolution
* Scalable for millions of files

**Expected results:**
* After first run: 155 rows (batch1 + batch2)
* Rerunning loads no duplicates (idempotent)

---

**Write your code in the cell below:**

In [0]:
# TODO: Use Auto Loader to load CSV files into bronze
# Use cloudFiles format with trigger(availableNow=True)
# Add ingestion_timestamp column



In [0]:
# TODO: Check the bronze table
# Count total rows, check for duplicates and data quality issues
# Use display() to show results



### üí° Hints for Task 4

<details>
<summary><b>Hint 1:</b> Auto Loader basic structure (click to expand)</summary>

```python
from pyspark.sql.functions import current_timestamp

df_stream = spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "csv") \
    .option("cloudFiles.schemaLocation", "/Volumes/main/default/ecommerce_raw_data/_schema") \
    .option("header", "true") \
    .load("/Volumes/main/default/ecommerce_raw_data/") \
    .withColumn("ingestion_timestamp", current_timestamp())
```
</details>

<details>
<summary><b>Hint 2:</b> Write stream to table (click to expand)</summary>

```python
df_stream.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/Volumes/main/default/ecommerce_raw_data/_checkpoint") \
    .trigger(availableNow=True) \
    .toTable("main.default.orders_bronze") \
    .awaitTermination()
```
</details>

<details>
<summary><b>Hint 3:</b> Verify data (click to expand)</summary>

```python
from pyspark.sql.functions import col, count, when, countDistinct

df_bronze = spark.table("main.default.orders_bronze")

# Count total rows
print(f"Total rows: {df_bronze.count()}")

# Check for issues
quality_metrics = df_bronze.select(
    count("*").alias("total_rows"),
    countDistinct("order_id").alias("unique_orders"),
    count(when(col("product_name").isNull(), 1)).alias("null_products"),
    count(when(col("quantity") < 0, 1)).alias("negative_quantities")
)

display(quality_metrics)
```
</details>

---

# ü•à Silver Layer: Cleaned & Validated Data

The silver layer contains cleaned, validated, and deduplicated data ready for business use.

**Transformations:**
* Remove duplicates (keep latest version)
* Filter out invalid data (nulls, negatives)
* Convert data types (STRING ‚Üí DATE)
* Add calculated columns (total_amount)
* Enforce data quality rules

**Method:** Delta merge API (PySpark equivalent of MERGE INTO)

**Delta merge API vs SQL MERGE:**
* Programmatic control in Python
* Chainable methods (.whenMatchedUpdate(), .whenNotMatchedInsert())
* Better for complex transformations
* Integrates with PySpark DataFrame operations

---

## Task 5: Create Silver Table ü•à

**Your Challenge:**

Create a cleaned version of the orders table with proper data types and calculated columns.

**Requirements:**

1. Table name: `main.default.orders_silver`
2. Columns:
   * `order_id` INT (primary key)
   * `customer_id` INT
   * `order_date` DATE (converted from STRING)
   * `product_name` STRING
   * `quantity` INT
   * `unit_price` DOUBLE
   * `total_amount` DOUBLE (calculated: quantity * unit_price)
   * `status` STRING
   * `created_at` TIMESTAMP (when first inserted)
   * `updated_at` TIMESTAMP (when last updated)
3. Use Delta format

**Key differences from bronze:**
* order_date is DATE (not STRING)
* Added total_amount (calculated column)
* Added created_at and updated_at for tracking
* No _rescued_data column

---

**Write your code in the cell below:**

In [0]:
# TODO: Create the silver table with proper data types
# Use spark.sql() or DeltaTable.create()



### üí° Hints for Task 5

<details>
<summary><b>Hint 1:</b> Using SQL (click to expand)</summary>

```python
spark.sql("""
  CREATE TABLE IF NOT EXISTS main.default.orders_silver (
    order_id INT,
    customer_id INT,
    order_date DATE,
    product_name STRING,
    quantity INT,
    unit_price DOUBLE,
    total_amount DOUBLE,
    status STRING,
    created_at TIMESTAMP,
    updated_at TIMESTAMP
  )
  USING DELTA
  COMMENT 'Cleaned and validated orders - Silver layer'
""")
```
</details>

<details>
<summary><b>Hint 2:</b> Using DeltaTable API (click to expand)</summary>

```python
from delta.tables import DeltaTable
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DateType, TimestampType

schema = StructType([
    StructField("order_id", IntegerType()),
    StructField("customer_id", IntegerType()),
    StructField("order_date", DateType()),
    StructField("product_name", StringType()),
    StructField("quantity", IntegerType()),
    StructField("unit_price", DoubleType()),
    StructField("total_amount", DoubleType()),
    StructField("status", StringType()),
    StructField("created_at", TimestampType()),
    StructField("updated_at", TimestampType())
])

DeltaTable.createIfNotExists(spark) \
    .tableName("main.default.orders_silver") \
    .addColumns(schema) \
    .comment("Cleaned and validated orders - Silver layer") \
    .execute()
```
</details>

## Task 6: Clean and Load Silver with Delta Merge API ‚ú®

**Your Challenge:**

Use Delta merge API to load cleaned data from bronze to silver.

**Requirements:**

**Data Quality Rules:**
1. **Remove duplicates** - Keep only one row per order_id (latest by ingestion_timestamp)
2. **Filter out invalid data:**
   * Skip rows where product_name IS NULL
   * Skip rows where quantity <= 0
   * Skip rows where unit_price <= 0
3. **Transform data:**
   * Convert order_date from STRING to DATE
   * Calculate total_amount = quantity * unit_price
4. **Use Delta merge API:**
   * `.whenMatchedUpdate()` - Update existing orders
   * `.whenNotMatchedInsert()` - Insert new orders

**Steps:**
1. Read bronze table and apply transformations
2. Deduplicate using window functions
3. Use DeltaTable.forName().merge() API
4. Set created_at on INSERT, updated_at on both

**Expected results:**
* Should have ~145 rows (155 minus ~10 invalid rows)
* No duplicates (order_id is unique)
* All data quality rules applied

---

**Write your code in the cell below:**

In [0]:
# TODO: Clean bronze data and use Delta merge API
# 1. Read bronze, filter invalid data, deduplicate
# 2. Use DeltaTable.forName().merge() with whenMatchedUpdate and whenNotMatchedInsert



In [0]:
# TODO: Verify silver table
# Check row count, no duplicates, no invalid data



### üí° Hints for Task 6

<details>
<summary><b>Hint 1:</b> Clean and deduplicate (click to expand)</summary>

```python
from pyspark.sql.functions import col, to_date, current_timestamp, row_number
from pyspark.sql.window import Window

# Read and clean bronze data
df_bronze = spark.table("main.default.orders_bronze")

# Filter invalid data
df_cleaned = df_bronze.filter(
    col("product_name").isNotNull() &
    (col("quantity") > 0) &
    (col("unit_price") > 0)
)

# Deduplicate (keep latest)
window_spec = Window.partitionBy("order_id").orderBy(col("ingestion_timestamp").desc())
df_deduped = df_cleaned.withColumn("rn", row_number().over(window_spec)) \
    .filter(col("rn") == 1) \
    .drop("rn")

# Transform
df_transformed = df_deduped \
    .withColumn("order_date", to_date(col("order_date"))) \
    .withColumn("total_amount", col("quantity") * col("unit_price")) \
    .drop("ingestion_timestamp", "_rescued_data")
```
</details>

<details>
<summary><b>Hint 2:</b> Delta merge API structure (click to expand)</summary>

```python
from delta.tables import DeltaTable

delta_silver = DeltaTable.forName(spark, "main.default.orders_silver")

delta_silver.alias("target").merge(
    df_transformed.alias("source"),
    "target.order_id = source.order_id"
).whenMatchedUpdate(set = {
    "customer_id": "source.customer_id",
    "order_date": "source.order_date",
    # ... other columns
    "updated_at": "current_timestamp()"
}).whenNotMatchedInsert(values = {
    "order_id": "source.order_id",
    # ... all columns
    "created_at": "current_timestamp()",
    "updated_at": "current_timestamp()"
}).execute()
```
</details>

<details>
<summary><b>Hint 3:</b> Complete merge (click to expand)</summary>

You need to:
1. Clean and transform bronze data
2. Create DeltaTable reference
3. Call merge() with join condition
4. Chain whenMatchedUpdate() and whenNotMatchedInsert()
5. Call execute() to run the merge
</details>

---

# ü•á Gold Layer: Business Metrics

The gold layer contains aggregated, business-ready data optimized for analytics and reporting.

**Characteristics:**
* Pre-aggregated metrics
* Optimized for dashboards
* Fast query performance
* Business-friendly column names
* Often materialized views or summary tables

**Method:** PySpark DataFrame aggregations with groupBy() and agg()

**Common patterns:**
* Daily/monthly aggregations
* Customer metrics
* Product performance
* KPIs and business metrics

---

## Task 7: Create Gold Table ü•á

**Your Challenge:**

Create a gold table for daily sales metrics.

**Requirements:**

1. Table name: `main.default.daily_sales_gold`
2. Columns:
   * `order_date` DATE
   * `total_orders` INT
   * `total_revenue` DOUBLE
   * `avg_order_value` DOUBLE
   * `total_quantity_sold` INT
   * `unique_customers` INT
   * `completed_orders` INT
   * `cancelled_orders` INT
   * `updated_at` TIMESTAMP
3. Use Delta format

**This table will store one row per date with aggregated metrics.**

---

**Write your code in the cell below:**

In [0]:
# TODO: Create the gold table for daily metrics
# Use spark.sql() or DeltaTable.create()



### üí° Hints for Task 7

<details>
<summary><b>Hint 1:</b> Using SQL (click to expand)</summary>

```python
spark.sql("""
  CREATE TABLE IF NOT EXISTS main.default.daily_sales_gold (
    order_date DATE,
    total_orders INT,
    total_revenue DOUBLE,
    avg_order_value DOUBLE,
    total_quantity_sold INT,
    unique_customers INT,
    completed_orders INT,
    cancelled_orders INT,
    updated_at TIMESTAMP
  )
  USING DELTA
  COMMENT 'Daily sales metrics - Gold layer'
""")
```
</details>

<details>
<summary><b>Hint 2:</b> Why these metrics? (click to expand)</summary>

These are common business metrics:
* `total_orders` - Volume metric
* `total_revenue` - Financial metric
* `avg_order_value` - Performance metric
* `unique_customers` - Customer metric
* `completed_orders` / `cancelled_orders` - Status metrics
</details>

## Task 8: Populate Gold Table with PySpark Aggregations üìà

**Your Challenge:**

Use PySpark DataFrame API to calculate daily metrics from silver and write to gold.

**Requirements:**

1. Read the silver table
2. Group by order_date using `.groupBy()`
3. Calculate metrics using `.agg()`:
   * `total_orders` - count()
   * `total_revenue` - sum("total_amount")
   * `avg_order_value` - avg("total_amount")
   * `total_quantity_sold` - sum("quantity")
   * `unique_customers` - countDistinct("customer_id")
   * `completed_orders` - sum(when(status='completed', 1))
   * `cancelled_orders` - sum(when(status='cancelled', 1))
4. Add updated_at column with current_timestamp()
5. Write using `.write.mode("overwrite").saveAsTable()`

**PySpark aggregation functions:**
* `count()`, `sum()`, `avg()`, `countDistinct()`
* `when()` for conditional logic
* `round()` for formatting

---

**Write your code in the cell below:**

In [0]:
# TODO: Read silver, group by order_date, calculate metrics
# Use .agg() with multiple aggregation functions
# Write with mode("overwrite")



In [0]:
# TODO: Query the gold table to see daily metrics
# Use display() and orderBy("order_date")



### üí° Hints for Task 8

<details>
<summary><b>Hint 1:</b> Import functions (click to expand)</summary>

```python
from pyspark.sql.functions import (
    col, count, sum, avg, countDistinct, 
    when, round, current_timestamp
)
```
</details>

<details>
<summary><b>Hint 2:</b> Aggregation structure (click to expand)</summary>

```python
df_silver = spark.table("main.default.orders_silver")

df_gold = df_silver.groupBy("order_date").agg(
    count("*").alias("total_orders"),
    round(sum("total_amount"), 2).alias("total_revenue"),
    round(avg("total_amount"), 2).alias("avg_order_value"),
    sum("quantity").alias("total_quantity_sold"),
    countDistinct("customer_id").alias("unique_customers"),
    sum(when(col("status") == "completed", 1).otherwise(0)).alias("completed_orders"),
    sum(when(col("status") == "cancelled", 1).otherwise(0)).alias("cancelled_orders")
).withColumn("updated_at", current_timestamp())
```
</details>

<details>
<summary><b>Hint 3:</b> Write to table (click to expand)</summary>

```python
df_gold.write.mode("overwrite").saveAsTable("main.default.daily_sales_gold")

print("‚úÖ Gold table populated")
```
</details>

---

# ‚úÖ Validation: Verify Your Pipeline

Let's verify that your medallion architecture pipeline works correctly!

---

## Task 9: Validate Data Quality Across Layers üîç

**Your Challenge:**

Verify that data quality improves as it flows through the pipeline using PySpark.

**Requirements:**

Create a DataFrame that compares all three layers:

1. **Bronze metrics:**
   * Total rows
   * Duplicate count
   * Null product_name count
   * Negative quantity count

2. **Silver metrics:**
   * Total rows (should be less than bronze)
   * Duplicate count (should be 0)
   * Null count (should be 0)
   * Negative count (should be 0)

3. **Gold metrics:**
   * Total rows (number of unique dates)
   * Total revenue sum

**Expected results:**
* Bronze: ~155 rows with issues
* Silver: ~145 rows, clean
* Gold: ~30 rows (one per date)

**Use `.union()` to combine metrics from all three layers.**

---

**Write your code in the cell below:**

In [0]:
# TODO: Create DataFrames with metrics from each layer
# Use union() to combine them
# Use display() to show the comparison



### üí° Hints for Task 9

<details>
<summary><b>Hint 1:</b> Calculate metrics for each layer (click to expand)</summary>

```python
from pyspark.sql.functions import lit, count, countDistinct, when, col

# Bronze metrics
df_bronze = spark.table("main.default.orders_bronze")
bronze_metrics = df_bronze.select(
    lit("Bronze").alias("layer"),
    count("*").alias("total_rows"),
    (count("*") - countDistinct("order_id")).alias("duplicates"),
    count(when(col("product_name").isNull(), 1)).alias("null_products"),
    count(when(col("quantity") < 0, 1)).alias("negative_quantities")
)

# Similar for silver and gold...
```
</details>

<details>
<summary><b>Hint 2:</b> Union all layers (click to expand)</summary>

```python
# Combine all metrics
all_metrics = bronze_metrics \
    .union(silver_metrics) \
    .union(gold_metrics)

display(all_metrics)
```
</details>

<details>
<summary><b>Hint 3:</b> What to verify (click to expand)</summary>

**Bronze should have:**
* More rows than silver
* Duplicates present
* Null values present
* Negative quantities present

**Silver should have:**
* Fewer rows (invalid data removed)
* No duplicates
* No nulls
* No negative values

**Gold should have:**
* Much fewer rows (aggregated by date)
* Summary metrics only
</details>

## Task 10: Test Incremental Updates üîÑ

**Your Challenge:**

Test that your pipeline handles new data correctly.

**Requirements:**

**Part A: Add new raw data**
1. Generate a 3rd batch of orders (order_id 151-200)
2. Write to `/Volumes/main/default/ecommerce_raw_data/batch3/`

**Part B: Run the pipeline**
1. Run Auto Loader again to load batch3 into bronze
2. Run Delta merge to update silver
3. Recalculate gold metrics

**Part C: Verify**
1. Check row counts increased appropriately
2. Verify no duplicates in silver
3. Verify gold metrics updated

**This tests the end-to-end incremental processing!**

---

**Write your code in the cells below:**

In [0]:
# TODO: Generate batch 3 data (50 more orders, order_id 151-200)
# Write to batch3 directory



In [0]:
# TODO: Run Auto Loader again to load batch3
# Same code as Task 4



In [0]:
# TODO: Run your Delta merge code again to update silver
# Same code as Task 6



In [0]:
# TODO: Recalculate gold metrics
# Same aggregation code as Task 8



In [0]:
# TODO: Check row counts in all three layers
# Bronze: should have ~205 rows
# Silver: should have ~195 rows
# Gold: should have ~30 rows (dates)



### üí° Hints for Task 10

<details>
<summary><b>Hint 1:</b> Generate batch 3 (click to expand)</summary>

```python
# Similar to Task 1, but different ID range
data_batch3 = [
    (i, 
     random.randint(1, 50),
     (datetime(2024, 1, 1) + timedelta(days=random.randint(0, 30))).strftime("%Y-%m-%d"),
     random.choice(['Laptop', 'Mouse', 'Keyboard', 'Monitor', 'Headphones']),
     random.randint(1, 10),
     round(random.uniform(10, 500), 2),
     random.choice(['completed', 'pending', 'cancelled']))
    for i in range(151, 201)
]

df_batch3 = spark.createDataFrame(data_batch3, schema)
df_batch3.coalesce(1).write.mode("overwrite").option("header", "true").csv("/Volumes/main/default/ecommerce_raw_data/batch3")
```
</details>

<details>
<summary><b>Hint 2:</b> Rerun the pipeline (click to expand)</summary>

Just run the same code again:
1. Auto Loader for bronze (automatically loads only batch3)
2. Delta merge for silver (processes new bronze data)
3. Aggregation for gold (recalculates all metrics)
</details>

<details>
<summary><b>Hint 3:</b> Verify counts (click to expand)</summary>

```python
from pyspark.sql.functions import lit

bronze_count = spark.table("main.default.orders_bronze").select(lit("Bronze").alias("layer"), count("*").alias("row_count"))
silver_count = spark.table("main.default.orders_silver").select(lit("Silver").alias("layer"), count("*").alias("row_count"))
gold_count = spark.table("main.default.daily_sales_gold").select(lit("Gold").alias("layer"), count("*").alias("row_count"))

all_counts = bronze_count.union(silver_count).union(gold_count)
display(all_counts)
```
</details>

---
---
---

# üìù Complete Solutions

**‚ö†Ô∏è Only look at these if you're stuck or want to verify your work!**

Try to solve the challenges yourself first. Learning happens through problem-solving!

---

## ‚úÖ Solution: Task 1 (Setup)

<details>
<summary><b>Click to reveal solution</b></summary>

```python
import random
from datetime import datetime, timedelta
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

# Create volume
spark.sql("""
  CREATE VOLUME IF NOT EXISTS main.default.ecommerce_raw_data
  COMMENT 'Raw order files for medallion architecture lab'
""")

print("‚úÖ Volume created")

# Define schema
schema = StructType([
    StructField("order_id", IntegerType()),
    StructField("customer_id", IntegerType()),
    StructField("order_date", StringType()),
    StructField("product_name", StringType()),
    StructField("quantity", IntegerType()),
    StructField("unit_price", DoubleType()),
    StructField("status", StringType())
])

products = ['Laptop', 'Mouse', 'Keyboard', 'Monitor', 'Headphones', None]  # Include None
statuses = ['completed', 'pending', 'cancelled']

# Batch 1: 100 orders
data_batch1 = [
    (i, 
     random.randint(1, 50),
     (datetime(2024, 1, 1) + timedelta(days=random.randint(0, 30))).strftime("%Y-%m-%d"),
     random.choice(products),
     random.randint(-2, 10),  # Include negatives
     round(random.uniform(10, 500), 2),
     random.choice(statuses))
    for i in range(1, 101)
]

df_batch1 = spark.createDataFrame(data_batch1, schema)
df_batch1.coalesce(1).write.mode("overwrite").option("header", "true").csv("/Volumes/main/default/ecommerce_raw_data/batch1")
print("‚úÖ Batch 1 created: 100 orders")

# Batch 2: 50 new orders + 5 duplicates
data_batch2 = data_batch1[0:5]  # Duplicates
data_batch2 += [
    (i, 
     random.randint(1, 50),
     (datetime(2024, 1, 1) + timedelta(days=random.randint(0, 30))).strftime("%Y-%m-%d"),
     random.choice(products),
     random.randint(-2, 10),
     round(random.uniform(10, 500), 2),
     random.choice(statuses))
    for i in range(101, 151)
]

df_batch2 = spark.createDataFrame(data_batch2, schema)
df_batch2.coalesce(1).write.mode("overwrite").option("header", "true").csv("/Volumes/main/default/ecommerce_raw_data/batch2")
print("‚úÖ Batch 2 created: 50 orders + 5 duplicates")

print("\n‚úÖ Setup complete! Raw data files ready.")
```

</details>

## ‚úÖ Solution: Task 2 (Explore Raw Data)

<details>
<summary><b>Click to reveal solution</b></summary>

**View raw data:**
```python
df_raw = spark.read.csv(
    "/Volumes/main/default/ecommerce_raw_data/",
    header=True,
    inferSchema=True
)

print("Sample raw data:")
display(df_raw.limit(20))
```

**Check data quality issues:**
```python
from pyspark.sql.functions import col, count, countDistinct, when

quality_check = df_raw.select(
    count("*").alias("total_rows"),
    countDistinct("order_id").alias("unique_orders"),
    (count("*") - countDistinct("order_id")).alias("duplicates"),
    count(when(col("product_name").isNull(), 1)).alias("null_products"),
    count(when(col("quantity") < 0, 1)).alias("negative_quantities")
)

print("Data quality issues:")
display(quality_check)
```

**Find duplicate order_ids:**
```python
duplicates = df_raw.groupBy("order_id") \
    .count() \
    .filter(col("count") > 1) \
    .orderBy("order_id")

print("Duplicate order_ids:")
display(duplicates)
```

**Expected findings:**
* Total: 155 rows
* Duplicates: 5 (order_id 1-5)
* Nulls: ~10-15 rows
* Negatives: ~10-15 rows

</details>

## ‚úÖ Solution: Task 3 (Create Bronze Table)

<details>
<summary><b>Click to reveal solution</b></summary>

**Option 1: Using SQL**
```python
spark.sql("""
  CREATE TABLE IF NOT EXISTS main.default.orders_bronze (
    order_id INT,
    customer_id INT,
    order_date STRING,
    product_name STRING,
    quantity INT,
    unit_price DOUBLE,
    status STRING,
    ingestion_timestamp TIMESTAMP,
    _rescued_data STRING
  )
  USING DELTA
  COMMENT 'Raw order data - Bronze layer'
""")

print("‚úÖ Bronze table created")
```

**Option 2: Let Auto Loader create it**
```python
# Skip table creation - Auto Loader will create it automatically
print("‚úÖ Will let Auto Loader create the table")
```

**Key points:**
* `order_date` is STRING (raw format from CSV)
* Added `ingestion_timestamp` for tracking
* Added `_rescued_data` for Auto Loader
* Using DELTA format for ACID properties

</details>

## ‚úÖ Solution: Task 4 (Ingest with Auto Loader)

<details>
<summary><b>Click to reveal solution</b></summary>

**Load data with Auto Loader:**
```python
from pyspark.sql.functions import current_timestamp

# Read stream with Auto Loader
df_stream = spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "csv") \
    .option("cloudFiles.schemaLocation", "/Volumes/main/default/ecommerce_raw_data/_schema") \
    .option("header", "true") \
    .load("/Volumes/main/default/ecommerce_raw_data/") \
    .withColumn("ingestion_timestamp", current_timestamp())

# Write stream to bronze table
query = df_stream.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/Volumes/main/default/ecommerce_raw_data/_checkpoint") \
    .trigger(availableNow=True) \
    .toTable("main.default.orders_bronze")

query.awaitTermination()

print("‚úÖ Data loaded to bronze")
```

**Verify:**
```python
from pyspark.sql.functions import col, count, countDistinct, when

df_bronze = spark.table("main.default.orders_bronze")

print(f"Total rows: {df_bronze.count()}")

# Check data quality
quality_metrics = df_bronze.select(
    count("*").alias("total_rows"),
    countDistinct("order_id").alias("unique_orders"),
    count(when(col("product_name").isNull(), 1)).alias("null_products"),
    count(when(col("quantity") < 0, 1)).alias("negative_quantities")
)

display(quality_metrics)
```

**Key concepts:**
* Auto Loader automatically tracks processed files
* `trigger(availableNow=True)` processes all available files once
* Checkpoint ensures idempotency
* `_rescued_data` captures schema mismatches
* Bronze contains ALL data (including bad data)

</details>

## ‚úÖ Solution: Task 5 (Create Silver Table)

<details>
<summary><b>Click to reveal solution</b></summary>

```python
spark.sql("""
  CREATE TABLE IF NOT EXISTS main.default.orders_silver (
    order_id INT,
    customer_id INT,
    order_date DATE,
    product_name STRING,
    quantity INT,
    unit_price DOUBLE,
    total_amount DOUBLE,
    status STRING,
    created_at TIMESTAMP,
    updated_at TIMESTAMP
  )
  USING DELTA
  COMMENT 'Cleaned and validated orders - Silver layer'
""")

print("‚úÖ Silver table created")
```

**Key differences from bronze:**
* `order_date` is DATE (not STRING)
* Added `total_amount` (calculated column)
* Added `created_at` and `updated_at` for tracking
* No ingestion_timestamp or _rescued_data

</details>

## ‚úÖ Solution: Task 6 (Clean and Load Silver)

<details>
<summary><b>Click to reveal solution</b></summary>

```python
from delta.tables import DeltaTable
from pyspark.sql.functions import col, to_date, current_timestamp, row_number
from pyspark.sql.window import Window

# Step 1: Read and clean bronze data
df_bronze = spark.table("main.default.orders_bronze")

# Step 2: Filter invalid data
df_cleaned = df_bronze.filter(
    col("product_name").isNotNull() &
    (col("quantity") > 0) &
    (col("unit_price") > 0)
)

# Step 3: Deduplicate (keep latest by ingestion_timestamp)
window_spec = Window.partitionBy("order_id").orderBy(col("ingestion_timestamp").desc())
df_deduped = df_cleaned.withColumn("rn", row_number().over(window_spec)) \
    .filter(col("rn") == 1) \
    .drop("rn")

# Step 4: Transform data
df_transformed = df_deduped \
    .withColumn("order_date", to_date(col("order_date"))) \
    .withColumn("total_amount", col("quantity") * col("unit_price")) \
    .drop("ingestion_timestamp", "_rescued_data") \
    .select(
        "order_id", "customer_id", "order_date", "product_name",
        "quantity", "unit_price", "total_amount", "status"
    )

# Step 5: Merge into silver using Delta merge API
delta_silver = DeltaTable.forName(spark, "main.default.orders_silver")

delta_silver.alias("target").merge(
    df_transformed.alias("source"),
    "target.order_id = source.order_id"
).whenMatchedUpdate(set = {
    "customer_id": "source.customer_id",
    "order_date": "source.order_date",
    "product_name": "source.product_name",
    "quantity": "source.quantity",
    "unit_price": "source.unit_price",
    "total_amount": "source.total_amount",
    "status": "source.status",
    "updated_at": "current_timestamp()"
}).whenNotMatchedInsert(values = {
    "order_id": "source.order_id",
    "customer_id": "source.customer_id",
    "order_date": "source.order_date",
    "product_name": "source.product_name",
    "quantity": "source.quantity",
    "unit_price": "source.unit_price",
    "total_amount": "source.total_amount",
    "status": "source.status",
    "created_at": "current_timestamp()",
    "updated_at": "current_timestamp()"
}).execute()

print("‚úÖ Silver table updated")
```

**Verify:**
```python
df_silver = spark.table("main.default.orders_silver")

quality_check = df_silver.select(
    count("*").alias("total_rows"),
    countDistinct("order_id").alias("unique_orders"),
    count(when(col("product_name").isNull(), 1)).alias("null_products"),
    count(when(col("quantity") <= 0, 1)).alias("invalid_quantities")
)

display(quality_check)
```

**What this does:**
1. Filters out invalid data (nulls, negatives)
2. Window function deduplicates (keeps latest)
3. Converts order_date to DATE type
4. Calculates total_amount
5. Delta merge API upserts into silver
6. Sets created_at on INSERT, updated_at on both

**Expected results:**
* ~145 rows (155 minus ~10 invalid)
* 0 duplicates
* 0 nulls
* 0 negative quantities

</details>

## ‚úÖ Solution: Task 7 (Create Gold Table)

<details>
<summary><b>Click to reveal solution</b></summary>

```python
spark.sql("""
  CREATE TABLE IF NOT EXISTS main.default.daily_sales_gold (
    order_date DATE,
    total_orders INT,
    total_revenue DOUBLE,
    avg_order_value DOUBLE,
    total_quantity_sold INT,
    unique_customers INT,
    completed_orders INT,
    cancelled_orders INT,
    updated_at TIMESTAMP
  )
  USING DELTA
  COMMENT 'Daily sales metrics - Gold layer'
""")

print("‚úÖ Gold table created")
```

**Key points:**
* One row per order_date
* All metrics are aggregated
* Business-friendly column names
* Ready for dashboards and reports

</details>

## ‚úÖ Solution: Task 8 (Populate Gold Table)

<details>
<summary><b>Click to reveal solution</b></summary>

```python
from pyspark.sql.functions import (
    col, count, sum, avg, countDistinct, 
    when, round, current_timestamp
)

# Read silver table
df_silver = spark.table("main.default.orders_silver")

# Calculate daily metrics
df_gold = df_silver.groupBy("order_date").agg(
    count("*").alias("total_orders"),
    round(sum("total_amount"), 2).alias("total_revenue"),
    round(avg("total_amount"), 2).alias("avg_order_value"),
    sum("quantity").alias("total_quantity_sold"),
    countDistinct("customer_id").alias("unique_customers"),
    sum(when(col("status") == "completed", 1).otherwise(0)).alias("completed_orders"),
    sum(when(col("status") == "cancelled", 1).otherwise(0)).alias("cancelled_orders")
).withColumn("updated_at", current_timestamp()) \
 .orderBy("order_date")

# Write to gold table (overwrite mode for full refresh)
df_gold.write.mode("overwrite").saveAsTable("main.default.daily_sales_gold")

print("‚úÖ Gold table populated")
```

**Verify:**
```python
df_gold_check = spark.table("main.default.daily_sales_gold")
print(f"Total dates: {df_gold_check.count()}")
display(df_gold_check.orderBy("order_date"))
```

**What this does:**
1. Groups by order_date
2. Uses .agg() with multiple aggregation functions
3. Uses when() for conditional counts
4. round() for clean numbers
5. mode("overwrite") replaces all data (full refresh)

**Expected results:**
* ~30 rows (one per unique date)
* Each row has aggregated metrics for that day
* Ready for dashboard visualization

</details>

## ‚úÖ Solution: Task 9 (Data Quality Validation)

<details>
<summary><b>Click to reveal solution</b></summary>

```python
from pyspark.sql.functions import lit, count, countDistinct, when, col

# Bronze metrics
df_bronze = spark.table("main.default.orders_bronze")
bronze_metrics = df_bronze.select(
    lit("Bronze").alias("layer"),
    count("*").alias("total_rows"),
    countDistinct("order_id").alias("unique_orders"),
    (count("*") - countDistinct("order_id")).alias("duplicates"),
    count(when(col("product_name").isNull(), 1)).alias("null_products"),
    count(when(col("quantity") < 0, 1)).alias("negative_quantities")
)

# Silver metrics
df_silver = spark.table("main.default.orders_silver")
silver_metrics = df_silver.select(
    lit("Silver").alias("layer"),
    count("*").alias("total_rows"),
    countDistinct("order_id").alias("unique_orders"),
    (count("*") - countDistinct("order_id")).alias("duplicates"),
    count(when(col("product_name").isNull(), 1)).alias("null_products"),
    count(when(col("quantity") < 0, 1)).alias("negative_quantities")
)

# Gold metrics
df_gold = spark.table("main.default.daily_sales_gold")
gold_metrics = df_gold.select(
    lit("Gold").alias("layer"),
    count("*").alias("total_rows"),
    lit(0).alias("unique_orders"),
    lit(0).alias("duplicates"),
    lit(0).alias("null_products"),
    lit(0).alias("negative_quantities")
)

# Combine all metrics
all_metrics = bronze_metrics.union(silver_metrics).union(gold_metrics)

print("Data quality comparison across layers:")
display(all_metrics)
```

**Expected results:**

| layer | total_rows | unique_orders | duplicates | null_products | negative_quantities |
|-------|------------|---------------|------------|---------------|--------------------|
| Bronze | 155 | 150 | 5 | ~10 | ~10 |
| Silver | ~145 | ~145 | 0 | 0 | 0 |
| Gold | ~30 | 0 | 0 | 0 | 0 |

**Key insights:**
* Bronze has all issues (duplicates, nulls, negatives)
* Silver is clean (all issues removed)
* Gold is aggregated (much fewer rows)

</details>

## ‚úÖ Solution: Task 10 (Test Incremental Updates)

<details>
<summary><b>Click to reveal solution</b></summary>

**Generate Batch 3:**
```python
# Generate 50 more orders
data_batch3 = [
    (i, 
     random.randint(1, 50),
     (datetime(2024, 1, 1) + timedelta(days=random.randint(0, 30))).strftime("%Y-%m-%d"),
     random.choice(['Laptop', 'Mouse', 'Keyboard', 'Monitor', 'Headphones']),
     random.randint(1, 10),
     round(random.uniform(10, 500), 2),
     random.choice(['completed', 'pending', 'cancelled']))
    for i in range(151, 201)
]

df_batch3 = spark.createDataFrame(data_batch3, schema)
df_batch3.coalesce(1).write.mode("overwrite").option("header", "true").csv("/Volumes/main/default/ecommerce_raw_data/batch3")
print("‚úÖ Batch 3 created: 50 orders")
```

**Load to Bronze (rerun Auto Loader):**
```python
# Same Auto Loader code from Task 4
df_stream = spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "csv") \
    .option("cloudFiles.schemaLocation", "/Volumes/main/default/ecommerce_raw_data/_schema") \
    .option("header", "true") \
    .load("/Volumes/main/default/ecommerce_raw_data/") \
    .withColumn("ingestion_timestamp", current_timestamp())

query = df_stream.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/Volumes/main/default/ecommerce_raw_data/_checkpoint") \
    .trigger(availableNow=True) \
    .toTable("main.default.orders_bronze")

query.awaitTermination()
print("‚úÖ Batch 3 loaded to bronze")
```

**Update Silver (rerun Delta merge):**
```python
# Same cleaning and merge code from Task 6
df_bronze = spark.table("main.default.orders_bronze")

df_cleaned = df_bronze.filter(
    col("product_name").isNotNull() &
    (col("quantity") > 0) &
    (col("unit_price") > 0)
)

window_spec = Window.partitionBy("order_id").orderBy(col("ingestion_timestamp").desc())
df_deduped = df_cleaned.withColumn("rn", row_number().over(window_spec)) \
    .filter(col("rn") == 1) \
    .drop("rn")

df_transformed = df_deduped \
    .withColumn("order_date", to_date(col("order_date"))) \
    .withColumn("total_amount", col("quantity") * col("unit_price")) \
    .drop("ingestion_timestamp", "_rescued_data") \
    .select(
        "order_id", "customer_id", "order_date", "product_name",
        "quantity", "unit_price", "total_amount", "status"
    )

delta_silver = DeltaTable.forName(spark, "main.default.orders_silver")

delta_silver.alias("target").merge(
    df_transformed.alias("source"),
    "target.order_id = source.order_id"
).whenMatchedUpdate(set = {
    "customer_id": "source.customer_id",
    "order_date": "source.order_date",
    "product_name": "source.product_name",
    "quantity": "source.quantity",
    "unit_price": "source.unit_price",
    "total_amount": "source.total_amount",
    "status": "source.status",
    "updated_at": "current_timestamp()"
}).whenNotMatchedInsert(values = {
    "order_id": "source.order_id",
    "customer_id": "source.customer_id",
    "order_date": "source.order_date",
    "product_name": "source.product_name",
    "quantity": "source.quantity",
    "unit_price": "source.unit_price",
    "total_amount": "source.total_amount",
    "status": "source.status",
    "created_at": "current_timestamp()",
    "updated_at": "current_timestamp()"
}).execute()

print("‚úÖ Silver updated")
```

**Update Gold (rerun aggregation):**
```python
# Same aggregation code from Task 8
df_silver = spark.table("main.default.orders_silver")

df_gold = df_silver.groupBy("order_date").agg(
    count("*").alias("total_orders"),
    round(sum("total_amount"), 2).alias("total_revenue"),
    round(avg("total_amount"), 2).alias("avg_order_value"),
    sum("quantity").alias("total_quantity_sold"),
    countDistinct("customer_id").alias("unique_customers"),
    sum(when(col("status") == "completed", 1).otherwise(0)).alias("completed_orders"),
    sum(when(col("status") == "cancelled", 1).otherwise(0)).alias("cancelled_orders")
).withColumn("updated_at", current_timestamp())

df_gold.write.mode("overwrite").saveAsTable("main.default.daily_sales_gold")
print("‚úÖ Gold updated")
```

**Verify:**
```python
from pyspark.sql.functions import lit

bronze_count = spark.table("main.default.orders_bronze").select(lit("Bronze").alias("layer"), count("*").alias("row_count"))
silver_count = spark.table("main.default.orders_silver").select(lit("Silver").alias("layer"), count("*").alias("row_count"))
gold_count = spark.table("main.default.daily_sales_gold").select(lit("Gold").alias("layer"), count("*").alias("row_count"))

all_counts = bronze_count.union(silver_count).union(gold_count)
display(all_counts)
```

**Expected results:**
* Bronze: ~205 rows (155 + 50)
* Silver: ~195 rows (cleaned)
* Gold: ~30 rows (dates)

**Key insight:** The pipeline is reusable - just rerun the same code for new data!

</details>

## üîÑ Common Pipeline Patterns

### **Pattern 1: Batch Pipeline (This Lab)**

```
Raw Files ‚Üí Bronze (Auto Loader) ‚Üí Silver (Delta merge) ‚Üí Gold (PySpark agg)
```

**Schedule:**
* Bronze: Every hour (ingest new files)
* Silver: Every hour (clean new data)
* Gold: Daily (aggregate for reports)

**Code pattern:**
```python
# Bronze
spark.readStream.format("cloudFiles").load(...).writeStream.toTable(...)

# Silver
DeltaTable.forName(...).merge(...).whenMatchedUpdate(...).execute()

# Gold
df.groupBy(...).agg(...).write.mode("overwrite").saveAsTable(...)
```

---

### **Pattern 2: Streaming Pipeline**

```
Raw Files ‚Üí Bronze (Auto Loader) ‚Üí Silver (Stream) ‚Üí Gold (Stream)
```

**Use when:**
* Need real-time data
* Continuous file arrival
* Low latency requirements

**Code pattern:**
```python
# All layers use streaming
spark.readStream...writeStream.trigger(processingTime="1 minute")
```

---

### **Pattern 3: CDC Pipeline**

```
Change Data ‚Üí Bronze (Auto Loader) ‚Üí Silver (MERGE with SCD) ‚Üí Gold (Agg)
```

**Use when:**
* Capturing database changes
* Need historical tracking
* SCD Type 2 requirements

---

### **This Lab's Pipeline**

```
CSV Files in Volume
    ‚Üì
    Auto Loader (streaming)
    ‚Üì
ü•â Bronze: orders_bronze (155 rows, raw data)
    ‚Üì
    Delta merge API (clean, dedupe)
    ‚Üì
ü•à Silver: orders_silver (~145 rows, validated)
    ‚Üì
    PySpark aggregations
    ‚Üì
ü•á Gold: daily_sales_gold (~30 rows, metrics)
    ‚Üì
    Dashboards & Reports
```

## üí° Key Concepts Summary

### **Medallion Architecture**

**Bronze ‚Üí Silver ‚Üí Gold** = **Raw ‚Üí Cleaned ‚Üí Aggregated**

**Why use it?**
* Clear separation of concerns
* Incremental processing
* Data quality improvement
* Reprocessing capability
* Audit trail

---

### **Auto Loader (Bronze)**

**Purpose:** Incremental file ingestion

**Key features:**
* Automatic file discovery
* Schema inference and evolution
* Checkpoint-based tracking
* Scalable (millions of files)
* Streaming API

**Syntax:**
```python
spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "csv") \
    .load(path) \
    .writeStream.toTable(table)
```

**When to use:**
* Production file ingestion
* Continuous data arrival
* Need schema evolution
* Bronze layer ingestion

---

### **Delta Merge API (Silver)**

**Purpose:** Programmatic upsert and deduplication

**Key features:**
* Python-based control
* Chainable methods
* Integrates with DataFrame ops
* Atomic operation

**Syntax:**
```python
DeltaTable.forName(spark, table).alias("t").merge(
    source.alias("s"),
    "t.id = s.id"
).whenMatchedUpdate(set={...}) \
 .whenNotMatchedInsert(values={...}) \
 .execute()
```

**When to use:**
* Complex transformations before merge
* Programmatic control needed
* Silver layer updates
* SCD patterns

---

### **PySpark Aggregations (Gold)**

**Purpose:** Business metrics

**Key features:**
* DataFrame API
* Multiple aggregation functions
* Conditional logic with when()
* Chainable operations

**Syntax:**
```python
df.groupBy("date").agg(
    count("*").alias("total"),
    sum("amount").alias("revenue"),
    avg("amount").alias("avg_value")
)
```

**When to use:**
* Creating KPIs
* Dashboard data
* Report tables
* Gold layer metrics

---

### **Data Quality**

**Bronze:** Accept all data (good and bad)  
**Silver:** Filter and validate with DataFrame operations  
**Gold:** Aggregate clean data  

**Quality improves at each layer!**

## ‚öñÔ∏è PySpark vs SQL: When to Use Each

### **Bronze Layer Ingestion**

| Feature | SQL (COPY INTO) | PySpark (Auto Loader) |
|---------|-----------------|----------------------|
| **Syntax** | Simple SQL | Streaming API |
| **File discovery** | Manual path | Automatic |
| **Schema evolution** | Manual | Automatic |
| **Scalability** | Good | Excellent |
| **Checkpoint** | Built-in | Manual setup |
| **Best for** | Simple batch | Production pipelines |

**Recommendation:** Use **Auto Loader** for production - better scalability and features.

---

### **Silver Layer Transformation**

| Feature | SQL (MERGE INTO) | PySpark (Delta merge API) |
|---------|------------------|---------------------------|
| **Syntax** | Declarative SQL | Programmatic Python |
| **Complexity** | Simple | More verbose |
| **Flexibility** | Limited | High |
| **Integration** | Standalone | With DataFrame ops |
| **Best for** | Simple upserts | Complex transformations |

**Recommendation:** 
* Use **SQL MERGE** for simple upserts
* Use **Delta merge API** when you need complex DataFrame transformations before merge

---

### **Gold Layer Aggregation**

| Feature | SQL | PySpark |
|---------|-----|----------|
| **Syntax** | Familiar SQL | DataFrame API |
| **Performance** | Same | Same |
| **Flexibility** | Good | Excellent |
| **ML integration** | No | Yes |
| **Best for** | Analysts | Engineers |

**Recommendation:** Use what your team prefers - both are equally performant.

---

### **This Lab's Approach**

**Why PySpark?**
* ‚úÖ Auto Loader is more powerful than COPY INTO
* ‚úÖ Better for production pipelines
* ‚úÖ Programmatic control
* ‚úÖ Integrates with ML workflows
* ‚úÖ More flexible transformations

**When to use SQL instead:**
* Simpler syntax for analysts
* Standalone queries
* Ad-hoc analysis
* Dashboard queries

## üìö Medallion Architecture Best Practices (PySpark)

### **ü•â Bronze Layer**

‚úÖ **Keep raw data as-is** - No transformations  
‚úÖ **Append-only** - Never delete from bronze  
‚úÖ **Add metadata columns** - ingestion_timestamp, _rescued_data  
‚úÖ **Use Auto Loader** - Incremental, scalable file discovery  
‚úÖ **Set checkpoint location** - Ensures idempotency  
‚úÖ **Use trigger(availableNow=True)** - For batch processing  

**Purpose:** Audit trail, reprocessing capability, data lineage

---

### **ü•à Silver Layer**

‚úÖ **Apply data quality rules** - Filter nulls, validate ranges  
‚úÖ **Deduplicate** - Use window functions with row_number()  
‚úÖ **Convert data types** - Use to_date(), cast()  
‚úÖ **Add calculated columns** - Use withColumn()  
‚úÖ **Use Delta merge API** - Programmatic upserts  
‚úÖ **Track lineage** - created_at, updated_at  

**Purpose:** Clean, validated data for analytics

---

### **ü•á Gold Layer**

‚úÖ **Pre-aggregate** - Use groupBy().agg()  
‚úÖ **Business-friendly** - Clear column names with .alias()  
‚úÖ **Optimize for queries** - Denormalize if needed  
‚úÖ **Use mode("overwrite")** - For full refresh  
‚úÖ **Use Delta merge** - For incremental updates  
‚úÖ **Document metrics** - What each column means  

**Purpose:** Fast dashboards, reports, analytics

---

### **General Best Practices**

‚úÖ **Separate concerns** - Each layer has clear purpose  
‚úÖ **Idempotent pipelines** - Safe to rerun  
‚úÖ **Incremental processing** - Process only new data  
‚úÖ **Monitor data quality** - Track metrics at each layer  
‚úÖ **Use Delta Lake** - ACID, time travel, performance  
‚úÖ **Schedule appropriately** - Bronze (frequent), Silver (hourly), Gold (daily)  
‚úÖ **Test with small data** - Validate logic before production  