**ðŸ§µ CASE STUDY: HOME ENERGY MONITORING PIPELINE** 

Modern smart homes have electrical panels that record appliance-level energy usage throughout the day.

You have been given raw telemetry from multiple homes.

Your job is to clean, validate, transform, enrich, and classify this data to prepare it for analytics.

In [1]:
raw_data = [
    {
        "home_id": "H001",
        "appliance": "heater",
        "watts": "1500",                     
        "duration_min": 45,
        "timestamp": "2025-02-10T07:30:00Z"
    },
    {
        "home_id": "H001",
        "appliance": "ac",
        "watts": 2200,
        "duration_min": "60",             
        "timestamp": "2025-02-10T08:45:00Z"
    },
    {
        "home_id": "H002",
        "appliance": "fridge",
        "watts": 180,
        "duration_min": 120,
        "timestamp": "2025-02-10T09:00:00Z"
    },
    {
        "home_id": "H003",
        "appliance": "oven",
        "watts": -900,                       
        "duration_min": 30,
        "timestamp": "2025-02-10T10:00:00Z"
    },
]


In [85]:
from pydantic import BaseModel,field_validator,ValidationError
from datetime import datetime
from typing import Optional, List, Dict, Any, Tuple
import csv
import json
import logging

# ============================================================
# INPUT MODEL
# ============================================================

class UsageEvent(BaseModel):
    home_id: str
    appliance: str
    watts: float
    duration_min: float
    timestamp: datetime

    @field_validator('appliance', mode = 'before')
    @classmethod
    def lower_case_appliance(cls, v: any) -> str:
        if isinstance(v,str):
            return v.lower()
        return v
            
    @field_validator('watts',mode='after')
    @classmethod
    def watts_greater_than_0(cls, v: float) -> float:
        if v <= 0:
            raise ValueError('Watts must be atleast 1')
        return v
    
    @field_validator('duration_min',mode='after')
    @classmethod
    def duration_greater_than_0(cls, v: float) -> float:
        if v <= 0:
            raise ValueError('Duration min must be atleast 1')
        return v


# ============================================================
# VALIDATION STAGE
# ============================================================


def parse_and_validate_batch(raw_data: List[Dict[str, Any]]) -> Tuple[List[UsageEvent], List[Dict]]:

     valid_records: List[UsageEvent] = []
     invalid_records: List[Dict[str, Any]] = []

     for row in raw_data:
        try:
            valid_records.append(UsageEvent.model_validate(row)) #<------- used on raw data

        except ValidationError as e:
            invalid_records.append({"input": row, "error": e.errors()})

     return valid_records, invalid_records 


# ============================================================
# TRANSFORMATION MODELS
# ============================================================

class EnergyEvent(BaseModel):
    home_id: str
    appliance: str
    watts: float
    duration_min: float
    timestamp: datetime
    kwh: float

class CostEvent(BaseModel):
    home_id: str
    appliance: str
    timestamp: datetime
    kwh: float
    cost: float


class CategorizedEvent(BaseModel):
    home_id: str
    appliance: str
    timestamp: datetime
    kwh: float
    cost: float
    category: str

# ============================================================
# SINGLE-ITEM TRANSFORMATIONS
# Job: Transform one object into one new object No loops. No lists. No batching. Pure logic.
# ============================================================


def compute_energy(event: UsageEvent) -> EnergyEvent:
    kwh = round((event.watts * event.duration_min) / 60000, 4)
    return EnergyEvent(**event.model_dump(), kwh=kwh)                 ##<--------- use when The input wonâ€™t contain fields the output doesnâ€™t expect/ You are adding or overriding a small number of fields


def compute_cost(event: EnergyEvent, rate: float) -> CostEvent:
    cost = round(event.kwh * rate, 4)
    return CostEvent(                       ###<------ doing this cause the model uses fewer colmns than the input model
        home_id=event.home_id,
        appliance=event.appliance,
        timestamp=event.timestamp,
        kwh=event.kwh,
        cost=cost,
    )
        
def categorize(event: CostEvent) -> CategorizedEvent:
    if event.kwh < 0.1:
        category = "LOW"
    elif event.kwh < 0.5:
        category = "MEDIUM"
    else:
        category = "HIGH"

    return CategorizedEvent(
        home_id=event.home_id,
        appliance=event.appliance,
        timestamp=event.timestamp,
        kwh=event.kwh,
        cost=event.cost,
        category=category,
    )

# ============================================================
# BATCH WRAPPERS
# Job: Apply the single-item function over a list. No business logic. No computation. No domain rules. Just iteration.
# ============================================================

def energy_batch(events: List[UsageEvent]) -> List[EnergyEvent]:
    return [compute_energy(e) for e in events]


def cost_batch(events: List[EnergyEvent], rate: float) -> List[CostEvent]:
    return [compute_cost(e, rate) for e in events]

def categorize_batch(events: List[CostEvent]) -> List[CategorizedEvent]:
    return [categorize(e) for e in events]


# ============================================================
# PURE PIPELINE RUNNER (NO IO)
# Because mixing pipeline logic and I/O in the same function destroys clarity, testability, maintainability, and composability.
# ============================================================

def process_data(data: List[Dict[str, Any]], rate: float = 0.22) -> Tuple[List[CategorizedEvent], List[Any]]:

    valid_records, invalid_records = parse_and_validate_batch(data)
    logging.info(f"Valid: {len(valid_records)}, Invalid: {len(invalid_records)}")

    stage1 = energy_batch(valid_records)
    stage2 = cost_batch(stage1, rate=rate)
    stage3 = categorize_batch(stage2)

    # ensure deterministic output order
    stage3.sort(key=lambda e: e.timestamp)

    return stage3, invalid_records

# ============================================================
# OUTPUT FUNCTIONS (Serialization / IO)
# ============================================================

def write_csv(path: str, events: List[CategorizedEvent]) -> None:
    if not events:
        return
    rows = [e.model_dump(mode="json") for e in events]
    with open(path, "w", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=rows[0].keys())
        writer.writeheader()
        writer.writerows(rows)


# def write_invalid_csv(path: str, invalid_records: List[Dict[str, Any]]) -> None:
#     if not invalid_records:
#         return
    
#     # Flatten records because "error" is a list
#     cleaned = []
#     for item in invalid_records:
#         flat = item["input"].copy()
#         flat["error"] = json.dumps(item["error"])
#         cleaned.append(flat)
    
#     with open(path, "w", newline="") as f:
#         writer = csv.DictWriter(f, fieldnames=cleaned[0].keys())
#         writer.writeheader()
#         writer.writerows(cleaned)

def write_invalid_csv(path: str, invalid_records: List[Dict[str, Any]]) -> None:
    if not invalid_records:
        return
    
    cleaned = []
    for item in invalid_records:

        input_row = item["input"]
        error = item["error"]

        flat = {
            "home_id": input_row.get("home_id"),
            "appliance": input_row.get("appliance"),
            "watts": input_row.get("watts"),
            "duration_min": input_row.get("duration_min"),
            "timestamp": input_row.get("timestamp"),
        }

        # error may be list (pydantic) or string (raw)
        if isinstance(error, list):
            # join all messages for readability
            messages = "; ".join([err["msg"] for err in error])
            flat["error_type"] = "; ".join([err["type"] for err in error])
            flat["error_message"] = messages
        else:
            # raw Python exception message
            flat["error_type"] = item.get("type", "Exception")
            flat["error_message"] = str(error)

        cleaned.append(flat)

    # Write CSV
    with open(path, "w", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=cleaned[0].keys())
        writer.writeheader()
        writer.writerows(cleaned)


# ============================================================
# END-TO-END WRAPPER (optional)
# ============================================================

def run_pipeline(data: List[Dict[str, Any]], csv_path: str) -> str:
    processed, invalid = process_data(data)
    write_csv(csv_path, processed)
    return to_json(processed)




In [77]:
processed, invalid=process_data(raw_data)

In [78]:
write_csv("energy_valid.csv", processed)
write_invalid_csv("energy_invalid.csv", invalid)

In [79]:
for a in invalid:
    print(a)


{'input': {'home_id': 'H003', 'appliance': 'oven', 'watts': -900, 'duration_min': 30, 'timestamp': '2025-02-10T10:00:00Z'}, 'error': [{'type': 'value_error', 'loc': ('watts',), 'msg': 'Value error, Watts must be atleast 1', 'input': -900, 'ctx': {'error': ValueError('Watts must be atleast 1')}, 'url': 'https://errors.pydantic.dev/2.12/v/value_error'}]}


In [86]:
parse_and_validate_batch(raw_data)

([UsageEvent(home_id='H001', appliance='heater', watts=1500.0, duration_min=45.0, timestamp=datetime.datetime(2025, 2, 10, 7, 30, tzinfo=TzInfo(0))),
  UsageEvent(home_id='H001', appliance='ac', watts=2200.0, duration_min=60.0, timestamp=datetime.datetime(2025, 2, 10, 8, 45, tzinfo=TzInfo(0))),
  UsageEvent(home_id='H002', appliance='fridge', watts=180.0, duration_min=120.0, timestamp=datetime.datetime(2025, 2, 10, 9, 0, tzinfo=TzInfo(0)))],
 [{'input': {'home_id': 'H003',
    'appliance': 'oven',
    'watts': -900,
    'duration_min': 30,
    'timestamp': '2025-02-10T10:00:00Z'},
   'error': [{'type': 'value_error',
     'loc': ('watts',),
     'msg': 'Value error, Watts must be atleast 1',
     'input': -900,
     'ctx': {'error': ValueError('Watts must be atleast 1')},
     'url': 'https://errors.pydantic.dev/2.12/v/value_error'}]}])

In [87]:
a,b=parse_and_validate_batch(raw_data)

In [95]:
for x in a:
    print(x.watts)

1500.0
2200.0
180.0


In [98]:
energy_batch(a)

[EnergyEvent(home_id='H001', appliance='heater', watts=1500.0, duration_min=45.0, timestamp=datetime.datetime(2025, 2, 10, 7, 30, tzinfo=TzInfo(0)), kwh=1.125),
 EnergyEvent(home_id='H001', appliance='ac', watts=2200.0, duration_min=60.0, timestamp=datetime.datetime(2025, 2, 10, 8, 45, tzinfo=TzInfo(0)), kwh=2.2),
 EnergyEvent(home_id='H002', appliance='fridge', watts=180.0, duration_min=120.0, timestamp=datetime.datetime(2025, 2, 10, 9, 0, tzinfo=TzInfo(0)), kwh=0.36)]

In [99]:
b=energy_batch(a)

In [102]:
cost_batch(b,0.9)

[CostEvent(home_id='H001', appliance='heater', timestamp=datetime.datetime(2025, 2, 10, 7, 30, tzinfo=TzInfo(0)), kwh=1.125, cost=1.0125),
 CostEvent(home_id='H001', appliance='ac', timestamp=datetime.datetime(2025, 2, 10, 8, 45, tzinfo=TzInfo(0)), kwh=2.2, cost=1.98),
 CostEvent(home_id='H002', appliance='fridge', timestamp=datetime.datetime(2025, 2, 10, 9, 0, tzinfo=TzInfo(0)), kwh=0.36, cost=0.324)]

In [104]:
c=cost_batch(b,0.9)

In [105]:
categorize_batch(c)

[CategorizedEvent(home_id='H001', appliance='heater', timestamp=datetime.datetime(2025, 2, 10, 7, 30, tzinfo=TzInfo(0)), kwh=1.125, cost=1.0125, category='HIGH'),
 CategorizedEvent(home_id='H001', appliance='ac', timestamp=datetime.datetime(2025, 2, 10, 8, 45, tzinfo=TzInfo(0)), kwh=2.2, cost=1.98, category='HIGH'),
 CategorizedEvent(home_id='H002', appliance='fridge', timestamp=datetime.datetime(2025, 2, 10, 9, 0, tzinfo=TzInfo(0)), kwh=0.36, cost=0.324, category='MEDIUM')]