Description: 

# ----------------------------------
# Import required Libraries
# ----------------------------------

In [152]:
import pandas as pd
import mysql.connector
import os
from datetime import datetime
from dotenv import load_dotenv

load_dotenv()

True

# ----------------------------------
# Load Environment Variables
# ----------------------------------

In [153]:
DB_CONFIG = {
    "host": os.getenv("DB_HOST"),
    "user": os.getenv("DB_USER"),
    "password": os.getenv("DB_PASSWORD"),
    "database": os.getenv("DB_NAME"),
    "port": os.getenv("DB_PORT")
}

# ----------------------------------
# Database Connection
# ----------------------------------

In [168]:
conn = mysql.connector.connect(**DB_CONFIG)
cursor = conn.cursor()

# ----------------------------------
# Extraction
# ----------------------------------

In [155]:
customers = pd.read_csv("../customers_raw.csv")
products = pd.read_csv("../products_raw.csv")
sales = pd.read_csv("../sales_raw.csv")

# ----------------------------------
# Utility Functions
# ----------------------------------

In [156]:
# General function to check and handle missing values in dataframes
def handle_missing_values(df):
    missing_col=[]
    numeric_cols = df.select_dtypes(include=["int64", "float64"]).columns.tolist()
    missing_summary = df.isnull().sum()
    for col, cnt in missing_summary.items():
        if cnt > 0:
            missing_col.append(f"col {col}: {cnt} missing values")
            if col in numeric_cols:
                df[col].fillna(df[col].mean(), inplace=True)
            else:
                df.dropna(subset=[col], inplace=True)

    return df, missing_col

def parse_date(date_str):
    for fmt in ("%Y-%m-%d", "%d/%m/%Y", "%m-%d-%Y", "%m/%d/%Y"):
        try:
            return datetime.strptime(str(date_str), fmt).date()
        except ValueError:
            continue
    return None

def standardize_phone(phone):
    if pd.isna(phone):
        return None
    digits = ''.join(filter(str.isdigit, str(phone)))
    return "+91-" + digits[-10:]

In [157]:
# Transform: Customers
customers.drop_duplicates(inplace=True)
customers, missing_cust_cols = handle_missing_values(customers)
customers['registration_date'] = customers['registration_date'].apply(parse_date)
customers['phone'] = customers['phone'].apply(standardize_phone)

In [158]:
# Transform: Products
products, missing_prod_cols = handle_missing_values(products)
products["category"] = products["category"].str.strip().str.capitalize()
products["stock_quantity"].fillna(0, inplace=True)
products["product_name"] = products["product_name"].str.strip()

In [160]:
# Transform: Sales
sales.drop_duplicates(inplace=True)
sales, missing_sales_cols = handle_missing_values(sales)
sales['transaction_date'] = sales['transaction_date'].apply(parse_date)
sales["subtotal"] = sales["quantity"] * sales["unit_price"]

In [None]:
# Load: Customers

customers_cols_names = ",".join(customers.columns)
customers_values = ",".join(["%s"] * len(customers.columns))
customer_sql = f"INSERT IGNORE INTO fleximart.customers ({customers_cols_names}) VALUES ({customers_values})"
print (customer_sql)
cursor.executemany(customer_sql, customers.values.tolist())
conn.commit()

INSERT IGNORE INTO fleximart.customers (customer_id,first_name,last_name,email,phone,city,registration_date) VALUES (%s,%s,%s,%s,%s,%s,%s)


In [163]:
# Load: Products

products_cols_names = ",".join(products.columns)
products_values = ",".join(["%s"] * len(products.columns))
product_sql = f"INSERT IGNORE INTO fleximart.products ({products_cols_names}) VALUES ({products_values})"
cursor.executemany(product_sql, products.values.tolist())
conn.commit()

In [164]:
# Load: Orders and Order Items
order_sql = """
INSERT IGNORE INTO orders
(order_id, customer_id, order_date, total_amount, status)
VALUES (%s,%s,%s,%s,%s)
"""

for _, row in sales.iterrows():
    cursor.execute(order_sql, (
        row["transaction_id"],
        row["customer_id"],
        row["transaction_date"],
        row["subtotal"],
        row["status"]
    ))

conn.commit()


In [169]:
# Load: Order Items
order_item_sql = """
INSERT IGNORE INTO order_items
(order_id, product_id, quantity, unit_price, subtotal)
VALUES (%s,%s,%s,%s,%s)
"""

for _, row in sales.iterrows():
    cursor.execute(order_item_sql, (
        row["transaction_id"],
        row["product_id"],
        row["quantity"],
        row["unit_price"],
        row["subtotal"]
    ))

conn.commit()

In [170]:
cursor.close()
conn.close()