# Day 3, Block A: Data Pipelines & Real-World Validation

**Duration:** 100 minutes (13:30–15:10)
**Course:** ECBS5294 - Introduction to Data Science: Working with Data

## Learning Objectives

By the end of this session, you will be able to:

1. Explain the **bronze → silver → gold** pipeline pattern and why it matters
2. Design idempotent data transformations
3. Write **assertions** to validate data quality programmatically
4. Identify and handle common real-world data problems (dates, types, nulls)
5. Apply the **pipeline pattern** to a real dataset

---

## 1. Why Data Pipelines?

### The Problem: One-Off Analysis Doesn't Scale

**You've hit the wall when:**
- Data updates regularly
- Multiple people need consistent results
- Stakeholders ask "how did you get this number?"
- Requirements change

**The solution:** A systematic, repeatable pipeline.

---

## 2. The Bronze-Silver-Gold Pattern

> **"Preserve the original, clean incrementally, aggregate deliberately."**

#### **Bronze Layer: Raw Ingestion**
- Preserve original data exactly as received
- No transformations
- Keep everything—even if it looks wrong

#### **Silver Layer: Clean & Validated**
- Analysis-ready data
- Fix types, handle nulls, validate
- Document what was fixed

#### **Gold Layer: Business Metrics**
- Aggregated, joined, ready for reporting
- Pre-computed KPIs

---

In [None]:
# Setup
import pandas as pd
import duckdb
from IPython.display import display
import warnings
warnings.filterwarnings('ignore')

con = duckdb.connect(':memory:')
print("✅ Setup complete")

### Bronze Layer: Raw Ingestion

**Goal:** Load data exactly as received.

In [None]:
# BRONZE: Load raw data
print("=== BRONZE LAYER ===\n")

con.execute("""
    CREATE TABLE bronze_orders AS
    SELECT * FROM '../../data/day3/teaching/olist_orders_subset.csv'
""")

con.execute("""
    CREATE TABLE bronze_customers AS
    SELECT * FROM '../../data/day3/teaching/olist_customers_subset.csv'
""")

con.execute("""
    CREATE TABLE bronze_order_items AS
    SELECT * FROM '../../data/day3/teaching/olist_order_items_subset.csv'
""")

print(f"Loaded {con.execute('SELECT COUNT(*) FROM bronze_orders').fetchone()[0]} orders")
print("✅ Bronze layer complete")

### Silver Layer: Clean & Validate

**Goal:** Transform into analysis-ready format.

In [None]:
# SILVER: Clean and validate
print("=== SILVER LAYER ===\n")

con.execute("""
    CREATE TABLE silver_orders AS
    SELECT
        order_id,
        customer_id,
        order_status,
        TRY_CAST(order_purchase_timestamp AS TIMESTAMP) as order_date
    FROM bronze_orders
    WHERE order_id IS NOT NULL
""")

con.execute("""
    CREATE TABLE silver_order_items AS
    SELECT
        order_id,
        product_id,
        CAST(price AS DOUBLE) as price,
        CAST(freight_value AS DOUBLE) as freight
    FROM bronze_order_items
    WHERE order_id IS NOT NULL
""")

print(f"Created {con.execute('SELECT COUNT(*) FROM silver_orders').fetchone()[0]} clean orders")
print("✅ Silver layer complete")

### Validation: Prove Data Quality

In [None]:
# VALIDATION
print("=== VALIDATION ===\n")

# Check 1: Primary key uniqueness
order_count = con.execute("SELECT COUNT(*) FROM silver_orders").fetchone()[0]
order_unique = con.execute("SELECT COUNT(DISTINCT order_id) FROM silver_orders").fetchone()[0]

print(f"✓ Order IDs unique? {order_count == order_unique}")
assert order_count == order_unique, "Duplicate order IDs!"

# Check 2: No null critical fields
null_ids = con.execute("SELECT COUNT(*) FROM silver_orders WHERE order_id IS NULL").fetchone()[0]
print(f"✓ No NULL order IDs? {null_ids == 0}")
assert null_ids == 0, "NULL order IDs found!"

# Check 3: Foreign key integrity
orphans = con.execute("""
    SELECT COUNT(*)
    FROM silver_order_items i
    LEFT JOIN silver_orders o ON i.order_id = o.order_id
    WHERE o.order_id IS NULL
""").fetchone()[0]

print(f"✓ All items have valid orders? {orphans == 0}")
assert orphans == 0, "Orphaned items found!"

print("\n✅ ALL VALIDATIONS PASSED")

### Gold Layer: Business Metrics

In [None]:
# GOLD: Business metrics
print("=== GOLD LAYER ===\n")

con.execute("""
    CREATE TABLE gold_daily_sales AS
    SELECT
        CAST(o.order_date AS DATE) as date,
        COUNT(DISTINCT o.order_id) as num_orders,
        SUM(i.price + i.freight) as total_revenue
    FROM silver_orders o
    INNER JOIN silver_order_items i ON o.order_id = i.order_id
    WHERE o.order_date IS NOT NULL
    GROUP BY CAST(o.order_date AS DATE)
    ORDER BY date
""")

result = con.execute("SELECT * FROM gold_daily_sales LIMIT 5").df()
print("Daily sales summary:")
display(result)
print("\n✅ Gold layer complete")

## 3. Key Principles

### 1. Idempotency
> **"Running the pipeline twice gives the same result."**

**Good:** Recreate tables from scratch
```python
con.execute("DROP TABLE IF EXISTS gold_daily_sales")
con.execute("CREATE TABLE gold_daily_sales AS SELECT ...")
```

### 2. Fail Fast
> **"If data is bad, stop immediately with a clear error."**

**Good:**  
```python
assert df['price'].min() >= 0, "Negative prices found!"
```

### 3. Document Assumptions

Every validation is documentation:
```python
assert df['order_id'].is_unique, "Duplicate order IDs"
```

---

## 4. Real-World Data Issues

### Dates
- Always parse explicitly with TRY_CAST or pd.to_datetime()
- Standardize format (prefer ISO: YYYY-MM-DD)
- Validate date ranges

### Types
- Check df.dtypes or INFORMATION_SCHEMA after loading
- Numbers stored as strings? Clean then convert
- Use TRY_CAST to handle errors gracefully

### NULLs
- Understand what NULL means (not applicable? unknown? not yet?)
- Document your handling strategy
- Remember: aggregations exclude NULLs

### SQL vs Python
**Use SQL for:** filtering, joining, grouping, sorting
**Use Python for:** complex string manipulation, APIs, ML, visualization

---

## Summary

### Key Takeaways

1. **Pipeline pattern:** Bronze (raw) → Silver (clean) → Gold (metrics)
2. **Validations:** Assertions catch problems early and loudly
3. **Idempotency:** Re-running gives same result
4. **Dates, types, NULLs:** Check and validate early
5. **Work habits:** Restart & Run All, small commits, read docs

**You're not just analyzing data—you're building infrastructure.**

---

## Next: In-Class Exercise

Build a mini-pipeline:
1. Bronze: Load raw data
2. Silver: Clean, validate (2 assertions)
3. Gold: Create 2-3 metrics
4. Document: Risk note

**Time:** 15 minutes
**Notebook:** `day3_exercise_mini_pipeline.ipynb`

**Let's build!** 🚀