In [2]:
import pandas as pd
import numpy as np
import os
import glob
import mysql.connector  # or use sqlalchemy
from datetime import datetime

In [4]:
import logging

In [5]:
logging.basicConfig(filename='etl_log.log', level=logging.INFO, format='%(asctime)s %(message)s')

In [21]:
db_config = {
    'user': 'root',
    'password': 'Priya#321',
    'host': 'localhost',
    'port':'330',
    'database': 'retail_db'
}

In [22]:
DATA_FOLDER = 'data/'  # Your local folder path for CSVs

In [23]:
CSV_FILES = [
    'store_001_sales.csv',
    'store_002_sales.csv',
    'store_003_sales.csv',
    'store_004_sales.csv',
    'store_005_sales.csv'
]

In [24]:
def extract_data():
    file_paths = [os.path.join(DATA_FOLDER, f) for f in CSV_FILES]
    df_list = [pd.read_csv(path) for path in file_paths]
    df = pd.concat(df_list, ignore_index=True)
    logging.info(f"Extracted {len(df)} rows from {len(file_paths)} files.")
    return df

In [25]:
def transform_data(df):
    df.columns = df.columns.str.strip().str.lower()
    df['date'] = pd.to_datetime(df['date'], errors='coerce')
    df.dropna(subset=['store_id', 'date', 'product_id'], inplace=True)
    df['quantity_sold'] = df['quantity_sold'].fillna(0).astype(int)
    df['unit_price'] = df['unit_price'].fillna(0.0).astype(float)
    df['discount_percent'] = df['discount_percent'].fillna(0.0).astype(float)
    df['total_sale_value'] = df['quantity_sold'] * df['unit_price'] * (1 - df['discount_percent'] / 100)
    df.drop_duplicates(subset=['store_id', 'date', 'product_id'], inplace=True)
    df['sale_category'] = np.where(df['total_sale_value'] >= 1000, 'High',
                            np.where(df['total_sale_value'] >= 500, 'Medium', 'Low'))
    
    logging.info(f"Transformed data. Rows after cleaning: {len(df)}")
    return df  # ✅ return the transformed DataFrame

In [26]:
def create_mysql_table(cursor):
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS sales_data (
            store_id VARCHAR(10),
            date DATE,
            product_id VARCHAR(20),
            quantity_sold INT,
            unit_price FLOAT,
            discount_percent FLOAT,
            total_sale_value FLOAT,
            sale_category VARCHAR(10),
            PRIMARY KEY (store_id, date, product_id)
        )
    """)

In [27]:
def load_to_mysql(df):
    conn = mysql.connector.connect(**db_config)
    cursor = conn.cursor()
    create_mysql_table(cursor)  # This must run before inserting

    insert_query = """
        REPLACE INTO sales_data (
            store_id, date, product_id, quantity_sold,
            unit_price, discount_percent, total_sale_value, sale_category
        ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
    """

    for _, row in df.iterrows():
        cursor.execute(insert_query, (
            row['store_id'], row['date'], row['product_id'], row['quantity_sold'],
            row['unit_price'], row['discount_percent'], row['total_sale_value'], row['sale_category']
        ))

    conn.commit()
    logging.info(f"✅ Loaded {len(df)} rows into MySQL.")
    cursor.close()
    conn.close()

In [28]:
def analyze_and_export(df):
    df.groupby('store_id')['total_sale_value'].sum().reset_index().to_csv('store_sales_summary.csv', index=False)
    df.groupby('product_name')['total_sale_value'].sum().nlargest(5).reset_index().to_csv('top_5_products.csv', index=False)
    df.groupby(['store_id', 'date'])['total_sale_value'].sum().reset_index().to_excel('daily_sales_trend.xlsx', index=False)
    logging.info("Analysis reports generated and saved.")

In [29]:
def main():
    logging.info("ETL Process Started.")
    df = extract_data()
    df = transform_data(df)
    load_to_mysql(df)
    analyze_and_export(df)
    logging.info("ETL Process Completed.")

In [30]:
import os
import pandas as pd

# Create mock data directory
os.makedirs("data", exist_ok=True)

# Sample data
data = {
    'Store_ID': ['S001'],
    'Date': ['2025-07-18'],
    'Product_ID': ['P001'],
    'Product_Name': ['Shampoo'],
    'Quantity_Sold': [10],
    'Unit_Price': [100.0],
    'Discount_Percent': [10.0],
    'Payment_Mode': ['Card']
}

# Save sample CSV
df = pd.DataFrame(data)
df.to_csv('data/store_001_sales.csv', index=False)
print("Sample file created.")

import pandas as pd
import os
import random
from datetime import datetime, timedelta

# Create the `data` folder if not exists
os.makedirs("data", exist_ok=True)

# Sample data for 5 stores
stores = ['store_001', 'store_002', 'store_003', 'store_004', 'store_005']
products = [
    {'Product_ID': 'P001', 'Product_Name': 'Shampoo'},
    {'Product_ID': 'P002', 'Product_Name': 'Soap'},
    {'Product_ID': 'P003', 'Product_Name': 'Toothpaste'},
    {'Product_ID': 'P004', 'Product_Name': 'Lotion'},
    {'Product_ID': 'P005', 'Product_Name': 'Face Wash'},
]
payment_modes = ['Cash', 'Card', 'UPI', 'Wallet']

for store in stores:
    records = []
    for _ in range(50):  # 50 rows per store
        product = random.choice(products)
        row = {
            'Store_ID': store,
            'Date': (datetime.now() - timedelta(days=random.randint(0, 10))).strftime('%Y-%m-%d'),
            'Product_ID': product['Product_ID'],
            'Product_Name': product['Product_Name'],
            'Quantity_Sold': random.randint(1, 10),
            'Unit_Price': round(random.uniform(10, 100), 2),
            'Discount_Percent': round(random.uniform(0, 30), 2),
            'Payment_Mode': random.choice(payment_modes)
        }
        records.append(row)
    
    df = pd.DataFrame(records)
    file_path = f"data/{store}_sales.csv"
    df.to_csv(file_path, index=False)

print("✅ 5 sample CSV files created in `data/` folder.")

Sample file created.
✅ 5 sample CSV files created in `data/` folder.


In [31]:
with pd.option_context('display.max_rows', None, 'display.max_columns', None):
    display(df)

Unnamed: 0,Store_ID,Date,Product_ID,Product_Name,Quantity_Sold,Unit_Price,Discount_Percent,Payment_Mode
0,store_005,2025-07-16,P002,Soap,9,44.75,23.7,Wallet
1,store_005,2025-07-20,P003,Toothpaste,8,44.03,15.57,Wallet
2,store_005,2025-07-15,P005,Face Wash,3,30.78,2.85,Card
3,store_005,2025-07-20,P003,Toothpaste,10,67.12,25.64,Cash
4,store_005,2025-07-12,P005,Face Wash,5,26.72,10.51,Cash
5,store_005,2025-07-16,P002,Soap,10,30.15,25.13,Cash
6,store_005,2025-07-14,P001,Shampoo,5,41.89,2.8,UPI
7,store_005,2025-07-10,P005,Face Wash,8,81.65,19.19,Wallet
8,store_005,2025-07-10,P002,Soap,8,32.17,21.19,Wallet
9,store_005,2025-07-19,P001,Shampoo,10,85.65,7.76,UPI


In [32]:
df = extract_data()
df.head()  # Show first 5 rows

Unnamed: 0,Store_ID,Date,Product_ID,Product_Name,Quantity_Sold,Unit_Price,Discount_Percent,Payment_Mode
0,store_001,2025-07-19,P005,Face Wash,4,35.51,9.83,UPI
1,store_001,2025-07-13,P002,Soap,5,23.71,14.91,Card
2,store_001,2025-07-13,P003,Toothpaste,2,86.08,6.89,Card
3,store_001,2025-07-12,P003,Toothpaste,5,86.01,11.09,Wallet
4,store_001,2025-07-14,P003,Toothpaste,6,29.86,6.06,Wallet


In [33]:
df = transform_data(df)
df.head()

Unnamed: 0,store_id,date,product_id,product_name,quantity_sold,unit_price,discount_percent,payment_mode,total_sale_value,sale_category
0,store_001,2025-07-19,P005,Face Wash,4,35.51,9.83,UPI,128.077468,Low
1,store_001,2025-07-13,P002,Soap,5,23.71,14.91,Card,100.874195,Low
2,store_001,2025-07-13,P003,Toothpaste,2,86.08,6.89,Card,160.298176,Low
3,store_001,2025-07-12,P003,Toothpaste,5,86.01,11.09,Wallet,382.357455,Low
4,store_001,2025-07-14,P003,Toothpaste,6,29.86,6.06,Wallet,168.302904,Low


In [34]:
import os

# Create 'output' directory if it doesn't exist
os.makedirs("output", exist_ok=True)

# Now save the file
df.to_csv("output/cleaned_data.csv", index=False)

# View the first 10 rows
df.head(10)

Unnamed: 0,store_id,date,product_id,product_name,quantity_sold,unit_price,discount_percent,payment_mode,total_sale_value,sale_category
0,store_001,2025-07-19,P005,Face Wash,4,35.51,9.83,UPI,128.077468,Low
1,store_001,2025-07-13,P002,Soap,5,23.71,14.91,Card,100.874195,Low
2,store_001,2025-07-13,P003,Toothpaste,2,86.08,6.89,Card,160.298176,Low
3,store_001,2025-07-12,P003,Toothpaste,5,86.01,11.09,Wallet,382.357455,Low
4,store_001,2025-07-14,P003,Toothpaste,6,29.86,6.06,Wallet,168.302904,Low
5,store_001,2025-07-17,P002,Soap,2,14.13,14.8,Card,24.07752,Low
6,store_001,2025-07-16,P005,Face Wash,9,22.07,3.06,UPI,192.551922,Low
7,store_001,2025-07-10,P003,Toothpaste,7,67.82,18.04,UPI,389.096904,Low
8,store_001,2025-07-13,P005,Face Wash,8,98.17,3.41,Card,758.579224,Medium
9,store_001,2025-07-20,P001,Shampoo,1,22.76,28.22,Cash,16.337128,Low


In [36]:
if __name__ == "__main__":
    logging.info("🚀 ETL Process Started.")

    df = extract_data()
    if df is not None:
        df = transform_data(df)
        if df is not None:
            load_to_mysql(df)
            logging.info("🏁 ETL Process Completed.")
        else:
            logging.error("⚠️ Transformation returned None.")
    else:
        logging.error("⚠️ Extraction failed.")

In [37]:
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute("SHOW TABLES")
print(cursor.fetchall())  # 👉 Should show [('sales_data',)]

[('sales_data',)]


In [38]:
cursor.execute("SELECT COUNT(*) FROM sales_data")
print("🧾 Total rows:", cursor.fetchone())

🧾 Total rows: (166,)
