## Step 1: Hello, Data!


Load the raw CSV data (Sales + Shipping/Promo) to simulate a real-world integration task.


In [None]:
import csv
import pprint

sales_path = '../data/sales_data.csv'
promo_path = '../data/shipping_promo_data.csv'

# Load Sales Data
sales_data = []
with open(sales_path, mode='r', encoding='utf-8') as f:
    reader = csv.DictReader(f)
    for row in reader:
        sales_data.append(row)

# Load Promo/Shipping Data
promo_data = {}
with open(promo_path, mode='r', encoding='utf-8') as f:
    reader = csv.DictReader(f)
    for row in reader:
        # Index by Order ID for fast merging
        promo_data[row['Order ID']] = row

print(f"Loaded {len(sales_data)} sales records.")
print(f"Loaded {len(promo_data)} shipping/promo records.")
pprint.pprint(sales_data[0])
pprint.pprint(promo_data[sales_data[0]['Order ID']])


## Step 2: Pick the Right Container


**Justification:**
1.  **Sales Data**: Dictionary list for sequential processing.
2.  **Promo Data**: **Dictionary of Dictionaries** (Hash Map) using `Order ID` as key. This allows O(1) lookups during the merge phase, which is much faster than looping through lists (O(N^2)).
This design mimics database index lookups.



## Step 3: Implement Functions and Data structure


We update our `Transaction` class to include the merged fields: `shipping_city` and `coupon_code`.


In [None]:
class Transaction:
    def __init__(self, sales_row, promo_row=None):
        # Base Sales Data
        self.region = sales_row.get('Region')
        self.country = sales_row.get('Country')
        self.item_type = sales_row.get('Item Type')
        self.sales_channel = sales_row.get('Sales Channel')
        self.order_date = sales_row.get('Order Date')
        self.order_id = sales_row.get('Order ID')
        self.units_sold = sales_row.get('Units Sold')
        self.unit_price = sales_row.get('Unit Price')
        self.unit_cost = sales_row.get('Unit Cost')
        self.total_revenue = sales_row.get('Total Revenue')
        self.total_cost = sales_row.get('Total Cost')
        self.total_profit = sales_row.get('Total Profit')
        
        # Merged Data (if available)
        if promo_row:
            self.shipping_city = promo_row.get('Shipping City')
            self.coupon_code = promo_row.get('Coupon Code')
        else:
            self.shipping_city = None
            self.coupon_code = None
            
        # Additional fields for processing
        self.discount_amount = 0.0

    def clean(self):
        # Type conversions
        if isinstance(self.units_sold, str):
            try:
                self.units_sold = int(self.units_sold)
            except ValueError:
                self.units_sold = 0
        
        for field in ['unit_price', 'unit_cost', 'total_revenue', 'total_cost', 'total_profit']:
            val = getattr(self, field)
            if isinstance(val, str):
                try:
                    setattr(self, field, float(val))
                except ValueError:
                    setattr(self, field, 0.0)
        
        # City Cleaning: Standardize Title Case
        if self.shipping_city:
            self.shipping_city = self.shipping_city.title().strip()

    def total(self):
        return self.total_revenue
    
    def __repr__(self):
        return f"<Transaction {self.order_id} | City: {self.shipping_city}>"

print("Enhanced Transaction Class defined.")


## Step 4: Bulk Loaded (The Merge)


We loop through sales data and **Merge** with promo data using the dictionary lookup.


In [None]:
transactions = []
for row in sales_data:
    oid = row['Order ID']
    # Merge occurs here: fetching matching promo row via key
    p_row = promo_data.get(oid) 
    
    t = Transaction(row, p_row)
    transactions.append(t)

print(f"Merged and Loaded {len(transactions)} Transaction objects.")
print("Sample Merged Object:", transactions[0])


## Step 5: Quick Profiling


Profiling now includes the new merged fields.


In [None]:
prices = []
for t in transactions:
    try:
        prices.append(float(t.unit_price))
    except:
        pass

if prices:
    min_price, max_price, mean_price = min(prices), max(prices), sum(prices)/len(prices)
else:
    min_price = max_price = mean_price = 0.0

# Count unique shipping cities
unique_cities = {t.shipping_city for t in transactions if t.shipping_city}
unique_coupons = {t.coupon_code for t in transactions if t.coupon_code}

print(f"Price Stats -> Min: {min_price:.2f}, Mean: {mean_price:.2f}")
print(f"Unique Shipping Cities: {len(unique_cities)}")
print(f"Active Coupon Codes: {unique_coupons}")


## Step 6: Spot the Grime


New dirty data identified from the merge:
1.  **Inconsistent Case**: Some cities might be lowercase (e.g., 'rome' vs 'Rome').
2.  **Missing Values**: Some transactions have no coupon code (this is expected, but empty strings need handling).
3.  **Data Types**: Numeric fields are still strings.



## Step 7: Cleaning Rules


Execute `clean()` which now fixes city casing as well.


In [None]:
# Checking dirty city sample
dirty_cities = [t.shipping_city for t in transactions if t.shipping_city and t.shipping_city.islower()]
print(f"Lowercase cities found before cleaning: {len(dirty_cities)}")

for t in transactions:
    t.clean()

dirty_cities_after = [t.shipping_city for t in transactions if t.shipping_city and t.shipping_city.islower()]
print(f"Lowercase cities found after cleaning: {len(dirty_cities_after)}")


## Step 8: Transformations (Coupon Parsing)


Transform `Coupon Code` into a numeric discount. Rule: 'SAVE10' -> 10.0.


In [None]:
from datetime import datetime

# Date Parsing
for t in transactions:
    if isinstance(t.order_date, str):
        try:
            t.order_date = datetime.strptime(t.order_date, '%m/%d/%Y')
        except ValueError:
            t.order_date = None

# Coupon Parsing Logic
def parse_coupon(code):
    if not code: return 0.0
    code = code.upper()
    if 'SAVE' in code:
        return float(code.replace('SAVE', ''))
    if 'WELCOME' in code:
        return float(code.replace('WELCOME', ''))
    if 'SUMMER' in code:
        return float(code.replace('SUMMER', ''))
    if 'WINTER' in code:
        return float(code.replace('WINTER', ''))
    return 0.0

for t in transactions:
    t.discount_amount = parse_coupon(t.coupon_code)

print(f"Sample Coupon: {transactions[0].coupon_code}, Discount: {transactions[0].discount_amount}")


## Step 9: Feature Engineering


Feature: `Net Revenue` = Total Revenue - Discount.


In [None]:
for t in transactions:
    # Applying simple flat discount for demonstration
    t.net_revenue = t.total_revenue - t.discount_amount
    
    if t.total_revenue > 0:
        t.margin_percentage = (t.total_profit / t.total_revenue) * 100
    else:
        t.margin_percentage = 0.0

print(f"Sample Net Revenue: {transactions[0].net_revenue:.2f}")


## Step 10: Mini-Aggregation


Aggregate Revenue by Shipping City.


In [None]:
rev_by_city = {}
for t in transactions:
    city = t.shipping_city
    if city not in rev_by_city:
        rev_by_city[city] = 0.0
    rev_by_city[city] += t.net_revenue

print("Top 5 Cities by Revenue:")
sorted_cities = sorted(rev_by_city.items(), key=lambda x: x[1], reverse=True)[:5]
for city, rev in sorted_cities:
    print(f"{city}: ${rev:,.2f}")


## Step 11: Serialization Checkpoint


Save to JSON and CSV.


In [None]:
import json
import csv

def transaction_serializer(obj):
    if isinstance(obj, datetime):
        return obj.strftime('%Y-%m-%d')
    return obj.__dict__

# JSON
with open('../data/processed_transactions.json', 'w') as f:
    json.dump(transactions, f, default=transaction_serializer, indent=4)

# CSV
if transactions:
    headers = transactions[0].__dict__.keys()
    with open('../data/processed_transactions.csv', 'w', newline='') as f:
        writer = csv.DictWriter(f, fieldnames=headers)
        writer.writeheader()
        for t in transactions:
            row = t.__dict__.copy()
            if isinstance(row['order_date'], datetime):
                row['order_date'] = row['order_date'].strftime('%Y-%m-%d')
            writer.writerow(row)

print("Serialization complete.")


## Step 12: Soft Interview Reflection


The whiteboard diagram required a **Merge** step, which I simulated by joining Sales Data with a secondary Shipping/Promo dataset. 
Using a **Hash Map (Dictionary)** for the secondary source allowed me to perform the merge in O(N) time complexity instead of O(N^2), which is critical for scaling data engineering pipelines.
This structure mimics a database JOIN operation, ensuring data integrity across the '1st Normal Form' final dataset.


## Data Dictionary



| Field | Type | Description | Source |
|-------|------|-------------|--------|
| Order ID | Int | Unique key used to merge the two files | Primary (Sales) + Secondary (Promo) |
| Shipping City | String | City to ship to (e.g., "Tripoli Capital City") | Secondary (Generated Promo File) |
| Coupon Code | String | The raw code (e.g., "SUMMER15") | Secondary (Generated Promo File) |
| Discount Amount | Float | The numeric value we extracted (e.g., 15.0) | Transformed (Python Function) |
| Net Revenue | Float | Revenue minus the discount | Feature Engineering (Math) |
| Region | String | Continent/Region | Primary (Sales CSV) |
| Country | String | Country Name | Primary (Sales CSV) |
| Item Type | String | Product Category | Primary (Sales CSV) |
| Sales Channel | String | Online or Offline | Primary (Sales CSV) |
| Order Date | Date | Date the order was placed | Primary (Sales CSV) |
| Units Sold | Int | Quantity | Primary (Sales CSV) |
| Unit Price | Float | Price per item | Primary (Sales CSV) |
| Total Revenue | Float | Unit Price * Units Sold | Primary (Sales CSV) |
| Total Profit | Float | Total Revenue - Total Cost | Primary (Sales CSV) |
| Margin % | Float | Calculated Profit / Revenue | Feature Engineering |

