# Lesson 7: Building End-to-End Data Pipelines

**Module 3: Data & Pipeline Engineering** | **Time**: 4-5 hours | **Difficulty**: Advanced

---

## 🎯 Learning Objectives

✅ Design and build a complete ML data pipeline from raw data to model-ready features  
✅ Implement data validation, transformation, and quality checks  
✅ Understand orchestration patterns (Airflow, Prefect)  
✅ Apply idempotency, observability, and error handling best practices  
✅ Answer 5 interview questions on production data pipelines  

---

## 📚 Table of Contents

1. [What Does “End-to-End” Mean?](#1-e2e)
2. [Pipeline Architecture Patterns](#2-patterns)
3. [Data Validation with Great Expectations](#3-validation)
4. [Hands-On: Complete ML Data Pipeline](#4-hands-on)
5. [Orchestration: Airflow & Prefect](#5-orchestration)
6. [Production Best Practices](#6-best-practices)
7. [Exercises](#7-exercises)
8. [Interview Preparation](#8-interview)

---

## 1. What Does “End-to-End” Mean? <a id='1-e2e'></a>

An end-to-end ML data pipeline covers **everything** from raw data ingestion to model-ready features.

### Complete Pipeline Architecture

```
  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐
  │ INGEST   │▶│ VALIDATE │▶│TRANSFORM │▶│ QUALITY  │▶│  STORE   │
  │          │  │          │  │          │  │  CHECK   │  │          │
  └────┬─────┘  └────┬─────┘  └────┬─────┘  └────┬─────┘  └────┬─────┘
       │           │           │           │           │
  Read from    Schema     Feature     Drift    Write to
  sources      checks     engineering  detect   Feature Store
  (API, DB,    Null check  Normalize   Anomaly  or Parquet
   files)      Type check  Encode      detect
```

### The 5 Stages of a Production Pipeline

| Stage | Purpose | Tools |
|-------|---------|-------|
| **Ingest** | Pull raw data from sources | APIs, Kafka, databases, file systems |
| **Validate** | Ensure data meets expectations | Great Expectations, Pandera |
| **Transform** | Feature engineering, cleaning | Pandas, Spark, dbt |
| **Quality Check** | Detect drift, anomalies | Custom checks, monitoring |
| **Store** | Save model-ready features | Parquet, Feature Store (Feast) |

---

## 2. Pipeline Architecture Patterns <a id='2-patterns'></a>

### Pattern 1: Simple Batch Pipeline

```
  ┌─────────┐      ┌──────────┐      ┌─────────┐
  │ Cron /  │  ─▶  │ Python   │  ─▶  │ Parquet │
  │ Airflow │      │ Script   │      │ Output  │
  └─────────┘      └──────────┘      └─────────┘
  Trigger          Process          Store
  (daily/hourly)   (transform)      (feature store)
```

**Best for:** Small-medium teams, daily/hourly batch processing

### Pattern 2: Lambda Architecture

```
                    ┌───────────────────┐
                    │  BATCH LAYER       │
  Raw Data ───┬──▶ │  (Spark + Parquet)  │───┐
               │    └───────────────────┘   │  ┌──────────┐
               │                            ├▶│ SERVING  │
               │    ┌───────────────────┐   │  │  LAYER   │
               └──▶ │  SPEED LAYER       │───┘  └──────────┘
                    │  (Kafka + Redis)   │
                    └───────────────────┘
```

**Best for:** Systems needing both historical analysis and real-time features

---

## 3. Data Validation with Great Expectations <a id='3-validation'></a>

**Never trust raw data.** Every pipeline should validate data at ingestion.

### Validation Checks

```
  ┌───────────────────────────────────────────────┐
  │             DATA VALIDATION LAYERS               │
  ├───────────────┬───────────────┬───────────────┤
  │ SCHEMA       │ VALUE          │ STATISTICAL    │
  │              │                │                │
  │ • Correct    │ • Non-null     │ • Mean within  │
  │   columns    │ • In range     │   bounds       │
  │ • Correct    │ • Valid enum   │ • Distribution │
  │   dtypes     │ • Unique IDs   │   check        │
  │ • No extra   │ • Positive     │ • Outlier      │
  │   columns    │   values       │   detection    │
  └───────────────┴───────────────┴───────────────┘
```

---

In [None]:
# ============================================================
# Custom Data Validator (production-grade pattern)
# ============================================================
import pandas as pd
import numpy as np
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
from datetime import datetime

@dataclass
class ValidationResult:
    """Result of a single validation check."""
    check_name: str
    passed: bool
    details: str
    severity: str  # 'error', 'warning', 'info'

@dataclass
class DataValidator:
    """Production data validation framework."""
    results: List[ValidationResult] = field(default_factory=list)
    
    def check_schema(self, df: pd.DataFrame, expected_columns: List[str]) -> None:
        """Validate that expected columns exist."""
        missing = set(expected_columns) - set(df.columns)
        extra = set(df.columns) - set(expected_columns)
        
        self.results.append(ValidationResult(
            check_name='schema_check',
            passed=len(missing) == 0,
            details=f'Missing: {missing}, Extra: {extra}' if missing or extra else 'All columns present',
            severity='error' if missing else 'warning' if extra else 'info'
        ))
    
    def check_nulls(self, df: pd.DataFrame, max_null_pct: float = 0.05) -> None:
        """Check null percentage per column."""
        null_pcts = df.isnull().mean()
        violations = null_pcts[null_pcts > max_null_pct]
        
        self.results.append(ValidationResult(
            check_name='null_check',
            passed=len(violations) == 0,
            details=f'Columns exceeding {max_null_pct:.0%} nulls: {dict(violations)}' if len(violations) > 0 else 'All columns within null threshold',
            severity='error' if len(violations) > 0 else 'info'
        ))
    
    def check_range(self, df: pd.DataFrame, column: str, min_val: float, max_val: float) -> None:
        """Check values are within expected range."""
        out_of_range = ((df[column] < min_val) | (df[column] > max_val)).sum()
        
        self.results.append(ValidationResult(
            check_name=f'range_check_{column}',
            passed=out_of_range == 0,
            details=f'{out_of_range} values outside [{min_val}, {max_val}]' if out_of_range > 0 else f'All values in [{min_val}, {max_val}]',
            severity='error' if out_of_range > 0 else 'info'
        ))
    
    def check_uniqueness(self, df: pd.DataFrame, column: str) -> None:
        """Check that column values are unique (for IDs)."""
        duplicates = df[column].duplicated().sum()
        
        self.results.append(ValidationResult(
            check_name=f'uniqueness_{column}',
            passed=duplicates == 0,
            details=f'{duplicates} duplicate values found' if duplicates > 0 else 'All values unique',
            severity='error' if duplicates > 0 else 'info'
        ))
    
    def report(self) -> bool:
        """Print validation report and return overall pass/fail."""
        print("\n" + "="*60)
        print("DATA VALIDATION REPORT")
        print(f"Timestamp: {datetime.now().isoformat()}")
        print("="*60)
        
        all_passed = True
        for r in self.results:
            icon = '✅' if r.passed else ('⚠️' if r.severity == 'warning' else '❌')
            print(f"  {icon} {r.check_name}: {r.details}")
            if not r.passed and r.severity == 'error':
                all_passed = False
        
        print("\n" + "-"*60)
        status = '✅ ALL CHECKS PASSED' if all_passed else '❌ VALIDATION FAILED'
        print(f"  Overall: {status}")
        print("="*60)
        return all_passed

print("✅ DataValidator class defined")

## 4. Hands-On: Complete ML Data Pipeline <a id='4-hands-on'></a>

We’ll build a **complete 6-step pipeline** that takes raw e-commerce data and produces model-ready features.

### Pipeline Flow

```
  Step 1        Step 2       Step 3        Step 4       Step 5       Step 6
  ┌──────┐  ┌───────┐  ┌────────┐  ┌───────┐  ┌───────┐  ┌───────┐
  │Ingest│▶│Validate│▶│Transform│▶│Feature│▶│Quality│▶│ Store │
  │      │  │       │  │        │  │ Engg  │  │ Check │  │       │
  └──────┘  └───────┘  └────────┘  └───────┘  └───────┘  └───────┘
```

In [None]:
# ============================================================
# STEP 1: INGEST - Simulate raw data from multiple sources
# ============================================================
import pandas as pd
import numpy as np
import os
import warnings
warnings.filterwarnings('ignore')

np.random.seed(42)
n = 50_000

print("STEP 1: INGEST")
print("="*60)

# Simulate transactions from an API
transactions = pd.DataFrame({
    'transaction_id': np.arange(n),
    'user_id': np.random.randint(1, 5001, n),
    'product_id': np.random.randint(1, 1001, n),
    'category': np.random.choice(['electronics', 'clothing', 'food', 'books', 'sports'], n),
    'amount': np.round(np.random.exponential(50, n), 2),
    'quantity': np.random.randint(1, 10, n),
    'timestamp': pd.date_range('2024-01-01', periods=n, freq='10min'),
})

# Inject realistic data quality issues
# 1. Missing values (~3%)
null_mask = np.random.random(n) < 0.03
transactions.loc[null_mask, 'amount'] = np.nan

# 2. Negative amounts (data entry errors)
neg_mask = np.random.random(n) < 0.005
transactions.loc[neg_mask, 'amount'] = -abs(transactions.loc[neg_mask, 'amount'])

# 3. Duplicate transactions 
duplicates = transactions.sample(n=100, random_state=42)
transactions = pd.concat([transactions, duplicates]).reset_index(drop=True)

# Simulate user profiles from a database
user_profiles = pd.DataFrame({
    'user_id': np.arange(1, 5001),
    'age': np.random.randint(18, 80, 5000),
    'region': np.random.choice(['US_East', 'US_West', 'EU', 'Asia', 'Other'], 5000),
    'account_age_days': np.random.randint(1, 3650, 5000),
    'is_premium': np.random.choice([0, 1], 5000, p=[0.7, 0.3]),
})

print(f"  Transactions: {len(transactions):,} rows ({transactions.isna().sum().sum()} nulls)")
print(f"  User profiles: {len(user_profiles):,} rows")
print(f"  Injected issues: nulls, negatives, duplicates")
print("✅ Raw data ingested")

In [None]:
# ============================================================
# STEP 2: VALIDATE - Check data quality before processing
# ============================================================
print("\nSTEP 2: VALIDATE")
print("="*60)

validator = DataValidator()

# Schema checks
validator.check_schema(transactions, 
    ['transaction_id', 'user_id', 'product_id', 'category', 'amount', 'quantity', 'timestamp'])

# Null checks
validator.check_nulls(transactions, max_null_pct=0.05)

# Range checks
validator.check_range(transactions, 'amount', min_val=0.01, max_val=10000)
validator.check_range(transactions, 'quantity', min_val=1, max_val=100)

# Uniqueness
validator.check_uniqueness(transactions, 'transaction_id')

is_valid = validator.report()

if not is_valid:
    print("\n>>> Validation failed! Will clean issues in Transform step.")

In [None]:
# ============================================================
# STEP 3: TRANSFORM - Clean and fix data quality issues
# ============================================================
print("\nSTEP 3: TRANSFORM (Clean)")
print("="*60)

df = transactions.copy()
initial_count = len(df)

# 1. Remove duplicates
df = df.drop_duplicates(subset=['transaction_id'], keep='first')
print(f"  Removed {initial_count - len(df)} duplicates")

# 2. Handle negative amounts (likely data entry errors)
neg_count = (df['amount'] < 0).sum()
df['amount'] = df['amount'].abs()  # Take absolute value
print(f"  Fixed {neg_count} negative amounts")

# 3. Handle missing values
null_count = df['amount'].isna().sum()
# Impute with median per category (more accurate than global median)
df['amount'] = df.groupby('category')['amount'].transform(
    lambda x: x.fillna(x.median())
)
print(f"  Imputed {null_count} null amounts with category median")

# 4. Add computed columns
df['total_value'] = df['amount'] * df['quantity']
df['hour'] = df['timestamp'].dt.hour
df['day_of_week'] = df['timestamp'].dt.dayofweek
df['is_weekend'] = (df['day_of_week'] >= 5).astype(int)

print(f"  Added computed columns: total_value, hour, day_of_week, is_weekend")
print(f"  Final shape: {df.shape}")
print("✅ Data cleaned and transformed")

In [None]:
# ============================================================
# STEP 4: FEATURE ENGINEERING - Create ML-ready features
# ============================================================
print("\nSTEP 4: FEATURE ENGINEERING")
print("="*60)

# --- User-level aggregation features ---
user_features = df.groupby('user_id').agg(
    total_transactions=('transaction_id', 'count'),
    total_spend=('total_value', 'sum'),
    avg_order_value=('amount', 'mean'),
    max_order_value=('amount', 'max'),
    unique_categories=('category', 'nunique'),
    avg_quantity=('quantity', 'mean'),
    weekend_ratio=('is_weekend', 'mean'),
    first_purchase=('timestamp', 'min'),
    last_purchase=('timestamp', 'max'),
).reset_index()

# Recency feature
max_date = df['timestamp'].max()
user_features['days_since_last_purchase'] = (max_date - user_features['last_purchase']).dt.days

# Purchase frequency
user_features['purchase_span_days'] = (
    (user_features['last_purchase'] - user_features['first_purchase']).dt.days + 1
)
user_features['purchase_frequency'] = (
    user_features['total_transactions'] / user_features['purchase_span_days']
)

# Drop temporal columns (can't use raw dates as features)
user_features = user_features.drop(columns=['first_purchase', 'last_purchase'])

# --- Merge with user profiles ---
features_df = user_features.merge(user_profiles, on='user_id', how='left')

# --- One-hot encode region ---
features_df = pd.get_dummies(features_df, columns=['region'], prefix='region')

print(f"  Created {len(features_df.columns) - 1} features for {len(features_df)} users")
print(f"  Feature list: {list(features_df.columns[:10])}... (+{len(features_df.columns)-10} more)")
print("✅ Features engineered")
features_df.head()

In [None]:
# ============================================================
# STEP 5: QUALITY CHECK - Validate output features
# ============================================================
print("\nSTEP 5: QUALITY CHECK (Output Validation)")
print("="*60)

output_validator = DataValidator()

# Check no nulls in final features
output_validator.check_nulls(features_df, max_null_pct=0.0)

# Check reasonable ranges
output_validator.check_range(features_df, 'total_spend', min_val=0, max_val=1_000_000)
output_validator.check_range(features_df, 'avg_order_value', min_val=0, max_val=10_000)
output_validator.check_range(features_df, 'age', min_val=18, max_val=100)

# Check uniqueness of user_id
output_validator.check_uniqueness(features_df, 'user_id')

output_valid = output_validator.report()

if output_valid:
    print("\n>>> Output features pass all quality checks!")

In [None]:
# ============================================================
# STEP 6: STORE - Save as Parquet (production format)
# ============================================================
print("\nSTEP 6: STORE")
print("="*60)

import time

os.makedirs('pipeline_output', exist_ok=True)

# Save as Parquet with metadata
output_path = 'pipeline_output/user_features.parquet'
features_df.to_parquet(output_path, index=False, compression='snappy')

file_size = os.path.getsize(output_path) / 1024

print(f"  Output: {output_path}")
print(f"  Size: {file_size:.1f} KB")
print(f"  Rows: {len(features_df):,}")
print(f"  Columns: {len(features_df.columns)}")
print(f"  Compression: snappy")
print("✅ Features saved to Parquet!")

# Verify we can read it back
verify_df = pd.read_parquet(output_path)
assert verify_df.shape == features_df.shape
print("✅ Read-back verification passed!")

# Cleanup
import shutil
shutil.rmtree('pipeline_output', ignore_errors=True)

print("\n" + "="*60)
print("🎉 END-TO-END PIPELINE COMPLETE!")
print("="*60)
print("  Ingested → Validated → Transformed → Feature Engineered → Quality Checked → Stored")

## 5. Orchestration: Airflow & Prefect <a id='5-orchestration'></a>

In production, pipelines need to be **scheduled, monitored, and managed**.

### Orchestration Visual

```
  AIRFLOW DAG:

  ┌────────┐     ┌─────────┐     ┌─────────┐     ┌──────────┐
  │ ingest │──▶│ validate│──▶│transform│──▶│  store   │
  └────────┘     └─────────┘     └─────────┘     └──────────┘
    @daily       on_success    on_success      on_success
                                                  │
                                           ┌─────┴──────┐
                                           │   notify    │
                                           │   (Slack)   │
                                           └────────────┘
```

### Airflow vs Prefect Comparison

| Feature | Apache Airflow | Prefect |
|---------|---------------|----------|
| **Setup** | Complex (needs Postgres, Redis) | Simple (pip install + cloud) |
| **Language** | Python DAG definitions | Python w/ decorators |
| **Testing** | Hard to test locally | Easy local testing |
| **Dynamic tasks** | Limited | First-class support |
| **Community** | Massive, mature | Growing fast |
| **Best for** | Enterprise, complex DAGs | Smaller teams, rapid iteration |

---

In [None]:
# ============================================================
# Conceptual: Airflow DAG Definition
# ============================================================
print("Conceptual Airflow DAG definition:")
print("""  
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'ml_team',
    'depends_on_past': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'ml_feature_pipeline',
    default_args=default_args,
    description='Daily ML feature engineering pipeline',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
)

ingest_task   = PythonOperator(task_id='ingest',   python_callable=ingest_data,   dag=dag)
validate_task = PythonOperator(task_id='validate', python_callable=validate_data, dag=dag)
transform_task= PythonOperator(task_id='transform',python_callable=transform_data,dag=dag)
store_task    = PythonOperator(task_id='store',    python_callable=store_features,dag=dag)

ingest_task >> validate_task >> transform_task >> store_task
""")

## 6. Production Best Practices <a id='6-best-practices'></a>

### The DOs and DON’Ts

| Practice | DO | DON’T |
|----------|-----|-------|
| **Idempotency** | Make pipelines re-runnable safely | Assume one-time execution |
| **Error handling** | Fail loudly, retry gracefully | Silently swallow errors |
| **Schema enforcement** | Validate schema at ingestion | Trust input data blindly |
| **Logging** | Log every step with metrics | Run pipelines silently |
| **Testing** | Unit test each transform | Test only end-to-end |
| **Monitoring** | Alert on failures and drift | Check only when users complain |

### Idempotency Explained

```
  IDEMPOTENT Pipeline (safe to re-run):
  Run 1: Produces output A   ✓
  Run 2: Produces output A   ✓  (same result!)
  Run 3: Produces output A   ✓  (same result!)

  NON-IDEMPOTENT Pipeline (dangerous):
  Run 1: Produces output A   ✓
  Run 2: Produces output A+A ❌  (duplicated!)
  Run 3: Produces output A+A+A ❌  (triple duplicated!)

  How to achieve idempotency:
  • Use INSERT OVERWRITE instead of INSERT INTO
  • Write to dated partitions: output/date=2024-01-15/
  • Delete before write: delete partition, then write
  • Use upserts with unique keys
```

---

## 7. Exercises <a id='7-exercises'></a>

### Exercise 1: Add Alerting
Extend the DataValidator to send a Slack/email alert when validation fails. Use a simple webhook simulation.

### Exercise 2: Incremental Pipeline
Modify the pipeline to process only **new data since the last run** (incremental processing) instead of the full dataset.

### Exercise 3: Data Lineage
Add metadata tracking to the pipeline: record which source files were used, what transformations were applied, and output statistics. Save as a lineage JSON file alongside the output.

---

## 8. Interview Preparation <a id='8-interview'></a>

### Q1: "Walk me through designing a production data pipeline for an ML model."

**Answer:**  
"My pipeline would have 5 stages:
1. **Ingest**: Read from source systems (APIs, databases, event streams). Store raw data immutably.
2. **Validate**: Schema checks, null checks, range checks. Fail early if data quality is unacceptable.
3. **Transform**: Clean (dedup, handle nulls, fix types), then feature engineering (aggregations, encoding, scaling).
4. **Quality Check**: Validate output features match expectations. Check for drift vs training distribution.
5. **Store**: Write to Parquet (offline) and feature store (online). Partition by date for incremental processing.

Key properties: idempotent (safe to re-run), observable (logging + alerting), and testable (unit tests per transform)."

---

### Q2: "What is idempotency and why is it critical for data pipelines?"

**Answer:**  
"An idempotent pipeline produces the same output regardless of how many times it’s run. This matters because pipelines fail and get retried — without idempotency, retries can duplicate data.

I achieve idempotency by: writing to dated partitions (overwrite, don’t append), using upserts with unique keys for databases, and ensuring transforms are deterministic (set random seeds, sort before processing)."

---

### Q3: "How do you handle data quality issues in production?"

**Answer:**  
"Three layers of defense:
1. **Prevention**: Schema validation at ingestion. Reject bad data early.
2. **Detection**: Statistical monitors — track null rates, value distributions, row counts. Set thresholds and alert.
3. **Remediation**: Automated fixes for known issues (dedup, type casting). Manual review for unknowns. Dead-letter queue for data that can’t be processed.

Tools: Great Expectations for validation, custom monitors for drift, PagerDuty/Slack for alerts."

---

### Q4: "Compare Airflow and Prefect. When would you choose each?"

**Answer:**  
"**Airflow**: Enterprise standard. Choose when you have complex DAGs with many dependencies, need robust scheduling, and have a platform team to maintain it. Drawbacks: heavy infrastructure (Postgres, Redis, webserver), hard to test locally.

**Prefect**: Modern alternative. Choose for smaller teams, rapid iteration, and Pythonic workflows. Easy local development, built-in observability. Drawbacks: smaller community, managed cloud may be required for production.

My choice: Airflow for large organizations with existing infra. Prefect for startups and new projects."

---

### Q5: "A pipeline that ran perfectly for months suddenly starts producing bad features. How do you debug it?"

**Answer:**  
"Systematic debugging approach:
1. **Check inputs first**: Has the source data changed? Schema change? New categories? Volume spike?
2. **Check each pipeline stage**: Run validation at each step to find where quality degrades
3. **Check infrastructure**: Did a dependency update? Memory issues? Timeout changes?
4. **Compare distributions**: Compare current output features vs historical baselines. Which features drifted?
5. **Root cause**: Most common causes are upstream schema changes, data source outages, and silent categorical changes (new enum values).

Prevention: add data contracts with upstream teams, monitor feature distributions over time, and version everything."

---

## 🎓 Key Takeaways

1. **End-to-end pipelines** have 5 stages: Ingest → Validate → Transform → Quality Check → Store
2. **Never trust raw data** — validate at ingestion and again at output
3. **Idempotency** is non-negotiable for production pipelines
4. **Feature engineering** is where the ML value is created
5. **Orchestration** (Airflow/Prefect) handles scheduling, retries, and monitoring
6. **Observability** — if you can’t see it, you can’t fix it

---

🎉 **Module 3 Complete!** You now have a comprehensive understanding of Data & Pipeline Engineering for MLOps.