In [5]:
# -----------------------------------
# ETL PIPELINE 
# -----------------------------------

import pandas as pd
import mysql.connector
import re
from decimal import Decimal

# -----------------------------------
# STEP 1: CONNECT TO MYSQL
# -----------------------------------

db = mysql.connector.connect(
    host="localhost",
    user="root",
    password="pass123",   # change if needed
    database="fleximart",
    charset='utf8mb4'
)

cursor = db.cursor()
db.autocommit = False

# -----------------------------------
# STEP 2: READ CSV FILES
# -----------------------------------

customers = pd.read_csv("customers_raw.csv")
products = pd.read_csv("products_raw.csv")
sales = pd.read_csv("sales_raw.csv")

# -----------------------------------
# STEP 3: STORE ORIGINAL COUNTS
# -----------------------------------

customers_count = len(customers)
products_count = len(products)
sales_count = len(sales)
# -----------------------------------
# STEP 3.1: DUPLICATE CHECK (BEFORE REMOVAL)
# -----------------------------------
customers_duplicates = customers.duplicated().sum()
products_duplicates = products.duplicated().sum()
sales_duplicates = sales.duplicated().sum()
# -----------------------------------
# STEP 3.2: MISSING VALUE COUNTS (BEFORE CLEANING)
# -----------------------------------

customers_missing = customers.isna().sum()
products_missing = products.isna().sum()
sales_missing = sales.isna().sum()
# -----------------------------------
# STEP 4: REMOVE DUPLICATES
# -----------------------------------
customers.drop_duplicates(inplace=True)
products.drop_duplicates(inplace=True)
sales.drop_duplicates(inplace=True)
# -----------------------------------
# STEP 5: CLEAN CUSTOMER DATA
# -----------------------------------

#  Standardize phone numbers
# - Remove non-numeric characters
# - Accept only valid 10-digit Indian numbers
# - Prefix with country code (+91)
# - Invalid or missing phones become NULL
def fix_phone(phone):
    
    # 1. Handle actual Nulls or empty spaces
    if pd.isna(phone) or str(phone).strip().lower() in ['nan', 'none', '']:
        return None
    
    # 2. Convert to string and strip ".0" if it came from a float/scientific notation
    phone_str = str(phone).strip()
    if phone_str.endswith('.0'):
        phone_str = phone_str[:-2]
    
    # 3. Keep only the digits
    digits = re.sub(r"\D", "", phone_str)
    
    # 4. Routing logic
    # Case: 10 digits
    if len(digits) == 10:
        return "+91-" + digits
    
    # Case: 11 digits starting with 0
    elif len(digits) == 11 and digits.startswith("0"):
        return "+91-" + digits[1:]
    
    # Case: 12 digits starting with 91
    elif len(digits) == 12 and digits.startswith("91"):
        return "+91-" + digits[2:]
    
    # Case: 13 digits starting with 0091 or +91
    elif len(digits) == 13 and digits.startswith("0091"):
        return "+91-" + digits[3:]

    return None

# Use this to ensure Pandas doesn't mess with the types during the apply
customers["phone"] = customers["phone"].apply(fix_phone)
# Replace '0' with the index of a row that is returning Null
#  Convert registration_date to MySQL-compatible DATE format
# - Invalid dates are coerced to NULL
# - Final format: YYYY-MM-DD

def parse_mixed_date(date):
    try:
        return pd.to_datetime(date, dayfirst=True)
    except:
        try:
            return pd.to_datetime(date, format="%m-%d-%Y")
        except:
            return None

customers["registration_date"] = customers["registration_date"] .apply(parse_mixed_date)
customers["registration_date"]  = customers["registration_date"] .dt.strftime("%Y-%m-%d")



#  Remove records missing mandatory customer fields
# - Ensures data integrity
# - Required fields for cust8omers table
customers = customers.dropna(
    subset=["first_name", "last_name", "email"]
).reset_index(drop=True)

#-----------------------------------
# STEP 6: CLEAN PRODUCT DATA
# -----------------------------------

#  Standardize product category values
# - Removes leading/trailing spaces
# - Capitalizes for consistency (e.g., electronics → Electronics)
products["category"] = products["category"].str.strip().str.capitalize()

#   Remove invalid product records
#   Product name, category, and price are mandatory
products.dropna(
    subset=["product_name", "category", "price"],
    inplace=True
)

#   Ensure stock_quantity is numeric
# - Missing values default to 0
# - Converted to integer for database compatibility
products["stock_quantity"] = products["stock_quantity"].fillna(0).astype(int)

# -----------------------------------
# STEP 7: CLEAN SALES DATA
# -----------------------------------

#   Convert transaction_date to order_date
# - Handles invalid date values safely
# - Converts to MySQL DATE format
def parse_mixed_date(date):
    try:
        return pd.to_datetime(date, dayfirst=True)
    except:
        try:
            return pd.to_datetime(date, format="%m-%d-%Y")
        except:
            return None

sales["order_date"] = sales["order_date"].apply(parse_mixed_date)
sales["order_date"] = sales["order_date"].dt.strftime("%Y-%m-%d")


#  Remove invalid sales records
# - customer_id & product_id required for foreign key mapping
# - quantity & unit_price required for calculations
sales.dropna(
    subset=["customer_id", "product_id", "quantity", "unit_price"],
    inplace=True
)



cursor.execute("SET FOREIGN_KEY_CHECKS = 0;")
cursor.execute("TRUNCATE TABLE products;")
cursor.execute("TRUNCATE TABLE orders;")
cursor.execute("TRUNCATE TABLE order_items;")
cursor.execute("SET FOREIGN_KEY_CHECKS = 1;")



# -----------------------------------
# STEP 8: BATCH INSERT CUSTOMERS
# -----------------------------------
# Inserts cleaned customer records into the customers table
# customer_id is NOT included because it is AUTO-INCREMENTED
customer_insert_query = """
INSERT IGNORE INTO customers
(first_name, last_name, email, phone, city, registration_date)
VALUES (%s, %s, %s, %s, %s, %s)
"""
# 2. Convert the dataframe to a list of tuples (better for SQL drivers than lists of lists)
customer_data = [
    (
        row['first_name'], 
        row['last_name'], 
        row['email'], 
         row['phone'], # Double-check it's a string
        row['city'], 
        row['registration_date']
    ) 
    for _, row in customers.iterrows()
]

# 3. Execute
cursor.executemany(customer_insert_query, customer_data)
db.commit()
# -----------------------------------
# STEP 9: BATCH INSERT PRODUCTS
# -----------------------------------
# Inserts cleaned product records into the products table
# product_id is AUTO-INCREMENTED by MySQL

product_insert_query = """
INSERT IGNORE INTO products
(product_name, category, price, stock_quantity)
VALUES (%s, %s, %s, %s)
"""

# Convert product DataFrame rows into list of values
product_data = products[
    ["product_name", "category", "price", "stock_quantity"]
].values.tolist()

cursor.executemany(product_insert_query, product_data)
db.commit()
# -----------------------------------


# -----------------------------------
# STEP 10: BUILD SURROGATE KEY MAPS (CRITICAL FIX)
# -----------------------------------

# CSV order preserved → DB auto-increment order preserved

cursor.execute("SELECT email, customer_id FROM customers")
db_customer_map = dict(cursor.fetchall())
customers["customer_id"] = customers["email"].map(db_customer_map)
if customers["customer_id"].isna().any():
    raise ValueError("Customer mapping failed — missing keys")


cursor.execute("SELECT product_id FROM products ORDER BY product_id")
db_products = [r[0] for r in cursor.fetchall()]
product_map = dict(zip(products["product_id"], db_products))

# -----------------------------------
# STEP 11: INSERT ORDERS & ORDER_ITEMS
# -----------------------------------

for _, row in sales.iterrows():

    customer_id = customer_map.get(row["customer_id"])
    product_id  = product_map.get(row["product_id"])

    if customer_id is None or product_id is None:
        print("❌ FK FAILED:", row["customer_id"], row["product_id"])
        continue

    quantity   = int(row["quantity"])
    unit_price = Decimal(row["unit_price"])
    subtotal   = quantity * unit_price

    cursor.execute(
        """
        INSERT INTO orders (customer_id, order_date, total_amount)
        VALUES (%s, %s, %s)
        """,
        (customer_id, row["order_date"], subtotal)
    )

    order_id = cursor.lastrowid

    cursor.execute(
        """
        INSERT INTO order_items
        (order_id, product_id, quantity, unit_price, subtotal)
        VALUES (%s, %s, %s, %s, %s)
        """,
        (order_id, product_id, quantity, unit_price, subtotal)
    )

db.commit()

# -----------------------------------
# STEP 12: DATA QUALITY REPORT
# -----------------------------------

with open("data_quality_report.txt", "w") as f:
    f.write("DATA QUALITY REPORT\n\n")
    f.write(f"Customers processed: {customers_count}\n")
    f.write(f"Products processed : {products_count}\n")
    f.write(f"Sales processed    : {sales_count}\n\n")

    f.write("Duplicates Removed\n")
    f.write(f"Customers: {customers_duplicates}\n")
    f.write(f"Products : {products_duplicates}\n")
    f.write(f"Sales    : {sales_duplicates}\n\n")

    f.write("Missing Values (Before Cleaning)\n")
    f.write(str(customers_missing) + "\n\n")
    f.write(str(products_missing) + "\n\n")
    f.write(str(sales_missing) + "\n")

# -----------------------------------
# STEP 13: CLOSE CONNECTION
# -----------------------------------

cursor.close()
db.close()

print("✅ ETL PIPELINE COMPLETED SUCCESSFULLY")

  return pd.to_datetime(date, dayfirst=True)
  return pd.to_datetime(date, dayfirst=True)
  return pd.to_datetime(date, dayfirst=True)


❌ FK FAILED: C003 P007
❌ FK FAILED: C007 P005
❌ FK FAILED: C012 P003
❌ FK FAILED: C018 P020
❌ FK FAILED: C020 P010
❌ FK FAILED: C021 P017
❌ FK FAILED: C003 P019
❌ FK FAILED: C007 P002
❌ FK FAILED: C012 P016
❌ FK FAILED: C018 P020
✅ ETL PIPELINE COMPLETED SUCCESSFULLY
