# EXTRACT DATA FROM DB AND LOAD IT INTO DWH

Create connection to the datasource
,Read data from DB source
,Load it into Variable
,Create connection to destination with engine that load data into
,load into sql

# Install MySQL connector for Python

In [None]:
pip install mysql-connector-python 

# Install ODBC driver for SQL Server

In [None]:
pip install pyodbc 

# Install SQLAlchemy for database interaction

In [None]:
pip install sqlalchemy 

In [None]:
## for accessing and reading data from sql db
import pyodbc
import pandas as pd
import os
import mysql.connector

In [None]:
pyodbc.drivers()

In [None]:
#### Mysql , sql server, oracle, postgresql, sqlite, mariadb, etc.
from mysql.connector import Error
try:
    conn = mysql.connector.connect(
        host='localhost',
        user='root',  
        password='',  
        database='sales_management',
        port=3306
    )
    print("Connection successful")
    
except Error as e:
    print(f"Erreur de connexion MySQL: {e}")

In [None]:
cursor = conn.cursor()
cursor.execute("SELECT * FROM orders")
for row in cursor.fetchall():
    print(row)

In [None]:
#read orders table
query = "SELECT * FROM orders"
orders = pd.read_sql(query, conn)
orders

In [None]:
orders.head()

In [None]:
#read customers table
query = "SELECT * FROM customers"
customers = pd.read_sql(query, conn)
customers

In [None]:
customers.head()

In [None]:
#read payments table
query = "SELECT * FROM payments"
payments = pd.read_sql(query, conn)
payments

In [None]:
payments.head()

In [None]:
# Tansformation
try:
    
    # 1. cleaning data
    orders_clean = orders.dropna()  #delete rows with NaN values
    orders_clean['order_date'] = pd.to_datetime(orders_clean['order_date'])  # Convert to datetime
    print(f"Orders nettoyées: {len(orders_clean)} lignes")
    
    # 2. Agregation 
    sales_summary = orders_clean.groupby('customer_id').agg({
        'total_amount': ['sum', 'count', 'mean']
    }).round(2)
    print("Agrégation sales_summary créée")
    
    # 3. join customers with orders
    orders_with_customers = orders_clean.merge(
        customers, 
        on='customer_id',  
        how='left'
    )
    print(f"Jointure réussie: {len(orders_with_customers)} lignes")
    
    # 4. temporal enrichment
    orders_with_customers['order_year'] = orders_with_customers['order_date'].dt.year
    orders_with_customers['order_month'] = orders_with_customers['order_date'].dt.month
    
except Exception as e:
    print(f" Erreur : {e}")
    print("Orders:", orders.columns.tolist() if 'orders' in locals() else "Not defined")
    print("Customers:", customers.columns.tolist() if 'customers' in locals() else "Not defined")

In [None]:
from sqlalchemy import create_engine, text

# First, connect to MySQL and create the DWH database if it doesn't exist
engine_tmp = create_engine('mysql+mysqlconnector://root:@localhost:3306/')
with engine_tmp.connect() as conn_tmp:
	conn_tmp.execute(text("CREATE DATABASE IF NOT EXISTS dwh_sales_management"))

# Now connect to the DWH database
engine_dwh = create_engine('mysql+mysqlconnector://root:@localhost:3306/dwh_sales_management')
conn_dwh = engine_dwh.connect()
print("Connexion DWH réussie")

In [None]:
# load in DWH
try:
    # load orders in DWH
    orders.to_sql(
        name='dim_orders',          
        con=engine_dwh,             
        if_exists='replace',       
        index=False,                 
        method='multi'               
    )
    
    # load customers in DWH
    customers.to_sql(
        name='dim_customers',
        con=engine_dwh,
        if_exists='replace',
        index=False,
        method='multi'
    )
    
    # load payments in DWH
    payments.to_sql(
        name='dim_payments',
        con=engine_dwh,
        if_exists='replace',
        index=False,
        method='multi'
    )

    sales_summary.to_sql('fact_sales_summary', engine_dwh, if_exists='replace')
    orders_with_customers.to_sql('fact_orders_enriched', engine_dwh, if_exists='replace')
    
    print("Chargement DWH terminé:")
    print("   - dim_orders créée")
    print("   - dim_customers créée") 
    print("   - dim_payments créée")
    
except Exception as e:
    print(f"Erreur : {e}")

In [None]:
# FINAL EXPORT AND VERIFICATION
try:
    # Ensure DWH engine is active
    if 'engine_dwh' not in locals():
        engine_dwh = create_engine('mysql+mysqlconnector://root:@localhost:3306/dwh_sales_management')
    
    # Export transformed data to DWH
    if 'sales_summary' in locals():
        sales_summary.to_sql('sales_summary', engine_dwh, if_exists='replace', index=False)
        print("sales_summary exported to DWH")
    
    if 'orders_clean' in locals():
        orders_clean.to_sql('orders_clean', engine_dwh, if_exists='replace', index=False)
        print("orders_clean exported to DWH")
        
        # Export CSV 
        orders_clean.to_csv('orders_clean.csv', index=False)
        print("orders_clean.csv created")
    
    # Final DWH verification
    with engine_dwh.connect() as conn_check:
        result = conn_check.execute(text("SHOW TABLES"))
        tables = [row[0] for row in result.fetchall()]
        
        print(f"\nDWH created successfully! Available tables ({len(tables)}):")
        for table in tables:
            # Count rows in each table
            count_result = conn_check.execute(text(f"SELECT COUNT(*) FROM {table}"))
            count = count_result.fetchone()[0]
            print(f"   - {table}: {count} rows")
    
    # Proper connection cleanup
    if 'engine_dwh' in locals():
        engine_dwh.dispose()
        print("\nDWH connection closed")
    
    if 'conn' in locals() and conn.is_connected():
        conn.close()
        print("Source connection closed")
    
    print("\nETL PROCESS COMPLETED SUCCESSFULLY!")
    
except Exception as e:
    print(f"Final export error: {e}")
    import traceback
    traceback.print_exc()