In [41]:
import os
import pandas as pd
import numpy as np
import mysql.connector
import logging
from datetime import datetime


In [42]:
logging.basicConfig(filename='etl_log.log', level=logging.INFO, format='%(asctime)s %(message)s')


In [43]:
db_config = {
    'user': 'root',
    'password': 'Muthu@2005',
    'host': 'localhost',
    'database': 'retail_db'
}

In [44]:
DATA_FOLDER = 'data/'  # Your local folder path for CSVs


In [45]:
CSV_FILES = [
    'store_001_sales.csv',
    'store_002_sales.csv',
    'store_003_sales.csv',
    'store_004_sales.csv',
    'store_005_sales.csv'
]

In [46]:
def extract_data():
    file_paths = [os.path.join(DATA_FOLDER, f) for f in CSV_FILES]
    df_list = [pd.read_csv(path) for path in file_paths]
    df = pd.concat(df_list, ignore_index=True)
    logging.info(f"Extracted {len(df)} rows from {len(file_paths)} files.")
    return df

In [47]:
def transform_data(df):
    df.columns = df.columns.str.strip().str.lower()
    df['date'] = pd.to_datetime(df['date'], errors='coerce')
    df.dropna(subset=['store_id', 'date', 'product_id'], inplace=True)
    df['quantity_sold'] = df['quantity_sold'].fillna(0).astype(int)
    df['unit_price'] = df['unit_price'].fillna(0.0).astype(float)
    df['discount_percent'] = df['discount_percent'].fillna(0.0).astype(float)
    df['total_sale_value'] = df['quantity_sold'] * df['unit_price'] * (1 - df['discount_percent'] / 100)
    df.drop_duplicates(subset=['store_id', 'date', 'product_id'], inplace=True)
    df['sale_category'] = np.where(df['total_sale_value'] >= 1000, 'High',
                            np.where(df['total_sale_value'] >= 500, 'Medium', 'Low'))
    
    logging.info(f"Transformed data. Rows after cleaning: {len(df)}")
    return df  # ✅ return the transformed DataFrame


In [65]:
def create_mysql_table(cursor):
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS sales_data (
            store_id VARCHAR(10),
            date DATE,
            product_id VARCHAR(20),
            quantity_sold INT,
            unit_price FLOAT,
            discount_percent FLOAT,
            total_sale_value FLOAT,
            sale_category VARCHAR(10),
            PRIMARY KEY (store_id, date, product_id)
        )
    """)


In [66]:
def load_to_mysql(df):
    conn = mysql.connector.connect(**db_config)
    cursor = conn.cursor()
    create_mysql_table(cursor)  # This must run before inserting

    insert_query = """
        REPLACE INTO sales_data (
            store_id, date, product_id, quantity_sold,
            unit_price, discount_percent, total_sale_value, sale_category
        ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
    """

    for _, row in df.iterrows():
        cursor.execute(insert_query, (
            row['store_id'], row['date'], row['product_id'], row['quantity_sold'],
            row['unit_price'], row['discount_percent'], row['total_sale_value'], row['sale_category']
        ))

    conn.commit()
    logging.info(f"✅ Loaded {len(df)} rows into MySQL.")
    cursor.close()
    conn.close()



In [50]:
def analyze_and_export(df):
    df.groupby('store_id')['total_sale_value'].sum().reset_index().to_csv('store_sales_summary.csv', index=False)
    df.groupby('product_name')['total_sale_value'].sum().nlargest(5).reset_index().to_csv('top_5_products.csv', index=False)
    df.groupby(['store_id', 'date'])['total_sale_value'].sum().reset_index().to_excel('daily_sales_trend.xlsx', index=False)
    logging.info("Analysis reports generated and saved.")


In [51]:
def main():
    logging.info("ETL Process Started.")
    df = extract_data()
    df = transform_data(df)
    load_to_mysql(df)
    analyze_and_export(df)
    logging.info("ETL Process Completed.")

In [52]:
import os
import pandas as pd

# Create mock data directory
os.makedirs("data", exist_ok=True)

# Sample data
data = {
    'Store_ID': ['S001'],
    'Date': ['2025-07-18'],
    'Product_ID': ['P001'],
    'Product_Name': ['Shampoo'],
    'Quantity_Sold': [10],
    'Unit_Price': [100.0],
    'Discount_Percent': [10.0],
    'Payment_Mode': ['Card']
}

# Save sample CSV
df = pd.DataFrame(data)
df.to_csv('data/store_001_sales.csv', index=False)
print("Sample file created.")

import pandas as pd
import os
import random
from datetime import datetime, timedelta

# Create the `data` folder if not exists
os.makedirs("data", exist_ok=True)

# Sample data for 5 stores
stores = ['store_001', 'store_002', 'store_003', 'store_004', 'store_005']
products = [
    {'Product_ID': 'P001', 'Product_Name': 'Shampoo'},
    {'Product_ID': 'P002', 'Product_Name': 'Soap'},
    {'Product_ID': 'P003', 'Product_Name': 'Toothpaste'},
    {'Product_ID': 'P004', 'Product_Name': 'Lotion'},
    {'Product_ID': 'P005', 'Product_Name': 'Face Wash'},
]
payment_modes = ['Cash', 'Card', 'UPI', 'Wallet']

for store in stores:
    records = []
    for _ in range(50):  # 50 rows per store
        product = random.choice(products)
        row = {
            'Store_ID': store,
            'Date': (datetime.now() - timedelta(days=random.randint(0, 10))).strftime('%Y-%m-%d'),
            'Product_ID': product['Product_ID'],
            'Product_Name': product['Product_Name'],
            'Quantity_Sold': random.randint(1, 10),
            'Unit_Price': round(random.uniform(10, 100), 2),
            'Discount_Percent': round(random.uniform(0, 30), 2),
            'Payment_Mode': random.choice(payment_modes)
        }
        records.append(row)
    
    df = pd.DataFrame(records)
    file_path = f"data/{store}_sales.csv"
    df.to_csv(file_path, index=False)

print("✅ 5 sample CSV files created in `data/` folder.")


Sample file created.
✅ 5 sample CSV files created in `data/` folder.


In [53]:
with pd.option_context('display.max_rows', None, 'display.max_columns', None):
    display(df)


Unnamed: 0,Store_ID,Date,Product_ID,Product_Name,Quantity_Sold,Unit_Price,Discount_Percent,Payment_Mode
0,store_005,2025-07-15,P002,Soap,3,43.69,28.85,Card
1,store_005,2025-07-15,P003,Toothpaste,3,48.65,6.0,UPI
2,store_005,2025-07-14,P001,Shampoo,9,47.53,1.64,Card
3,store_005,2025-07-13,P003,Toothpaste,4,32.2,19.03,Cash
4,store_005,2025-07-13,P003,Toothpaste,2,79.39,3.9,Card
5,store_005,2025-07-11,P004,Lotion,4,32.94,20.93,Card
6,store_005,2025-07-10,P002,Soap,4,15.49,3.26,UPI
7,store_005,2025-07-16,P002,Soap,7,93.11,8.47,Card
8,store_005,2025-07-09,P004,Lotion,3,28.42,2.26,Card
9,store_005,2025-07-11,P001,Shampoo,1,68.67,22.41,Wallet


In [54]:
df = extract_data()
df.head()  # Show first 5 rows


Unnamed: 0,Store_ID,Date,Product_ID,Product_Name,Quantity_Sold,Unit_Price,Discount_Percent,Payment_Mode
0,store_001,2025-07-13,P001,Shampoo,3,68.33,28.0,Card
1,store_001,2025-07-15,P001,Shampoo,8,70.65,14.78,Card
2,store_001,2025-07-11,P005,Face Wash,10,63.18,29.84,Wallet
3,store_001,2025-07-13,P003,Toothpaste,10,37.9,16.76,Card
4,store_001,2025-07-09,P002,Soap,5,90.39,3.33,UPI


In [56]:
df = transform_data(df)
df.head()


Unnamed: 0,store_id,date,product_id,product_name,quantity_sold,unit_price,discount_percent,payment_mode,total_sale_value,sale_category
0,store_001,2025-07-13,P001,Shampoo,3,68.33,28.0,Card,147.5928,Low
1,store_001,2025-07-15,P001,Shampoo,8,70.65,14.78,Card,481.66344,Low
2,store_001,2025-07-11,P005,Face Wash,10,63.18,29.84,Wallet,443.27088,Low
3,store_001,2025-07-13,P003,Toothpaste,10,37.9,16.76,Card,315.4796,Low
4,store_001,2025-07-09,P002,Soap,5,90.39,3.33,UPI,436.900065,Low


In [58]:
import os

# Create 'output' directory if it doesn't exist
os.makedirs("output", exist_ok=True)

# Now save the file
df.to_csv("output/cleaned_data.csv", index=False)

# View the first 10 rows
df.head(10)



Unnamed: 0,store_id,date,product_id,product_name,quantity_sold,unit_price,discount_percent,payment_mode,total_sale_value,sale_category
0,store_001,2025-07-13,P001,Shampoo,3,68.33,28.0,Card,147.5928,Low
1,store_001,2025-07-15,P001,Shampoo,8,70.65,14.78,Card,481.66344,Low
2,store_001,2025-07-11,P005,Face Wash,10,63.18,29.84,Wallet,443.27088,Low
3,store_001,2025-07-13,P003,Toothpaste,10,37.9,16.76,Card,315.4796,Low
4,store_001,2025-07-09,P002,Soap,5,90.39,3.33,UPI,436.900065,Low
5,store_001,2025-07-10,P005,Face Wash,5,97.61,16.66,Card,406.74087,Low
6,store_001,2025-07-09,P003,Toothpaste,7,96.59,25.88,UPI,501.147556,Medium
8,store_001,2025-07-10,P001,Shampoo,4,55.65,25.36,Card,166.14864,Low
9,store_001,2025-07-08,P005,Face Wash,8,94.66,1.56,Cash,745.466432,Medium
10,store_001,2025-07-11,P002,Soap,6,60.3,29.45,Cash,255.2499,Low


In [67]:
if __name__ == "__main__":
    logging.info("🚀 ETL Process Started.")

    df = extract_data()
    if df is not None:
        df = transform_data(df)
        if df is not None:
            load_to_mysql(df)
            logging.info("🏁 ETL Process Completed.")
        else:
            logging.error("⚠️ Transformation returned None.")
    else:
        logging.error("⚠️ Extraction failed.")


In [68]:
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute("SHOW TABLES")
print(cursor.fetchall())  # 👉 Should show [('sales_data',)]


[('retail_sales',), ('sales_data',)]


In [69]:
cursor.execute("SELECT COUNT(*) FROM sales_data")
print("🧾 Total rows:", cursor.fetchone())


🧾 Total rows: (169,)
