# Data Cleaning and Validation

This notebook performs data cleaning and validation on the raw e-commerce data:
- Handle missing values
- Fix data type issues
- Remove duplicates
- Standardize formats
- Validate data integrity

Clean data is saved to the staging layer.

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
import os
import warnings
warnings.filterwarnings('ignore')

plt.style.use('default')
sns.set_palette('husl')

## Load Raw Data

In [None]:
# Load raw data
raw_data_path = '../data/raw/'

customers_df = pd.read_csv(f'{raw_data_path}/customers.csv')
products_df = pd.read_csv(f'{raw_data_path}/products.csv')
transactions_df = pd.read_csv(f'{raw_data_path}/transactions.csv')

print("=== RAW DATA LOADED ===")
print(f"Customers: {len(customers_df):,} records")
print(f"Products: {len(products_df):,} records")
print(f"Transactions: {len(transactions_df):,} records")

## Data Quality Assessment

In [None]:
def data_quality_report(df, name):
    print(f"\n=== {name.upper()} DATA QUALITY ===")
    print(f"Shape: {df.shape}")
    print(f"\nMissing values:")
    missing = df.isnull().sum()
    if missing.sum() > 0:
        print(missing[missing > 0])
    else:
        print("No missing values found")
    
    print(f"\nDuplicate rows: {df.duplicated().sum()}")
    print(f"Data types:")
    print(df.dtypes)

data_quality_report(customers_df, 'customers')
data_quality_report(products_df, 'products')
data_quality_report(transactions_df, 'transactions')

## Clean Customer Data

In [None]:
# Clean customer data
customers_clean = customers_df.copy()

# Convert date columns
customers_clean['registration_date'] = pd.to_datetime(customers_clean['registration_date'])

# Standardize text fields
customers_clean['first_name'] = customers_clean['first_name'].str.strip().str.title()
customers_clean['last_name'] = customers_clean['last_name'].str.strip().str.title()
customers_clean['email'] = customers_clean['email'].str.lower().str.strip()
customers_clean['city'] = customers_clean['city'].str.strip().str.title()
customers_clean['state'] = customers_clean['state'].str.strip().str.upper()

# Remove duplicates based on email (keep first)
customers_clean = customers_clean.drop_duplicates(subset=['email'], keep='first')

# Validate age ranges
customers_clean = customers_clean[(customers_clean['age'] >= 18) & (customers_clean['age'] <= 100)]

# Validate registration dates (not in the future)
customers_clean = customers_clean[customers_clean['registration_date'] <= datetime.now()]

print(f"✅ Customer data cleaned: {len(customers_df)} → {len(customers_clean)} records")
print(f"   Removed {len(customers_df) - len(customers_clean)} records")

customers_clean.head()

## Clean Product Data

In [None]:
# Clean product data
products_clean = products_df.copy()

# Convert date columns
products_clean['launch_date'] = pd.to_datetime(products_clean['launch_date'])

# Standardize text fields
products_clean['product_name'] = products_clean['product_name'].str.strip().str.title()
products_clean['brand'] = products_clean['brand'].str.strip().str.title()
products_clean['category'] = products_clean['category'].str.strip().str.title()

# Remove duplicates based on product_name and brand
products_clean = products_clean.drop_duplicates(subset=['product_name', 'brand'], keep='first')

# Validate price and cost
products_clean = products_clean[(products_clean['price'] > 0) & (products_clean['cost'] >= 0)]
products_clean = products_clean[products_clean['cost'] <= products_clean['price']]

# Validate rating
products_clean = products_clean[(products_clean['rating'] >= 1.0) & (products_clean['rating'] <= 5.0)]

# Validate weight
products_clean = products_clean[(products_clean['weight_kg'] > 0) & (products_clean['weight_kg'] <= 50)]

print(f"✅ Product data cleaned: {len(products_df)} → {len(products_clean)} records")
print(f"   Removed {len(products_df) - len(products_clean)} records")

products_clean.head()

## Clean Transaction Data

In [None]:
# Clean transaction data
transactions_clean = transactions_df.copy()

# Convert date columns
transactions_clean['transaction_date'] = pd.to_datetime(transactions_clean['transaction_date'])

# Remove transactions with invalid customer or product IDs
valid_customer_ids = set(customers_clean['customer_id'])
valid_product_ids = set(products_clean['product_id'])

initial_count = len(transactions_clean)
transactions_clean = transactions_clean[
    transactions_clean['customer_id'].isin(valid_customer_ids) &
    transactions_clean['product_id'].isin(valid_product_ids)
]

# Validate quantity
transactions_clean = transactions_clean[(transactions_clean['quantity'] > 0) & (transactions_clean['quantity'] <= 10)]

# Validate monetary values
transactions_clean = transactions_clean[
    (transactions_clean['unit_price'] > 0) &
    (transactions_clean['total_amount'] > 0) &
    (transactions_clean['discount_amount'] >= 0) &
    (transactions_clean['shipping_cost'] >= 0)
]

# Remove duplicates
transactions_clean = transactions_clean.drop_duplicates(subset=['transaction_id'], keep='first')

# Validate transaction dates (not in the future, not before customer registration)
transactions_clean = transactions_clean[transactions_clean['transaction_date'] <= datetime.now()]

# Add customer registration date check
customer_reg_dates = customers_clean.set_index('customer_id')['registration_date'].to_dict()
transactions_clean['customer_reg_date'] = transactions_clean['customer_id'].map(customer_reg_dates)
transactions_clean = transactions_clean[transactions_clean['transaction_date'] >= transactions_clean['customer_reg_date']]
transactions_clean = transactions_clean.drop('customer_reg_date', axis=1)

print(f"✅ Transaction data cleaned: {len(transactions_df)} → {len(transactions_clean)} records")
print(f"   Removed {len(transactions_df) - len(transactions_clean)} records")

transactions_clean.head()

## Data Validation and Integrity Checks

In [None]:
# Integrity checks
print("=== DATA INTEGRITY VALIDATION ===")

# Check for orphaned transactions
orphaned_customers = set(transactions_clean['customer_id']) - set(customers_clean['customer_id'])
orphaned_products = set(transactions_clean['product_id']) - set(products_clean['product_id'])

print(f"Orphaned customer references: {len(orphaned_customers)}")
print(f"Orphaned product references: {len(orphaned_products)}")

# Check unique constraints
print(f"\nUnique customer IDs: {customers_clean['customer_id'].nunique()} (should equal {len(customers_clean)})")
print(f"Unique product IDs: {products_clean['product_id'].nunique()} (should equal {len(products_clean)})")
print(f"Unique transaction IDs: {transactions_clean['transaction_id'].nunique()} (should equal {len(transactions_clean)})")

# Check data ranges
print(f"\nCustomer age range: {customers_clean['age'].min()} - {customers_clean['age'].max()}")
print(f"Product price range: ${products_clean['price'].min():.2f} - ${products_clean['price'].max():.2f}")
print(f"Transaction amount range: ${transactions_clean['total_amount'].min():.2f} - ${transactions_clean['total_amount'].max():.2f}")

# Date ranges
print(f"\nCustomer registration dates: {customers_clean['registration_date'].min().date()} to {customers_clean['registration_date'].max().date()}")
print(f"Transaction dates: {transactions_clean['transaction_date'].min().date()} to {transactions_clean['transaction_date'].max().date()}")


## Data Distribution Visualization

In [None]:
# Create visualizations
fig, axes = plt.subplots(2, 3, figsize=(18, 12))

# Customer age distribution
axes[0, 0].hist(customers_clean['age'], bins=30, alpha=0.7, edgecolor='black')
axes[0, 0].set_title('Customer Age Distribution')
axes[0, 0].set_xlabel('Age')
axes[0, 0].set_ylabel('Count')

# Product price distribution
axes[0, 1].hist(products_clean['price'], bins=30, alpha=0.7, edgecolor='black')
axes[0, 1].set_title('Product Price Distribution')
axes[0, 1].set_xlabel('Price ($)')
axes[0, 1].set_ylabel('Count')

# Transaction amount distribution
axes[0, 2].hist(transactions_clean['total_amount'], bins=30, alpha=0.7, edgecolor='black')
axes[0, 2].set_title('Transaction Amount Distribution')
axes[0, 2].set_xlabel('Amount ($)')
axes[0, 2].set_ylabel('Count')

# Gender distribution
gender_counts = customers_clean['gender'].value_counts()
axes[1, 0].pie(gender_counts.values, labels=gender_counts.index, autopct='%1.1f%%')
axes[1, 0].set_title('Customer Gender Distribution')

# Product category distribution
category_counts = products_clean['category'].value_counts()
axes[1, 1].bar(category_counts.index, category_counts.values)
axes[1, 1].set_title('Product Category Distribution')
axes[1, 1].set_xlabel('Category')
axes[1, 1].set_ylabel('Count')
axes[1, 1].tick_params(axis='x', rotation=45)

# Order status distribution
status_counts = transactions_clean['order_status'].value_counts()
axes[1, 2].pie(status_counts.values, labels=status_counts.index, autopct='%1.1f%%')
axes[1, 2].set_title('Order Status Distribution')

plt.tight_layout()
plt.show()

## Save Clean Data to Staging Layer

In [None]:
# Create output directory if it doesn't exist
stage_data_path = '../data/stage/'
os.makedirs(stage_data_path, exist_ok=True)

# Save cleaned datasets
customers_clean.to_csv(f'{stage_data_path}/cleaned_customers.csv', index=False)
products_clean.to_csv(f'{stage_data_path}/cleaned_products.csv', index=False)
transactions_clean.to_csv(f'{stage_data_path}/cleaned_transactions.csv', index=False)

print("✅ Clean data successfully saved to staging layer:")
print(f"   - cleaned_customers.csv: {len(customers_clean):,} records")
print(f"   - cleaned_products.csv: {len(products_clean):,} records")
print(f"   - cleaned_transactions.csv: {len(transactions_clean):,} records")

# Save data quality summary
quality_summary = {
    'cleaned_date': datetime.now().isoformat(),
    'customers': {
        'raw_count': len(customers_df),
        'clean_count': len(customers_clean),
        'removed_count': len(customers_df) - len(customers_clean)
    },
    'products': {
        'raw_count': len(products_df),
        'clean_count': len(products_clean),
        'removed_count': len(products_df) - len(products_clean)
    },
    'transactions': {
        'raw_count': len(transactions_df),
        'clean_count': len(transactions_clean),
        'removed_count': len(transactions_df) - len(transactions_clean)
    }
}

import json
with open(f'{stage_data_path}/data_quality_summary.json', 'w') as f:
    json.dump(quality_summary, f, indent=2)

print("\n📊 Data quality summary saved to data_quality_summary.json")