In [5]:
import pandas as pd
import os
from datetime import datetime

inputFolder = r"C:\Users\jinan\Downloads\raw_data"
outputFolder = './clean_data'
exchangeRatetoOMR= 0.38

def extract_data(file_path):
    return pd.read_csv(file_path)

def transform_data (df):
    if 'Qty' in df.columns:
        df['Qty'] = df['Qty'].fillna(df['Qty'].mean())

    if 'Unit_Price' in df.columns:
        df['Unit_Price'] = df['Unit_Price'].fillna(df['Unit_Price'].mean())

    if 'CustomerID' in df.columns:
        mode_customer_id = df['CustomerID'].mode()
        if not mode_customer_id.empty:
            df['CustomerID'] = df['CustomerID'].fillna(mode_customer_id[0])
   #data type conversion 
    df['Qty'] = df['Qty'].astype(int)
    df['Unit_Price'] = df['Unit_Price'].astype(float)
    df['SaleDate'] = pd.to_datetime(df['SaleDate'], errors='coerce')

    #New column : Total price
    df['Total_Price']= df ['Qty'] * df['Unit_Price']

    #currency conversion
    df['Unit_Price_OMR'] = df['Unit_Price'] * exchangeRatetoOMR
    df['Total_Price_OMR'] = df['Total_Price'] * exchangeRatetoOMR
    
        # Normalize text fields
    if 'ProductName' in df.columns:
        df['ProductName'] = df['ProductName'].str.strip().str.title()

        # CurrencyType conversion to OMR
    if 'CurrencyType' in df.columns:
        df['CurrencyType'] = 'OMR' 

    return df

def load_data(df, output_path):
    df.to_csv(output_path, index = False)
import mysql.connector

def insert_ignore(cursor, table, column, value):
    query = f"INSERT IGNORE INTO {table} ({column}) VALUES (%s)"
    cursor.execute(query, (value,))

def get_id(cursor, table, column, value, id_column='id'):
    query = f"SELECT {id_column} FROM {table} WHERE {column} = %s"
    cursor.execute(query, (value,))
    result = cursor.fetchone()
    return result[0] if result else None

def load_to_mysql(df, store_name):
    ...

def load_to_mysql(df, store_name):
    conn = mysql.connector.connect(
        host='localhost',
        user='root',
        password='root',
        database='SalesDB'
    )
    cursor = conn.cursor()

    # Insert store
    insert_ignore(cursor, 'Stores', 'StoreName', store_name)
    conn.commit()
    store_id = get_id(cursor, 'Stores', 'StoreName', store_name, id_column='StoreID')

    for _, row in df.iterrows():
        # Insert product and customer if not exist
        insert_ignore(cursor, 'Products', 'ProductName', row['ProductName'])
        insert_ignore(cursor, 'Customers', 'CustomerID', row['CustomerID'])
        conn.commit()

        product_id = get_id(cursor, 'Products', 'ProductName', row['ProductName'], id_column='ProductID')
        customer_id = row['CustomerID']  # This is primary key, no need to fetch
        # store_id already fetched

        # Insert into Sales
        sales_query = """
            INSERT INTO Sales (SaleDate, Qty, Unit_Price, Total_Price, Unit_Price_OMR, Total_Price_OMR, ProductID, CustomerID, StoreID)
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
        """
        cursor.execute(sales_query, (
            row['SaleDate'], row['Qty'], row['Unit_Price'], row['Total_Price'],
            row['Unit_Price_OMR'], row['Total_Price_OMR'],
            product_id, customer_id, store_id
        ))
        conn.commit()

    cursor.close()
    conn.close()
    print(f"✅ Loaded data into MySQL for store: {store_name}")

def run_etl_pipeline():
    os.makedirs(outputFolder, exist_ok=True)
    for filename in os.listdir(inputFolder):
        if filename.endswith('.csv'):
            file_path = os.path.join(inputFolder, filename)
            store_name = os.path.splitext(filename)[0]  # Use filename (without .csv) as StoreName
            print(f"Processing: {filename}")
            
            raw_df = extract_data(file_path)
            clean_df = transform_data(raw_df)
            
            output_file = os.path.join(outputFolder, f"cleaned_{filename}")
            load_data(clean_df, output_file)
            print(f"✅ Saved cleaned data to: {output_file}")

            load_to_mysql(clean_df, store_name)
            print(f"✅ Finished ETL for store: {store_name}")

if __name__ == '__main__':
    run_etl_pipeline()

Processing: store_sales_1.csv
✅ Saved cleaned data to: ./clean_data\cleaned_store_sales_1.csv
✅ Loaded data into MySQL for store: store_sales_1
✅ Finished ETL for store: store_sales_1
Processing: store_sales_2.csv
✅ Saved cleaned data to: ./clean_data\cleaned_store_sales_2.csv
✅ Loaded data into MySQL for store: store_sales_2
✅ Finished ETL for store: store_sales_2
Processing: store_sales_3.csv
✅ Saved cleaned data to: ./clean_data\cleaned_store_sales_3.csv
✅ Loaded data into MySQL for store: store_sales_3
✅ Finished ETL for store: store_sales_3
