# Machine Learning Programming: Data Collection and Pre-Processing Lab

**Author:** Emmanuel (Chooks)  
**Course:** PROG8245 - Machine Learning Programming  
**Date:** January 2025

---

## Lab Overview

This notebook demonstrates a complete **15-step Data Engineering roadmap** applied to an e-commerce dataset. We will:
- Load and explore raw transactional data
- Design appropriate data structures using Python classes
- Clean, transform, and engineer features
- Serialize data in multiple formats
- Generate analytical insights

**Dataset:** Synthetic e-commerce sales data with 550 transactions, including intentional data quality issues for cleaning exercises.

In [None]:
# Import required libraries
import pandas as pd
import numpy as np
import json
from datetime import datetime, timedelta
from collections import namedtuple
from typing import Dict, List, Optional, Any
import warnings
warnings.filterwarnings('ignore')

print(f"pandas version: {pd.__version__}")
print(f"numpy version: {np.__version__}")

---

## Step 1: Hello, Data!

We begin by loading the raw e-commerce CSV file and examining its structure. This initial exploration helps us understand the data's shape, column types, and general characteristics before any processing.

In [None]:
# Load the raw CSV data
raw_df = pd.read_csv('data/ecommerce_sales.csv')

# Display basic info about the dataset
print(f"Dataset Shape: {raw_df.shape[0]} rows × {raw_df.shape[1]} columns")
print(f"\nColumn Names: {list(raw_df.columns)}")
print("\n" + "="*60)
print("First 3 Rows:")
print("="*60)
raw_df.head(3)

In [None]:
# Quick look at data types and non-null counts
print("Data Types and Non-Null Counts:")
print("-" * 40)
raw_df.info()

---

## Step 2: Pick the Right Container

### Design Decision: Data Structure Selection

For this e-commerce data engineering task, I need to choose between several Python data structures:

| Structure | Pros | Cons | Best Use Case |
|-----------|------|------|---------------|
| `dict` | Flexible, mutable, key-value access | No attribute access, less structured | Quick lookups, aggregations |
| `namedtuple` | Immutable, memory-efficient, attribute access | Cannot modify after creation | Read-only records |
| `set` | Fast membership testing, unique values only | Unordered, no duplicates | Deduplication, unique counts |
| `class` | Full control, methods, validation | More code to write | Complex objects with behavior |

### My Choice: **Custom Class + Dictionary for Aggregation**

I'll implement a `SalesRecord` class because:
1. **Encapsulation**: I can bundle data with methods like `clean()` and `total()`
2. **Validation**: The class can validate data during instantiation
3. **Readability**: Attribute access (`record.price`) is cleaner than dict keys (`record['price']`)
4. **Extensibility**: Easy to add new methods as requirements evolve

I'll also use `set` for tracking unique values (cities, products) and `dict` for aggregations (revenue by city).

---

## Step 3: Implement Functions and Data Structure

Here we implement a `SalesRecord` class that encapsulates individual sales transactions with built-in cleaning and calculation methods.

In [None]:
class SalesRecord:
    """
    Represents a single e-commerce sales transaction.
    
    Attributes:
        order_id: Unique identifier for the order
        order_date: Date of the transaction
        customer_id: Customer identifier
        product: Name of the product purchased
        category: Product category
        price: Unit price of the product
        quantity: Number of units purchased
        coupon_code: Applied discount code (if any)
        shipping_city: Destination city for shipping
        is_cleaned: Flag indicating if record has been cleaned
    """
    
    # Coupon discount mapping (class-level constant)
    COUPON_DISCOUNTS = {
        'SAVE10': 0.10, 'SAVE15': 0.15, 'SAVE20': 0.20, 'SAVE25': 0.25,
        'WELCOME5': 0.05, 'FLASH30': 0.30, 'VIP40': 0.40, 'HOLIDAY15': 0.15,
        'SUMMER10': 0.10, 'WINTER20': 0.20, 'FREESHIP': 0.00, 'LOYALTY25': 0.25
    }
    
    def __init__(self, order_id: str, order_date: str, customer_id: str,
                 product: str, category: str, price: float, quantity: int,
                 coupon_code: Optional[str], shipping_city: str):
        """Initialize a SalesRecord with transaction data."""
        self.order_id = order_id
        self.order_date = order_date
        self.customer_id = customer_id
        self.product = product
        self.category = category
        self.price = price
        self.quantity = quantity
        self.coupon_code = coupon_code
        self.shipping_city = shipping_city
        self.is_cleaned = False
        
        # Track data quality issues found
        self.quality_issues = []
    
    def clean(self) -> 'SalesRecord':
        """
        Clean and standardize the record data.
        
        Cleaning operations:
        - Strip whitespace from string fields
        - Standardize city names to title case
        - Handle negative prices (convert to absolute value)
        - Ensure quantity is at least 1
        - Validate and standardize coupon codes
        
        Returns:
            self: The cleaned SalesRecord instance (for method chaining)
        """
        # Clean shipping city: strip whitespace and title case
        if self.shipping_city:
            original_city = self.shipping_city
            self.shipping_city = self.shipping_city.strip().title()
            if original_city != self.shipping_city:
                self.quality_issues.append(f"City standardized: '{original_city}' -> '{self.shipping_city}'")
        
        # Handle negative prices
        if self.price is not None and self.price < 0:
            self.quality_issues.append(f"Negative price corrected: {self.price} -> {abs(self.price)}")
            self.price = abs(self.price)
        
        # Ensure quantity is at least 1
        if self.quantity is not None and self.quantity < 1:
            self.quality_issues.append(f"Invalid quantity corrected: {self.quantity} -> 1")
            self.quantity = 1
        
        # Standardize coupon code to uppercase
        if self.coupon_code and isinstance(self.coupon_code, str):
            self.coupon_code = self.coupon_code.strip().upper()
        
        # Clean customer_id
        if self.customer_id:
            self.customer_id = str(self.customer_id).strip()
        
        self.is_cleaned = True
        return self
    
    def total(self) -> float:
        """
        Calculate the total amount for this transaction.
        
        Applies coupon discount if a valid coupon code is present.
        
        Returns:
            float: Total transaction amount after discount
        """
        if self.price is None or self.quantity is None:
            return 0.0
        
        subtotal = self.price * self.quantity
        
        # Apply discount if coupon exists
        discount = self.get_discount_rate()
        return round(subtotal * (1 - discount), 2)
    
    def get_discount_rate(self) -> float:
        """
        Get the discount rate for the applied coupon.
        
        Returns:
            float: Discount rate (0.0 to 1.0), or 0.0 if no valid coupon
        """
        if self.coupon_code and self.coupon_code in self.COUPON_DISCOUNTS:
            return self.COUPON_DISCOUNTS[self.coupon_code]
        return 0.0
    
    def is_valid(self) -> bool:
        """
        Check if the record has all required fields populated.
        
        Returns:
            bool: True if record has no critical missing values
        """
        required_fields = [
            self.order_id,
            self.order_date,
            self.product,
            self.price is not None,
            self.quantity is not None
        ]
        return all(required_fields)
    
    def to_dict(self) -> Dict[str, Any]:
        """
        Convert the record to a dictionary for serialization.
        
        Returns:
            dict: Dictionary representation of the record
        """
        return {
            'order_id': self.order_id,
            'order_date': self.order_date,
            'customer_id': self.customer_id,
            'product': self.product,
            'category': self.category,
            'price': self.price,
            'quantity': self.quantity,
            'coupon_code': self.coupon_code,
            'shipping_city': self.shipping_city,
            'total_amount': self.total(),
            'is_cleaned': self.is_cleaned
        }
    
    def __repr__(self) -> str:
        """String representation for debugging."""
        return f"SalesRecord({self.order_id}, {self.product}, ${self.price}, qty={self.quantity})"


# Demonstrate the class with a sample record
sample = SalesRecord(
    order_id="ORD000001",
    order_date="2024-06-15",
    customer_id="CUST00042",
    product="Wireless Earbuds",
    category="Electronics",
    price=79.99,
    quantity=2,
    coupon_code="SAVE10",
    shipping_city="  new york  "  # Intentionally dirty
)

print("Before cleaning:")
print(f"  City: '{sample.shipping_city}'")
print(f"  Total: ${sample.total()}")

sample.clean()

print("\nAfter cleaning:")
print(f"  City: '{sample.shipping_city}'")
print(f"  Total: ${sample.total()} (after 10% discount)")
print(f"  Is Valid: {sample.is_valid()}")
print(f"  Quality Issues Found: {sample.quality_issues}")

---

## Step 4: Bulk Loaded

Now we convert the entire DataFrame into a collection of `SalesRecord` objects. This demonstrates mapping data from pandas DataFrames to custom data structures, enabling us to leverage our class methods for data processing.

In [None]:
def load_records_from_dataframe(df: pd.DataFrame) -> List[SalesRecord]:
    """
    Convert a pandas DataFrame to a list of SalesRecord objects.
    
    Args:
        df: DataFrame with e-commerce transaction data
        
    Returns:
        List of SalesRecord objects
    """
    records = []
    
    for _, row in df.iterrows():
        # Handle NaN values appropriately
        record = SalesRecord(
            order_id=row['order_id'],
            order_date=row['order_date'] if pd.notna(row['order_date']) else None,
            customer_id=row['customer_id'] if pd.notna(row['customer_id']) else None,
            product=row['product'] if pd.notna(row['product']) else None,
            category=row['category'] if pd.notna(row['category']) else None,
            price=float(row['price']) if pd.notna(row['price']) else None,
            quantity=int(row['quantity']) if pd.notna(row['quantity']) else None,
            coupon_code=row['coupon_code'] if pd.notna(row['coupon_code']) else None,
            shipping_city=row['shipping_city'] if pd.notna(row['shipping_city']) else None
        )
        records.append(record)
    
    return records


# Load all records from the raw DataFrame
all_records = load_records_from_dataframe(raw_df)

print(f"Loaded {len(all_records)} SalesRecord objects")
print(f"\nFirst 3 records:")
for i, rec in enumerate(all_records[:3], 1):
    print(f"  {i}. {rec}")

In [None]:
# Also create a dictionary mapping order_id to records for O(1) lookups
records_dict: Dict[str, SalesRecord] = {rec.order_id: rec for rec in all_records}

print(f"Created dictionary with {len(records_dict)} entries")
print(f"\nExample lookup: records_dict['ORD000005'] = {records_dict['ORD000005']}")

---

## Step 5: Quick Profiling

Before diving into cleaning, we need to understand our data's statistical properties. This profiling step reveals the range of values, identifies outliers, and helps us understand what "normal" looks like for this dataset.

In [None]:
def profile_numeric_field(records: List[SalesRecord], field: str) -> Dict[str, float]:
    """
    Calculate min, mean, max, and std for a numeric field across all records.
    
    Args:
        records: List of SalesRecord objects
        field: Name of the numeric attribute to profile
        
    Returns:
        Dictionary with statistical measures
    """
    values = [getattr(rec, field) for rec in records if getattr(rec, field) is not None]
    
    if not values:
        return {'min': None, 'mean': None, 'max': None, 'std': None, 'count': 0}
    
    return {
        'min': round(min(values), 2),
        'mean': round(sum(values) / len(values), 2),
        'max': round(max(values), 2),
        'std': round(np.std(values), 2),
        'count': len(values)
    }


# Profile price field
price_stats = profile_numeric_field(all_records, 'price')
print("PRICE Statistics:")
print(f"  Min:    ${price_stats['min']}")
print(f"  Mean:   ${price_stats['mean']}")
print(f"  Max:    ${price_stats['max']}")
print(f"  Std:    ${price_stats['std']}")
print(f"  Count:  {price_stats['count']} non-null values")

# Profile quantity field
qty_stats = profile_numeric_field(all_records, 'quantity')
print("\nQUANTITY Statistics:")
print(f"  Min:    {qty_stats['min']}")
print(f"  Mean:   {qty_stats['mean']}")
print(f"  Max:    {qty_stats['max']}")
print(f"  Count:  {qty_stats['count']} non-null values")

In [None]:
# Use SET for unique value counts - perfect use case for sets!
unique_cities: set = {rec.shipping_city for rec in all_records if rec.shipping_city}
unique_products: set = {rec.product for rec in all_records if rec.product}
unique_categories: set = {rec.category for rec in all_records if rec.category}
unique_coupons: set = {rec.coupon_code for rec in all_records if rec.coupon_code}

print("UNIQUE VALUE COUNTS (using sets):")
print(f"  Unique Cities:     {len(unique_cities)}")
print(f"  Unique Products:   {len(unique_products)}")
print(f"  Unique Categories: {len(unique_categories)}")
print(f"  Unique Coupons:    {len(unique_coupons)}")

# Show the unique categories
print(f"\nCategories: {sorted(unique_categories)}")

In [None]:
# Look at city distribution - some might have inconsistent casing
print("City values (first 20 unique):")
for city in sorted(unique_cities)[:20]:
    print(f"  - '{city}'")

---

## Step 6: Spot the Grime

Data quality issues are inevitable in real-world datasets. In this step, we systematically identify "dirty" data that needs cleaning. We look for:

1. **Missing values** - NULL or empty fields
2. **Invalid values** - Negative prices, zero quantities
3. **Inconsistent formatting** - Different cases, extra whitespace
4. **Data type issues** - Wrong types or malformed data

In [None]:
def identify_data_quality_issues(records: List[SalesRecord]) -> Dict[str, List]:
    """
    Scan records and categorize all data quality issues found.
    
    Returns:
        Dictionary mapping issue type to list of affected order_ids
    """
    issues = {
        'missing_date': [],
        'missing_customer_id': [],
        'missing_product': [],
        'missing_price': [],
        'negative_price': [],
        'zero_quantity': [],
        'inconsistent_city_case': [],
        'whitespace_in_city': []
    }
    
    for rec in records:
        # Check for missing values
        if not rec.order_date:
            issues['missing_date'].append(rec.order_id)
        if not rec.customer_id or rec.customer_id == '':
            issues['missing_customer_id'].append(rec.order_id)
        if not rec.product:
            issues['missing_product'].append(rec.order_id)
        if rec.price is None:
            issues['missing_price'].append(rec.order_id)
            
        # Check for invalid values
        if rec.price is not None and rec.price < 0:
            issues['negative_price'].append(rec.order_id)
        if rec.quantity is not None and rec.quantity == 0:
            issues['zero_quantity'].append(rec.order_id)
            
        # Check for formatting issues
        if rec.shipping_city:
            if rec.shipping_city != rec.shipping_city.title():
                issues['inconsistent_city_case'].append(rec.order_id)
            if rec.shipping_city != rec.shipping_city.strip():
                issues['whitespace_in_city'].append(rec.order_id)
    
    return issues


# Find all data quality issues
quality_issues = identify_data_quality_issues(all_records)

print("DATA QUALITY ISSUES FOUND:")
print("=" * 50)
total_issues = 0
for issue_type, affected_orders in quality_issues.items():
    count = len(affected_orders)
    total_issues += count
    if count > 0:
        print(f"\n{issue_type.upper().replace('_', ' ')}:")
        print(f"  Count: {count} records")
        print(f"  Examples: {affected_orders[:3]}")

print(f"\n{'='*50}")
print(f"TOTAL ISSUES: {total_issues} across {len(all_records)} records")

In [None]:
# Let's examine some specific dirty records
print("Examples of dirty records:")
print("-" * 60)

# Negative price example
if quality_issues['negative_price']:
    order_id = quality_issues['negative_price'][0]
    rec = records_dict[order_id]
    print(f"\n1. NEGATIVE PRICE ({order_id}):")
    print(f"   Product: {rec.product}")
    print(f"   Price: ${rec.price} ← Invalid!")

# Missing product example
if quality_issues['missing_product']:
    order_id = quality_issues['missing_product'][0]
    rec = records_dict[order_id]
    print(f"\n2. MISSING PRODUCT ({order_id}):")
    print(f"   Product: {rec.product} ← NULL")
    print(f"   Category: {rec.category}")

# Inconsistent city casing example
if quality_issues['inconsistent_city_case']:
    order_id = quality_issues['inconsistent_city_case'][0]
    rec = records_dict[order_id]
    print(f"\n3. INCONSISTENT CITY CASE ({order_id}):")
    print(f"   City: '{rec.shipping_city}' ← Needs standardization")

---

## Step 7: Cleaning Rules

Now we apply our cleaning logic to fix the identified issues. We'll use the `clean()` method we implemented in our `SalesRecord` class and track before/after metrics.

In [None]:
# Capture "before" state
before_issues = identify_data_quality_issues(all_records)
before_counts = {k: len(v) for k, v in before_issues.items()}

print("BEFORE CLEANING:")
print("-" * 40)
for issue_type, count in before_counts.items():
    if count > 0:
        print(f"  {issue_type}: {count}")

In [None]:
# Apply cleaning to all records
all_quality_issues_found = []

for record in all_records:
    record.clean()  # Apply cleaning method
    if record.quality_issues:
        all_quality_issues_found.extend(record.quality_issues)

print(f"Cleaning applied to {len(all_records)} records")
print(f"\nQuality fixes applied ({len(all_quality_issues_found)} total):")
for issue in all_quality_issues_found[:10]:  # Show first 10
    print(f"  • {issue}")
if len(all_quality_issues_found) > 10:
    print(f"  ... and {len(all_quality_issues_found) - 10} more")

In [None]:
# Capture "after" state
after_issues = identify_data_quality_issues(all_records)
after_counts = {k: len(v) for k, v in after_issues.items()}

print("\nAFTER CLEANING:")
print("-" * 40)
for issue_type, count in after_counts.items():
    if count > 0:
        print(f"  {issue_type}: {count}")

# Show before/after comparison
print("\n" + "=" * 50)
print("BEFORE vs AFTER COMPARISON:")
print("=" * 50)
print(f"{'Issue Type':<30} {'Before':>8} {'After':>8} {'Fixed':>8}")
print("-" * 54)
for issue_type in before_counts:
    before = before_counts[issue_type]
    after = after_counts[issue_type]
    fixed = before - after
    if before > 0 or after > 0:
        print(f"{issue_type:<30} {before:>8} {after:>8} {fixed:>8}")

---

## Step 8: Transformations

Transformations convert raw data into more useful formats. Here we'll:
1. Parse `coupon_code` into a numeric `discount_percent` field
2. Calculate `total_amount` with discounts applied
3. Extract date components (year, month, day of week)

In [None]:
def transform_record_to_enriched_dict(record: SalesRecord) -> Dict[str, Any]:
    """
    Transform a SalesRecord into an enriched dictionary with computed fields.
    
    New fields added:
    - discount_percent: Numeric discount from coupon code
    - discount_amount: Dollar amount saved
    - total_amount: Final amount after discount
    - year, month, day_of_week: Date components
    """
    base_dict = record.to_dict()
    
    # Parse coupon_code to numeric discount
    discount_rate = record.get_discount_rate()
    base_dict['discount_percent'] = int(discount_rate * 100)
    
    # Calculate discount amount and total
    subtotal = (record.price or 0) * (record.quantity or 0)
    base_dict['subtotal'] = round(subtotal, 2)
    base_dict['discount_amount'] = round(subtotal * discount_rate, 2)
    
    # Extract date components
    if record.order_date:
        try:
            date_obj = datetime.strptime(record.order_date, '%Y-%m-%d')
            base_dict['year'] = date_obj.year
            base_dict['month'] = date_obj.month
            base_dict['month_name'] = date_obj.strftime('%B')
            base_dict['day_of_week'] = date_obj.strftime('%A')
            base_dict['quarter'] = (date_obj.month - 1) // 3 + 1
        except ValueError:
            base_dict['year'] = None
            base_dict['month'] = None
            base_dict['month_name'] = None
            base_dict['day_of_week'] = None
            base_dict['quarter'] = None
    else:
        base_dict['year'] = None
        base_dict['month'] = None
        base_dict['month_name'] = None
        base_dict['day_of_week'] = None
        base_dict['quarter'] = None
    
    return base_dict


# Transform all records
transformed_data = [transform_record_to_enriched_dict(rec) for rec in all_records]

# Show example transformations
print("TRANSFORMATION EXAMPLES:")
print("=" * 60)

# Find a record with a coupon
coupon_examples = [t for t in transformed_data if t['coupon_code']]
if coupon_examples:
    ex = coupon_examples[0]
    print(f"\nRecord with Coupon ({ex['order_id']}):")
    print(f"  Coupon Code:      {ex['coupon_code']}")
    print(f"  Discount Percent: {ex['discount_percent']}%")
    print(f"  Subtotal:         ${ex['subtotal']}")
    print(f"  Discount Amount:  -${ex['discount_amount']}")
    print(f"  Total Amount:     ${ex['total_amount']}")
    print(f"  Date Components:  {ex['year']}/{ex['month']} ({ex['day_of_week']})")

In [None]:
# Convert to DataFrame for easier viewing
transformed_df = pd.DataFrame(transformed_data)

print("\nTransformed DataFrame - New Columns:")
new_cols = ['order_id', 'coupon_code', 'discount_percent', 'subtotal', 
            'discount_amount', 'total_amount', 'year', 'month', 'day_of_week']
transformed_df[new_cols].head(10)

---

## Step 9: Feature Engineering

Feature engineering creates new variables that can improve analytical insights. We'll add:
1. `days_since_purchase` - Recency measure from today's date
2. `is_high_value` - Flag for transactions above average
3. `customer_segment` - Categorization based on spending patterns

In [None]:
# Reference date for calculating days_since_purchase
REFERENCE_DATE = datetime(2025, 1, 29)  # Today's date

def add_engineered_features(data: List[Dict], reference_date: datetime) -> List[Dict]:
    """
    Add engineered features to transformed records.
    
    New features:
    - days_since_purchase: Days between order and reference date
    - is_high_value: True if total_amount > median
    - order_size_category: 'Small', 'Medium', 'Large' based on quantity
    - revenue_category: 'Low', 'Medium', 'High' based on total_amount
    """
    # First pass: calculate statistics needed for features
    totals = [d['total_amount'] for d in data if d['total_amount']]
    median_total = np.median(totals)
    percentile_33 = np.percentile(totals, 33)
    percentile_67 = np.percentile(totals, 67)
    
    print(f"Total Amount Statistics:")
    print(f"  Median: ${median_total:.2f}")
    print(f"  33rd percentile: ${percentile_33:.2f}")
    print(f"  67th percentile: ${percentile_67:.2f}")
    
    # Second pass: add features
    for record in data:
        # Days since purchase
        if record['order_date']:
            try:
                order_date = datetime.strptime(record['order_date'], '%Y-%m-%d')
                record['days_since_purchase'] = (reference_date - order_date).days
            except ValueError:
                record['days_since_purchase'] = None
        else:
            record['days_since_purchase'] = None
        
        # High value flag
        record['is_high_value'] = record['total_amount'] > median_total if record['total_amount'] else False
        
        # Order size category
        qty = record.get('quantity', 0) or 0
        if qty <= 1:
            record['order_size_category'] = 'Small'
        elif qty <= 3:
            record['order_size_category'] = 'Medium'
        else:
            record['order_size_category'] = 'Large'
        
        # Revenue category
        total = record['total_amount'] or 0
        if total <= percentile_33:
            record['revenue_category'] = 'Low'
        elif total <= percentile_67:
            record['revenue_category'] = 'Medium'
        else:
            record['revenue_category'] = 'High'
    
    return data


# Add engineered features
enriched_data = add_engineered_features(transformed_data, REFERENCE_DATE)

print("\n" + "=" * 60)
print("FEATURE ENGINEERING COMPLETE")

In [None]:
# Convert to DataFrame and show new features
enriched_df = pd.DataFrame(enriched_data)

print("\nNew Engineered Features:")
feature_cols = ['order_id', 'order_date', 'total_amount', 'days_since_purchase', 
                'is_high_value', 'order_size_category', 'revenue_category']
enriched_df[feature_cols].head(10)

In [None]:
# Feature distribution summary
print("\nFeature Distribution Summary:")
print("-" * 40)

print("\nOrder Size Category:")
print(enriched_df['order_size_category'].value_counts())

print("\nRevenue Category:")
print(enriched_df['revenue_category'].value_counts())

print("\nHigh Value Transactions:")
print(enriched_df['is_high_value'].value_counts())

print(f"\nDays Since Purchase Range: {enriched_df['days_since_purchase'].min()} to {enriched_df['days_since_purchase'].max()} days")

---

## Step 10: Mini-Aggregation

Now we aggregate our cleaned and enriched data to extract business insights. We'll calculate revenue by shipping city using both dictionary-based aggregation and pandas `groupby`.

In [None]:
def aggregate_revenue_by_city(records: List[Dict]) -> Dict[str, Dict[str, float]]:
    """
    Aggregate revenue metrics by shipping city using dictionary.
    
    Returns:
        Dictionary mapping city to {total_revenue, order_count, avg_order_value}
    """
    city_stats = {}
    
    for record in records:
        city = record.get('shipping_city')
        if not city:
            continue
            
        if city not in city_stats:
            city_stats[city] = {
                'total_revenue': 0.0,
                'order_count': 0,
                'total_quantity': 0
            }
        
        total = record.get('total_amount') or 0
        qty = record.get('quantity') or 0
        
        city_stats[city]['total_revenue'] += total
        city_stats[city]['order_count'] += 1
        city_stats[city]['total_quantity'] += qty
    
    # Calculate averages
    for city in city_stats:
        stats = city_stats[city]
        stats['total_revenue'] = round(stats['total_revenue'], 2)
        stats['avg_order_value'] = round(stats['total_revenue'] / stats['order_count'], 2)
        stats['avg_quantity'] = round(stats['total_quantity'] / stats['order_count'], 2)
    
    return city_stats


# Aggregate using dictionary method
city_revenue = aggregate_revenue_by_city(enriched_data)

# Sort by total revenue (descending)
sorted_cities = sorted(city_revenue.items(), key=lambda x: x[1]['total_revenue'], reverse=True)

print("REVENUE BY SHIPPING CITY (Dictionary Method):")
print("=" * 70)
print(f"{'City':<20} {'Revenue':>12} {'Orders':>8} {'Avg Order':>12}")
print("-" * 70)
for city, stats in sorted_cities[:15]:  # Top 15
    print(f"{city:<20} ${stats['total_revenue']:>10,.2f} {stats['order_count']:>8} ${stats['avg_order_value']:>10,.2f}")

In [None]:
# Alternative: Using pandas groupby
print("\nREVENUE BY CITY (pandas groupby):")
print("=" * 60)

city_summary = enriched_df.groupby('shipping_city').agg({
    'total_amount': ['sum', 'mean', 'count'],
    'quantity': 'sum'
}).round(2)

city_summary.columns = ['Total Revenue', 'Avg Order', 'Order Count', 'Total Qty']
city_summary = city_summary.sort_values('Total Revenue', ascending=False)
city_summary.head(10)

In [None]:
# Additional aggregation: Revenue by Category
print("\nREVENUE BY PRODUCT CATEGORY:")
print("=" * 50)

category_summary = enriched_df.groupby('category').agg({
    'total_amount': ['sum', 'mean'],
    'order_id': 'count'
}).round(2)

category_summary.columns = ['Total Revenue', 'Avg Order', 'Order Count']
category_summary = category_summary.sort_values('Total Revenue', ascending=False)
category_summary

In [None]:
# Aggregation: Monthly Trends
print("\nMONTHLY REVENUE TREND:")
print("=" * 50)

monthly = enriched_df.groupby(['year', 'month', 'month_name']).agg({
    'total_amount': 'sum',
    'order_id': 'count'
}).round(2)

monthly.columns = ['Revenue', 'Orders']
monthly = monthly.reset_index()
monthly = monthly.sort_values(['year', 'month'])
monthly.tail(12)  # Last 12 months

---

## Step 11: Serialization Checkpoint

We'll save our cleaned and enriched data in two formats:
1. **JSON** - For API consumption and flexible schema
2. **CSV** - For spreadsheet analysis and ML pipelines

In [None]:
import os

# Create output directory
output_dir = 'output'
os.makedirs(output_dir, exist_ok=True)

# Serialize to JSON
json_path = os.path.join(output_dir, 'cleaned_sales_data.json')

with open(json_path, 'w') as f:
    json.dump(enriched_data, f, indent=2, default=str)

print(f"JSON Serialization:")
print(f"  File: {json_path}")
print(f"  Records: {len(enriched_data)}")
print(f"  Size: {os.path.getsize(json_path):,} bytes")

In [None]:
# Serialize to CSV
csv_path = os.path.join(output_dir, 'cleaned_sales_data.csv')

enriched_df.to_csv(csv_path, index=False)

print(f"\nCSV Serialization:")
print(f"  File: {csv_path}")
print(f"  Records: {len(enriched_df)}")
print(f"  Columns: {len(enriched_df.columns)}")
print(f"  Size: {os.path.getsize(csv_path):,} bytes")

In [None]:
# Verify JSON can be loaded back
with open(json_path, 'r') as f:
    loaded_json = json.load(f)

print(f"\nJSON Verification:")
print(f"  Records loaded: {len(loaded_json)}")
print(f"  Sample record keys: {list(loaded_json[0].keys())}")

# Verify CSV can be loaded back
loaded_csv = pd.read_csv(csv_path)
print(f"\nCSV Verification:")
print(f"  Shape: {loaded_csv.shape}")
print(f"  Columns match: {list(loaded_csv.columns) == list(enriched_df.columns)}")

---

## Step 12: Soft Interview Reflection

### How Functions and Classes Have Helped in This Project

The `SalesRecord` class was central to this data engineering pipeline. By encapsulating transaction data with methods like `clean()` and `total()`, I achieved **separation of concerns** — the cleaning logic lives with the data it operates on. This made the code more **maintainable**: when I needed to add new cleaning rules (like handling negative prices), I only modified one place.

Functions like `profile_numeric_field()` and `aggregate_revenue_by_city()` promoted **reusability** — I could apply the same profiling logic to any numeric field without duplicating code. The **type hints** and **docstrings** made the code self-documenting, which would help any teammate understand the pipeline quickly.

Most importantly, this structure supports **testing**: each method can be unit-tested in isolation, ensuring data quality at every step of the pipeline.

---

# Data Dictionary

This data dictionary merges field definitions from two sources:
1. **Primary Source**: E-commerce sales CSV (`ecommerce_sales.csv`)
2. **Secondary Source**: Coupon metadata JSON (`coupon_details.json`)

## Original Fields (from Primary Source)

| Field | Type | Description | Source |
|-------|------|-------------|--------|
| `order_id` | string | Unique identifier for each order (format: ORDnnnnnn) | Primary CSV |
| `order_date` | string | Date of transaction in YYYY-MM-DD format | Primary CSV |
| `customer_id` | string | Unique customer identifier (format: CUSTnnnnn) | Primary CSV |
| `product` | string | Name of the product purchased | Primary CSV |
| `category` | string | Product category (Electronics, Home & Kitchen, etc.) | Primary CSV |
| `price` | float | Unit price in USD | Primary CSV |
| `quantity` | integer | Number of units purchased | Primary CSV |
| `coupon_code` | string | Promotional discount code applied (nullable) | Primary CSV |
| `shipping_city` | string | Destination city for order delivery | Primary CSV |

## Transformed Fields (computed from Primary)

| Field | Type | Description | Source |
|-------|------|-------------|--------|
| `subtotal` | float | price × quantity before discounts | Computed: price × quantity |
| `total_amount` | float | Final transaction amount after discount | Computed: subtotal × (1 - discount_rate) |
| `year` | integer | Year extracted from order_date | Computed: from order_date |
| `month` | integer | Month number (1-12) from order_date | Computed: from order_date |
| `month_name` | string | Full month name (January, February, etc.) | Computed: from order_date |
| `day_of_week` | string | Day name (Monday, Tuesday, etc.) | Computed: from order_date |
| `quarter` | integer | Fiscal quarter (1-4) | Computed: from month |
| `is_cleaned` | boolean | Flag indicating record passed through clean() | Computed: after cleaning |

## Enriched Fields (from Secondary Source + Computation)

| Field | Type | Description | Source |
|-------|------|-------------|--------|
| `discount_percent` | integer | Percentage discount from coupon (0-40) | Secondary JSON: mapped from coupon_code |
| `discount_amount` | float | Dollar amount of discount applied | Computed: subtotal × discount_rate |

## Engineered Features (derived for analysis)

| Field | Type | Description | Source |
|-------|------|-------------|--------|
| `days_since_purchase` | integer | Days between order_date and reference date (2025-01-29) | Computed: date difference |
| `is_high_value` | boolean | True if total_amount exceeds median | Computed: comparison to median |
| `order_size_category` | string | Size bucket: 'Small' (qty≤1), 'Medium' (qty≤3), 'Large' (qty>3) | Computed: quantity bins |
| `revenue_category` | string | Revenue bucket: 'Low' (<33%), 'Medium' (33-67%), 'High' (>67%) | Computed: percentile bins |

## Secondary Source: Coupon Metadata Fields

The coupon metadata (`coupon_details.json`) provided the discount mapping:

| Coupon Code | Discount % | Description |
|-------------|------------|-------------|
| SAVE10 | 10 | Standard 10% discount |
| SAVE15 | 15 | Weekend special |
| SAVE20 | 20 | Newsletter subscriber discount |
| SAVE25 | 25 | Seasonal sale |
| WELCOME5 | 5 | New customer welcome |
| FLASH30 | 30 | Flash sale limited time |
| VIP40 | 40 | VIP member exclusive |
| HOLIDAY15 | 15 | Holiday season special |
| SUMMER10 | 10 | Summer clearance |
| WINTER20 | 20 | Winter promotion |
| FREESHIP | 0 | Free shipping (no price discount) |
| LOYALTY25 | 25 | Loyalty program member |

---

## Summary Statistics

Final overview of the processed dataset:

In [None]:
print("FINAL DATASET SUMMARY")
print("=" * 60)
print(f"Total Records: {len(enriched_df)}")
print(f"Total Columns: {len(enriched_df.columns)}")
print(f"\nOriginal Columns: 9")
print(f"Transformed Columns: 7")
print(f"Engineered Features: 4")
print(f"\nTotal Revenue: ${enriched_df['total_amount'].sum():,.2f}")
print(f"Average Order Value: ${enriched_df['total_amount'].mean():.2f}")
print(f"Total Discounts Given: ${enriched_df['discount_amount'].sum():,.2f}")
print(f"\nDate Range: {enriched_df['order_date'].min()} to {enriched_df['order_date'].max()}")
print(f"Unique Customers: {enriched_df['customer_id'].nunique()}")
print(f"Unique Cities: {enriched_df['shipping_city'].nunique()}")
print(f"\nData Quality: {sum(1 for r in all_records if r.is_cleaned)} records cleaned")

In [None]:
# List all columns in final dataset
print("\nALL COLUMNS IN FINAL DATASET:")
print("-" * 40)
for i, col in enumerate(enriched_df.columns, 1):
    dtype = enriched_df[col].dtype
    print(f"{i:2}. {col:<25} ({dtype})")

---

**End of Lab Notebook**

*Data Engineering Pipeline Complete*