# Data Engineering Fundamentals Crash Course for Data Science Assessments

**Last Updated:** 25 January 2026

This notebook covers data engineering concepts that data scientists should understand. We focus on ETL pipelines, data quality, orchestration, and the fundamentals of building reliable data systems.

## Table of Contents

1. [Introduction and Setup](#1-introduction-and-setup)
2. [ETL vs ELT](#2-etl-vs-elt)
3. [Data Pipeline Concepts](#3-data-pipeline-concepts)
4. [Batch vs Streaming Processing](#4-batch-vs-streaming-processing)
5. [Data Quality and Validation](#5-data-quality-and-validation)
6. [Data Formats and Storage](#6-data-formats-and-storage)
7. [Orchestration Concepts](#7-orchestration-concepts)
8. [Data Modelling for Analytics](#8-data-modelling-for-analytics)
9. [Error Handling and Monitoring](#9-error-handling-and-monitoring)
10. [Scalability Concepts](#10-scalability-concepts)
11. [Practice Questions](#11-practice-questions)
12. [Summary](#12-summary)

---

## 1. Introduction and Setup

**Data engineering** builds the infrastructure and pipelines that enable data science. Understanding these concepts helps data scientists:

- Work effectively with data engineers
- Build reliable data pipelines
- Debug data quality issues
- Understand data system trade-offs

In [None]:
import pandas as pd
import numpy as np
import json
import logging
from datetime import datetime, timedelta
from typing import List, Dict, Any, Callable, Optional, Tuple
from dataclasses import dataclass
import hashlib
import time

import warnings
warnings.filterwarnings('ignore')

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

np.random.seed(42)

print("All imports successful!")

---

## 2. ETL vs ELT

### ETL (Extract, Transform, Load)

**Traditional approach**: Transform data before loading into target.

```
Source -> [Extract] -> [Transform] -> [Load] -> Data Warehouse
```

### ELT (Extract, Load, Transform)

**Modern approach**: Load raw data first, transform in the warehouse.

```
Source -> [Extract] -> [Load] -> Data Lake -> [Transform] -> Data Warehouse
```

### Comparison

| Aspect | ETL | ELT |
|--------|-----|-----|
| Transform location | Staging area | Target warehouse |
| Raw data preserved | No | Yes |
| Compute power | ETL tool | Warehouse (e.g., BigQuery) |
| Schema | Defined upfront | Schema-on-read |
| Best for | Structured, known requirements | Flexible, exploratory |

In [None]:
class ETLPipeline:
    """Simple ETL pipeline demonstration."""
    
    def extract(self, source: Dict[str, Any]) -> pd.DataFrame:
        """Extract data from source.
        
        Args:
            source: Source configuration.
        
        Returns:
            Raw data as DataFrame.
        """
        logger.info(f"Extracting from {source['type']}")
        return pd.DataFrame(source['data'])
    
    def transform(self, df: pd.DataFrame, transformations: List[Callable]) -> pd.DataFrame:
        """Apply transformations to data.
        
        Args:
            df: Input DataFrame.
            transformations: List of transformation functions.
        
        Returns:
            Transformed DataFrame.
        """
        result = df.copy()
        for transform_func in transformations:
            logger.info(f"Applying transformation: {transform_func.__name__}")
            result = transform_func(result)
        return result
    
    def load(self, df: pd.DataFrame, target: Dict[str, Any]) -> bool:
        """Load data to target.
        
        Args:
            df: Data to load.
            target: Target configuration.
        
        Returns:
            Success status.
        """
        logger.info(f"Loading {len(df)} rows to {target['type']}")
        return True
    
    def run(self, source: Dict, transformations: List[Callable], target: Dict) -> pd.DataFrame:
        """Run the complete ETL pipeline.
        
        Args:
            source: Source configuration.
            transformations: List of transformations.
            target: Target configuration.
        
        Returns:
            Transformed data.
        """
        raw_data = self.extract(source)
        transformed_data = self.transform(raw_data, transformations)
        self.load(transformed_data, target)
        return transformed_data

In [None]:
def clean_names(df: pd.DataFrame) -> pd.DataFrame:
    """Standardise column names."""
    df.columns = df.columns.str.lower().str.replace(' ', '_')
    return df


def add_timestamp(df: pd.DataFrame) -> pd.DataFrame:
    """Add processing timestamp."""
    df['processed_at'] = datetime.now()
    return df


source = {
    'type': 'csv',
    'data': {
        'Customer Name': ['Alice', 'Bob', 'Charlie'],
        'Order Total': [100.50, 200.75, 150.25]
    }
}

target = {'type': 'database', 'table': 'orders'}

pipeline = ETLPipeline()
result = pipeline.run(source, [clean_names, add_timestamp], target)
print("\nTransformed data:")
print(result)

---

## 3. Data Pipeline Concepts

### Pipeline Characteristics

| Property | Description | Example |
|----------|-------------|--------|
| Idempotent | Same input always produces same output | Re-running doesn't create duplicates |
| Reproducible | Can recreate any historical state | Versioned code + data |
| Atomic | All or nothing execution | Transaction commits |
| Incremental | Process only new/changed data | WHERE date > last_run |

In [None]:
@dataclass
class PipelineConfig:
    """Configuration for a data pipeline."""
    name: str
    source: str
    target: str
    schedule: str
    incremental: bool = True
    watermark_column: Optional[str] = None


class IncrementalPipeline:
    """Pipeline that processes only new data."""
    
    def __init__(self, config: PipelineConfig):
        """Initialise pipeline with configuration.
        
        Args:
            config: Pipeline configuration.
        """
        self.config = config
        self.last_watermark: Optional[datetime] = None
    
    def get_watermark(self) -> Optional[datetime]:
        """Get the last processed timestamp.
        
        Returns:
            Last watermark or None if first run.
        """
        return self.last_watermark
    
    def set_watermark(self, value: datetime) -> None:
        """Update the watermark after successful processing.
        
        Args:
            value: New watermark value.
        """
        self.last_watermark = value
        logger.info(f"Watermark updated to {value}")
    
    def extract_incremental(self, df: pd.DataFrame) -> pd.DataFrame:
        """Extract only records after watermark.
        
        Args:
            df: Full source data.
        
        Returns:
            Filtered DataFrame with only new records.
        """
        if self.last_watermark is None:
            logger.info("First run - extracting all data")
            return df
        
        col = self.config.watermark_column
        filtered = df[df[col] > self.last_watermark]  # type: ignore[index]
        logger.info(f"Incremental: {len(filtered)} new records (after {self.last_watermark})")
        return filtered  # type: ignore[return-value]

In [None]:
config = PipelineConfig(
    name='daily_orders',
    source='orders_db',
    target='warehouse',
    schedule='0 2 * * *',
    incremental=True,
    watermark_column='created_at'
)

pipeline = IncrementalPipeline(config)

data = pd.DataFrame({
    'order_id': [1, 2, 3, 4, 5],
    'created_at': pd.to_datetime([
        '2024-01-01', '2024-01-02', '2024-01-03', '2024-01-04', '2024-01-05'
    ])
})

print("Run 1 (full load):")
batch1 = pipeline.extract_incremental(data)
print(f"  Extracted {len(batch1)} records")
pipeline.set_watermark(batch1['created_at'].max())  # type: ignore[arg-type]

data_new = pd.concat([data, pd.DataFrame({
    'order_id': [6, 7],
    'created_at': pd.to_datetime(['2024-01-06', '2024-01-07'])
})])

print("\nRun 2 (incremental):")
batch2 = pipeline.extract_incremental(data_new)
print(f"  Extracted {len(batch2)} records")
print(batch2)

---

## 4. Batch vs Streaming Processing

| Aspect | Batch | Streaming |
|--------|-------|----------|
| Processing | Scheduled intervals | Continuous |
| Latency | Hours/days | Seconds/minutes |
| Complexity | Lower | Higher |
| Use case | Reports, ML training | Real-time dashboards, alerts |
| Tools | Spark, Airflow | Kafka, Flink, Spark Streaming |

In [None]:
class BatchProcessor:
    """Process data in batches."""
    
    def process(self, data: pd.DataFrame) -> pd.DataFrame:
        """Process entire batch at once.
        
        Args:
            data: Full batch of data.
        
        Returns:
            Processed results.
        """
        logger.info(f"Processing batch of {len(data)} records")
        result = data.groupby('category').agg({
            'amount': ['sum', 'mean', 'count']
        })
        return result


class StreamProcessor:
    """Process data record by record (simplified streaming simulation)."""
    
    def __init__(self):
        self.state = {}
    
    def process_record(self, record: Dict[str, Any]) -> Dict[str, Any]:
        """Process single record and update state.
        
        Args:
            record: Single data record.
        
        Returns:
            Current aggregated state.
        """
        category = record['category']
        amount = record['amount']
        
        if category not in self.state:
            self.state[category] = {'sum': 0, 'count': 0}
        
        self.state[category]['sum'] += amount
        self.state[category]['count'] += 1
        self.state[category]['avg'] = (
            self.state[category]['sum'] / self.state[category]['count']
        )
        
        return self.state[category]

In [None]:
data = pd.DataFrame({
    'category': ['A', 'B', 'A', 'B', 'A'],
    'amount': [100, 200, 150, 250, 120]
})

print("Batch Processing:")
batch = BatchProcessor()
print(batch.process(data))

print("\nStream Processing (record by record):")
stream = StreamProcessor()
for _, row in data.iterrows():
    result = stream.process_record(row.to_dict())
    print(f"  After {row['category']}:{row['amount']} -> {result}")

### Lambda vs Kappa Architecture

| Architecture | Description | Pros | Cons |
|--------------|-------------|------|------|
| Lambda | Batch + streaming layers | Accurate batch, fast streaming | Maintain two codebases |
| Kappa | Streaming only | Single codebase | Reprocessing harder |

---

## 5. Data Quality and Validation

### Data Quality Dimensions

| Dimension | Description | Example Check |
|-----------|-------------|---------------|
| Completeness | No missing values | `df.isnull().sum()` |
| Accuracy | Values are correct | Range checks, lookup validation |
| Consistency | Same format across records | Date formats, units |
| Timeliness | Data is up-to-date | Max date within expected range |
| Uniqueness | No duplicates | `df.duplicated().sum()` |

In [None]:
@dataclass
class ValidationResult:
    """Result of a data validation check."""
    check_name: str
    passed: bool
    message: str
    details: Optional[Dict[str, Any]] = None


class DataValidator:
    """Validate data quality."""
    
    def __init__(self):
        self.results: List[ValidationResult] = []
    
    def check_not_null(self, df: pd.DataFrame, columns: List[str]) -> ValidationResult:
        """Check that specified columns have no null values.
        
        Args:
            df: DataFrame to validate.
            columns: Columns to check.
        
        Returns:
            Validation result.
        """
        null_counts = df[columns].isnull().sum()
        has_nulls = null_counts.sum() > 0
        
        result = ValidationResult(
            check_name='not_null',
            passed=not has_nulls,
            message='No null values found' if not has_nulls else f'Found nulls: {null_counts.to_dict()}',
            details={'null_counts': null_counts.to_dict()}
        )
        self.results.append(result)
        return result
    
    def check_unique(self, df: pd.DataFrame, columns: List[str]) -> ValidationResult:
        """Check that specified columns have unique values.
        
        Args:
            df: DataFrame to validate.
            columns: Columns to check for uniqueness.
        
        Returns:
            Validation result.
        """
        duplicates = df.duplicated(subset=columns, keep=False).sum()
        
        result = ValidationResult(
            check_name='unique',
            passed=duplicates == 0,
            message=f'No duplicates' if duplicates == 0 else f'{duplicates} duplicate rows',
            details={'duplicate_count': duplicates}
        )
        self.results.append(result)
        return result
    
    def check_range(self, df: pd.DataFrame, column: str, min_val: float, max_val: float) -> ValidationResult:
        """Check that values are within expected range.
        
        Args:
            df: DataFrame to validate.
            column: Column to check.
            min_val: Minimum allowed value.
            max_val: Maximum allowed value.
        
        Returns:
            Validation result.
        """
        out_of_range = ((df[column] < min_val) | (df[column] > max_val)).sum()
        
        result = ValidationResult(
            check_name='range',
            passed=out_of_range == 0,
            message=f'All values in range [{min_val}, {max_val}]' if out_of_range == 0 else f'{out_of_range} values out of range',
            details={'out_of_range_count': out_of_range, 'actual_min': df[column].min(), 'actual_max': df[column].max()}
        )
        self.results.append(result)
        return result
    
    def check_row_count(self, df: pd.DataFrame, min_rows: int) -> ValidationResult:
        """Check that DataFrame has minimum number of rows.
        
        Args:
            df: DataFrame to validate.
            min_rows: Minimum expected rows.
        
        Returns:
            Validation result.
        """
        row_count = len(df)
        
        result = ValidationResult(
            check_name='row_count',
            passed=row_count >= min_rows,
            message=f'Row count {row_count} >= {min_rows}' if row_count >= min_rows else f'Only {row_count} rows (expected >= {min_rows})',
            details={'actual_rows': row_count, 'expected_min': min_rows}
        )
        self.results.append(result)
        return result
    
    def summary(self) -> pd.DataFrame:
        """Get summary of all validation results.
        
        Returns:
            DataFrame with validation summary.
        """
        return pd.DataFrame([
            {'check': r.check_name, 'passed': r.passed, 'message': r.message}
            for r in self.results
        ])

In [None]:
data = pd.DataFrame({
    'id': [1, 2, 3, 4, 4],
    'name': ['Alice', 'Bob', None, 'David', 'Eve'],
    'age': [25, 30, 35, 150, 28]
})

validator = DataValidator()

validator.check_not_null(data, ['id', 'name'])
validator.check_unique(data, ['id'])
validator.check_range(data, 'age', 0, 120)
validator.check_row_count(data, 5)

print("Validation Results:")
print(validator.summary())

---

## 6. Data Formats and Storage

### Common Data Formats

| Format | Type | Best For | Pros | Cons |
|--------|------|----------|------|------|
| CSV | Row-based | Small data, compatibility | Human readable | No types, slow |
| JSON | Document | APIs, nested data | Flexible schema | Verbose |
| Parquet | Columnar | Analytics | Fast queries, compression | Not human readable |
| Avro | Row-based | Streaming | Schema evolution | Less common |

In [None]:
print("Row-based vs Columnar Storage:")
print()
print("ROW-BASED (CSV, JSON, Avro):")
print("  Storage: [id:1, name:Alice, age:25], [id:2, name:Bob, age:30], ...")
print("  Good for: Inserting rows, reading full records")
print("  Bad for: Aggregating single column (must scan all data)")
print()
print("COLUMNAR (Parquet, ORC):")
print("  Storage: [id: 1,2,3,...], [name: Alice,Bob,...], [age: 25,30,...]")
print("  Good for: Analytics (SUM, AVG on one column)")
print("  Bad for: Full record retrieval, frequent inserts")

In [None]:
def estimate_storage_savings(df: pd.DataFrame, column: str) -> Dict[str, Any]:
    """Estimate storage savings from columnar compression.
    
    Args:
        df: DataFrame.
        column: Column to analyse.
    
    Returns:
        Dictionary with compression estimates.
    """
    unique_values = df[column].nunique()
    total_values = len(df)
    
    cardinality_ratio = unique_values / total_values
    
    if cardinality_ratio < 0.1:
        compression = 'Excellent (dictionary encoding effective)'
    elif cardinality_ratio < 0.5:
        compression = 'Good'
    else:
        compression = 'Limited (high cardinality)'
    
    return {
        'column': column,
        'unique_values': unique_values,
        'total_values': total_values,
        'cardinality_ratio': cardinality_ratio,
        'compression_potential': compression
    }


data = pd.DataFrame({
    'user_id': range(1000),
    'country': np.random.choice(['US', 'UK', 'DE', 'FR'], 1000),
    'timestamp': pd.date_range('2024-01-01', periods=1000, freq='h')
})

print("Compression Potential by Column:")
for col in ['user_id', 'country']:
    result = estimate_storage_savings(data, col)
    print(f"  {col}: {result['compression_potential']} (cardinality: {result['cardinality_ratio']:.2%})")

---

## 7. Orchestration Concepts

**Orchestration** schedules and manages pipeline execution.

### Key Concepts

| Concept | Description | Example |
|---------|-------------|--------|
| DAG | Directed Acyclic Graph of tasks | Task B depends on Task A |
| Schedule | When to run | Cron: `0 2 * * *` (2 AM daily) |
| Dependency | Task ordering | Extract -> Transform -> Load |
| Backfill | Process historical data | Rerun for past dates |
| Idempotency | Safe to re-run | Same result on retry |

In [None]:
@dataclass
class Task:
    """A task in a DAG."""
    name: str
    func: Callable
    dependencies: List[str]
    retries: int = 3


class SimpleDAG:
    """Simple DAG executor (like a minimal Airflow)."""
    
    def __init__(self, name: str):
        """Initialise DAG.
        
        Args:
            name: DAG name.
        """
        self.name = name
        self.tasks: Dict[str, Task] = {}
        self.results: Dict[str, Any] = {}
    
    def add_task(self, task: Task) -> None:
        """Add a task to the DAG.
        
        Args:
            task: Task to add.
        """
        self.tasks[task.name] = task
    
    def _get_execution_order(self) -> List[str]:
        """Topological sort to get execution order.
        
        Returns:
            List of task names in execution order.
        """
        in_degree = {name: 0 for name in self.tasks}
        for task in self.tasks.values():
            for dep in task.dependencies:
                if dep in in_degree:
                    in_degree[task.name] += 1
        
        queue = [name for name, degree in in_degree.items() if degree == 0]
        result = []
        
        while queue:
            current = queue.pop(0)
            result.append(current)
            
            for name, task in self.tasks.items():
                if current in task.dependencies:
                    in_degree[name] -= 1
                    if in_degree[name] == 0:
                        queue.append(name)
        
        return result
    
    def run(self) -> Dict[str, Any]:
        """Execute the DAG.
        
        Returns:
            Dictionary of task results.
        """
        order = self._get_execution_order()
        logger.info(f"Execution order: {order}")
        
        for task_name in order:
            task = self.tasks[task_name]
            logger.info(f"Running task: {task_name}")
            
            dep_results = {dep: self.results.get(dep) for dep in task.dependencies}
            
            for attempt in range(task.retries):
                try:
                    self.results[task_name] = task.func(dep_results)
                    logger.info(f"Task {task_name} completed")
                    break
                except Exception as e:
                    logger.warning(f"Task {task_name} failed (attempt {attempt + 1}): {e}")
                    if attempt == task.retries - 1:
                        raise
        
        return self.results

In [None]:
def extract_task(deps):
    return {'data': [1, 2, 3, 4, 5]}


def transform_task(deps):
    data = deps['extract']['data']
    return {'data': [x * 2 for x in data]}


def load_task(deps):
    data = deps['transform']['data']
    return {'rows_loaded': len(data)}


dag = SimpleDAG('etl_pipeline')
dag.add_task(Task('extract', extract_task, []))
dag.add_task(Task('transform', transform_task, ['extract']))
dag.add_task(Task('load', load_task, ['transform']))

results = dag.run()
print("\nFinal Results:")
for task, result in results.items():
    print(f"  {task}: {result}")

### Cron Schedule Syntax

```
* * * * *
| | | | |
| | | | +-- Day of week (0-7, Sunday=0 or 7)
| | | +---- Month (1-12)
| | +------ Day of month (1-31)
| +-------- Hour (0-23)
+---------- Minute (0-59)
```

| Schedule | Cron Expression |
|----------|----------------|
| Every hour | `0 * * * *` |
| Daily at 2 AM | `0 2 * * *` |
| Monday at 9 AM | `0 9 * * 1` |
| First of month | `0 0 1 * *` |

---

## 8. Data Modelling for Analytics

### Slowly Changing Dimensions (SCD)

How to handle changes to dimension data over time.

| Type | Strategy | Example |
|------|----------|--------|
| SCD 0 | Never update | Static reference data |
| SCD 1 | Overwrite | Only current value matters |
| SCD 2 | Add new row | Full history with date ranges |
| SCD 3 | Add column | Current + previous value |

In [None]:
print("SCD Type 2 Example (Customer Address Changes):")
print()

scd2_example = pd.DataFrame({
    'customer_key': [1, 2, 3],
    'customer_id': [100, 100, 101],
    'name': ['Alice', 'Alice', 'Bob'],
    'city': ['London', 'Manchester', 'Leeds'],
    'valid_from': ['2020-01-01', '2023-06-15', '2021-03-01'],
    'valid_to': ['2023-06-14', '9999-12-31', '9999-12-31'],
    'is_current': [False, True, True]
})

print(scd2_example)
print()
print("Note: customer_id 100 (Alice) has two rows - old address and current address")
print("      Use 'is_current = True' for current state, or date range for point-in-time")

In [None]:
class SCD2Handler:
    """Handle SCD Type 2 updates."""
    
    def __init__(self, key_column: str, tracked_columns: List[str]):
        """Initialise SCD2 handler.
        
        Args:
            key_column: Natural key column.
            tracked_columns: Columns to track for changes.
        """
        self.key_column = key_column
        self.tracked_columns = tracked_columns
    
    def detect_changes(
        self,
        current: pd.DataFrame,
        incoming: pd.DataFrame
    ) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
        """Detect inserts, updates, and unchanged records.
        
        Args:
            current: Current dimension table.
            incoming: New data.
        
        Returns:
            Tuple of (new records, changed records, unchanged records).
        """
        current_keys = set(current[self.key_column])
        incoming_keys = set(incoming[self.key_column])
        
        new_keys = incoming_keys - current_keys
        new_records = incoming[incoming[self.key_column].isin(list(new_keys))]  # type: ignore[arg-type]
        
        existing_keys = incoming_keys & current_keys
        
        changed_records = []
        unchanged_records = []
        
        for key in existing_keys:
            current_row = current[current[self.key_column] == key][self.tracked_columns].iloc[0]  # type: ignore[union-attr]
            incoming_row = incoming[incoming[self.key_column] == key][self.tracked_columns].iloc[0]  # type: ignore[union-attr]
            
            if not current_row.equals(incoming_row):
                changed_records.append(incoming[incoming[self.key_column] == key])
            else:
                unchanged_records.append(incoming[incoming[self.key_column] == key])
        
        changed_df = pd.concat(changed_records) if changed_records else pd.DataFrame()
        unchanged_df = pd.concat(unchanged_records) if unchanged_records else pd.DataFrame()
        
        return new_records, changed_df, unchanged_df  # type: ignore[return-value]

In [None]:
current_dim = pd.DataFrame({
    'customer_id': [1, 2, 3],
    'name': ['Alice', 'Bob', 'Charlie'],
    'city': ['London', 'Manchester', 'Leeds']
})

incoming_data = pd.DataFrame({
    'customer_id': [1, 2, 4],
    'name': ['Alice', 'Bob', 'David'],
    'city': ['London', 'Liverpool', 'Bristol']
})

handler = SCD2Handler(key_column='customer_id', tracked_columns=['name', 'city'])
new, changed, unchanged = handler.detect_changes(current_dim, incoming_data)

print("Change Detection Results:")
print(f"\nNew records ({len(new)}):")
print(new if len(new) > 0 else "  (none)")
print(f"\nChanged records ({len(changed)}):")
print(changed if len(changed) > 0 else "  (none)")
print(f"\nUnchanged records ({len(unchanged)}):")
print(unchanged if len(unchanged) > 0 else "  (none)")

---

## 9. Error Handling and Monitoring

### Pipeline Failure Modes

| Failure Type | Cause | Handling |
|--------------|-------|----------|
| Source unavailable | Network, permissions | Retry with backoff |
| Data quality | Invalid data | Quarantine bad records |
| Transform error | Code bug, edge case | Log, alert, fail fast |
| Target full | Disk space, limits | Alert, pause pipeline |

In [None]:
def retry_with_backoff(
    func: Callable,
    max_retries: int = 3,
    base_delay: float = 1.0,
    exponential: bool = True
) -> Any:
    """Retry a function with exponential backoff.
    
    Args:
        func: Function to retry.
        max_retries: Maximum number of retries.
        base_delay: Initial delay in seconds.
        exponential: Use exponential backoff.
    
    Returns:
        Function result.
    
    Raises:
        Exception: If all retries fail.
    """
    for attempt in range(max_retries):
        try:
            return func()
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            
            delay = base_delay * (2 ** attempt) if exponential else base_delay
            logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay}s...")
            time.sleep(delay)


class DeadLetterQueue:
    """Store failed records for later processing."""
    
    def __init__(self):
        self.failed_records: List[Dict[str, Any]] = []
    
    def add(self, record: Dict[str, Any], error: str) -> None:
        """Add a failed record.
        
        Args:
            record: The failed record.
            error: Error message.
        """
        self.failed_records.append({
            'record': record,
            'error': error,
            'timestamp': datetime.now().isoformat()
        })
    
    def get_all(self) -> List[Dict[str, Any]]:
        """Get all failed records.
        
        Returns:
            List of failed records with metadata.
        """
        return self.failed_records

In [None]:
def validate_and_process(record: Dict, dlq: DeadLetterQueue) -> Optional[Dict]:
    """Validate and process a record, sending failures to DLQ.
    
    Args:
        record: Record to process.
        dlq: Dead letter queue for failures.
    
    Returns:
        Processed record or None if failed.
    """
    try:
        if record.get('amount', 0) < 0:
            raise ValueError("Negative amount not allowed")
        
        record['processed'] = True
        return record
        
    except Exception as e:
        dlq.add(record, str(e))
        return None


records = [
    {'id': 1, 'amount': 100},
    {'id': 2, 'amount': -50},
    {'id': 3, 'amount': 200},
]

dlq = DeadLetterQueue()
processed = []

for record in records:
    result = validate_and_process(record, dlq)
    if result:
        processed.append(result)

print(f"Successfully processed: {len(processed)} records")
print(f"Failed (in DLQ): {len(dlq.get_all())} records")
print(f"\nDLQ contents:")
for item in dlq.get_all():
    print(f"  Record {item['record']['id']}: {item['error']}")

---

## 10. Scalability Concepts

### Scaling Strategies

| Strategy | Description | Example |
|----------|-------------|--------|
| Vertical | Bigger machine | More RAM, CPUs |
| Horizontal | More machines | Distribute across nodes |
| Partitioning | Split data | By date, region |
| Caching | Store computed results | Redis, Memcached |

In [None]:
def partition_by_date(df: pd.DataFrame, date_column: str) -> Dict[str, pd.DataFrame]:
    """Partition DataFrame by date.
    
    Args:
        df: Input DataFrame.
        date_column: Column containing dates.
    
    Returns:
        Dictionary mapping date strings to DataFrames.
    """
    df[date_column] = pd.to_datetime(df[date_column])
    df['partition_key'] = df[date_column].dt.strftime('%Y-%m-%d')
    
    partitions = {}
    for key, group in df.groupby('partition_key'):
        partitions[key] = group.drop(columns=['partition_key'])
    
    return partitions

In [None]:
data = pd.DataFrame({
    'event_id': range(10),
    'event_date': pd.date_range('2024-01-01', periods=10, freq='12h'),
    'value': np.random.randint(1, 100, 10)
})

partitions = partition_by_date(data, 'event_date')

print("Partitioned Data:")
for partition_key, partition_data in partitions.items():
    print(f"\nPartition: {partition_key} ({len(partition_data)} rows)")
    print(partition_data.head(2))

In [None]:
class SimpleCache:
    """Simple in-memory cache with TTL."""
    
    def __init__(self, ttl_seconds: int = 300):
        """Initialise cache.
        
        Args:
            ttl_seconds: Time to live in seconds.
        """
        self.cache: Dict[str, Tuple[Any, datetime]] = {}
        self.ttl = timedelta(seconds=ttl_seconds)
    
    def _make_key(self, *args, **kwargs) -> str:
        """Create cache key from arguments."""
        key_data = json.dumps({'args': args, 'kwargs': kwargs}, sort_keys=True)
        return hashlib.md5(key_data.encode()).hexdigest()
    
    def get(self, key: str) -> Optional[Any]:
        """Get value from cache.
        
        Args:
            key: Cache key.
        
        Returns:
            Cached value or None if not found/expired.
        """
        if key in self.cache:
            value, timestamp = self.cache[key]
            if datetime.now() - timestamp < self.ttl:
                return value
            del self.cache[key]
        return None
    
    def set(self, key: str, value: Any) -> None:
        """Set value in cache.
        
        Args:
            key: Cache key.
            value: Value to cache.
        """
        self.cache[key] = (value, datetime.now())
    
    def cached(self, func: Callable) -> Callable:
        """Decorator to cache function results.
        
        Args:
            func: Function to cache.
        
        Returns:
            Wrapped function.
        """
        def wrapper(*args, **kwargs):
            key = self._make_key(*args, **kwargs)
            cached_value = self.get(key)
            if cached_value is not None:
                logger.info(f"Cache hit for {func.__name__}")
                return cached_value
            
            logger.info(f"Cache miss for {func.__name__}")
            result = func(*args, **kwargs)
            self.set(key, result)
            return result
        return wrapper

In [None]:
cache = SimpleCache(ttl_seconds=60)


@cache.cached
def expensive_computation(x: int) -> int:
    """Simulate expensive computation."""
    time.sleep(0.1)
    return x * x


print("First call (cache miss):")
result1 = expensive_computation(5)
print(f"Result: {result1}")

print("\nSecond call (cache hit):")
result2 = expensive_computation(5)
print(f"Result: {result2}")

## 12. Practice Questions

Test your understanding with these interview-style questions. Try to solve each question in the empty code cell before revealing the answer.

### Question 1: ETL vs ELT Decision

When would you choose ETL over ELT, and vice versa?

In [None]:
# Write your solution here

<details>
<summary>Click to reveal answer</summary>

**Choose ETL when:**
- Data needs cleaning before storage (compliance, PII removal)
- Target system has limited compute (traditional data warehouse)
- Schema must be strictly defined upfront
- Data volume is manageable on ETL server

**Choose ELT when:**
- Using cloud data warehouse (BigQuery, Snowflake, Redshift)
- Want to preserve raw data for future analysis
- Schema may evolve (schema-on-read)
- Transformations are complex and benefit from warehouse compute
- Need flexibility to re-transform historical data

</details>


---

### Question 2: Incremental Load Strategy

Design an incremental load strategy for a table with `updated_at` timestamp.

In [None]:
# Write your solution here

<details>
<summary>Click to reveal answer</summary>

```python
def incremental_load(source_table: str, target_table: str, conn):
    """Incremental load using watermark."""
    
    # 1. Get current watermark (max updated_at in target)
    watermark_query = f"SELECT MAX(updated_at) FROM {target_table}"
    watermark = conn.execute(watermark_query).fetchone()[0]
    
    # 2. Extract only new/updated records
    if watermark:
        extract_query = f"""
            SELECT * FROM {source_table}
            WHERE updated_at > '{watermark}'
        """
    else:
        # First run - full load
        extract_query = f"SELECT * FROM {source_table}"
    
    new_data = pd.read_sql(extract_query, conn)
    
    # 3. Upsert to target (MERGE or INSERT ON CONFLICT)
    # Handle both inserts and updates
    for _, row in new_data.iterrows():
        upsert_query = f"""
            INSERT INTO {target_table} VALUES (...)
            ON CONFLICT (id) DO UPDATE SET ...
        """
        conn.execute(upsert_query)
    
    return len(new_data)
```

</details>


---

### Question 3: Data Quality Checks

What data quality checks would you implement for a daily sales pipeline?

In [None]:
# Write your solution here

<details>
<summary>Click to reveal answer</summary>

```python
def validate_sales_data(df: pd.DataFrame) -> List[str]:
    """Validate daily sales data."""
    errors = []
    
    # 1. Completeness
    required_cols = ['order_id', 'customer_id', 'amount', 'order_date']
    for col in required_cols:
        null_count = df[col].isnull().sum()
        if null_count > 0:
            errors.append(f"{col} has {null_count} null values")
    
    # 2. Uniqueness
    dup_orders = df['order_id'].duplicated().sum()
    if dup_orders > 0:
        errors.append(f"{dup_orders} duplicate order_ids")
    
    # 3. Accuracy (range checks)
    negative_amounts = (df['amount'] < 0).sum()
    if negative_amounts > 0:
        errors.append(f"{negative_amounts} negative amounts")
    
    # 4. Timeliness
    max_date = df['order_date'].max()
    expected_date = datetime.now().date()
    if max_date < expected_date - timedelta(days=1):
        errors.append(f"Data is stale (max date: {max_date})")
    
    # 5. Volume check
    if len(df) < 100:  # Expected minimum
        errors.append(f"Low volume: only {len(df)} records")
    
    return errors
```

</details>


---

### Question 4: Batch vs Streaming

For each use case, would you use batch or streaming processing?

a) Daily sales report
b) Fraud detection for credit cards
c) ML model training
d) Real-time recommendation engine

In [None]:
# Write your solution here

<details>
<summary>Click to reveal answer</summary>

**a) Daily sales report: BATCH**
- No real-time requirement
- Aggregates historical data
- Simpler, cheaper

**b) Fraud detection: STREAMING**
- Must detect fraud in real-time to block transactions
- Latency of minutes is too slow

**c) ML model training: BATCH**
- Needs historical data
- Training is computationally intensive
- Doesn't need to be real-time

**d) Real-time recommendations: STREAMING**
- Must respond to user actions immediately
- "Users who viewed this also viewed..."

</details>


---

### Question 5: SCD Type 2 Implementation

Write SQL to update an SCD Type 2 dimension when a customer changes address.

In [None]:
# Write your solution here

<details>
<summary>Click to reveal answer</summary>

```sql
-- Step 1: Close the current record
UPDATE dim_customer
SET 
    valid_to = CURRENT_DATE - INTERVAL '1 day',
    is_current = FALSE
WHERE 
    customer_id = 123
    AND is_current = TRUE;

-- Step 2: Insert new record
INSERT INTO dim_customer (
    customer_id,
    name,
    address,
    city,
    valid_from,
    valid_to,
    is_current
)
VALUES (
    123,
    'Alice Smith',
    '456 New Street',
    'Manchester',
    CURRENT_DATE,
    '9999-12-31',
    TRUE
);
```

</details>


---

### Question 6: Pipeline Idempotency

How would you make a pipeline idempotent (safe to re-run)?

In [None]:
# Write your solution here

<details>
<summary>Click to reveal answer</summary>

**Strategies for idempotency:**

1. **Delete and replace:** Delete existing data for the time period, then insert
```sql
DELETE FROM fact_sales WHERE date = '2024-01-15';
INSERT INTO fact_sales SELECT ... WHERE date = '2024-01-15';
```

2. **Upsert (MERGE):** Insert or update based on key
```sql
MERGE INTO target USING source
ON target.id = source.id
WHEN MATCHED THEN UPDATE SET ...
WHEN NOT MATCHED THEN INSERT ...;
```

3. **Partitioned overwrite:** Replace entire partition
```sql
INSERT OVERWRITE TABLE sales PARTITION (date='2024-01-15')
SELECT ... FROM staging;
```

4. **Deduplication:** Include dedup logic
```sql
INSERT INTO target
SELECT * FROM source
WHERE NOT EXISTS (SELECT 1 FROM target WHERE target.id = source.id);
```

</details>


---

### Question 7: Error Handling Strategy

Design an error handling strategy for a pipeline processing customer orders.

In [None]:
# Write your solution here

<details>
<summary>Click to reveal answer</summary>

```python
class OrderPipeline:
    def __init__(self):
        self.dlq = DeadLetterQueue()  # For failed records
        self.metrics = PipelineMetrics()  # For monitoring
    
    def process_order(self, order: dict) -> Optional[dict]:
        try:
            # Validate
            self.validate(order)
            
            # Transform
            transformed = self.transform(order)
            
            # Load with retry
            retry_with_backoff(
                lambda: self.load(transformed),
                max_retries=3
            )
            
            self.metrics.record_success()
            return transformed
            
        except ValidationError as e:
            # Bad data - send to DLQ, continue processing
            self.dlq.add(order, str(e))
            self.metrics.record_validation_error()
            return None
            
        except (ConnectionError, TimeoutError) as e:
            # Transient error - retry handled above, now fail
            self.metrics.record_system_error()
            raise PipelineError(f"Failed after retries: {e}")
            
        except Exception as e:
            # Unexpected error - alert and stop
            self.alert_team(f"Unexpected error: {e}")
            raise
```

</details>


---

### Question 8: Partitioning Strategy

Design a partitioning strategy for a fact table with 1 billion rows per year.

In [None]:
# Write your solution here

<details>
<summary>Click to reveal answer</summary>

**Strategy: Partition by date + cluster by frequently filtered columns**

```sql
CREATE TABLE fact_events (
    event_id BIGINT,
    event_date DATE,
    user_id BIGINT,
    event_type STRING,
    amount DECIMAL(10, 2)
)
PARTITIONED BY (event_date)
CLUSTERED BY (user_id) INTO 256 BUCKETS;
```

**Reasoning:**
1. **Partition by date:** Most queries filter by date range
   - ~3M rows per day (1B / 365)
   - Queries only scan relevant partitions

2. **Cluster by user_id:** Common to filter/join on user
   - 256 buckets balances parallelism and file count

3. **Partition pruning:** `WHERE event_date = '2024-01-15'` only reads one partition

4. **Data retention:** Easy to drop old partitions
   ```sql
   ALTER TABLE fact_events DROP PARTITION (event_date < '2022-01-01');
   ```

</details>


---

### Question 9: Data Format Selection

Which data format would you choose for each scenario?

a) Archive for compliance (rarely accessed)
b) Analytics queries (aggregate columns)
c) API responses
d) Streaming with schema evolution

In [None]:
# Write your solution here

<details>
<summary>Click to reveal answer</summary>

**a) Archive: Parquet with compression**
- Excellent compression ratio
- Columnar for efficient storage
- Self-describing schema

**b) Analytics: Parquet**
- Columnar format perfect for aggregations
- Only reads columns needed
- Predicate pushdown for filtering

**c) API responses: JSON**
- Human readable
- Widely supported
- Flexible schema

**d) Streaming with schema evolution: Avro**
- Schema stored with data
- Forward/backward compatibility
- Compact binary format
- Kafka's preferred format

</details>


---

### Question 10: DAG Design

Design a DAG for an ETL pipeline that:
1. Extracts from 3 sources in parallel
2. Validates each source
3. Joins the validated data
4. Loads to warehouse

In [None]:
# Write your solution here

<details>
<summary>Click to reveal answer</summary>

```
DAG Structure:

extract_source_a ─→ validate_a ─┐
                                │
extract_source_b ─→ validate_b ─┼─→ join_data ─→ load_warehouse
                                │
extract_source_c ─→ validate_c ─┘
```

```python
dag = SimpleDAG('multi_source_etl')

# Parallel extraction (no dependencies)
dag.add_task(Task('extract_a', extract_source_a, []))
dag.add_task(Task('extract_b', extract_source_b, []))
dag.add_task(Task('extract_c', extract_source_c, []))

# Validation (depends on respective extraction)
dag.add_task(Task('validate_a', validate_data, ['extract_a']))
dag.add_task(Task('validate_b', validate_data, ['extract_b']))
dag.add_task(Task('validate_c', validate_data, ['extract_c']))

# Join (depends on all validations)
dag.add_task(Task('join_data', join_sources, 
                  ['validate_a', 'validate_b', 'validate_c']))

# Load (depends on join)
dag.add_task(Task('load_warehouse', load_to_warehouse, ['join_data']))
```

</details>


---

### Question 11: Monitoring Metrics

What metrics would you monitor for a data pipeline?

In [None]:
# Write your solution here

<details>
<summary>Click to reveal answer</summary>

**Pipeline Health Metrics:**
1. **Run status:** Success/failure rate
2. **Duration:** Execution time, SLA compliance
3. **Lag:** Time since last successful run

**Data Quality Metrics:**
1. **Row counts:** Expected vs actual
2. **Null rates:** Per column
3. **Duplicate rates:** Primary key violations
4. **Schema changes:** Column additions/removals

**Volume Metrics:**
1. **Records processed:** Per run
2. **Data size:** GB processed
3. **Throughput:** Records per second

**Error Metrics:**
1. **Error rate:** Failed records / total
2. **DLQ size:** Unprocessed failures
3. **Error types:** Categorised failures

</details>


---

### Question 12: Pipeline Backfill

How would you backfill data for a new pipeline that needs historical data from the past year?

In [None]:
# Write your solution here

<details>
<summary>Click to reveal answer</summary>

**Strategy:**

1. **Partition the backfill:** Process by date chunks
```python
def backfill_pipeline(start_date, end_date, chunk_days=7):
    current = start_date
    while current < end_date:
        chunk_end = min(current + timedelta(days=chunk_days), end_date)
        process_date_range(current, chunk_end)
        current = chunk_end
```

2. **Process oldest first:** Maintains chronological order

3. **Idempotent design:** Safe to re-run if failures occur

4. **Monitor and throttle:** Don't overload source systems

5. **Progress tracking:** Log completed chunks
```python
def mark_chunk_complete(date):
    db.execute(
        "INSERT INTO backfill_progress (date, completed_at) VALUES (?, ?)",
        (date, datetime.now())
    )
```

6. **Parallel processing:** If source can handle it
```python
from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(max_workers=4) as executor:
    executor.map(process_date, date_chunks)
```

</details>


---

## 12. Summary

This notebook covered data engineering fundamentals:

1. **ETL vs ELT**: Transform before or after loading; ELT for cloud warehouses
2. **Pipeline Concepts**: Idempotency, incremental loads, watermarks
3. **Batch vs Streaming**: Scheduled vs continuous processing
4. **Data Quality**: Validation checks, completeness, accuracy
5. **Data Formats**: Row-based (CSV, JSON) vs columnar (Parquet)
6. **Orchestration**: DAGs, dependencies, scheduling (cron)
7. **Data Modelling**: SCD Type 2 for historical tracking
8. **Error Handling**: Retries, dead letter queues, monitoring
9. **Scalability**: Partitioning, caching, horizontal scaling

---

### Key Interview Tips

- **Know ETL vs ELT trade-offs**: Cloud warehouses favour ELT
- **Idempotency is essential**: Pipelines must be safe to re-run
- **Data quality first**: Validate before loading, use DLQs for failures
- **Understand partitioning**: Key for performance at scale
- **Monitor everything**: Track row counts, latency, error rates
- **Design for failure**: Retries, backfill capability, alerting