# FRE 521D: Data Analytics in Climate, Food and Environment
## Lecture 8: ETL Tools - BigQuery and Python Automation

**Date:** Wednesday, January 28, 2026  
**Instructor:** Asif Ahmed Neloy  
**Program:** UBC Master of Food and Resource Economics

---

### Today's Agenda

1. Introduction to Cloud Data Warehouses
2. Google BigQuery Fundamentals
3. Connecting to BigQuery from Python
4. Loading Large Datasets Efficiently
5. Python Automation Scripts for ETL
6. Scheduling and Task Automation
7. Error Handling and Logging in Production
8. Monitoring Pipeline Health
9. Complete Automated ETL Example

---

## Learning Objectives

By the end of this lecture, you will be able to:

1. Explain the difference between traditional databases and cloud data warehouses
2. Connect to Google BigQuery from Python and execute queries
3. Load large datasets efficiently using batch operations
4. Write automated ETL scripts with proper logging
5. Implement error handling with retry logic
6. Design monitoring strategies for production pipelines
7. Schedule ETL jobs to run automatically

---

## Setting Up

In [1]:
# Standard imports
import pandas as pd
import numpy as np
import os
import json
import logging
import time
from datetime import datetime, timedelta
from pathlib import Path
import warnings
warnings.filterwarnings('ignore')

# Display settings
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
pd.set_option('display.max_rows', 60)

print(f"Pandas version: {pd.__version__}")
print(f"Current time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("Ready for ETL Tools lecture!")

Pandas version: 2.3.3
Current time: 2026-01-27 21:54:13
Ready for ETL Tools lecture!


The cell above imports the essential libraries we need for this lecture. We use `pandas` for data manipulation, `logging` for tracking pipeline execution, and `datetime` for timestamp operations. The display settings ensure we can see all columns in our DataFrames without truncation.

---

## 1. Introduction to Cloud Data Warehouses

### What is a Data Warehouse?

A **data warehouse** is a centralized repository designed for analytical queries on large volumes of data. Unlike transactional databases (OLTP) that handle many small read/write operations, data warehouses (OLAP) are optimized for complex queries that scan millions of rows.

```
┌─────────────────────────────────────────────────────────────────┐
│                    DATA WAREHOUSE vs DATABASE                   │
├─────────────────────────────────┬───────────────────────────────┤
│      TRANSACTIONAL (OLTP)       │      ANALYTICAL (OLAP)        │
├─────────────────────────────────┼───────────────────────────────┤
│  MySQL, PostgreSQL, SQL Server  │  BigQuery, Snowflake, Redshift│
│  Many small read/write ops      │  Few large read-heavy queries │
│  Row-oriented storage           │  Column-oriented storage      │
│  Normalized schemas             │  Denormalized/star schemas    │
│  Real-time operations           │  Batch processing             │
│  Gigabytes of data              │  Terabytes to Petabytes       │
└─────────────────────────────────┴───────────────────────────────┘
```

### Why Cloud Data Warehouses?

Traditional on-premise data warehouses require:
- Expensive hardware
- Dedicated IT staff
- Capacity planning months in advance
- Manual scaling

Cloud data warehouses provide:
- **Pay-per-query pricing**: Only pay for what you use
- **Automatic scaling**: Handle any data volume
- **No infrastructure management**: Focus on analysis, not servers
- **Built-in redundancy**: Data is replicated automatically

### Major Cloud Data Warehouses

| Platform | Provider | Key Feature |
|----------|----------|-------------|
| BigQuery | Google | Serverless, pay-per-query |
| Snowflake | Independent | Multi-cloud, data sharing |
| Redshift | Amazon | Tight AWS integration |
| Azure Synapse | Microsoft | Unified analytics platform |

In this course, we focus on **Google BigQuery** because:
1. Generous free tier (1 TB queries/month, 10 GB storage)
2. No cluster management required
3. Excellent Python integration
4. Widely used in industry

---

## 2. Google BigQuery Fundamentals

### BigQuery Architecture

BigQuery separates storage and compute, which is the key to its scalability:

```
┌─────────────────────────────────────────────────────────────────┐
│                      BIGQUERY ARCHITECTURE                      │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   ┌─────────────┐      ┌─────────────┐      ┌─────────────┐   │
│   │   Client    │ ───► │   Dremel    │ ───► │   Colossus  │   │
│   │  (Python)   │      │  (Compute)  │      │  (Storage)  │   │
│   └─────────────┘      └─────────────┘      └─────────────┘   │
│                                                                 │
│   You send SQL         Thousands of         Distributed        │
│   queries here         workers process      column-store       │
│                        your query           file system        │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘
```

### BigQuery Hierarchy

```
Project (billing unit)
    └── Dataset (like a schema/database)
            └── Table (your data)
                    └── Columns and Rows
```

- **Project**: Top-level container, linked to billing account
- **Dataset**: Collection of tables, controls access permissions
- **Table**: Actual data, can be native or external

### BigQuery Pricing

Two pricing models:

1. **On-demand** (pay per query)
   - $5 per TB of data scanned
   - First 1 TB/month free
   - Good for ad-hoc analysis

2. **Flat-rate** (reserved capacity)
   - Fixed monthly cost for dedicated slots
   - Better for predictable, heavy workloads

**Storage costs:**
- Active storage: $0.02/GB/month
- Long-term storage (>90 days untouched): $0.01/GB/month
- First 10 GB/month free

---

## 3. Connecting to BigQuery from Python

### Installation

First, install the required libraries:

```bash
pip install google-cloud-bigquery pandas-gbq db-dtypes
```

### Authentication Options

BigQuery requires authentication. There are several methods:

1. **Service Account Key** (recommended for automation)
   - Download JSON key from Google Cloud Console
   - Set environment variable: `GOOGLE_APPLICATION_CREDENTIALS`

2. **User Account** (for interactive use)
   - Run `gcloud auth application-default login`
   - Opens browser for OAuth

3. **Default Credentials** (on Google Cloud VMs)
   - Automatic if running on GCP

For this lecture, we will simulate BigQuery operations using SQLite to demonstrate the concepts without requiring cloud credentials. The patterns are identical.

In [2]:
# Since we may not have BigQuery credentials, let's create a simulation
# that demonstrates the same patterns using SQLite

import sqlite3
from contextlib import contextmanager

class BigQuerySimulator:
    """
    A class that simulates BigQuery operations using SQLite.
    This allows us to learn BigQuery patterns without cloud credentials.
    The methods mirror the actual google-cloud-bigquery API.
    """
    
    def __init__(self, database_path=':memory:'):
        """
        Initialize the simulator with a SQLite database.
        
        Parameters:
        -----------
        database_path : str
            Path to SQLite database file, or ':memory:' for in-memory
        """
        self.database_path = database_path
        self.connection = sqlite3.connect(database_path)
        print(f"Connected to simulated BigQuery (SQLite: {database_path})")
    
    def query(self, sql):
        """
        Execute a SQL query and return results as DataFrame.
        Mirrors: client.query(sql).to_dataframe()
        """
        return pd.read_sql_query(sql, self.connection)
    
    def load_table_from_dataframe(self, dataframe, table_name, if_exists='replace'):
        """
        Load a DataFrame into a table.
        Mirrors: client.load_table_from_dataframe(df, table_ref)
        """
        dataframe.to_sql(table_name, self.connection, if_exists=if_exists, index=False)
        row_count = len(dataframe)
        print(f"Loaded {row_count:,} rows into {table_name}")
        return row_count
    
    def list_tables(self):
        """
        List all tables in the database.
        Mirrors: client.list_tables(dataset)
        """
        query = "SELECT name FROM sqlite_master WHERE type='table'"
        return pd.read_sql_query(query, self.connection)['name'].tolist()
    
    def get_table_info(self, table_name):
        """
        Get schema information for a table.
        Mirrors: client.get_table(table_ref)
        """
        query = f"PRAGMA table_info({table_name})"
        return pd.read_sql_query(query, self.connection)
    
    def close(self):
        """Close the database connection."""
        self.connection.close()
        print("Connection closed.")


# Create our simulated BigQuery client
bq_client = BigQuerySimulator('weather_warehouse.db')

Connected to simulated BigQuery (SQLite: weather_warehouse.db)


The `BigQuerySimulator` class above creates a local simulation of BigQuery using SQLite. This is valuable for learning because:

1. **No cloud credentials required** - You can run this code immediately
2. **Same patterns** - The method names and usage mirror the real BigQuery API
3. **Fast iteration** - No network latency during development

In production, you would replace this with the actual BigQuery client:

```python
from google.cloud import bigquery
bq_client = bigquery.Client(project='your-project-id')
```

---

## 4. Loading Large Datasets Efficiently

### The Challenge with Large Data

Our `GlobalWeatherRepository.csv` has nearly 44,000 rows. When loading large datasets, we face several challenges:

1. **Memory constraints**: Loading everything at once may exceed RAM
2. **Network timeouts**: Large uploads can fail partway through
3. **Type inference**: Pandas may guess wrong types on large files
4. **Progress tracking**: Need to know how much is done

### Strategy: Chunked Loading

The solution is to load data in **chunks** - smaller pieces that are manageable:

```
┌─────────────────────────────────────────────────────────────────┐
│                     CHUNKED LOADING STRATEGY                    │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   Large CSV File                    Database Table              │
│   ┌──────────────┐                  ┌──────────────┐           │
│   │  Chunk 1     │ ───────────────► │  Rows 1-10K  │           │
│   │  (10K rows)  │                  ├──────────────┤           │
│   ├──────────────┤                  │  Rows 10K-20K│           │
│   │  Chunk 2     │ ───────────────► ├──────────────┤           │
│   │  (10K rows)  │                  │  Rows 20K-30K│           │
│   ├──────────────┤                  ├──────────────┤           │
│   │  Chunk 3     │ ───────────────► │  Rows 30K-44K│           │
│   │  (10K rows)  │                  └──────────────┘           │
│   ├──────────────┤                                             │
│   │  Chunk 4     │                                             │
│   │  (4K rows)   │                                             │
│   └──────────────┘                                             │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘
```

In [3]:
# First, let's examine our dataset
DATA_PATH = '../../Datasets/GlobalWeatherRepository.csv'

# Check file size
file_size = os.path.getsize(DATA_PATH)
print(f"File size: {file_size / (1024*1024):.2f} MB")

# Count rows without loading entire file
with open(DATA_PATH, 'r', encoding='utf-8') as f:
    row_count = sum(1 for line in f) - 1  # subtract header
print(f"Total rows: {row_count:,}")

# Preview the structure
df_preview = pd.read_csv(DATA_PATH, nrows=5)
print(f"\nColumns ({len(df_preview.columns)}):")
for i, col in enumerate(df_preview.columns):
    print(f"  {i+1:2d}. {col}")

File size: 10.95 MB
Total rows: 43,884

Columns (41):
   1. country
   2. location_name
   3. latitude
   4. longitude
   5. timezone
   6. last_updated_epoch
   7. last_updated
   8. temperature_celsius
   9. temperature_fahrenheit
  10. condition_text
  11. wind_mph
  12. wind_kph
  13. wind_degree
  14. wind_direction
  15. pressure_mb
  16. pressure_in
  17. precip_mm
  18. precip_in
  19. humidity
  20. cloud
  21. feels_like_celsius
  22. feels_like_fahrenheit
  23. visibility_km
  24. visibility_miles
  25. uv_index
  26. gust_mph
  27. gust_kph
  28. air_quality_Carbon_Monoxide
  29. air_quality_Ozone
  30. air_quality_Nitrogen_dioxide
  31. air_quality_Sulphur_dioxide
  32. air_quality_PM2.5
  33. air_quality_PM10
  34. air_quality_us-epa-index
  35. air_quality_gb-defra-index
  36. sunrise
  37. sunset
  38. moonrise
  39. moonset
  40. moon_phase
  41. moon_illumination


The code above examines our dataset without loading it entirely into memory. We learn:
- The file size in megabytes
- The exact row count
- All column names

This reconnaissance step is essential before designing your ETL pipeline.

In [4]:
# Define the schema explicitly for consistent data types
# This prevents pandas from guessing and ensures consistency across chunks

WEATHER_SCHEMA = {
    'country': 'string',
    'location_name': 'string',
    'latitude': 'float64',
    'longitude': 'float64',
    'timezone': 'string',
    'last_updated_epoch': 'int64',
    'last_updated': 'string',
    'temperature_celsius': 'float64',
    'temperature_fahrenheit': 'float64',
    'condition_text': 'string',
    'wind_mph': 'float64',
    'wind_kph': 'float64',
    'wind_degree': 'float64',
    'wind_direction': 'string',
    'pressure_mb': 'float64',
    'pressure_in': 'float64',
    'precip_mm': 'float64',
    'precip_in': 'float64',
    'humidity': 'float64',
    'cloud': 'float64',
    'feels_like_celsius': 'float64',
    'feels_like_fahrenheit': 'float64',
    'visibility_km': 'float64',
    'visibility_miles': 'float64',
    'uv_index': 'float64',
    'gust_mph': 'float64',
    'gust_kph': 'float64',
    'air_quality_Carbon_Monoxide': 'float64',
    'air_quality_Ozone': 'float64',
    'air_quality_Nitrogen_dioxide': 'float64',
    'air_quality_Sulphur_dioxide': 'float64',
    'air_quality_PM2.5': 'float64',
    'air_quality_PM10': 'float64',
    'air_quality_us-epa-index': 'float64',
    'air_quality_gb-defra-index': 'float64',
    'sunrise': 'string',
    'sunset': 'string',
    'moonrise': 'string',
    'moonset': 'string',
    'moon_phase': 'string',
    'moon_illumination': 'float64'
}

print(f"Schema defined with {len(WEATHER_SCHEMA)} columns")
print("\nData types distribution:")
type_counts = pd.Series(WEATHER_SCHEMA.values()).value_counts()
for dtype, count in type_counts.items():
    print(f"  {dtype}: {count} columns")

Schema defined with 41 columns

Data types distribution:
  float64: 29 columns
  string: 11 columns
  int64: 1 columns


Defining the schema explicitly is a best practice for several reasons:

1. **Consistency**: Every chunk uses the same types, preventing merge errors
2. **Memory efficiency**: Proper types use less memory than object types
3. **Error detection**: Type mismatches surface immediately
4. **Documentation**: The schema documents what data you expect

In [5]:
def load_csv_in_chunks(filepath, table_name, client, chunk_size=10000, schema=None):
    """
    Load a large CSV file into a database table in chunks.
    
    Parameters:
    -----------
    filepath : str
        Path to the CSV file
    table_name : str
        Name of the destination table
    client : BigQuerySimulator
        Database client for loading data
    chunk_size : int
        Number of rows per chunk (default: 10,000)
    schema : dict
        Column name to dtype mapping
    
    Returns:
    --------
    dict : Statistics about the load operation
    """
    start_time = time.time()
    total_rows = 0
    chunk_count = 0
    
    # Create a chunk iterator
    chunks = pd.read_csv(
        filepath,
        chunksize=chunk_size,
        dtype=schema,
        encoding='utf-8'
    )
    
    print(f"Loading {filepath} into {table_name}")
    print(f"Chunk size: {chunk_size:,} rows")
    print("-" * 50)
    
    for chunk in chunks:
        chunk_count += 1
        rows_in_chunk = len(chunk)
        total_rows += rows_in_chunk
        
        # First chunk: create/replace table
        # Subsequent chunks: append to table
        if_exists = 'replace' if chunk_count == 1 else 'append'
        
        # Load the chunk
        chunk.to_sql(table_name, client.connection, if_exists=if_exists, index=False)
        
        # Progress update
        elapsed = time.time() - start_time
        rate = total_rows / elapsed if elapsed > 0 else 0
        print(f"  Chunk {chunk_count}: {rows_in_chunk:,} rows | Total: {total_rows:,} | Rate: {rate:,.0f} rows/sec")
    
    # Final statistics
    elapsed_time = time.time() - start_time
    
    stats = {
        'table_name': table_name,
        'total_rows': total_rows,
        'chunk_count': chunk_count,
        'elapsed_seconds': round(elapsed_time, 2),
        'rows_per_second': round(total_rows / elapsed_time, 0) if elapsed_time > 0 else 0
    }
    
    print("-" * 50)
    print(f"Complete! {total_rows:,} rows loaded in {elapsed_time:.2f} seconds")
    
    return stats

The `load_csv_in_chunks` function above implements the chunked loading strategy. Key design decisions:

1. **`pd.read_csv` with `chunksize`**: Returns an iterator instead of a DataFrame, so only one chunk is in memory at a time

2. **First chunk replaces, subsequent chunks append**: This ensures we start fresh but don't lose data from previous chunks

3. **Progress tracking**: Real-time feedback on loading speed helps identify problems

4. **Statistics return**: The function returns metrics that can be logged or monitored

In [6]:
# Load our weather data using the chunked approach
load_stats = load_csv_in_chunks(
    filepath=DATA_PATH,
    table_name='global_weather',
    client=bq_client,
    chunk_size=10000,
    schema=WEATHER_SCHEMA
)

Loading ../../Datasets/GlobalWeatherRepository.csv into global_weather
Chunk size: 10,000 rows
--------------------------------------------------
  Chunk 1: 10,000 rows | Total: 10,000 | Rate: 40,070 rows/sec
  Chunk 2: 10,000 rows | Total: 20,000 | Rate: 40,293 rows/sec
  Chunk 3: 10,000 rows | Total: 30,000 | Rate: 40,403 rows/sec
  Chunk 4: 10,000 rows | Total: 40,000 | Rate: 40,396 rows/sec
  Chunk 5: 3,884 rows | Total: 43,884 | Rate: 39,705 rows/sec
--------------------------------------------------
Complete! 43,884 rows loaded in 1.11 seconds


The output shows each chunk being processed with running totals. This visibility is important in production because:
- You can estimate completion time
- You can identify slow chunks that might indicate data issues
- If the process fails, you know where it stopped

In [7]:
# Verify the load was successful
print("Verifying loaded data...")
print("\n1. Row count:")
result = bq_client.query("SELECT COUNT(*) as row_count FROM global_weather")
print(f"   {result['row_count'].iloc[0]:,} rows in table")

print("\n2. Sample data:")
sample = bq_client.query("SELECT country, location_name, temperature_celsius, condition_text FROM global_weather LIMIT 5")
print(sample.to_string(index=False))

print("\n3. Countries in dataset:")
countries = bq_client.query("SELECT COUNT(DISTINCT country) as country_count FROM global_weather")
print(f"   {countries['country_count'].iloc[0]} unique countries")

Verifying loaded data...

1. Row count:
   43,884 rows in table

2. Sample data:
    country    location_name  temperature_celsius condition_text
Afghanistan            Kabul                 26.6  Partly Cloudy
    Albania           Tirana                 19.0  Partly cloudy
    Algeria          Algiers                 23.0          Sunny
    Andorra Andorra La Vella                  6.3  Light drizzle
     Angola           Luanda                 26.0  Partly cloudy

3. Countries in dataset:
   210 unique countries


Always verify your data after loading. The three checks above confirm:
1. The expected number of rows arrived
2. The data looks correct (spot check)
3. Key dimensions are reasonable (country count)

---

## 5. Python Automation Scripts for ETL

### From Notebook to Script

Notebooks are great for exploration, but production ETL needs:
- **Standalone scripts** that run without Jupyter
- **Command-line arguments** for flexibility
- **Configuration files** for environment-specific settings
- **Proper logging** instead of print statements

### Script Structure

A well-organized ETL script follows this pattern:

```
┌─────────────────────────────────────────────────────────────────┐
│                    ETL SCRIPT STRUCTURE                         │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  1. IMPORTS           - Libraries and modules                   │
│  2. CONFIGURATION     - Load settings from file/environment     │
│  3. LOGGING SETUP     - Configure logging handlers              │
│  4. HELPER FUNCTIONS  - Reusable utility functions              │
│  5. ETL FUNCTIONS     - Extract, Transform, Load                │
│  6. MAIN FUNCTION     - Orchestrates the pipeline               │
│  7. ENTRY POINT       - if __name__ == '__main__'               │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘
```

In [8]:
# Configuration management using a simple class

class ETLConfig:
    """
    Configuration class for ETL pipelines.
    Loads settings from environment variables or defaults.
    """
    
    def __init__(self):
        # Data source settings
        self.source_path = os.getenv('ETL_SOURCE_PATH', '../../Datasets/GlobalWeatherRepository.csv')
        
        # Database settings
        self.database_path = os.getenv('ETL_DATABASE', 'weather_warehouse.db')
        self.table_name = os.getenv('ETL_TABLE', 'global_weather')
        
        # Processing settings
        self.chunk_size = int(os.getenv('ETL_CHUNK_SIZE', '10000'))
        self.max_retries = int(os.getenv('ETL_MAX_RETRIES', '3'))
        self.retry_delay = int(os.getenv('ETL_RETRY_DELAY', '5'))
        
        # Output settings
        self.log_level = os.getenv('ETL_LOG_LEVEL', 'INFO')
        self.log_file = os.getenv('ETL_LOG_FILE', 'etl_pipeline.log')
    
    def to_dict(self):
        """Return configuration as dictionary for logging."""
        return {
            'source_path': self.source_path,
            'database_path': self.database_path,
            'table_name': self.table_name,
            'chunk_size': self.chunk_size,
            'max_retries': self.max_retries,
            'retry_delay': self.retry_delay,
            'log_level': self.log_level
        }
    
    def __repr__(self):
        return f"ETLConfig({self.to_dict()})"


# Create and display configuration
config = ETLConfig()
print("ETL Configuration:")
for key, value in config.to_dict().items():
    print(f"  {key}: {value}")

ETL Configuration:
  source_path: ../../Datasets/GlobalWeatherRepository.csv
  database_path: weather_warehouse.db
  table_name: global_weather
  chunk_size: 10000
  max_retries: 3
  retry_delay: 5
  log_level: INFO


The `ETLConfig` class above centralizes all configuration in one place. Benefits of this approach:

1. **Environment-aware**: Uses `os.getenv()` to read from environment variables, with sensible defaults
2. **Easy to modify**: Change settings without touching code
3. **Self-documenting**: All settings are visible in one place
4. **Testable**: Can create different configs for testing vs production

In production, you might load from a YAML or JSON file:
```python
with open('config.yaml') as f:
    config = yaml.safe_load(f)
```

---

## 6. Scheduling and Task Automation

### Why Schedule ETL Jobs?

Data pipelines need to run automatically:
- **Daily updates**: Refresh data every morning before business hours
- **Hourly ingestion**: Stream near-real-time data
- **Weekly aggregations**: Compute summary tables on weekends
- **Event-triggered**: Run when new data arrives

### Scheduling Options

| Method | Platform | Best For |
|--------|----------|----------|
| Cron | Linux/Mac | Simple recurring jobs |
| Task Scheduler | Windows | Simple Windows jobs |
| Apache Airflow | Any | Complex workflows with dependencies |
| Cloud Scheduler | GCP | Triggering cloud functions |
| AWS EventBridge | AWS | Triggering Lambda functions |

### Cron Syntax

Cron uses a 5-field syntax to specify when jobs run:

```
┌───────────── minute (0 - 59)
│ ┌───────────── hour (0 - 23)
│ │ ┌───────────── day of month (1 - 31)
│ │ │ ┌───────────── month (1 - 12)
│ │ │ │ ┌───────────── day of week (0 - 6, Sunday = 0)
│ │ │ │ │
* * * * *  command to execute
```

**Examples:**
```bash
# Run every day at 6 AM
0 6 * * * python /path/to/etl_script.py

# Run every hour
0 * * * * python /path/to/etl_script.py

# Run Monday at midnight
0 0 * * 1 python /path/to/etl_script.py

# Run every 15 minutes
*/15 * * * * python /path/to/etl_script.py
```

In [9]:
# Function to generate cron schedule recommendations

def recommend_schedule(data_freshness_hours, processing_time_minutes):
    """
    Recommend a cron schedule based on data requirements.
    
    Parameters:
    -----------
    data_freshness_hours : int
        Maximum acceptable age of data in hours
    processing_time_minutes : int
        Estimated time to run the ETL job
    
    Returns:
    --------
    str : Recommended cron expression
    """
    # Add buffer for processing time
    effective_interval = data_freshness_hours - (processing_time_minutes / 60)
    
    if effective_interval <= 0:
        return "ERROR: Processing time exceeds freshness requirement"
    
    if effective_interval <= 1:
        return "0 * * * *  # Every hour"
    elif effective_interval <= 4:
        return "0 */4 * * *  # Every 4 hours"
    elif effective_interval <= 12:
        return "0 6,18 * * *  # Twice daily (6 AM and 6 PM)"
    elif effective_interval <= 24:
        return "0 6 * * *  # Daily at 6 AM"
    else:
        return "0 6 * * 1  # Weekly on Monday at 6 AM"


# Example: Data must be no more than 24 hours old, job takes 30 minutes
print("Schedule Recommendations:\n")

scenarios = [
    (24, 30, "Daily dashboard refresh"),
    (4, 15, "Near-real-time monitoring"),
    (168, 60, "Weekly aggregation report"),
    (1, 5, "Hourly data feed"),
]

for freshness, processing, description in scenarios:
    schedule = recommend_schedule(freshness, processing)
    print(f"{description}:")
    print(f"  Freshness: {freshness}h, Processing: {processing}min")
    print(f"  Schedule: {schedule}\n")

Schedule Recommendations:

Daily dashboard refresh:
  Freshness: 24h, Processing: 30min
  Schedule: 0 6 * * *  # Daily at 6 AM

Near-real-time monitoring:
  Freshness: 4h, Processing: 15min
  Schedule: 0 */4 * * *  # Every 4 hours

Weekly aggregation report:
  Freshness: 168h, Processing: 60min
  Schedule: 0 6 * * 1  # Weekly on Monday at 6 AM

Hourly data feed:
  Freshness: 1h, Processing: 5min
  Schedule: 0 * * * *  # Every hour



The `recommend_schedule` function helps determine appropriate run frequencies. The key insight is that your job must complete before the next scheduled run, and the data must remain fresh enough for business needs.

---

## 7. Error Handling and Logging in Production

### Why Logging Matters

Print statements work in notebooks, but production ETL needs proper logging:

| Print Statements | Proper Logging |
|-----------------|----------------|
| Goes to stdout only | Can go to files, databases, cloud services |
| No severity levels | DEBUG, INFO, WARNING, ERROR, CRITICAL |
| No timestamps | Automatic timestamps |
| Lost when terminal closes | Persisted for analysis |
| Hard to filter | Easy to filter by level |

### Logging Levels

```
┌─────────────────────────────────────────────────────────────────┐
│                      LOGGING LEVELS                             │
├───────────┬─────────────────────────────────────────────────────┤
│  DEBUG    │  Detailed diagnostic info (variable values, etc.)  │
├───────────┼─────────────────────────────────────────────────────┤
│  INFO     │  Confirmation that things work as expected         │
├───────────┼─────────────────────────────────────────────────────┤
│  WARNING  │  Something unexpected, but not an error            │
├───────────┼─────────────────────────────────────────────────────┤
│  ERROR    │  Serious problem, some functionality failed        │
├───────────┼─────────────────────────────────────────────────────┤
│  CRITICAL │  Program cannot continue                           │
└───────────┴─────────────────────────────────────────────────────┘
```

In [10]:
# Set up production-grade logging

def setup_logging(log_file='etl_pipeline.log', level=logging.INFO):
    """
    Configure logging for ETL pipeline.
    
    Creates two handlers:
    1. File handler: Writes all logs to file
    2. Console handler: Shows INFO and above in terminal
    
    Parameters:
    -----------
    log_file : str
        Path to log file
    level : int
        Logging level (e.g., logging.INFO)
    
    Returns:
    --------
    logging.Logger : Configured logger
    """
    # Create logger
    logger = logging.getLogger('etl_pipeline')
    logger.setLevel(level)
    
    # Clear any existing handlers (important for Jupyter)
    logger.handlers = []
    
    # Create formatters
    detailed_format = logging.Formatter(
        '%(asctime)s | %(levelname)-8s | %(funcName)s | %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S'
    )
    simple_format = logging.Formatter(
        '%(levelname)-8s | %(message)s'
    )
    
    # File handler - captures everything
    file_handler = logging.FileHandler(log_file)
    file_handler.setLevel(logging.DEBUG)
    file_handler.setFormatter(detailed_format)
    logger.addHandler(file_handler)
    
    # Console handler - shows INFO and above
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.INFO)
    console_handler.setFormatter(simple_format)
    logger.addHandler(console_handler)
    
    return logger


# Create our logger
logger = setup_logging()

# Demonstrate logging levels
logger.debug("This is a DEBUG message - only in file")
logger.info("This is an INFO message - visible in console")
logger.warning("This is a WARNING message")
logger.error("This is an ERROR message")

INFO     | This is an INFO message - visible in console
ERROR    | This is an ERROR message


The `setup_logging` function creates a professional logging configuration:

1. **Two handlers**: File gets everything (for debugging), console shows INFO and above (for monitoring)
2. **Detailed timestamps**: Essential for debugging "when did this happen?"
3. **Function names**: Helps locate where messages originate
4. **Level padding**: Aligned output for readability

In [11]:
# Retry decorator with exponential backoff

import functools

def retry_with_backoff(max_retries=3, base_delay=1, max_delay=60):
    """
    Decorator that retries a function with exponential backoff.
    
    Exponential backoff means each retry waits longer:
    - Attempt 1: immediate
    - Attempt 2: wait base_delay seconds
    - Attempt 3: wait base_delay * 2 seconds
    - Attempt 4: wait base_delay * 4 seconds
    - etc.
    
    Parameters:
    -----------
    max_retries : int
        Maximum number of retry attempts
    base_delay : int
        Initial delay in seconds
    max_delay : int
        Maximum delay between retries
    """
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None
            
            for attempt in range(max_retries + 1):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    last_exception = e
                    
                    if attempt < max_retries:
                        # Calculate delay with exponential backoff
                        delay = min(base_delay * (2 ** attempt), max_delay)
                        logger.warning(
                            f"Attempt {attempt + 1} failed: {e}. "
                            f"Retrying in {delay} seconds..."
                        )
                        time.sleep(delay)
                    else:
                        logger.error(
                            f"All {max_retries + 1} attempts failed. "
                            f"Last error: {e}"
                        )
            
            raise last_exception
        return wrapper
    return decorator


# Example: A function that might fail
@retry_with_backoff(max_retries=3, base_delay=1)
def unreliable_api_call(success_probability=0.3):
    """
    Simulates an unreliable API that fails randomly.
    """
    import random
    if random.random() > success_probability:
        raise ConnectionError("API server unavailable")
    return {"status": "success", "data": [1, 2, 3]}


# Test the retry mechanism
print("Testing retry mechanism with unreliable function...")
try:
    result = unreliable_api_call(success_probability=0.7)
    print(f"Success: {result}")
except Exception as e:
    print(f"Final failure: {e}")

Testing retry mechanism with unreliable function...
Success: {'status': 'success', 'data': [1, 2, 3]}


The `retry_with_backoff` decorator is a production essential. Exponential backoff is important because:

1. **Transient failures recover**: Network glitches often resolve in seconds
2. **Don't overwhelm failing systems**: Increasing delays give servers time to recover
3. **Automatic recovery**: The pipeline self-heals without manual intervention
4. **Clean code**: Error handling logic is separated from business logic

---

## 8. Monitoring Pipeline Health

### What to Monitor

A healthy ETL pipeline should track:

1. **Execution metrics**: Runtime, rows processed, success/failure
2. **Data quality metrics**: Null rates, duplicates, range violations
3. **Resource metrics**: Memory usage, disk space, API quotas
4. **Business metrics**: Data freshness, completeness, accuracy

### Simple Monitoring Framework

In [12]:
class PipelineMonitor:
    """
    Monitors ETL pipeline health and collects metrics.
    """
    
    def __init__(self, pipeline_name):
        self.pipeline_name = pipeline_name
        self.metrics = []
        self.start_time = None
        self.current_run = {}
    
    def start_run(self):
        """Mark the beginning of a pipeline run."""
        self.start_time = datetime.now()
        self.current_run = {
            'pipeline': self.pipeline_name,
            'start_time': self.start_time.isoformat(),
            'status': 'running'
        }
        logger.info(f"Pipeline '{self.pipeline_name}' started")
    
    def record_metric(self, name, value):
        """Record a metric for the current run."""
        self.current_run[name] = value
        logger.debug(f"Metric recorded: {name} = {value}")
    
    def end_run(self, status='success', error_message=None):
        """Mark the end of a pipeline run."""
        end_time = datetime.now()
        duration = (end_time - self.start_time).total_seconds()
        
        self.current_run.update({
            'end_time': end_time.isoformat(),
            'duration_seconds': round(duration, 2),
            'status': status
        })
        
        if error_message:
            self.current_run['error'] = error_message
        
        self.metrics.append(self.current_run.copy())
        
        if status == 'success':
            logger.info(f"Pipeline completed successfully in {duration:.2f}s")
        else:
            logger.error(f"Pipeline failed after {duration:.2f}s: {error_message}")
    
    def get_summary(self):
        """Get summary statistics across all runs."""
        if not self.metrics:
            return "No runs recorded"
        
        df = pd.DataFrame(self.metrics)
        
        summary = {
            'total_runs': len(df),
            'successful_runs': len(df[df['status'] == 'success']),
            'failed_runs': len(df[df['status'] == 'failed']),
            'avg_duration_seconds': df['duration_seconds'].mean(),
            'max_duration_seconds': df['duration_seconds'].max(),
            'success_rate': len(df[df['status'] == 'success']) / len(df) * 100
        }
        
        return summary
    
    def export_metrics(self, filepath):
        """Export metrics to JSON file."""
        with open(filepath, 'w') as f:
            json.dump(self.metrics, f, indent=2)
        logger.info(f"Metrics exported to {filepath}")


# Demonstrate the monitor
monitor = PipelineMonitor('weather_etl')

# Simulate a few runs
for i in range(3):
    monitor.start_run()
    monitor.record_metric('rows_processed', 43884)
    monitor.record_metric('null_rate', 0.02)
    time.sleep(0.1)  # Simulate work
    monitor.end_run(status='success')

print("\nPipeline Summary:")
for key, value in monitor.get_summary().items():
    print(f"  {key}: {value}")

INFO     | Pipeline 'weather_etl' started
INFO     | Pipeline completed successfully in 0.11s
INFO     | Pipeline 'weather_etl' started
INFO     | Pipeline completed successfully in 0.11s
INFO     | Pipeline 'weather_etl' started
INFO     | Pipeline completed successfully in 0.11s



Pipeline Summary:
  total_runs: 3
  successful_runs: 3
  failed_runs: 0
  avg_duration_seconds: 0.11
  max_duration_seconds: 0.11
  success_rate: 100.0


The `PipelineMonitor` class provides a foundation for monitoring. In production, you would extend this to:

1. **Send alerts**: Email or Slack when failures occur
2. **Store metrics**: Write to a database for historical analysis
3. **Create dashboards**: Visualize trends over time
4. **Set thresholds**: Auto-alert when metrics exceed limits

---

## 9. Complete Automated ETL Example

Now let's put everything together into a production-ready ETL pipeline.

In [13]:
class WeatherETLPipeline:
    """
    Complete ETL pipeline for weather data.
    
    This class demonstrates production-ready patterns:
    - Configuration management
    - Logging throughout
    - Error handling with retries
    - Monitoring and metrics
    - Data validation
    """
    
    def __init__(self, config):
        """
        Initialize the pipeline with configuration.
        
        Parameters:
        -----------
        config : ETLConfig
            Configuration object with all settings
        """
        self.config = config
        self.monitor = PipelineMonitor('weather_etl')
        self.client = None
        
        logger.info(f"Pipeline initialized with config: {config.to_dict()}")
    
    def connect(self):
        """Establish database connection."""
        logger.info("Connecting to database...")
        self.client = BigQuerySimulator(self.config.database_path)
    
    def extract(self):
        """
        Extract phase: Read and validate source data.
        
        Returns:
        --------
        pd.DataFrame : Extracted data
        """
        logger.info(f"Extracting data from {self.config.source_path}")
        
        # Verify source file exists
        if not os.path.exists(self.config.source_path):
            raise FileNotFoundError(f"Source file not found: {self.config.source_path}")
        
        # Read the data
        df = pd.read_csv(self.config.source_path, dtype=WEATHER_SCHEMA)
        
        logger.info(f"Extracted {len(df):,} rows, {len(df.columns)} columns")
        self.monitor.record_metric('extracted_rows', len(df))
        
        return df
    
    def transform(self, df):
        """
        Transform phase: Clean and enhance data.
        
        Parameters:
        -----------
        df : pd.DataFrame
            Raw extracted data
        
        Returns:
        --------
        pd.DataFrame : Transformed data
        """
        logger.info("Starting transformation...")
        
        # Make a copy to avoid modifying original
        df_clean = df.copy()
        
        # 1. Parse datetime
        logger.debug("Parsing datetime column")
        df_clean['last_updated_dt'] = pd.to_datetime(df_clean['last_updated'])
        
        # 2. Extract date components
        df_clean['update_date'] = df_clean['last_updated_dt'].dt.date
        df_clean['update_hour'] = df_clean['last_updated_dt'].dt.hour
        
        # 3. Calculate derived metrics
        logger.debug("Calculating derived metrics")
        df_clean['temp_feels_diff'] = (
            df_clean['temperature_celsius'] - df_clean['feels_like_celsius']
        ).round(2)
        
        # 4. Categorize wind speed
        df_clean['wind_category'] = pd.cut(
            df_clean['wind_kph'],
            bins=[0, 10, 30, 50, 100, float('inf')],
            labels=['Calm', 'Light', 'Moderate', 'Strong', 'Severe']
        )
        
        # 5. Add ETL metadata
        df_clean['_etl_timestamp'] = datetime.now().isoformat()
        df_clean['_etl_source'] = self.config.source_path
        
        # Record data quality metrics
        null_rate = df_clean.isnull().sum().sum() / (len(df_clean) * len(df_clean.columns))
        self.monitor.record_metric('null_rate', round(null_rate, 4))
        self.monitor.record_metric('transformed_rows', len(df_clean))
        
        logger.info(f"Transformation complete. Null rate: {null_rate:.2%}")
        
        return df_clean
    
    def validate(self, df):
        """
        Validate data quality before loading.
        
        Parameters:
        -----------
        df : pd.DataFrame
            Transformed data to validate
        
        Returns:
        --------
        bool : True if validation passes
        """
        logger.info("Running validation checks...")
        
        issues = []
        
        # Check 1: Row count
        if len(df) == 0:
            issues.append("ERROR: DataFrame is empty")
        
        # Check 2: Required columns exist
        required_cols = ['country', 'location_name', 'temperature_celsius']
        missing_cols = [c for c in required_cols if c not in df.columns]
        if missing_cols:
            issues.append(f"ERROR: Missing required columns: {missing_cols}")
        
        # Check 3: Temperature range (reasonable bounds)
        temp_min = df['temperature_celsius'].min()
        temp_max = df['temperature_celsius'].max()
        if temp_min < -90 or temp_max > 60:
            issues.append(f"WARNING: Temperature out of range: [{temp_min}, {temp_max}]")
        
        # Check 4: No duplicate location-time combinations
        dup_count = df.duplicated(subset=['country', 'location_name', 'last_updated']).sum()
        if dup_count > 0:
            issues.append(f"WARNING: {dup_count} duplicate records found")
        
        # Report issues
        for issue in issues:
            if issue.startswith('ERROR'):
                logger.error(issue)
            else:
                logger.warning(issue)
        
        # Only fail on errors, not warnings
        errors = [i for i in issues if i.startswith('ERROR')]
        
        self.monitor.record_metric('validation_issues', len(issues))
        self.monitor.record_metric('validation_errors', len(errors))
        
        return len(errors) == 0
    
    def load(self, df):
        """
        Load phase: Write data to destination.
        
        Parameters:
        -----------
        df : pd.DataFrame
            Validated data to load
        """
        logger.info(f"Loading {len(df):,} rows to {self.config.table_name}")
        
        # Load in chunks for large datasets
        self.client.load_table_from_dataframe(df, self.config.table_name)
        
        self.monitor.record_metric('loaded_rows', len(df))
        logger.info("Load complete")
    
    def run(self):
        """
        Execute the complete ETL pipeline.
        
        Returns:
        --------
        dict : Run statistics
        """
        self.monitor.start_run()
        
        try:
            # Connect
            self.connect()
            
            # Extract
            df_raw = self.extract()
            
            # Transform
            df_transformed = self.transform(df_raw)
            
            # Validate
            if not self.validate(df_transformed):
                raise ValueError("Data validation failed")
            
            # Load
            self.load(df_transformed)
            
            # Success
            self.monitor.end_run(status='success')
            
        except Exception as e:
            self.monitor.end_run(status='failed', error_message=str(e))
            raise
        
        finally:
            if self.client:
                self.client.close()
        
        return self.monitor.current_run

The `WeatherETLPipeline` class brings together all the concepts from this lecture:

1. **Configuration**: Uses `ETLConfig` for all settings
2. **Logging**: Every step is logged with appropriate levels
3. **Monitoring**: Metrics are recorded throughout
4. **Validation**: Data quality is checked before loading
5. **Error handling**: Failures are caught and reported
6. **Clean structure**: Extract, Transform, Load are separate methods

In [14]:
# Run the complete pipeline
print("="*60)
print("EXECUTING WEATHER ETL PIPELINE")
print("="*60)

# Create configuration
config = ETLConfig()

# Create and run pipeline
pipeline = WeatherETLPipeline(config)
result = pipeline.run()

print("\n" + "="*60)
print("PIPELINE RESULT")
print("="*60)
for key, value in result.items():
    print(f"  {key}: {value}")

INFO     | Pipeline initialized with config: {'source_path': '../../Datasets/GlobalWeatherRepository.csv', 'database_path': 'weather_warehouse.db', 'table_name': 'global_weather', 'chunk_size': 10000, 'max_retries': 3, 'retry_delay': 5, 'log_level': 'INFO'}
INFO     | Pipeline 'weather_etl' started
INFO     | Connecting to database...
INFO     | Extracting data from ../../Datasets/GlobalWeatherRepository.csv


EXECUTING WEATHER ETL PIPELINE
Connected to simulated BigQuery (SQLite: weather_warehouse.db)


INFO     | Extracted 43,884 rows, 41 columns
INFO     | Starting transformation...
INFO     | Transformation complete. Null rate: 0.00%
INFO     | Running validation checks...
INFO     | Loading 43,884 rows to global_weather
INFO     | Load complete
INFO     | Pipeline completed successfully in 1.41s


Loaded 43,884 rows into global_weather
Connection closed.

PIPELINE RESULT
  pipeline: weather_etl
  start_time: 2026-01-27T21:58:56.475142
  status: success
  extracted_rows: 43884
  null_rate: 0.0
  transformed_rows: 43884
  validation_issues: 0
  validation_errors: 0
  loaded_rows: 43884
  end_time: 2026-01-27T21:58:57.885971
  duration_seconds: 1.41


The pipeline output shows each phase executing with proper logging. In production, this output would go to log files and monitoring dashboards.

In [15]:
# Verify the loaded data
print("Verifying loaded data...\n")

# Reconnect to check data
verify_client = BigQuerySimulator(config.database_path)

# Check row count
count_result = verify_client.query(f"SELECT COUNT(*) as cnt FROM {config.table_name}")
print(f"Total rows: {count_result['cnt'].iloc[0]:,}")

# Check sample with new columns
print("\nSample data with derived columns:")
sample = verify_client.query(f"""
    SELECT 
        country,
        location_name,
        temperature_celsius,
        feels_like_celsius,
        temp_feels_diff,
        wind_category
    FROM {config.table_name}
    LIMIT 10
""")
print(sample.to_string(index=False))

# Check wind category distribution
print("\nWind category distribution:")
wind_dist = verify_client.query(f"""
    SELECT 
        wind_category,
        COUNT(*) as count
    FROM {config.table_name}
    GROUP BY wind_category
    ORDER BY count DESC
""")
print(wind_dist.to_string(index=False))

verify_client.close()

Verifying loaded data...

Connected to simulated BigQuery (SQLite: weather_warehouse.db)
Total rows: 43,884

Sample data with derived columns:
            country    location_name  temperature_celsius  feels_like_celsius  temp_feels_diff wind_category
        Afghanistan            Kabul                 26.6                25.3              1.3         Light
            Albania           Tirana                 19.0                19.0              0.0         Light
            Algeria          Algiers                 23.0                24.6             -1.6         Light
            Andorra Andorra La Vella                  6.3                 3.8              2.5         Light
             Angola           Luanda                 26.0                28.7             -2.7         Light
Antigua and Barbuda     Saint John's                 26.0                28.2             -2.2          Calm
          Argentina     Buenos Aires                  8.0                 7.1              0.9

The verification confirms our pipeline worked correctly. The derived columns (`temp_feels_diff`, `wind_category`) are present and the data is properly categorized.

---

## Summary: Key Takeaways

### 1. Cloud Data Warehouses
- Separate storage and compute for scalability
- Pay-per-query pricing model
- BigQuery: serverless, no infrastructure to manage

### 2. Loading Large Datasets
- Use chunked loading for memory efficiency
- Define schemas explicitly for consistency
- Always verify after loading

### 3. Automation Scripts
- Use configuration classes for flexibility
- Separate Extract, Transform, Load phases
- Keep business logic testable

### 4. Scheduling
- Cron for simple recurring jobs
- Consider processing time vs freshness requirements
- Use Airflow for complex dependencies

### 5. Error Handling
- Retry with exponential backoff
- Proper logging (not print statements)
- Validate data before loading

### 6. Monitoring
- Track execution metrics
- Record data quality metrics
- Export for dashboards and alerting

---

## References

### Books
- Densmore, J. (2021). *Data Pipelines Pocket Reference*. O'Reilly Media.
- Reis, J., & Housley, M. (2022). *Fundamentals of Data Engineering*. O'Reilly Media.
- Kleppmann, M. (2017). *Designing Data-Intensive Applications*. O'Reilly Media.

### Documentation
- [Google BigQuery Documentation](https://cloud.google.com/bigquery/docs)
- [Python logging module](https://docs.python.org/3/library/logging.html)
- [pandas read_csv chunking](https://pandas.pydata.org/docs/reference/api/pandas.read_csv.html)

### Best Practices
- [Google Cloud Architecture: ETL best practices](https://cloud.google.com/architecture/building-production-ready-data-pipelines-using-dataflow)
- [AWS: ETL best practices](https://docs.aws.amazon.com/prescriptive-guidance/latest/patterns/etl-best-practices.html)

---

## Practice Exercises

### Exercise 1: Add Email Notifications
Extend the `PipelineMonitor` class to send an email when a pipeline fails. Use Python's `smtplib` library.

### Exercise 2: Incremental Loading
Modify the pipeline to only load records newer than the most recent record in the database (incremental load vs full refresh).

### Exercise 3: Data Quality Dashboard
Create a function that reads the metrics JSON file and generates a summary report with:
- Success rate over time
- Average runtime trend
- Most common failure reasons

### Exercise 4: Multi-Source Pipeline
Extend the pipeline to extract from multiple CSV files and merge them before loading.

---

## Next Class: Data Cleaning I

In Lecture 9, we will cover:
- Understanding dirty data patterns
- Data type standardization
- Date and time parsing
- String cleaning and normalization
- Deduplication strategies

We will continue using the GlobalWeatherRepository and introduce the AirQualityUCI dataset, which has intentional data quality issues for us to fix.

---

*End of Lecture 8*