# FRE 521D: Data Analytics in Climate, Food and Environment
## Lab 4: ETL Tools - Building an Automated Pipeline

**Program:** UBC Master of Food and Resource Economics  
**Instructor:** Asif Ahmed Neloy

---

<div style="background-color: #FFF3CD; border-left: 4px solid #E6A23C; padding: 15px; margin: 15px 0;">
    <h3 style="margin-top: 0; color: #856404;">Submission Deadline</h3>
    <p style="margin-bottom: 0; font-size: 1.2em;"><strong>End of Day: Tuesday, Feb 03, 2026</strong></p>
</div>

---

## Lab Objectives

In this lab, you will build an automated ETL pipeline with professional features. You will:

1. Create a configuration class for pipeline settings
2. Implement proper logging instead of print statements
3. Build functions for Extract, Transform, and Load phases
4. Add retry logic with exponential backoff
5. Generate a pipeline execution report

---

## Dataset

We will use the `GlobalWeatherRepository.csv` dataset, which contains weather observations from locations around the world.

---

## Setup: Import Libraries

In [None]:
# Import libraries
import pandas as pd
import numpy as np
import os
import logging
import time
import sqlite3
import functools
from datetime import datetime

# Display settings
pd.set_option('display.max_columns', 15)
pd.set_option('display.width', None)

print("Setup complete!")
print(f"Current time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

---
---

# Question 1: Configuration Class (15 points)

## Task: Create a Pipeline Configuration Class

As covered in Lecture 8, configuration should be centralized in a class that can read from environment variables with sensible defaults.

**Complete the class below** to include:
- `source_path`: Path to the CSV file (default: `'../../Datasets/GlobalWeatherRepository.csv'`)
- `database_path`: Path to SQLite database (default: `'weather_pipeline.db'`)
- `table_name`: Name of destination table (default: `'weather_data'`)
- `chunk_size`: Rows per chunk for loading (default: `10000`)
- `log_file`: Path to log file (default: `'pipeline.log'`)

---

In [None]:
class PipelineConfig:
    """
    Configuration class for the ETL pipeline.
    Reads from environment variables with sensible defaults.
    """
    
    def __init__(self):
        # ============================================
        # YOUR CODE HERE: Define the 5 configuration attributes
        # Use os.getenv() to allow environment variable overrides
        # ============================================
        
        # 1. source_path - path to source CSV file
        self.source_path = None  # Replace with os.getenv(...)
        
        # 2. database_path - path to SQLite database
        self.database_path = None  # Replace with os.getenv(...)
        
        # 3. table_name - destination table name
        self.table_name = None  # Replace with os.getenv(...)
        
        # 4. chunk_size - rows per chunk (convert to int!)
        self.chunk_size = None  # Replace with int(os.getenv(...))
        
        # 5. log_file - path to log file
        self.log_file = None  # Replace with os.getenv(...)
        
        # ============================================
    
    def to_dict(self):
        """Return configuration as dictionary."""
        return {
            'source_path': self.source_path,
            'database_path': self.database_path,
            'table_name': self.table_name,
            'chunk_size': self.chunk_size,
            'log_file': self.log_file
        }

In [None]:
# Test your configuration class
config = PipelineConfig()

print("Pipeline Configuration:")
print("-" * 40)
for key, value in config.to_dict().items():
    print(f"  {key}: {value}")

# Verify all values are set
assert config.source_path is not None, "source_path not set!"
assert config.database_path is not None, "database_path not set!"
assert config.table_name is not None, "table_name not set!"
assert config.chunk_size is not None, "chunk_size not set!"
assert config.log_file is not None, "log_file not set!"
print("\nAll configuration values set correctly!")

---
---

# Question 2: Logging Setup (15 points)

## Task: Create a Logging Configuration Function

Production pipelines use logging instead of print statements. As covered in Lecture 8, we need both file and console handlers.

**Complete the function below** to:
1. Create a logger with the name 'etl_pipeline'
2. Add a file handler that writes to the specified log file
3. Add a console handler that shows INFO level and above
4. Use this format: `'%(asctime)s | %(levelname)-8s | %(message)s'`

---

In [None]:
def setup_logging(log_file, level=logging.INFO):
    """
    Configure logging for the ETL pipeline.
    
    Parameters:
    -----------
    log_file : str
        Path to the log file
    level : int
        Logging level (default: logging.INFO)
    
    Returns:
    --------
    logging.Logger : Configured logger
    """
    # ============================================
    # YOUR CODE HERE
    # ============================================
    
    # Step 1: Create logger
    logger = logging.getLogger('etl_pipeline')
    logger.setLevel(level)
    
    # Clear existing handlers (important in Jupyter)
    logger.handlers = []
    
    # Step 2: Create formatter
    # Use format: '%(asctime)s | %(levelname)-8s | %(message)s'
    formatter = None  # YOUR CODE HERE
    
    # Step 3: Create and add file handler
    # YOUR CODE HERE
    
    # Step 4: Create and add console handler
    # YOUR CODE HERE
    
    # ============================================
    
    return logger

In [None]:
# Test your logging function
logger = setup_logging(config.log_file)

logger.info("Testing INFO message")
logger.warning("Testing WARNING message")
logger.error("Testing ERROR message")

print(f"\nLog file created: {config.log_file}")
print("Check the log file to verify messages were written.")

---
---

# Question 3: Retry Decorator (20 points)

## Task: Implement Retry Logic with Exponential Backoff

Network operations can fail temporarily. We need retry logic that waits longer between each attempt (exponential backoff).

**Complete the decorator below** to:
1. Attempt the function up to `max_retries + 1` times
2. If it fails, wait `base_delay * (2 ** attempt)` seconds before retrying
3. Log each retry attempt using the logger
4. If all attempts fail, raise the last exception

---

In [None]:
def retry_with_backoff(max_retries=3, base_delay=1):
    """
    Decorator that retries a function with exponential backoff.
    
    Parameters:
    -----------
    max_retries : int
        Maximum number of retry attempts
    base_delay : int
        Base delay in seconds (doubles each retry)
    """
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None
            
            # ============================================
            # YOUR CODE HERE
            # ============================================
            
            # Loop through attempts (0 to max_retries)
            for attempt in range(max_retries + 1):
                try:
                    # Try to execute the function
                    return func(*args, **kwargs)
                except Exception as e:
                    last_exception = e
                    
                    if attempt < max_retries:
                        # Calculate delay: base_delay * (2 ** attempt)
                        delay = None  # YOUR CODE HERE
                        
                        # Log the retry (use logger.warning)
                        # YOUR CODE HERE
                        
                        # Wait before retrying
                        time.sleep(delay)
                    else:
                        # All attempts failed - log error
                        # YOUR CODE HERE
                        pass
            
            # ============================================
            
            # Raise the last exception
            raise last_exception
        return wrapper
    return decorator

In [None]:
# Test the retry decorator

# This function fails 2 times then succeeds
fail_count = 0

@retry_with_backoff(max_retries=3, base_delay=1)
def unreliable_function():
    global fail_count
    fail_count += 1
    if fail_count <= 2:
        raise ConnectionError(f"Simulated failure #{fail_count}")
    return "Success!"

print("Testing retry decorator...")
result = unreliable_function()
print(f"Result: {result}")
print(f"Total attempts: {fail_count}")

---
---

# Question 4: ETL Functions (30 points)

## Task: Implement Extract, Transform, and Load Functions

Now we'll build the core ETL functions. Each function should:
- Log its progress using the logger
- Return useful information about what it did

---

In [None]:
def extract(source_path):
    """
    Extract data from CSV file.
    
    Parameters:
    -----------
    source_path : str
        Path to the CSV file
    
    Returns:
    --------
    pd.DataFrame : Extracted data
    """
    # ============================================
    # YOUR CODE HERE
    # ============================================
    
    # 1. Log that extraction is starting
    # YOUR CODE HERE
    
    # 2. Check if file exists, raise FileNotFoundError if not
    # YOUR CODE HERE
    
    # 3. Read the CSV file
    df = None  # YOUR CODE HERE
    
    # 4. Log how many rows were extracted
    # YOUR CODE HERE
    
    # ============================================
    
    return df

In [None]:
def transform(df):
    """
    Transform the weather data.
    
    Transformations:
    1. Parse last_updated to datetime
    2. Add temperature category column
    3. Add ETL metadata columns
    
    Parameters:
    -----------
    df : pd.DataFrame
        Raw extracted data
    
    Returns:
    --------
    pd.DataFrame : Transformed data
    """
    # ============================================
    # YOUR CODE HERE
    # ============================================
    
    logger.info("Starting transformation...")
    
    # Make a copy
    df_clean = df.copy()
    
    # 1. Parse last_updated to datetime
    df_clean['last_updated_dt'] = pd.to_datetime(df_clean['last_updated'], errors='coerce')
    
    # 2. Add temperature category based on temperature_celsius
    # Categories: 'Cold' (<10), 'Mild' (10-25), 'Hot' (>25)
    # Use pd.cut() - YOUR CODE HERE
    df_clean['temp_category'] = None  # YOUR CODE HERE
    
    # 3. Add ETL metadata
    # _etl_timestamp: current datetime as ISO string
    # _etl_source: the source file path from config
    df_clean['_etl_timestamp'] = None  # YOUR CODE HERE
    df_clean['_etl_source'] = None  # YOUR CODE HERE
    
    # 4. Log transformation complete
    # YOUR CODE HERE
    
    # ============================================
    
    return df_clean

In [None]:
def load(df, database_path, table_name, chunk_size=10000):
    """
    Load data into SQLite database.
    
    Parameters:
    -----------
    df : pd.DataFrame
        Data to load
    database_path : str
        Path to SQLite database
    table_name : str
        Destination table name
    chunk_size : int
        Rows per chunk
    
    Returns:
    --------
    int : Number of rows loaded
    """
    # ============================================
    # YOUR CODE HERE
    # ============================================
    
    # 1. Log that loading is starting
    # YOUR CODE HERE
    
    # 2. Create database connection
    conn = sqlite3.connect(database_path)
    
    # 3. Load data using to_sql with if_exists='replace'
    df.to_sql(table_name, conn, if_exists='replace', index=False)
    
    # 4. Verify the load by counting rows
    cursor = conn.cursor()
    cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
    row_count = cursor.fetchone()[0]
    
    # 5. Close connection
    conn.close()
    
    # 6. Log how many rows were loaded
    # YOUR CODE HERE
    
    # ============================================
    
    return row_count

In [None]:
# Test the ETL functions
print("Testing ETL Functions")
print("=" * 50)

# Extract
df_raw = extract(config.source_path)
print(f"\nExtracted shape: {df_raw.shape}")

# Transform
df_transformed = transform(df_raw)
print(f"Transformed shape: {df_transformed.shape}")
print(f"New columns: {[c for c in df_transformed.columns if c not in df_raw.columns]}")

# Load
rows_loaded = load(df_transformed, config.database_path, config.table_name)
print(f"Rows loaded: {rows_loaded:,}")

---
---

# Question 5: Pipeline Execution Report (20 points)

## Task: Create a Function to Generate Execution Reports

After running the pipeline, we need a report that summarizes what happened.

**Complete the function below** to generate a report including:
- Pipeline name and execution timestamp
- Duration in seconds
- Rows extracted, transformed, loaded
- Status (success/failed)

---

In [None]:
def generate_report(pipeline_name, start_time, end_time, metrics, status='success', error=None):
    """
    Generate a pipeline execution report.
    
    Parameters:
    -----------
    pipeline_name : str
        Name of the pipeline
    start_time : datetime
        When the pipeline started
    end_time : datetime
        When the pipeline ended
    metrics : dict
        Dictionary of metrics (rows_extracted, rows_transformed, rows_loaded)
    status : str
        'success' or 'failed'
    error : str
        Error message if failed
    
    Returns:
    --------
    str : Formatted report
    """
    # ============================================
    # YOUR CODE HERE
    # ============================================
    
    # Calculate duration
    duration = (end_time - start_time).total_seconds()
    
    # Build the report string
    report = f"""
{'='*60}
PIPELINE EXECUTION REPORT
{'='*60}

Pipeline: {pipeline_name}
Status: {status.upper()}

Timing:
  Start: {start_time.isoformat()}
  End: {end_time.isoformat()}
  Duration: {duration:.2f} seconds

Metrics:
"""
    
    # Add metrics to report
    for key, value in metrics.items():
        report += f"  {key}: {value:,}\n"  # YOUR CODE: format the value
    
    # Add error if present
    if error:
        report += f"\nError: {error}\n"
    
    report += f"\n{'='*60}\n"
    
    # ============================================
    
    return report

In [None]:
# Run complete pipeline and generate report

print("Running Complete ETL Pipeline")
print("=" * 50)

# Record start time
start_time = datetime.now()
metrics = {}

try:
    # Extract
    df_raw = extract(config.source_path)
    metrics['rows_extracted'] = len(df_raw)
    
    # Transform
    df_transformed = transform(df_raw)
    metrics['rows_transformed'] = len(df_transformed)
    
    # Load
    rows_loaded = load(df_transformed, config.database_path, config.table_name)
    metrics['rows_loaded'] = rows_loaded
    
    status = 'success'
    error = None
    
except Exception as e:
    status = 'failed'
    error = str(e)

# Record end time
end_time = datetime.now()

# Generate and print report
report = generate_report('weather_etl', start_time, end_time, metrics, status, error)
print(report)

---

## Verify Your Work

Run this cell to verify the pipeline worked correctly.

In [None]:
# Verification
print("Pipeline Verification")
print("=" * 50)

# Check database was created
print(f"\n1. Database exists: {os.path.exists(config.database_path)}")

# Check table contents
conn = sqlite3.connect(config.database_path)
cursor = conn.cursor()

cursor.execute(f"SELECT COUNT(*) FROM {config.table_name}")
row_count = cursor.fetchone()[0]
print(f"2. Rows in table: {row_count:,}")

# Check for new columns
sample = pd.read_sql_query(f"SELECT * FROM {config.table_name} LIMIT 5", conn)
print(f"3. temp_category column exists: {'temp_category' in sample.columns}")
print(f"4. _etl_timestamp column exists: {'_etl_timestamp' in sample.columns}")

# Check log file
print(f"5. Log file exists: {os.path.exists(config.log_file)}")

conn.close()

print("\n" + "=" * 50)
print("Verification complete!")

---

## Submission Checklist

Before submitting, make sure:

- [ ] **Question 1**: PipelineConfig class has all 5 attributes with defaults
- [ ] **Question 2**: Logging function creates both file and console handlers
- [ ] **Question 3**: Retry decorator implements exponential backoff correctly
- [ ] **Question 4**: All three ETL functions work and log their progress
- [ ] **Question 5**: Report function generates complete execution summary
- [ ] All verification checks pass

### How to Submit

1. Save this notebook
2. Submit via Canvas (Lab 4 Submission) by **end of day Tuesday, Feb 03, 2026**

---