In [2]:
1. Prerequisites
Install required packages:

bash
Copy
Edit
pip install pandas numpy mysql-connector-python sqlalchemy
2. ETL Pipeline Overview
Extract: Read .csv files from a directory.

Transform: Clean and process data using Pandas and NumPy.

Load: Insert the data into a MySQL database.

3. Directory Structure Example
kotlin
Copy
Edit
sales_data_etl/
├── data/
│   ├── sales_2025-07-15.csv
│   ├── sales_2025-07-16.csv
├── etl_script.py
4. Python ETL Script (etl_script.py)
python
Copy
Edit
import os
import pandas as pd
import numpy as np
from sqlalchemy import create_engine
from datetime import datetime

# CONFIGURATION
DATA_DIR = './data'
DB_USER = 'your_user'
DB_PASSWORD = 'your_password'
DB_HOST = 'localhost'
DB_PORT = '3306'
DB_NAME = 'sales_db'

# CONNECT TO MYSQL
engine = create_engine(f"mysql+mysqlconnector://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}")

def extract_data(data_dir):
    all_files = [f for f in os.listdir(data_dir) if f.endswith('.csv')]
    df_list = [pd.read_csv(os.path.join(data_dir, file)) for file in all_files]
    combined_df = pd.concat(df_list, ignore_index=True)
    return combined_df

def clean_transform_data(df):
    # Clean column names
    df.columns = df.columns.str.strip().str.lower().str.replace(' ', '_')

    # Handle missing values
    df.fillna({'price': 0, 'quantity': 0, 'store_id': 'unknown'}, inplace=True)

    # Convert data types
    df['date'] = pd.to_datetime(df['date'], errors='coerce')
    df['price'] = pd.to_numeric(df['price'], errors='coerce')
    df['quantity'] = pd.to_numeric(df['quantity'], errors='coerce')

    # Drop rows with invalid dates
    df.dropna(subset=['date'], inplace=True)

    # Add derived fields
    df['total_sales'] = df['price'] * df['quantity']
    return df

def load_to_mysql(df, table_name='sales_data'):
    df.to_sql(table_name, con=engine, if_exists='append', index=False)
    print(f"Loaded {len(df)} records into {table_name}.")

def analyze_data(df):
    print("Top 5 Stores by Sales:")
    print(df.groupby('store_id')['total_sales'].sum().sort_values(ascending=False).head())

    print("\nDaily Sales Summary:")
    print(df.groupby('date')['total_sales'].sum().head())

def main():
    print("Starting ETL process...")

    raw_data = extract_data(DATA_DIR)
    print(f"Extracted {len(raw_data)} rows.")

    cleaned_data = clean_transform_data(raw_data)
    print(f"Cleaned data: {cleaned_data.shape[0]} rows after transformation.")

    load_to_mysql(cleaned_data)
    
    analyze_data(cleaned_data)

if __name__ == "__main__":
    main()
5. MySQL Table Schema (Optional for Setup)
If not using to_sql to auto-create the table, run this SQL to create it manually:

sql
Copy
Edit
CREATE TABLE sales_data (
    date DATE,
    store_id VARCHAR(255),
    product_id VARCHAR(255),
    price DECIMAL(10, 2),
    quantity INT,
    total_sales DECIMAL(12, 2)
);

SyntaxError: invalid character '├' (U+251C) (3311633672.py, line 20)