# Day 3 In-Class Exercise: Mini-Pipeline

**Time:** 15 minutes
**Grading:** Completion-based (5% of course grade)
**Goal:** Build a 3-stage pipeline with validations

---

## Your Mission

Build a data pipeline using the mini Olist dataset:
1. **Bronze:** Load raw CSV files
2. **Silver:** Clean, validate (write 2 assertions)
3. **Gold:** Create 2-3 summary metrics
4. **Document:** Write a risk note (assumptions & limitations)

**Dataset:** `../../data/day3/exercise/`
- `mini_orders.csv` (500 orders)
- `mini_customers.csv` (500 customers)
- `mini_order_items.csv` (~540 items)

---

## Submission

- Complete all TODOs
- Ensure notebook runs end-to-end (Restart & Run All)
- Submit on Moodle by end of class

**Let's build!** ðŸš€

---

In [None]:
# Setup (PROVIDED - don't modify)
import pandas as pd
import duckdb
from IPython.display import display
import warnings
warnings.filterwarnings('ignore')

con = duckdb.connect(':memory:')
print("âœ… Setup complete")

---

## Part 1: Bronze Layer (2-3 min)

**TODO:** Load the three CSV files into bronze tables.

**Requirements:**
- Load exactly as-is (no transformations)
- Name tables: `bronze_orders`, `bronze_customers`, `bronze_items`
- First table is PROVIDED as example - follow the pattern!

**Pattern:** `SELECT * FROM '../../data/day3/exercise/filename.csv'`

In [None]:
# BRONZE LAYER

# âœ… EXAMPLE PROVIDED: Load orders (this one is done for you!)
con.execute("""
    CREATE TABLE bronze_orders AS
    SELECT * FROM '../../data/day3/exercise/mini_orders.csv'
""")
print(f"âœ“ Loaded {con.execute('SELECT COUNT(*) FROM bronze_orders').fetchone()[0]} orders")

# TODO: Load customers (follow the pattern above)
con.execute("""
    CREATE TABLE bronze_customers AS
    -- YOUR CODE HERE (hint: mini_customers.csv)
""")
print(f"âœ“ Loaded {con.execute('SELECT COUNT(*) FROM bronze_customers').fetchone()[0]} customers")

# TODO: Load order items (follow the pattern above)
con.execute("""
    CREATE TABLE bronze_items AS
    -- YOUR CODE HERE (hint: mini_order_items.csv)
""")
print(f"âœ“ Loaded {con.execute('SELECT COUNT(*) FROM bronze_items').fetchone()[0]} items")

print("\nâœ… Bronze layer complete!")

---

## Part 2: Silver Layer - Cleaning (4-5 min)

**TODO:** Create clean, typed silver tables.

**Requirements:**
- Fix date types (use `TRY_CAST(...AS TIMESTAMP)`)
- Cast price/freight to `DOUBLE`
- Remove rows with NULL in critical fields
- Most column names are PROVIDED - you fill in the gaps!

**Strategy:** Each query is 80% complete. Read carefully and complete the TODOs.

In [None]:
# SILVER LAYER

# Silver orders - MOSTLY PROVIDED
con.execute("""
    CREATE TABLE silver_orders AS
    SELECT
        order_id,
        customer_id,
        order_status,
        TRY_CAST(order_purchase_timestamp AS TIMESTAMP) as order_date,
        -- TODO: Add order_delivered_customer_date as TIMESTAMP using TRY_CAST
        -- Hint: TRY_CAST(order_delivered_customer_date AS TIMESTAMP) as delivered_date
    FROM bronze_orders
    WHERE order_id IS NOT NULL
""")
print(f"âœ“ Created {con.execute('SELECT COUNT(*) FROM silver_orders').fetchone()[0]} clean orders")

# Silver customers - COMPLETE THE WHERE CLAUSE
con.execute("""
    CREATE TABLE silver_customers AS
    SELECT
        customer_id,
        customer_city,
        customer_state,
        customer_zip_code_prefix
        -- All columns provided!
    FROM bronze_customers
    WHERE -- TODO: Filter out rows where customer_id IS NULL
""")
print(f"âœ“ Created {con.execute('SELECT COUNT(*) FROM silver_customers').fetchone()[0]} clean customers")

# Silver items - ADD THE CASTS
con.execute("""
    CREATE TABLE silver_items AS
    SELECT
        order_id,
        product_id,
        -- TODO: CAST price AS DOUBLE (hint: CAST(price AS DOUBLE) as price)
        
        -- TODO: CAST freight_value AS DOUBLE (hint: same pattern)
        
    FROM bronze_items
    WHERE order_id IS NOT NULL
""")
print(f"âœ“ Created {con.execute('SELECT COUNT(*) FROM silver_items').fetchone()[0]} clean items")

print("\nâœ… Silver layer complete!")

---

## Part 3: Silver Layer - Validation (3-4 min)

**TODO:** Write 2 assertions to validate data quality.

**Requirements:**
- Assertion 1: Check primary key uniqueness (order_id in silver_orders)
- Assertion 2: Check foreign key integrity (all items have valid orders)
- SQL queries are PROVIDED - you write the assertions!

**Pattern:** `assert condition, "Error message"`

In [None]:
# VALIDATIONS

print("=== VALIDATIONS ===\n")

# Validation 1: Primary key uniqueness - QUERIES PROVIDED
print("Check 1: Primary key uniqueness")
total_rows = con.execute("SELECT COUNT(*) FROM silver_orders").fetchone()[0]
unique_rows = con.execute("SELECT COUNT(DISTINCT order_id) FROM silver_orders").fetchone()[0]

print(f"  Total rows: {total_rows}, Unique order_ids: {unique_rows}")

# TODO: Write assertion that total_rows equals unique_rows
# Hint: assert total_rows == unique_rows, "Duplicate order IDs found!"


# Validation 2: Foreign key integrity - QUERY PROVIDED
print("\nCheck 2: Foreign key integrity")
orphan_items = con.execute("""
    SELECT COUNT(*)
    FROM silver_items i
    LEFT JOIN silver_orders o ON i.order_id = o.order_id
    WHERE o.order_id IS NULL
""").fetchone()[0]

print(f"  Orphaned items (no matching order): {orphan_items}")

# TODO: Write assertion that orphan_items equals 0
# Hint: assert orphan_items == 0, "Found items without valid orders!"


print("\nâœ… All validations passed!")

---

## Part 4: Gold Layer (3-4 min)

**TODO:** Create 2 aggregated tables for business reporting.

**Requirements:**
- First table is COMPLETE - study the pattern!
- Second table is a template - fill in the TODOs
- Both use JOINs and GROUP BY

**Strategy:** Understand the complete example, then modify the template for table 2.

In [None]:
# GOLD LAYER

# âœ… Gold Table 1: Daily sales - COMPLETE EXAMPLE PROVIDED
con.execute("""
    CREATE TABLE gold_daily_sales AS
    SELECT
        CAST(order_date AS DATE) as date,
        COUNT(DISTINCT o.order_id) as num_orders,
        ROUND(SUM(i.price + i.freight_value), 2) as total_revenue
    FROM silver_orders o
    INNER JOIN silver_items i ON o.order_id = i.order_id
    WHERE order_date IS NOT NULL
    GROUP BY CAST(order_date AS DATE)
    ORDER BY date
""")

print("Gold Table 1: Daily Sales Summary")
display(con.execute("SELECT * FROM gold_daily_sales LIMIT 5").df())

# TODO: Gold Table 2: State summary - TEMPLATE PROVIDED
con.execute("""
    CREATE TABLE gold_state_summary AS
    SELECT
        customer_state,
        COUNT(DISTINCT c.customer_id) as num_customers,
        -- TODO: Add COUNT(DISTINCT o.order_id) as num_orders
        
        -- TODO: Add COUNT(DISTINCT customer_city) as num_cities
        
    FROM silver_customers c
    INNER JOIN silver_orders o ON c.customer_id = o.customer_id
    -- TODO: Add GROUP BY clause (hint: GROUP BY customer_state)
    
    ORDER BY num_customers DESC
""")

print("\n\nGold Table 2: State Summary")
display(con.execute("SELECT * FROM gold_state_summary LIMIT 5").df())

print("\nâœ… Gold layer complete!")

---

## Part 5: Risk Note (2-3 min)

**TODO:** Write 3-5 sentences documenting assumptions, limitations, and gaps.

**Prompts to guide your thinking:**
- **Data assumptions:** What did you assume about data quality/completeness?
- **What we removed:** How many rows were filtered out and why?
- **Limitations:** What questions can this data NOT answer?
- **Missing context:** What external information would improve this analysis?

Use the bullet prompts below to structure your answer!

### Risk Note

**Write 3-5 sentences addressing these points:**

- **Data assumptions:** What did you assume is true about this data?  
  _(e.g., "We assumed all order_ids are valid completed transactions...")_

- **Data removed:** How much data did you filter out in the Silver layer?  
  _(e.g., "We removed X rows with NULL customer_ids, representing Y% of the data...")_

- **Analysis limitations:** What questions can this data NOT answer?  
  _(e.g., "This data cannot tell us about cancelled or refunded orders...")_

- **Missing context:** What info would make this analysis better?  
  _(e.g., "We don't know the currency, time zone, or product categories...")_

---

**YOUR RISK NOTE (write below):**

_[Double-click to edit this cell and write your 3-5 sentences here. Delete this placeholder text.]_

---

---

## âœ… Submission Checklist

Before submitting, verify:

- [ ] All TODO sections completed
- [ ] Notebook runs end-to-end (Kernel â†’ Restart & Run All)
- [ ] All assertions pass (no errors)
- [ ] Gold tables display results
- [ ] Risk note written (3-5 sentences addressing all bullet points)
- [ ] File saved

**Submit on Moodle by end of class!**

---

## Done Early?

**Challenge 1:** Add a 3rd validation checking that all prices are positive:
```python
negative_prices = con.execute(
    "SELECT COUNT(*) FROM silver_items WHERE price < 0"
).fetchone()[0]
assert negative_prices == 0, f"Found {negative_prices} negative prices!"
```

**Challenge 2:** Create a 3rd gold table showing customer city rankings by order volume.

**Great work!** ðŸŽ‰