In [5]:
import pandas as pd

In [76]:
customers = pd.read_csv("../data/raw/customers/customers_raw.csv")
products = pd.read_csv("../data/raw/products/products_raw.csv")
transactions = pd.read_csv("../data/raw/transactions/transactions_2023_01_01.csv")

In [78]:
customers.head()

Unnamed: 0,customer_id,full_name,email,country,signup_date
0,C001,Rahul Sharma,rahul@gmail.com,India,15-01-2022
1,C002,Anita Verma,anita@gmail.com,India,10-03-2022
2,C003,John Smith,john.smith@email.com,USA,05-11-2021
3,C004,Meera Nair,,India,21-07-2022
4,C005,David Lee,david.lee@email.com,UK,15-08-2022


In [80]:
products.head()

Unnamed: 0,product_id,product_name,category,base_price
0,P001,Laptop,Electronics,55000
1,P002,Headphones,Electronics,2000
2,P003,Office Chair,Furniture,7500
3,P004,Water Bottle,Kitchen,500
4,P005,Smartphone,Electronics,30000


In [82]:
transactions.head()

Unnamed: 0,transaction_id,customer_id,product_id,quantity,unit_price,transaction_date
0,T001,C001,P001,1,55000,01-01-2023
1,T002,C002,P002,2,2000,01-01-2023
2,T003,C003,P003,1,7500,01-01-2023
3,T004,C009,P004,3,500,01-01-2023
4,T005,C001,P002,1,2000,01-01-2023


In [84]:
customers.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 11 entries, 0 to 10
Data columns (total 5 columns):
 #   Column       Non-Null Count  Dtype 
---  ------       --------------  ----- 
 0   customer_id  11 non-null     object
 1   full_name    11 non-null     object
 2   email        10 non-null     object
 3   country      11 non-null     object
 4   signup_date  11 non-null     object
dtypes: object(5)
memory usage: 572.0+ bytes


In [86]:
products.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8 entries, 0 to 7
Data columns (total 4 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   product_id    8 non-null      object
 1   product_name  8 non-null      object
 2   category      8 non-null      object
 3   base_price    8 non-null      int64 
dtypes: int64(1), object(3)
memory usage: 388.0+ bytes


In [88]:
transactions.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10 entries, 0 to 9
Data columns (total 6 columns):
 #   Column            Non-Null Count  Dtype 
---  ------            --------------  ----- 
 0   transaction_id    10 non-null     object
 1   customer_id       10 non-null     object
 2   product_id        10 non-null     object
 3   quantity          10 non-null     int64 
 4   unit_price        10 non-null     int64 
 5   transaction_date  10 non-null     object
dtypes: int64(2), object(4)
memory usage: 612.0+ bytes


In [90]:
customers_clean = customers.copy()

In [115]:
#First: Confirm duplicates exist (never assume)

In [92]:
customers_clean.duplicated().sum()

1

In [None]:
# Apply deduplication

In [94]:
# Sort by signup_date so earliest comes first
customers_clean = customers_clean.sort_values(by="signup_date")

# Drop duplicates based on customer_id
customers_clean = customers_clean.drop_duplicates(
    subset="customer_id",
    keep="first"
)

# Reset index
customers_clean = customers_clean.reset_index(drop=True)


In [None]:
# Verify result

In [96]:
customers_clean.duplicated().sum()

0

In [98]:
customers_clean.shape

(10, 5)

In [117]:
# First: Inspect current country values

In [100]:
customers_clean['country'].value_counts()

country
India     6
USA       2
UK        1
Canada    1
Name: count, dtype: int64

In [119]:
# Apply standardization

In [105]:
# Remove leading/trailing spaces and standardize case
customers_clean['country'] = customers_clean['country'].str.strip()

# Map known country variations
country_mapping = {
    'India': 'India',
    'USA': 'USA',
    'US': 'USA',
    'United States': 'USA',
    'UK': 'UK',
    'United Kingdom': 'UK',
    'Canada': 'Canada'
}

customers_clean['country'] = customers_clean['country'].map(country_mapping)


In [121]:
# Verify result

In [107]:
customers_clean['country'].value_counts()


country
India     6
USA       2
UK        1
Canada    1
Name: count, dtype: int64

In [123]:
# Convert dates safely

In [109]:
customers_clean['signup_date'] = pd.to_datetime(
    customers_clean['signup_date'],
    dayfirst=True,
    errors='coerce'
)

In [125]:
# Verify conversion

In [111]:
customers_clean.info()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10 entries, 0 to 9
Data columns (total 5 columns):
 #   Column       Non-Null Count  Dtype         
---  ------       --------------  -----         
 0   customer_id  10 non-null     object        
 1   full_name    10 non-null     object        
 2   email        9 non-null      object        
 3   country      10 non-null     object        
 4   signup_date  10 non-null     datetime64[ns]
dtypes: datetime64[ns](1), object(4)
memory usage: 532.0+ bytes


In [113]:
customers_clean[['signup_date']].head()

Unnamed: 0,signup_date
0,2023-03-01
1,2023-01-05
2,2021-11-05
3,2022-03-10
4,2022-10-12


In [129]:
# Check for failed conversions (important)

In [127]:
customers_clean[customers_clean['signup_date'].isna()]

Unnamed: 0,customer_id,full_name,email,country,signup_date


In [131]:
# Handle Missing Emails

In [133]:
customers_clean['email'] = customers_clean['email'].fillna('unknown')

In [137]:
# Verify result

In [135]:
customers_clean.isnull().sum()


customer_id    0
full_name      0
email          0
country        0
signup_date    0
dtype: int64

In [139]:
# Finalize Customer Dimension (Warehouse-Ready)
# Prepare Dim_Customer table
# Ensure clean, consistent schema
# Make it ready to load into a data warehouse

# Select & rename columns

In [141]:
dim_customer = customers_clean[[
    'customer_id',
    'full_name',
    'email',
    'country',
    'signup_date'
]].copy()

In [143]:
# Ensure column names are lowercase:

In [145]:
dim_customer.columns = dim_customer.columns.str.lower()

In [147]:
# Final check

In [149]:
dim_customer.head()

Unnamed: 0,customer_id,full_name,email,country,signup_date
0,C010,Arjun Reddy,arjun.r@gmail.com,India,2023-03-01
1,C008,Rajesh Kumar,rajesh.k@gmail.com,India,2023-01-05
2,C003,John Smith,john.smith@email.com,USA,2021-11-05
3,C002,Anita Verma,anita@gmail.com,India,2022-03-10
4,C007,Sarah Brown,sarah.b@email.com,USA,2022-10-12


In [151]:
dim_customer.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10 entries, 0 to 9
Data columns (total 5 columns):
 #   Column       Non-Null Count  Dtype         
---  ------       --------------  -----         
 0   customer_id  10 non-null     object        
 1   full_name    10 non-null     object        
 2   email        10 non-null     object        
 3   country      10 non-null     object        
 4   signup_date  10 non-null     datetime64[ns]
dtypes: datetime64[ns](1), object(4)
memory usage: 532.0+ bytes


In [153]:
# Product Data Cleaning & Product Dimension Creation
# Clean product master data
# Remove duplicates
# Validate prices
# Create Dim_Product (warehouse-ready)

In [155]:
# Create a working copy

In [157]:
products_clean = products.copy()

In [159]:
# Inspect product data
products_clean.head()

Unnamed: 0,product_id,product_name,category,base_price
0,P001,Laptop,Electronics,55000
1,P002,Headphones,Electronics,2000
2,P003,Office Chair,Furniture,7500
3,P004,Water Bottle,Kitchen,500
4,P005,Smartphone,Electronics,30000


In [161]:
products_clean.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8 entries, 0 to 7
Data columns (total 4 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   product_id    8 non-null      object
 1   product_name  8 non-null      object
 2   category      8 non-null      object
 3   base_price    8 non-null      int64 
dtypes: int64(1), object(3)
memory usage: 388.0+ bytes


In [165]:
products_clean.duplicated().sum()

1

In [167]:
# Deduplicate products

products_clean = products_clean.drop_duplicates(
    subset='product_id',
    keep='first'
).reset_index(drop=True)

In [169]:
products_clean.duplicated().sum()

0

In [171]:
# Convert price to numeric

products_clean['base_price'] = pd.to_numeric(
    products_clean['base_price'],
    errors='coerce'
)

In [173]:
# Check
products_clean.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 7 entries, 0 to 6
Data columns (total 4 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   product_id    7 non-null      object
 1   product_name  7 non-null      object
 2   category      7 non-null      object
 3   base_price    7 non-null      int64 
dtypes: int64(1), object(3)
memory usage: 356.0+ bytes


In [175]:
# Validate prices (Data Quality)

products_clean[products_clean['base_price'] <= 0]

Unnamed: 0,product_id,product_name,category,base_price


In [177]:
# Finalize Product Dimension

In [181]:
dim_product = products_clean[[
    'product_id',
    'product_name',
    'category',
    'base_price'
]].copy()

dim_product.columns = dim_product.columns.str.lower()


In [183]:
# Final check

dim_product.head()

Unnamed: 0,product_id,product_name,category,base_price
0,P001,Laptop,Electronics,55000
1,P002,Headphones,Electronics,2000
2,P003,Office Chair,Furniture,7500
3,P004,Water Bottle,Kitchen,500
4,P005,Smartphone,Electronics,30000


In [185]:
dim_product.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 7 entries, 0 to 6
Data columns (total 4 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   product_id    7 non-null      object
 1   product_name  7 non-null      object
 2   category      7 non-null      object
 3   base_price    7 non-null      int64 
dtypes: int64(1), object(3)
memory usage: 356.0+ bytes


In [187]:
# Transaction Data Cleaning & Fact Table Preparation
# By the end of this step, you will have a Fact_Transactions table that:
# Contains only valid transactions
# Links correctly to Dim_Customer and Dim_Product
# Has calculated metrics needed for analytics

In [191]:
# Create a working copy
transactions_clean = transactions.copy()

In [193]:
# Inspect transaction data
transactions_clean.head()

Unnamed: 0,transaction_id,customer_id,product_id,quantity,unit_price,transaction_date
0,T001,C001,P001,1,55000,01-01-2023
1,T002,C002,P002,2,2000,01-01-2023
2,T003,C003,P003,1,7500,01-01-2023
3,T004,C009,P004,3,500,01-01-2023
4,T005,C001,P002,1,2000,01-01-2023


In [195]:
transactions_clean.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10 entries, 0 to 9
Data columns (total 6 columns):
 #   Column            Non-Null Count  Dtype 
---  ------            --------------  ----- 
 0   transaction_id    10 non-null     object
 1   customer_id       10 non-null     object
 2   product_id        10 non-null     object
 3   quantity          10 non-null     int64 
 4   unit_price        10 non-null     int64 
 5   transaction_date  10 non-null     object
dtypes: int64(2), object(4)
memory usage: 612.0+ bytes


In [197]:
# Convert numeric columns

transactions_clean['quantity'] = pd.to_numeric(
    transactions_clean['quantity'], errors='coerce'
)

transactions_clean['unit_price'] = pd.to_numeric(
    transactions_clean['unit_price'], errors='coerce'
)

In [199]:
# verify

transactions_clean.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10 entries, 0 to 9
Data columns (total 6 columns):
 #   Column            Non-Null Count  Dtype 
---  ------            --------------  ----- 
 0   transaction_id    10 non-null     object
 1   customer_id       10 non-null     object
 2   product_id        10 non-null     object
 3   quantity          10 non-null     int64 
 4   unit_price        10 non-null     int64 
 5   transaction_date  10 non-null     object
dtypes: int64(2), object(4)
memory usage: 612.0+ bytes


In [201]:
# Convert transaction date

transactions_clean['transaction_date'] = pd.to_datetime(
    transactions_clean['transaction_date'],
    dayfirst=True,
    errors='coerce'
)

In [203]:
# check
transactions_clean[['transaction_date']].head()

Unnamed: 0,transaction_date
0,2023-01-01
1,2023-01-01
2,2023-01-01
3,2023-01-01
4,2023-01-01


In [205]:
# Handle invalid customer references (VERY IMPORTANT)
transactions_clean = transactions_clean[
    transactions_clean['customer_id'].isin(dim_customer['customer_id'])
]

In [207]:
# Verify

transactions_clean.shape

(9, 6)

In [209]:
# Handle invalid product references (safety check)

transactions_clean = transactions_clean[
    transactions_clean['product_id'].isin(dim_product['product_id'])
]

In [211]:
# Create transaction amount (KEY METRIC)

In [213]:
transactions_clean['total_amount'] = (
    transactions_clean['quantity'] * transactions_clean['unit_price']
)

In [215]:
# Finalize Fact_Transactions table

In [217]:
fact_transactions = transactions_clean[[
    'transaction_id',
    'customer_id',
    'product_id',
    'transaction_date',
    'quantity',
    'unit_price',
    'total_amount'
]].copy()

fact_transactions.columns = fact_transactions.columns.str.lower()

In [219]:
# Final Verification

fact_transactions.head()

Unnamed: 0,transaction_id,customer_id,product_id,transaction_date,quantity,unit_price,total_amount
0,T001,C001,P001,2023-01-01,1,55000,55000
1,T002,C002,P002,2023-01-01,2,2000,4000
2,T003,C003,P003,2023-01-01,1,7500,7500
3,T004,C009,P004,2023-01-01,3,500,1500
4,T005,C001,P002,2023-01-01,1,2000,2000


In [221]:
fact_transactions.info()

<class 'pandas.core.frame.DataFrame'>
Index: 9 entries, 0 to 8
Data columns (total 7 columns):
 #   Column            Non-Null Count  Dtype         
---  ------            --------------  -----         
 0   transaction_id    9 non-null      object        
 1   customer_id       9 non-null      object        
 2   product_id        9 non-null      object        
 3   transaction_date  9 non-null      datetime64[ns]
 4   quantity          9 non-null      int64         
 5   unit_price        9 non-null      int64         
 6   total_amount      9 non-null      int64         
dtypes: datetime64[ns](1), int64(3), object(3)
memory usage: 576.0+ bytes


In [223]:
# Load Data from Python into MySQL

# Using SQLAlchemy + PyMySQL:

In [229]:
from sqlalchemy import create_engine

engine = create_engine(
    "mysql+pymysql://root:root123@localhost:3306/data_warehouse"
)

dim_customer.to_sql("dim_customer", engine, if_exists="append", index=False)
dim_product.to_sql("dim_product", engine, if_exists="append", index=False)
fact_transactions.to_sql("fact_transactions", engine, if_exists="append", index=False)


9

### REALISTIC DATA SCALING (This makes it stand out)

##### Now we will:

Scale transaction data to 50K–100K rows

Simulate multiple days/months

Reload into MySQL

Show scalability & performance thinking

Scale 9 transactions → ~50,000+ transactions

In [234]:
import pandas as pd
import numpy as np

In [236]:
base_transactions = fact_transactions.copy()

In [254]:
# Define scaling parameters

NUM_DAYS = 6000   # ~16 years of daily transactions
TARGET_ROWS = 50000    # realistic scale

In [256]:
# Generate scaled transactions
# Core idea:
# Repeat transactions
# Shift dates forward
# Modify quantity slightly
# Generate new transaction IDs

In [258]:
base_transactions

Unnamed: 0,transaction_id,customer_id,product_id,transaction_date,quantity,unit_price,total_amount
0,T001,C001,P001,2023-01-01,1,55000,55000
1,T002,C002,P002,2023-01-01,2,2000,4000
2,T003,C003,P003,2023-01-01,1,7500,7500
3,T004,C009,P004,2023-01-01,3,500,1500
4,T005,C001,P002,2023-01-01,1,2000,2000
5,T006,C010,P005,2023-01-01,1,30000,30000
6,T007,C007,P006,2023-01-01,2,1200,2400
7,T008,C008,P007,2023-01-01,10,150,1500
8,T009,C004,P003,2023-01-01,1,7500,7500


In [260]:
scaled_transactions = []

current_id = 1000

for day in range(NUM_DAYS):
    daily = base_transactions.copy()

    # Shift date
    daily['transaction_date'] = daily['transaction_date'] + pd.Timedelta(days=day)

    # Slightly vary quantity (realistic behavior)
    daily['quantity'] = daily['quantity'].apply(
        lambda x: max(1, x + np.random.randint(-1, 2))
    )

    # Recalculate total_amount
    daily['total_amount'] = daily['quantity'] * daily['unit_price']

    # Generate new transaction IDs
    daily['transaction_id'] = [
        f"T{current_id + i}" for i in range(len(daily))
    ]
    current_id += len(daily)

    scaled_transactions.append(daily)

scaled_fact = pd.concat(scaled_transactions, ignore_index=True)


In [261]:
scaled_fact.shape

(54000, 7)

In [264]:
# Control final size

scaled_fact = scaled_fact.sample(
    n=TARGET_ROWS, random_state=42
).reset_index(drop=True)

scaled_fact.shape

(50000, 7)

In [266]:
scaled_fact.head()

Unnamed: 0,transaction_id,customer_id,product_id,transaction_date,quantity,unit_price,total_amount
0,T31246,C007,P006,2032-03-14,1,1200,1200
1,T35164,C001,P001,2033-05-24,1,55000,55000
2,T41692,C009,P004,2035-05-19,3,500,1500
3,T29724,C010,P005,2031-09-27,1,30000,30000
4,T29097,C004,P003,2031-07-19,1,7500,7500


In [270]:
# here trasaction date goes upto 2032 to 2035, for leanring and scalig it is acceptable,
# but for real data -we should fix it
# Anchor dates to a realistic range

In [272]:
from datetime import datetime

start_date = pd.to_datetime("2023-01-01")

scaled_fact['transaction_date'] = (
    start_date +
    pd.to_timedelta(
        np.random.randint(0, 730, size=len(scaled_fact)),
        unit='D'
    )
)

In [274]:
scaled_fact.head()

Unnamed: 0,transaction_id,customer_id,product_id,transaction_date,quantity,unit_price,total_amount
0,T31246,C007,P006,2023-06-12,1,1200,1200
1,T35164,C001,P001,2024-01-11,1,55000,55000
2,T41692,C009,P004,2023-02-04,3,500,1500
3,T29724,C010,P005,2024-01-19,1,30000,30000
4,T29097,C004,P003,2023-03-05,1,7500,7500


In [276]:
scaled_fact.shape

(50000, 7)

In [278]:
# Reload scaled data from Python (THIS IS THE MAIN STEP)

In [280]:
scaled_fact.to_sql(
    "fact_transactions",
    engine,
    if_exists="append",
    index=False,
    chunksize=5000
)

50000

In [284]:
## Save Processed Data as CSV Files

In [282]:
import os

os.makedirs("../data/processed", exist_ok=True)

In [286]:
dim_customer.to_csv(
    "../data/processed/dim_customer.csv",
    index=False
)

In [288]:
dim_product.to_csv(
    "../data/processed/dim_product.csv",
    index=False
)


In [290]:
scaled_fact.to_csv(
    "../data/processed/fact_transactions.csv",
    index=False
)


In [292]:
import pandas as pd

pd.read_csv("../data/processed/dim_customer.csv").head()
pd.read_csv("../data/processed/dim_product.csv").head()
pd.read_csv("../data/processed/fact_transactions.csv").shape

(50000, 7)