<a href="https://colab.research.google.com/github/MJMortensonWarwick/data_engineering_for_data_scientists/blob/main/1_1_ETL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## 1.1 Extract, Transform, Load (ETL)
This Notebook will walk us through a full ETL process, following the best practices we have identified.

Our first step will be to create a `utilities.py` that will hold all of our key functions. As before, we will follow some familiar principles:
* logging to record all our outcomes;
* a 'purpose', 'INPUT', 'OUTPUT' approach to describing our functions;
* functions serving a single purpose with 'clean' names that are easy to recall;
* general FP and cloud native principles.

In [1]:
%%writefile utilities.py
import logging
import json
from typing import List, Dict, Any

# ==========================================
# 0. LOGGING SETUP
# ==========================================
# We configure a logger that looks like a production service
logger = logging.getLogger("ETL_Service")
logger.setLevel(logging.INFO)

# Create a console handler if one doesn't exist
if not logger.handlers:
    ch = logging.StreamHandler()
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    ch.setFormatter(formatter)
    logger.addHandler(ch)

# ==========================================
# 1. EXTRACT (The Source)
# ==========================================
def extract_data_from_source() -> List[Dict]:
    """Simulates fetching data from an external API.
       INPUT: None (as this is a simulation)
       OUTPUT: A dataset
    """
    logger.info("Connecting to external source...")
    data = [
        {"id": 101, "Product": "Laptop", "price": "$1200.00", "status": "completed", "date": "2023-10-01"},
        {"id": 102, "Product": "Mouse", "price": "$25.50", "status": "refunded", "date": "2023-10-02"},
        {"id": 103, "Product": "Monitor", "price": "300", "status": "completed", "date": "2023-10-03"},
        {"id": 104, "Product": "HDMI Cable", "price": "$15.00", "status": "completed", "date": "2023-10-03"},
        {"id": 105, "Product": "Keyboard", "price": None, "status": "error", "date": "2023-10-04"},
    ]
    logger.info(f"Extraction successful. Received {len(data)} records.")
    return data

# ==========================================
# 2. TRANSFORMATIONS (Pure Functions)
# ==========================================
def normalize_keys(row: Dict) -> Dict:
    ''' transforms dictionary keys to lower case
        INPUT: dictionary
        OUTPUT: dictionary (lower case)
    '''
    return {k.lower(): v for k, v in row.items()}

def clean_price(row: Dict) -> Dict:
    ''' cleans price data for errors
        INPUT: dictionary
        OUTPUT: dictionary (cleaned)
    '''
    new_row = row.copy()
    price_raw = row.get("price")

    try:
        if isinstance(price_raw, str):
            clean_val = float(price_raw.replace("$", ""))
            new_row["price"] = clean_val
            # Log specific data quality fixes (Verbose)
            if "$" in price_raw:
                logger.debug(f"Fixed currency format for ID {row.get('id')}")
        elif price_raw is None:
            new_row["price"] = 0.0
            logger.warning(f"Found NULL price for ID {row.get('id')}. Defaulting to 0.0")
    except ValueError:
        logger.error(f"Failed to parse price for ID {row.get('id')}: {price_raw}")
        new_row["price"] = 0.0

    return new_row

def is_valid_transaction(row: Dict) -> bool:
    ''' validates transaction data
        INPUT: dictionary
        OUTPUT: boolean (valid or not)
    '''
    is_valid = row.get("status") == "completed" and row.get("price", 0) > 0
    if not is_valid:
        logger.info(f"Dropping invalid row ID {row.get('id')} (Status: {row.get('status')})")
    return is_valid

def enrich_tax(row: Dict) -> Dict:
    ''' enrich data with tax
        INPUT: dictionary
        OUTPUT: dictionary (enriched)
    '''
    new_row = row.copy()
    price = new_row.get("price", 0)
    new_row["tax"] = round(price * 0.2, 2)
    new_row["total"] = price + new_row["tax"]
    return new_row

# ==========================================
# 3. LOAD / AGGREGATE
# ==========================================
def save_to_json(data: List[Dict], filename: str):
    """Simulates loading to a specific storage bucket.
       INPUT: A dataset and a filename
       OUTPUT: None (as this is a simulation)
    """
    logger.info(f"Writing {len(data)} records to storage: {filename}")
    with open(filename, 'w') as f:
        json.dump(data, f, indent=2)

def calculate_revenue_reducer(acc: float, row: Dict) -> float:
    ''' total revenue aggregator (reduce)
        INPUT: accumulator and dictionary
        OUTPUT: float (total)
    '''
    return acc + row.get("total", 0)

Writing utilities.py


Now we switch roles. We are the Platform. We will run three separate programs (Cells), simulating distinct microservices that react to files appearing.

### Microservice A: The Ingestion Service
A scheduled (timed - e.g. cron) job runs this service. The service fetches data and drops it into the "Landing Zone."

Note, we import all of the functions we created and stored in `utilities.py`.

In [2]:
import utilities as utils
import os

# Create directory structure to simulate S3 buckets
os.makedirs("data/landing_zone", exist_ok=True)
os.makedirs("data/processed", exist_ok=True)

print("SERVICE STARTED: Ingestion") # would be sent to logging but just illustratively

# 1. Run Extraction
raw_data = utils.extract_data_from_source()

# 2. Emit Event (Save File)
# In a real system, saving this file would trigger an AWS S3 Event Notification
output_path = "data/landing_zone/batch_001.json"
utils.save_to_json(raw_data, output_path)

print(f"EVENT EMITTED: New file created at {output_path}")

2026-02-10 22:35:56,368 - ETL_Service - INFO - Connecting to external source...
INFO:ETL_Service:Connecting to external source...
2026-02-10 22:35:56,371 - ETL_Service - INFO - Extraction successful. Received 5 records.
INFO:ETL_Service:Extraction successful. Received 5 records.
2026-02-10 22:35:56,373 - ETL_Service - INFO - Writing 5 records to storage: data/landing_zone/batch_001.json
INFO:ETL_Service:Writing 5 records to storage: data/landing_zone/batch_001.json


SERVICE STARTED: Ingestion
EVENT EMITTED: New file created at data/landing_zone/batch_001.json


### Microservice B: The Transformation Service
Detects batch_001.json in the Landing Zone. Cleans the data and moves it to the "Processed Zone."

In [3]:
import json
import utilities as utils

print("SERVICE STARTED: Transformation Worker")

input_path = "data/landing_zone/batch_001.json"
output_path = "data/processed/clean_batch_001.json"

# 1. Listen for Event (Check if file exists)
if not os.path.exists(input_path):
    print("No new events found. Sleeping...")
else:
    print(f"EVENT RECEIVED: Processing {input_path}")

    # 2. Load Data
    with open(input_path, 'r') as f:
        data = json.load(f)

    # 3. Apply Functional Pipeline (Map/Filter)
    # Notice we chain the map/filters here using the utility functions
    normalized = map(utils.normalize_keys, data)
    cleaned = map(utils.clean_price, normalized)
    valid_only = filter(utils.is_valid_transaction, cleaned)
    enriched = map(utils.enrich_tax, valid_only)

    final_dataset = list(enriched)

    # 4. Emit Next Event
    utils.save_to_json(final_dataset, output_path)
    print(f"EVENT EMITTED: Clean data saved to {output_path}")

2026-02-10 22:37:32,247 - ETL_Service - INFO - Dropping invalid row ID 102 (Status: refunded)
INFO:ETL_Service:Dropping invalid row ID 102 (Status: refunded)
2026-02-10 22:37:32,250 - ETL_Service - INFO - Dropping invalid row ID 105 (Status: error)
INFO:ETL_Service:Dropping invalid row ID 105 (Status: error)
2026-02-10 22:37:32,252 - ETL_Service - INFO - Writing 3 records to storage: data/processed/clean_batch_001.json
INFO:ETL_Service:Writing 3 records to storage: data/processed/clean_batch_001.json


SERVICE STARTED: Transformation Worker
EVENT RECEIVED: Processing data/landing_zone/batch_001.json
EVENT EMITTED: Clean data saved to data/processed/clean_batch_001.json


We can see that it found some errors and created a new version of our data (_immutability_).

### Microservice C: The Analytics Service
Detects clean_batch_001.json in the Processed Zone. Calculates revenue and updates the Dashboard.


In [4]:
import functools
import utilities as utils

print("SERVICE STARTED: Revenue Aggregator")

input_path = "data/processed/clean_batch_001.json"

# 1. Listen for Event
if not os.path.exists(input_path):
    print("No processed data found.")
else:
    print(f"EVENT RECEIVED: Aggregating {input_path}")

    # 2. Load Data
    with open(input_path, 'r') as f:
        clean_data = json.load(f)

    # 3. Reduce (Aggregation)
    total_revenue = functools.reduce(utils.calculate_revenue_reducer, clean_data, 0.0)

    # 4. Final Action
    print("="*40)
    print(f"FINAL REPORT: Total Revenue is ${total_revenue:,.2f}")
    print("="*40)

SERVICE STARTED: Revenue Aggregator
EVENT RECEIVED: Aggregating data/processed/clean_batch_001.json
FINAL REPORT: Total Revenue is $1,818.00


And that's it! A full ETL process.