# Tutorial 05: ETL Pipelines for ML

## Module 3: Data Preparation

---

## Learning Objectives

By the end of this tutorial, you will be able to:

1. **Design and implement ETL pipelines** for ML workflows
2. **Handle data extraction** from multiple sources (batch, streaming, API)
3. **Apply data transformation** and cleaning techniques
4. **Load processed data** to data warehouses and feature stores

---

## Table of Contents

1. [Introduction to ETL for ML](#1-introduction)
2. [Extract Phase](#2-extract)
3. [Transform Phase](#3-transform)
4. [Load Phase](#4-load)
5. [Complete ETL Pipeline Example](#5-complete-pipeline)
6. [Data Quality and Validation](#6-data-quality)
7. [Hands-on Exercise](#7-exercise)
8. [Summary and Key Takeaways](#8-summary)

---

## 1. Introduction to ETL for ML <a id='1-introduction'></a>

### What is ETL?

**ETL** stands for **Extract, Transform, Load** - a data integration process.

### Why ETL Matters for ML

- **Data Quality**: Ensures clean, consistent data for training
- **Feature Engineering**: Prepares features for model consumption
- **Reproducibility**: Creates traceable data pipelines
- **Scalability**: Handles growing data volumes

In [None]:
# Import required libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from abc import ABC, abstractmethod
import json
import hashlib
import logging
import warnings

warnings.filterwarnings('ignore')
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger('ETL')

print("Libraries imported successfully!")

---

## 2. Extract Phase <a id='2-extract'></a>

The Extract phase involves collecting data from various sources.

| Method | Description | Latency |
|--------|-------------|----------|
| **Batch** | Periodic full/incremental loads | Hours/Days |
| **Streaming** | Real-time data ingestion | Seconds |
| **API** | Pull data from web services | Variable |

In [None]:
# Base Extractor Interface
class BaseExtractor(ABC):
    def __init__(self, source_name: str):
        self.source_name = source_name
        self.extraction_timestamp = None
        self.records_extracted = 0
    
    @abstractmethod
    def extract(self, **kwargs) -> pd.DataFrame:
        pass
    
    def get_metadata(self) -> Dict[str, Any]:
        return {'source': self.source_name, 'timestamp': self.extraction_timestamp, 'records': self.records_extracted}


class BatchExtractor(BaseExtractor):
    def __init__(self, source_name: str, data_generator: Callable):
        super().__init__(source_name)
        self.data_generator = data_generator
    
    def extract(self, mode: str = 'full', since: Optional[datetime] = None) -> pd.DataFrame:
        logger.info(f"Starting {mode} extraction from {self.source_name}")
        self.extraction_timestamp = datetime.now()
        df = self.data_generator()
        
        if mode == 'incremental' and since and 'updated_at' in df.columns:
            df = df[df['updated_at'] > since]
        
        self.records_extracted = len(df)
        logger.info(f"Extracted {self.records_extracted} records")
        return df


class APIExtractor(BaseExtractor):
    def __init__(self, source_name: str, endpoint: str):
        super().__init__(source_name)
        self.endpoint = endpoint
    
    def extract(self, params: Optional[Dict] = None) -> pd.DataFrame:
        logger.info(f"Fetching from API: {self.endpoint}")
        self.extraction_timestamp = datetime.now()
        
        max_pages = params.get('max_pages', 3) if params else 3
        page_size = params.get('page_size', 100) if params else 100
        
        records = []
        for page in range(1, max_pages + 1):
            np.random.seed(page * 42)
            for i in range(page_size):
                records.append({
                    'id': f"api_{(page-1)*page_size + i}",
                    'value': np.random.uniform(0, 100),
                    'category': np.random.choice(['A', 'B', 'C'])
                })
        
        df = pd.DataFrame(records)
        self.records_extracted = len(df)
        return df


class StreamExtractor(BaseExtractor):
    def extract(self, window_seconds: int = 60, events_per_second: int = 10) -> pd.DataFrame:
        logger.info(f"Consuming stream for {window_seconds}s window")
        self.extraction_timestamp = datetime.now()
        
        total_events = window_seconds * events_per_second
        events = []
        for i in range(total_events):
            events.append({
                'event_id': f"evt_{i:08d}",
                'event_type': np.random.choice(['click', 'view', 'purchase'], p=[0.5, 0.35, 0.15]),
                'user_id': f"user_{np.random.randint(1, 1000)}",
                'value': np.random.exponential(10)
            })
        
        df = pd.DataFrame(events)
        self.records_extracted = len(df)
        return df

print("Extractor classes defined!")

In [None]:
# Demo Batch Extraction
def generate_customer_data() -> pd.DataFrame:
    np.random.seed(42)
    n = 500
    return pd.DataFrame({
        'customer_id': [f'CUST_{i:05d}' for i in range(n)],
        'name': [f'Customer_{i}' for i in range(n)],
        'country': np.random.choice(['US', 'UK', 'DE', 'FR', 'JP'], n),
        'created_at': pd.date_range('2023-01-01', periods=n, freq='2H'),
        'updated_at': [datetime.now() - timedelta(days=np.random.randint(0, 30)) for _ in range(n)],
        'lifetime_value': np.random.exponential(500, n).round(2)
    })

print("=" * 60)
print("BATCH EXTRACTION DEMO")
print("=" * 60)

batch_extractor = BatchExtractor('customer_db', generate_customer_data)
df_full = batch_extractor.extract(mode='full')
print(f"Full extraction: {len(df_full)} records")
print(df_full.head())

In [None]:
# Demo API Extraction
print("\n" + "=" * 60)
print("API EXTRACTION DEMO")
print("=" * 60)

api_extractor = APIExtractor('third_party_api', 'https://api.example.com/data')
df_api = api_extractor.extract(params={'max_pages': 2, 'page_size': 50})
print(f"API extraction: {len(df_api)} records")
print(df_api.head())

In [None]:
# Demo Stream Extraction
print("\n" + "=" * 60)
print("STREAM EXTRACTION DEMO")
print("=" * 60)

stream_extractor = StreamExtractor('event_stream')
df_stream = stream_extractor.extract(window_seconds=10, events_per_second=5)
print(f"Stream extraction: {len(df_stream)} events")
print(df_stream['event_type'].value_counts())

---

## 3. Transform Phase <a id='3-transform'></a>

The Transform phase processes extracted data.

| Category | Operations | Purpose |
|----------|------------|----------|
| **Cleaning** | Remove duplicates, handle nulls | Data quality |
| **Validation** | Type checking, range validation | Data integrity |
| **Enrichment** | Add derived fields | Feature creation |

In [None]:
# Data Transformer
@dataclass
class TransformResult:
    data: pd.DataFrame
    rows_before: int
    rows_after: int
    transformations: List[str] = field(default_factory=list)


class DataTransformer:
    def __init__(self, df: pd.DataFrame):
        self.original_df = df.copy()
        self.df = df.copy()
        self.transformations = []
    
    def remove_duplicates(self, subset: Optional[List[str]] = None) -> 'DataTransformer':
        before = len(self.df)
        self.df = self.df.drop_duplicates(subset=subset)
        removed = before - len(self.df)
        if removed > 0:
            self.transformations.append(f"remove_duplicates: {removed} removed")
        return self
    
    def handle_missing(self, strategy: Dict[str, str]) -> 'DataTransformer':
        for column, strat in strategy.items():
            if column not in self.df.columns:
                continue
            null_count = self.df[column].isnull().sum()
            if null_count == 0:
                continue
            
            if strat == 'drop':
                self.df = self.df.dropna(subset=[column])
            elif strat == 'mean':
                self.df[column] = self.df[column].fillna(self.df[column].mean())
            elif strat == 'median':
                self.df[column] = self.df[column].fillna(self.df[column].median())
            elif strat == 'mode':
                self.df[column] = self.df[column].fillna(self.df[column].mode()[0])
            else:
                self.df[column] = self.df[column].fillna(strat)
            
            self.transformations.append(f"handle_missing({column}): {null_count} -> {strat}")
        return self
    
    def remove_outliers(self, columns: List[str], method: str = 'iqr', factor: float = 1.5) -> 'DataTransformer':
        before = len(self.df)
        for col in columns:
            if col not in self.df.columns or not pd.api.types.is_numeric_dtype(self.df[col]):
                continue
            
            if method == 'iqr':
                Q1, Q3 = self.df[col].quantile([0.25, 0.75])
                IQR = Q3 - Q1
                self.df = self.df[(self.df[col] >= Q1 - factor*IQR) & (self.df[col] <= Q3 + factor*IQR)]
            elif method == 'zscore':
                z = np.abs((self.df[col] - self.df[col].mean()) / self.df[col].std())
                self.df = self.df[z < factor]
        
        removed = before - len(self.df)
        if removed > 0:
            self.transformations.append(f"remove_outliers: {removed} removed")
        return self
    
    def validate_range(self, column: str, min_val: float = None, max_val: float = None) -> 'DataTransformer':
        if column not in self.df.columns:
            return self
        before = len(self.df)
        if min_val is not None:
            self.df = self.df[self.df[column] >= min_val]
        if max_val is not None:
            self.df = self.df[self.df[column] <= max_val]
        removed = before - len(self.df)
        if removed > 0:
            self.transformations.append(f"validate_range({column}): {removed} removed")
        return self
    
    def add_column(self, name: str, func: Callable) -> 'DataTransformer':
        self.df[name] = func(self.df)
        self.transformations.append(f"add_column: {name}")
        return self
    
    def add_date_features(self, date_column: str) -> 'DataTransformer':
        if date_column not in self.df.columns:
            return self
        self.df[date_column] = pd.to_datetime(self.df[date_column])
        self.df[f'{date_column}_year'] = self.df[date_column].dt.year
        self.df[f'{date_column}_month'] = self.df[date_column].dt.month
        self.df[f'{date_column}_dayofweek'] = self.df[date_column].dt.dayofweek
        self.transformations.append(f"add_date_features: {date_column}")
        return self
    
    def standardize_text(self, columns: List[str]) -> 'DataTransformer':
        for col in columns:
            if col in self.df.columns:
                self.df[col] = self.df[col].str.strip().str.lower()
        self.transformations.append(f"standardize_text: {columns}")
        return self
    
    def get_result(self) -> TransformResult:
        return TransformResult(
            data=self.df,
            rows_before=len(self.original_df),
            rows_after=len(self.df),
            transformations=self.transformations
        )

print("DataTransformer class defined!")

In [None]:
# Transform Demo
print("=" * 60)
print("TRANSFORMATION DEMO")
print("=" * 60)

# Create data with issues
np.random.seed(42)
n = 1000
raw_data = pd.DataFrame({
    'customer_id': [f'CUST_{i:05d}' for i in range(n)],
    'name': [f'  Customer {i}  ' if np.random.random() > 0.1 else None for i in range(n)],
    'email': [f'customer{i}@EMAIL.COM' for i in range(n)],
    'age': np.random.normal(40, 15, n),
    'income': np.random.lognormal(10, 1, n),
    'signup_date': pd.date_range('2020-01-01', periods=n, freq='6H'),
    'country': np.random.choice(['US', 'UK', 'DE', 'FR', None], n, p=[0.35, 0.25, 0.2, 0.15, 0.05])
})

# Add duplicates and outliers
raw_data = pd.concat([raw_data, raw_data.iloc[:50]], ignore_index=True)
raw_data.loc[np.random.choice(len(raw_data), 20), 'income'] = np.random.uniform(1e8, 1e9, 20)
raw_data.loc[np.random.choice(len(raw_data), 10), 'age'] = np.random.choice([-5, 150], 10)

print(f"Raw data: {len(raw_data)} rows")
print(f"Missing values: {raw_data.isnull().sum().sum()}")

In [None]:
# Apply transformations
transformer = DataTransformer(raw_data)

result = (
    transformer
    .remove_duplicates(subset=['customer_id'])
    .handle_missing({'name': 'Unknown', 'country': 'mode', 'age': 'median'})
    .validate_range('age', min_val=0, max_val=120)
    .remove_outliers(['income'], method='iqr', factor=3)
    .standardize_text(['email'])
    .add_date_features('signup_date')
    .add_column('income_category', 
        lambda df: pd.cut(df['income'], bins=[0, 30000, 60000, 100000, float('inf')],
                         labels=['Low', 'Medium', 'High', 'Very High']))
    .get_result()
)

print(f"\nRows before: {result.rows_before}")
print(f"Rows after: {result.rows_after}")
print(f"\nTransformations applied:")
for t in result.transformations:
    print(f"  - {t}")

print("\nTransformed data sample:")
print(result.data.head())

In [None]:
# Visualize transformation effects
fig, axes = plt.subplots(2, 2, figsize=(12, 10))

ax1 = axes[0, 0]
raw_data['income'].hist(bins=50, ax=ax1, alpha=0.5, label='Before', color='red')
result.data['income'].hist(bins=50, ax=ax1, alpha=0.5, label='After', color='green')
ax1.set_title('Income: Before vs After Outlier Removal')
ax1.legend()

ax2 = axes[0, 1]
result.data['age'].hist(bins=30, ax=ax2, color='steelblue')
ax2.set_title('Age After Validation')

ax3 = axes[1, 0]
result.data['signup_date_dayofweek'].value_counts().sort_index().plot(kind='bar', ax=ax3, color='coral')
ax3.set_title('Signups by Day of Week')
ax3.set_xlabel('Day (0=Mon)')

ax4 = axes[1, 1]
result.data['income_category'].value_counts().plot(kind='pie', ax=ax4, autopct='%1.1f%%')
ax4.set_title('Income Categories')

plt.tight_layout()
plt.show()

---

## 4. Load Phase <a id='4-load'></a>

The Load phase stores transformed data in target systems.

| Strategy | Description | Use Case |
|----------|-------------|----------|
| **Full** | Replace entire table | Initial load |
| **Incremental** | Add new records | Large tables |
| **Upsert** | Insert or update | Dimension tables |

In [None]:
# Data Loader
class DataLoader:
    def __init__(self, target_name: str):
        self.target_name = target_name
        self._storage = {}
        self.load_history = []
    
    def full_load(self, table: str, data: pd.DataFrame) -> Dict[str, Any]:
        old_count = len(self._storage.get(table, pd.DataFrame()))
        self._storage[table] = data.copy()
        result = {'operation': 'full_load', 'table': table, 'rows_before': old_count, 'rows_after': len(data)}
        self.load_history.append(result)
        logger.info(f"Full load to {table}: {len(data)} rows")
        return result
    
    def incremental_load(self, table: str, data: pd.DataFrame, key_column: str) -> Dict[str, Any]:
        if table not in self._storage:
            self._storage[table] = pd.DataFrame()
        
        existing = self._storage[table]
        existing_keys = set(existing[key_column]) if len(existing) > 0 else set()
        new_records = data[~data[key_column].isin(existing_keys)]
        self._storage[table] = pd.concat([existing, new_records], ignore_index=True)
        
        result = {'operation': 'incremental', 'table': table, 'rows_new': len(new_records), 'rows_total': len(self._storage[table])}
        self.load_history.append(result)
        logger.info(f"Incremental load to {table}: {len(new_records)} new rows")
        return result
    
    def upsert(self, table: str, data: pd.DataFrame, key_column: str) -> Dict[str, Any]:
        if table not in self._storage:
            self._storage[table] = pd.DataFrame()
        
        existing = self._storage[table]
        
        if len(existing) > 0:
            existing_keys = set(existing[key_column])
            incoming_keys = set(data[key_column])
            update_keys = existing_keys & incoming_keys
            unchanged = existing[~existing[key_column].isin(update_keys)]
            self._storage[table] = pd.concat([unchanged, data], ignore_index=True)
            updates = len(update_keys)
            inserts = len(incoming_keys - existing_keys)
        else:
            self._storage[table] = data.copy()
            updates, inserts = 0, len(data)
        
        result = {'operation': 'upsert', 'table': table, 'updates': updates, 'inserts': inserts, 'rows_total': len(self._storage[table])}
        self.load_history.append(result)
        logger.info(f"Upsert to {table}: {inserts} inserts, {updates} updates")
        return result
    
    def get_table(self, table: str) -> pd.DataFrame:
        return self._storage.get(table, pd.DataFrame())

print("DataLoader class defined!")

In [None]:
# Load Demo
print("=" * 60)
print("DATA LOADING DEMO")
print("=" * 60)

clean_data = result.data.copy()
loader = DataLoader('data_warehouse')

# Full Load
print("\n--- FULL LOAD ---")
full_result = loader.full_load('customers', clean_data)
print(f"Result: {full_result}")

# Incremental Load
print("\n--- INCREMENTAL LOAD ---")
new_customers = pd.DataFrame({
    'customer_id': [f'CUST_NEW_{i:03d}' for i in range(50)],
    'name': [f'New Customer {i}' for i in range(50)],
    'email': [f'newcust{i}@email.com' for i in range(50)],
    'age': np.random.randint(20, 60, 50).astype(float),
    'income': np.random.lognormal(10, 0.5, 50),
    'signup_date': pd.date_range('2024-01-01', periods=50, freq='D'),
    'country': np.random.choice(['US', 'UK'], 50)
})
incr_result = loader.incremental_load('customers', new_customers, 'customer_id')
print(f"Result: {incr_result}")

# Upsert
print("\n--- UPSERT ---")
updates = clean_data.head(20).copy()
updates['income'] = updates['income'] * 1.1
upsert_result = loader.upsert('customers', updates, 'customer_id')
print(f"Result: {upsert_result}")

---

## 5. Complete ETL Pipeline <a id='5-complete-pipeline'></a>

Now let's build a complete ETL pipeline.

In [None]:
# Complete ETL Pipeline
@dataclass
class PipelineConfig:
    name: str
    target_table: str
    load_strategy: str = 'full'
    key_column: Optional[str] = None


class ETLPipeline:
    def __init__(self, config: PipelineConfig):
        self.config = config
        self.metrics = {'status': 'pending', 'errors': []}
    
    def run(self, extractor: BaseExtractor, loader: DataLoader, 
            transform_func: Callable[[pd.DataFrame], pd.DataFrame] = None) -> Dict[str, Any]:
        
        self.metrics['start_time'] = datetime.now()
        self.metrics['status'] = 'running'
        
        logger.info(f"Starting pipeline: {self.config.name}")
        
        try:
            # EXTRACT
            logger.info("[EXTRACT] Starting...")
            raw_data = extractor.extract()
            self.metrics['extract_records'] = len(raw_data)
            
            # TRANSFORM
            logger.info("[TRANSFORM] Starting...")
            if transform_func:
                clean_data = transform_func(raw_data)
            else:
                clean_data = raw_data
            self.metrics['transform_records'] = len(clean_data)
            
            # LOAD
            logger.info("[LOAD] Starting...")
            if self.config.load_strategy == 'full':
                load_result = loader.full_load(self.config.target_table, clean_data)
            elif self.config.load_strategy == 'incremental':
                load_result = loader.incremental_load(self.config.target_table, clean_data, self.config.key_column)
            elif self.config.load_strategy == 'upsert':
                load_result = loader.upsert(self.config.target_table, clean_data, self.config.key_column)
            
            self.metrics['load_records'] = load_result.get('rows_after') or load_result.get('rows_total', 0)
            self.metrics['status'] = 'success'
            
        except Exception as e:
            self.metrics['status'] = 'failed'
            self.metrics['errors'].append(str(e))
            logger.error(f"Pipeline failed: {e}")
        
        self.metrics['end_time'] = datetime.now()
        self.metrics['duration_seconds'] = (self.metrics['end_time'] - self.metrics['start_time']).total_seconds()
        
        logger.info(f"Pipeline {self.metrics['status']}: {self.metrics['duration_seconds']:.2f}s")
        return self.metrics

print("ETLPipeline class defined!")

In [None]:
# Run Complete Pipeline
print("=" * 60)
print("COMPLETE ETL PIPELINE DEMO")
print("=" * 60)

def transform_customers(df: pd.DataFrame) -> pd.DataFrame:
    transformer = DataTransformer(df)
    result = (
        transformer
        .remove_duplicates(subset=['customer_id'])
        .handle_missing({'name': 'Unknown', 'country': 'mode'})
        .remove_outliers(['lifetime_value'], method='iqr', factor=2)
        .add_date_features('created_at')
        .get_result()
    )
    return result.data

# Configure pipeline
config = PipelineConfig(
    name='customer_etl',
    target_table='dim_customers',
    load_strategy='full'
)

# Create components
extractor = BatchExtractor('customer_db', generate_customer_data)
loader = DataLoader('analytics_warehouse')
pipeline = ETLPipeline(config)

# Run pipeline
metrics = pipeline.run(extractor, loader, transform_customers)

print("\n" + "=" * 60)
print("PIPELINE METRICS")
print("=" * 60)
for key, value in metrics.items():
    print(f"  {key}: {value}")

---

## 6. Data Quality and Validation <a id='6-data-quality'></a>

Data quality is crucial for ML systems.

In [None]:
# Data Quality Validator
class DataQualityValidator:
    def __init__(self, df: pd.DataFrame):
        self.df = df
        self.results = []
    
    def check_nulls(self, max_null_pct: float = 0.05) -> 'DataQualityValidator':
        for col in self.df.columns:
            null_pct = self.df[col].isnull().mean()
            passed = null_pct <= max_null_pct
            self.results.append({
                'check': 'null_check',
                'column': col,
                'value': f"{null_pct:.2%}",
                'threshold': f"{max_null_pct:.2%}",
                'passed': passed
            })
        return self
    
    def check_unique(self, columns: List[str]) -> 'DataQualityValidator':
        for col in columns:
            if col not in self.df.columns:
                continue
            is_unique = self.df[col].nunique() == len(self.df)
            self.results.append({
                'check': 'uniqueness',
                'column': col,
                'value': f"{self.df[col].nunique()}/{len(self.df)}",
                'threshold': 'all unique',
                'passed': is_unique
            })
        return self
    
    def check_range(self, column: str, min_val: float, max_val: float) -> 'DataQualityValidator':
        if column not in self.df.columns:
            return self
        in_range = ((self.df[column] >= min_val) & (self.df[column] <= max_val)).all()
        actual_min = self.df[column].min()
        actual_max = self.df[column].max()
        self.results.append({
            'check': 'range',
            'column': column,
            'value': f"[{actual_min:.2f}, {actual_max:.2f}]",
            'threshold': f"[{min_val}, {max_val}]",
            'passed': in_range
        })
        return self
    
    def get_report(self) -> pd.DataFrame:
        return pd.DataFrame(self.results)
    
    def all_passed(self) -> bool:
        return all(r['passed'] for r in self.results)


# Run validation
print("=" * 60)
print("DATA QUALITY VALIDATION")
print("=" * 60)

clean_data = loader.get_table('dim_customers')

validator = DataQualityValidator(clean_data)
report = (
    validator
    .check_nulls(max_null_pct=0.05)
    .check_unique(['customer_id'])
    .check_range('lifetime_value', 0, 100000)
    .get_report()
)

print("\nValidation Report:")
print(report.to_string(index=False))
print(f"\nAll checks passed: {validator.all_passed()}")

---

## 7. Hands-on Exercise <a id='7-exercise'></a>

Build your own ETL pipeline for transaction data.

In [None]:
# Exercise: Build a Transaction ETL Pipeline

def generate_transactions() -> pd.DataFrame:
    np.random.seed(42)
    n = 2000
    
    df = pd.DataFrame({
        'transaction_id': [f'TXN_{i:08d}' for i in range(n)],
        'customer_id': [f'CUST_{np.random.randint(0, 500):05d}' for _ in range(n)],
        'amount': np.random.lognormal(4, 1, n),
        'currency': np.random.choice(['USD', 'EUR', 'GBP', None], n, p=[0.6, 0.2, 0.15, 0.05]),
        'category': np.random.choice(['electronics', 'clothing', 'food', 'services'], n),
        'timestamp': pd.date_range('2024-01-01', periods=n, freq='30min'),
        'status': np.random.choice(['completed', 'pending', 'failed'], n, p=[0.85, 0.10, 0.05])
    })
    
    # Add duplicates
    df = pd.concat([df, df.iloc[:30]], ignore_index=True)
    # Add outliers
    df.loc[np.random.choice(len(df), 10), 'amount'] = np.random.uniform(100000, 1000000, 10)
    
    return df


# YOUR TASK: Complete the transform function
def transform_transactions(df: pd.DataFrame) -> pd.DataFrame:
    transformer = DataTransformer(df)
    result = (
        transformer
        .remove_duplicates(subset=['transaction_id'])
        .handle_missing({'currency': 'USD'})
        .remove_outliers(['amount'], method='iqr', factor=3)
        .add_date_features('timestamp')
        .add_column('amount_usd', lambda x: x['amount'])  # Simplified - no conversion
        .get_result()
    )
    return result.data


# Run the pipeline
print("=" * 60)
print("EXERCISE: TRANSACTION ETL PIPELINE")
print("=" * 60)

txn_config = PipelineConfig(
    name='transaction_etl',
    target_table='fact_transactions',
    load_strategy='full'
)

txn_extractor = BatchExtractor('transactions_db', generate_transactions)
txn_loader = DataLoader('analytics_warehouse')
txn_pipeline = ETLPipeline(txn_config)

txn_metrics = txn_pipeline.run(txn_extractor, txn_loader, transform_transactions)

print("\nPipeline Results:")
for key, value in txn_metrics.items():
    print(f"  {key}: {value}")

# Validate
txn_data = txn_loader.get_table('fact_transactions')
print(f"\nFinal data shape: {txn_data.shape}")
print(txn_data.head())

---

## 8. Summary and Key Takeaways <a id='8-summary'></a>

### Key Concepts

1. **Extract Phase**
   - Batch extraction for periodic loads
   - API extraction for external data
   - Stream extraction for real-time data

2. **Transform Phase**
   - Data cleaning (duplicates, missing values, outliers)
   - Data validation (type checking, range validation)
   - Data enrichment (derived columns, date features)

3. **Load Phase**
   - Full load for complete replacement
   - Incremental load for appending
   - Upsert for insert-or-update

4. **Data Quality**
   - Null checks, uniqueness checks, range validation
   - Automated quality gates

### Best Practices

- Design for idempotency
- Add comprehensive logging
- Implement data quality checks
- Use incremental loads when possible

### Next Steps

In Tutorial 06, we'll learn about Feature Engineering Operations.

In [None]:
print("=" * 60)
print("TUTORIAL 05 COMPLETE: ETL Pipelines")
print("=" * 60)
print("\nKey topics covered:")
print("  1. Extract (batch, API, streaming)")
print("  2. Transform (cleaning, validation, enrichment)")
print("  3. Load (full, incremental, upsert)")
print("  4. Data Quality Validation")
print("  5. Complete ETL Pipeline")
print("\nNext: Tutorial 06 - Feature Engineering Operations")