In [17]:
import pandas as pd
import numpy as np


In [3]:
customers = pd.read_csv("raw_data/Customers_raw.csv")
sales = pd.read_csv('raw_data/Sales_raw.csv')
products = pd.read_csv('raw_data/Products_raw.csv')

Customers

In [6]:
# Remove duplicates
customers = customers.drop_duplicates(subset="customer_id")

# Clean names & city
customers['city'] = customers['city'].str.strip().str.title()

# Standardize phone
def format_phone(phone):
    digits = ''.join(filter(str.isdigit, str(phone)))
    if len(digits) >= 10:
        return "+91-" + digits[-10:]
    return np.nan

customers['phone'] = customers['phone'].apply(format_phone)

# Standardize dates
customers['registration_date'] = pd.to_datetime(
    customers['registration_date'], errors='coerce', dayfirst=True
).dt.date

# Add surrogate key
customers.insert(0, "customer_sk", range(1, len(customers) + 1))


In [7]:
customers.head()

Unnamed: 0,customer_sk,customer_id,first_name,last_name,email,phone,city,registration_date
0,1,C001,Rahul,Sharma,rahul.sharma@gmail.com,+91-8765432100,Bangalore,2023-01-15
1,2,C002,Priya,Patel,priya.patel@yahoo.com,+91-9887765640,Mumbai,2023-02-20
2,3,C003,Amit,Kumar,,+91-7654321090,Delhi,2023-03-10
3,4,C004,Sneha,Reddy,sneha.reddy@gmail.com,+91-1234567890,Hyderabad,2023-04-15
4,5,C005,Vikram,Singh,vikram.singh@outlook.com,+91-9881122330,Chennai,2023-05-22


Product_Transform

In [8]:
# Trim text
products['product_name'] = products['product_name'].str.strip()

# Standardize category
products['category'] = (
    products['category']
    .str.strip()
    .str.lower()
    .str.capitalize()
)

# Fill missing stock with 0
products['stock_quantity'] = products['stock_quantity'].fillna(0)

# Fill missing price with median by category
products['price'] = products.groupby('category')['price'] \
                             .transform(lambda x: x.fillna(x.median()))

# Add surrogate key
products.insert(0, "product_sk", range(1, len(products) + 1))


In [9]:
products.head()

Unnamed: 0,product_sk,product_id,product_name,category,price,stock_quantity
0,1,P001,Samsung Galaxy S21,Electronics,45999.0,150.0
1,2,P002,Nike Running Shoes,Fashion,3499.0,80.0
2,3,P003,Apple MacBook Pro,Electronics,32999.0,45.0
3,4,P004,Levi's Jeans,Fashion,2999.0,120.0
4,5,P005,Sony Headphones,Electronics,1999.0,200.0


Sales_Data_Transform

In [10]:
# Remove duplicates
sales = sales.drop_duplicates(subset="transaction_id")

# Drop rows with missing FK
sales = sales.dropna(subset=["customer_id", "product_id"])

# Standardize dates
sales['transaction_date'] = pd.to_datetime(
    sales['transaction_date'], errors='coerce', dayfirst=True
).dt.date

# Add surrogate key
sales.insert(0, "sales_sk", range(1, len(sales) + 1))


In [11]:
sales.head()

Unnamed: 0,sales_sk,transaction_id,customer_id,product_id,quantity,unit_price,transaction_date,status
0,1,T001,C001,P001,1,45999,2024-01-15,Completed
1,2,T002,C002,P004,2,2999,2024-01-16,Completed
2,3,T003,C003,P007,1,52999,2024-01-15,Completed
4,4,T005,C005,P009,3,650,2024-01-20,Completed
5,5,T006,C006,P012,1,12999,NaT,Completed


Insert Data into mysql

In [24]:
import os
import mysql.connector
from dotenv import load_dotenv
load_dotenv()


True

In [33]:
def upload_data_db(df: pd.DataFrame, table_name: str):

    conn = mysql.connector.connect(
        host=os.getenv("DB_HOST"),
        user=os.getenv("DB_USER"),
        password=os.getenv("DB_PASSWORD"),
        database=os.getenv("DB_NAME")
    )

    cursor = conn.cursor()

    cols_names = ",".join(df.columns)
    placeholders = ",".join(["%s"] * len(df.columns))

    sql = f"INSERT INTO {table_name} ({cols_names}) VALUES ({placeholders})"


    cursor.executemany(sql, df.values.tolist())
    conn.commit()

    cursor.close()
    conn.close()


In [32]:
print(os.getenv("DB_USER"))


root


Data Quality


In [3]:
import os
from dotenv import load_dotenv
import mysql.connector
from mysql.connector import Error

load_dotenv()

DB_HOST = os.getenv("DB_HOST")
DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")
DB_NAME = os.getenv("DB_NAME")

try:
    # Connect without specifying database
    conn = mysql.connector.connect(
        host=DB_HOST,
        user=DB_USER,
        password=DB_PASSWORD
    )
    cursor = conn.cursor()

    # Create database if not exists
    cursor.execute(f"CREATE DATABASE IF NOT EXISTS {DB_NAME}")
    print(f"✅ Database '{DB_NAME}' is ready!")

    cursor.close()
    conn.close()

except Error as e:
    print("❌ Error:", e)


✅ Database 'fleximart' is ready!


In [5]:
# etl_pipeline.py

import os
import pandas as pd
import mysql.connector
from dotenv import load_dotenv
from mysql.connector import Error

# ---------------------------------------
# LOAD ENV VARIABLES
# ---------------------------------------
load_dotenv()  # Make sure .env is in the same folder as this script

DB_HOST = os.getenv("DB_HOST")
DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")
DB_NAME = os.getenv("DB_NAME")

# ---------------------------------------
# TEST MYSQL CONNECTION
# ---------------------------------------
try:
    conn = mysql.connector.connect(
        host=DB_HOST,
        user=DB_USER,
        password=DB_PASSWORD,
        database=DB_NAME
    )
    if conn.is_connected():
        print(f"Connected to MySQL database '{DB_NAME}' as user '{DB_USER}'")
except Error as e:
    print("Error connecting to MySQL:", e)
    exit(1)  # Stop execution if connection fails

cursor = conn.cursor()

# ---------------------------------------
# DATA QUALITY TRACKER
# ---------------------------------------
dq = {
    "customers": {"processed": 0, "duplicates": 0, "missing": 0, "loaded": 0},
    "products": {"processed": 0, "duplicates": 0, "missing": 0, "loaded": 0},
    "sales": {"processed": 0, "duplicates": 0, "missing": 0, "loaded": 0},
}

# ---------------------------------------
# EXTRACT
# ---------------------------------------
customers = pd.read_csv("raw_data/Customers_raw.csv")
products = pd.read_csv("raw_data/Products_raw.csv")
sales = pd.read_csv("raw_data/Sales_raw.csv")

dq["customers"]["processed"] = len(customers)
dq["products"]["processed"] = len(products)
dq["sales"]["processed"] = len(sales)

# ---------------------------------------
# TRANSFORM — CUSTOMERS
# ---------------------------------------
before = len(customers)
customers.drop_duplicates(subset="customer_id", inplace=True)
dq["customers"]["duplicates"] = before - len(customers)

missing_email = customers["email"].isna().sum()
customers.dropna(subset=["email"], inplace=True)
dq["customers"]["missing"] += missing_email

def format_phone(phone):
    digits = ''.join(filter(str.isdigit, str(phone)))
    return "+91-" + digits[-10:] if len(digits) >= 10 else None

customers["phone"] = customers["phone"].apply(format_phone)
customers["city"] = customers["city"].str.strip().str.title()
customers["registration_date"] = pd.to_datetime(
    customers["registration_date"], errors="coerce", dayfirst=True
).dt.date

customers_final = customers[[
    "first_name", "last_name", "email", "phone", "city", "registration_date"
]]

# ---------------------------------------
# LOAD — CUSTOMERS
# ---------------------------------------
cust_sql = """
INSERT INTO customers
(first_name, last_name, email, phone, city, registration_date)
VALUES (%s, %s, %s, %s, %s, %s)
"""

try:
    cursor.executemany(cust_sql, customers_final.values.tolist())
    conn.commit()
    dq["customers"]["loaded"] = cursor.rowcount
    print(f"{dq['customers']['loaded']} customers loaded successfully.")
except Error as e:
    print("Error inserting customers:", e)
    conn.rollback()

# Fetch customer_id mapping
cursor.execute("SELECT customer_id, email FROM customers")
customer_map = dict(cursor.fetchall())

# ---------------------------------------
# TRANSFORM & LOAD — PRODUCTS
# ---------------------------------------
products["product_name"] = products["product_name"].str.strip()
products["category"] = products["category"].str.lower().str.capitalize()

missing_stock = products["stock_quantity"].isna().sum()
products["stock_quantity"].fillna(0, inplace=True)
dq["products"]["missing"] += missing_stock

missing_price = products["price"].isna().sum()
products["price"] = products.groupby("category")["price"].transform(lambda x: x.fillna(x.median()))
dq["products"]["missing"] += missing_price

products_final = products[[
    "product_name", "category", "price", "stock_quantity"
]]

prod_sql = """
INSERT INTO products
(product_name, category, price, stock_quantity)
VALUES (%s, %s, %s, %s)
"""

try:
    cursor.executemany(prod_sql, products_final.values.tolist())
    conn.commit()
    dq["products"]["loaded"] = cursor.rowcount
    print(f"{dq['products']['loaded']} products loaded successfully.")
except Error as e:
    print("Error inserting products:", e)
    conn.rollback()

cursor.execute("SELECT product_id, product_name FROM products")
product_map = dict(cursor.fetchall())

# ---------------------------------------
# TRANSFORM & LOAD — SALES (ORDERS & ORDER_ITEMS)
# ---------------------------------------
orders_loaded = 0

before = len(sales)
sales.drop_duplicates(subset="transaction_id", inplace=True)
dq["sales"]["duplicates"] = before - len(sales)

missing_fk = sales[sales["customer_id"].isna() | sales["product_id"].isna()].shape[0]
sales.dropna(subset=["customer_id", "product_id"], inplace=True)
dq["sales"]["missing"] += missing_fk

sales["transaction_date"] = pd.to_datetime(sales["transaction_date"], errors="coerce", dayfirst=True).dt.date

for _, row in sales.iterrows():
    email = customers.loc[customers["customer_id"] == row["customer_id"], "email"]
    if email.empty:
        continue
    cust_id = customer_map.get(email.values[0])
    if not cust_id:
        continue

    subtotal = row["quantity"] * row["unit_price"]

    # Insert order
    try:
        cursor.execute(
            "INSERT INTO orders (customer_id, order_date, total_amount, status) VALUES (%s,%s,%s,%s)",
            (cust_id, row["transaction_date"], subtotal, row["status"])
        )
        conn.commit()
        order_id = cursor.lastrowid
    except Error as e:
        print("Error inserting order:", e)
        conn.rollback()
        continue

    prod_id = product_map.get(row["product_id"])
    if not prod_id:
        continue

    # Insert order_item
    try:
        cursor.execute(
            "INSERT INTO order_items (order_id, product_id, quantity, unit_price, subtotal) VALUES (%s,%s,%s,%s,%s)",
            (order_id, prod_id, row["quantity"], row["unit_price"], subtotal)
        )
        conn.commit()
        orders_loaded += 1
    except Error as e:
        print("Error inserting order_item:", e)
        conn.rollback()

dq["sales"]["loaded"] = orders_loaded

# ---------------------------------------
# DATA QUALITY REPORT
# ---------------------------------------
with open("data_quality_report.txt", "w") as f:
    for table, stats in dq.items():
        f.write(f"Table: {table}\n")
        f.write(f"Records Processed: {stats['processed']}\n")
        f.write(f"Duplicates Removed: {stats['duplicates']}\n")
        f.write(f"Missing Values Handled: {stats['missing']}\n")
        f.write(f"Records Loaded: {stats['loaded']}\n\n")

cursor.close()
conn.close()
print("ETL Pipeline completed successfully.")


Connected to MySQL database 'fleximart' as user 'root'
20 customers loaded successfully.
20 products loaded successfully.
ETL Pipeline completed successfully.
