### Connection of database and data warehouse

In [1]:
import mysql.connector
from mysql.connector import Error
import pandas as pd

# Database configurations
config = {
    "source": {
        "host": "localhost",
        "user": "root",
        "password": "",
        "database": "gravity_books"
    },
    "target": {
        "host": "localhost",
        "user": "root",
        "password": "",
        "database": "storebooks_dw"
    }
}

def create_connection(config_key):
    """Create database connection"""
    try:
        conn = mysql.connector.connect(**config[config_key])
        print(f"✅ Successfully connected to {config_key} database!")
        return conn
    except Error as e:
        print(f"❌ Error connecting to {config_key} database:", e)
        return None

# Create connections
source_conn = create_connection("source")
target_conn = create_connection("target")

✅ Successfully connected to source database!
✅ Successfully connected to target database!


### Test connection

In [2]:
def test_connection(conn, db_name):
    """Test database connection with a simple query"""
    if conn:
        try:
            cursor = conn.cursor()
            cursor.execute(f"SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = '{db_name}';")
            count = cursor.fetchone()[0]
            print(f"📊 {db_name} contains {count} tables")
            return True
        except Error as e:
            print(f"Query failed on {db_name}:", e)
            return False
    return False

# Test connections
source_ok = test_connection(source_conn, config["source"]["database"])
target_ok = test_connection(target_conn, config["target"]["database"])

if not (source_ok and target_ok):
    print("\n⚠️ Please fix connection issues before proceeding!")
else:
    print("\n🚀 All connections working! Ready for ETL.")

📊 gravity_books contains 15 tables
📊 storebooks_dw contains 5 tables

🚀 All connections working! Ready for ETL.


### Extract customers for dimcustomer

In [3]:
def extract_customers(source_conn):
    """Extract customer data with addresses"""
    query = """
    SELECT 
        c.customer_id,
        c.first_name,
        c.last_name,
        c.email,
        co.country_name,
        CONCAT(a.street_number, ' ', a.street_name) AS street_name,
        ast.address_status
    FROM gravity_books.customer c
    JOIN gravity_books.customer_address ca ON c.customer_id = ca.customer_id
    JOIN gravity_books.address a ON ca.address_id = a.address_id
    JOIN gravity_books.country co ON a.country_id = co.country_id
    JOIN gravity_books.address_status ast ON ca.status_id = ast.status_id
    """
    return pd.read_sql(query, source_conn)

customer_df = extract_customers(source_conn)
print(f"📊 Extracted {len(customer_df)} customer records")
display(customer_df.head())

  return pd.read_sql(query, source_conn)


📊 Extracted 3350 customer records


Unnamed: 0,customer_id,first_name,last_name,email,country_name,street_name,address_status
0,678,Raeann,Doole,rdooleit@wired.com,Afghanistan,2 Sachtjen Place,Active
1,679,Benetta,Bussell,bbusselliu@wp.com,Afghanistan,2 Sachtjen Place,Active
2,734,Harriot,Korpolak,hkorpolakkd@nih.gov,Afghanistan,2 Sachtjen Place,Active
3,1861,Federica,Moulding,fmouldingnw@samsung.com,Afghanistan,2 Sachtjen Place,Active
4,528,Laurel,Brassington,lbrassingtonen@smh.com.au,Afghanistan,336 Tennessee Parkway,Active


### Extract Books for dimbook

In [4]:
def extract_books(source_conn):
    """Extract book data with authors and publishers"""
    query = """
    SELECT 
        b.book_id,
        b.title,
        b.isbn13,
        bl.language_name,
        b.num_pages,
        b.publication_date,
        p.publisher_name,
        GROUP_CONCAT(DISTINCT a.author_name SEPARATOR ', ') AS author_name
    FROM gravity_books.book b
    JOIN gravity_books.publisher p ON b.publisher_id = p.publisher_id
    JOIN gravity_books.book_language bl ON b.language_id = bl.language_id
    LEFT JOIN gravity_books.book_author ba ON b.book_id = ba.book_id
    LEFT JOIN gravity_books.author a ON ba.author_id = a.author_id
    GROUP BY b.book_id
    """
    return pd.read_sql(query, source_conn)

book_df = extract_books(source_conn)
print(f"📚 Extracted {len(book_df)} book records")
display(book_df.head())

  return pd.read_sql(query, source_conn)


📚 Extracted 11127 book records


Unnamed: 0,book_id,title,isbn13,language_name,num_pages,publication_date,publisher_name,author_name
0,1,The World's First Love: Mary Mother of God,8987059752,United States English,276,1996-09-01,Ignatius Press,Fulton J. Sheen
1,2,The Illuminati,20049130001,English,352,2004-10-04,Thomas Nelson,Larry Burkett
2,3,The Servant Leader,23755004321,English,128,2003-03-11,Thomas Nelson,Kenneth H. Blanchard
3,4,What Life Was Like in the Jewel in the Crown: ...,34406054602,English,168,1999-09-01,Time Life Medical,Time-Life Books
4,5,Cliffs Notes on Aristophanes' Lysistrata The ...,49086007763,English,80,1983-12-29,Cliffs Notes,W. John Campbell


### Extract for dimdate

In [6]:
# Corrected date dimension extraction
date_query = """
SELECT DISTINCT
    DATE(o.order_date) AS full_date,
    DAY(o.order_date) AS day,
    MONTH(o.order_date) AS month,
    MONTHNAME(o.order_date) AS month_name,
    QUARTER(o.order_date) AS quarter,
    YEAR(o.order_date) AS year
FROM gravity_books.cust_order o
WHERE DATE(o.order_date) NOT IN (
    SELECT full_date FROM storebooks_dw.dimdate
)
"""

date_df = pd.read_sql(date_query, source_conn)

# Add SCD fields
date_df['is_current'] = 1
print(f"✅ Loaded {len(date_df)} date records")

✅ Loaded 1096 date records


  date_df = pd.read_sql(date_query, source_conn)


### Extract for dimordermethod

In [8]:
from sqlalchemy import Integer, String, DateTime, Boolean, SmallInteger
# Corrected query with proper database reference
method_query = """
SELECT 
    method_id AS method_id_BK,
    method_name,
    1 AS source_system_code  # Adding this at extraction since it's required
FROM gravity_books.shipping_method
WHERE method_id NOT IN (
    SELECT method_id_BK FROM storebooks_dw.dimordermethod
)
"""

method_df = pd.read_sql(method_query, source_conn)

  method_df = pd.read_sql(method_query, source_conn)


### Extract Fact Data from Source (Fact Table)

In [27]:
# Extract order data with all required fields
fact_query = """
SELECT 
    o.order_id,
    o.customer_id,
    ol.book_id,
    o.shipping_method_id,
    DATE(o.order_date) AS order_date,
    os.status_value AS order_status,
    ol.price,
    sm.cost AS shipping_cost,
    1 AS quantity  # Assuming 1 quantity per line item
FROM gravity_books.cust_order o
JOIN gravity_books.order_line ol ON o.order_id = ol.order_id
JOIN gravity_books.order_history oh ON o.order_id = oh.order_id
JOIN gravity_books.order_status os ON oh.status_id = os.status_id
JOIN gravity_books.shipping_method sm ON o.shipping_method_id = sm.method_id
WHERE oh.status_date = (
    SELECT MAX(status_date) 
    FROM gravity_books.order_history 
    WHERE order_id = oh.order_id
)
"""

fact_df = pd.read_sql(fact_query, source_conn)
print(f"📊 Extracted {len(fact_df)} order records")

  fact_df = pd.read_sql(fact_query, source_conn)


📊 Extracted 30777 order records


 ### Transform Data

#### Add data warehouse metadata

In [11]:
def add_warehouse_metadata(df):
    """Add SCD and source system fields"""
    df['source_system_code'] = 1  # Assuming source system 1
    df['start_date'] = pd.to_datetime('now')
    df['end_date'] = None
    df['is_current'] = 1
    return df

# Transform customers
customer_df = add_warehouse_metadata(customer_df)
customer_df = customer_df.rename(columns={'customer_id': 'customer_id_BK'})

# Transform books
book_df = add_warehouse_metadata(book_df)
book_df = book_df.rename(columns={'book_id': 'book_id_BK'})

 #### Transform data to dimcustomer and dimbook

In [17]:
from sqlalchemy import create_engine
import pandas as pd

# Create SQLAlchemy engine for MySQL
target_engine = create_engine(
    f"mysql+mysqlconnector://{config['target']['user']}:{config['target']['password']}@"
    f"{config['target']['host']}/{config['target']['database']}"
)

In [28]:
customer_df['source_system_code'] = 1
customer_df['start_date'] = pd.to_datetime('now')
customer_df['end_date'] = None
customer_df['is_current'] = 1

##### Get Dimension Mappings

In [29]:
# Get all dimension mappings from data warehouse
dim_maps = {
    'customer': pd.read_sql("SELECT customer_id_SK, customer_id_BK FROM dimcustomer", target_engine),
    'book': pd.read_sql("SELECT book_id_SK, book_id_BK FROM dimbook", target_engine),
    'method': pd.read_sql("SELECT method_id_SK, method_id_BK FROM dimordermethod", target_engine),
    'date': pd.read_sql("SELECT date_id_SK, full_date FROM dimdate", target_engine)
}

#### Transform fact table data

In [30]:
# Merge with dimension tables to get surrogate keys
fact_df = fact_df.merge(dim_maps['customer'], left_on='customer_id', right_on='customer_id_BK')
fact_df = fact_df.merge(dim_maps['book'], left_on='book_id', right_on='book_id_BK')
fact_df = fact_df.merge(dim_maps['method'], left_on='shipping_method_id', right_on='method_id_BK')
fact_df = fact_df.merge(dim_maps['date'], left_on='order_date', right_on='full_date')

# Select and rename columns for fact table
fact_df = fact_df[[
    'order_id', 'customer_id_SK', 'book_id_SK',
    'method_id_SK', 'date_id_SK', 'order_status',
    'price', 'shipping_cost', 'quantity'
]]

fact_df.columns = [
    'order_id_BK', 'customer_id_SK', 'book_id_SK',
    'method_id_SK', 'order_date_SK', 'order_status',
    'price', 'shipping_cost', 'quantity'
]

# Add metadata
fact_df['source_system_code'] = 1
fact_df['created_at'] = pd.to_datetime('now')

print("Transformed fact table columns:", fact_df.columns.tolist())

Transformed fact table columns: ['order_id_BK', 'customer_id_SK', 'book_id_SK', 'method_id_SK', 'order_date_SK', 'order_status', 'price', 'shipping_cost', 'quantity', 'source_system_code', 'created_at']


## Load Data into Warehouse

#### Load Customers to dimcustomer

In [21]:
# Load with SQLAlchemy engine
customer_df.to_sql(
    name='dimcustomer',
    con=target_engine,
    if_exists='append',
    index=False,
    chunksize=1000  # Better for large datasets
)
print(f"✅ Loaded {len(customer_df)} customers")

✅ Loaded 3350 customers


#### Loading to dimbook

In [23]:
book_df.to_sql(
    name='dimbook',
    con=target_engine,
    if_exists='append',
    index=False,
    chunksize=1000
)
print(f"✅ Loaded {len(book_df)} books")

✅ Loaded 11127 books


#### Loading to dimdate

In [24]:
# Load to data warehouse
date_df.to_sql(
    name='dimdate',
    con=target_engine,
    if_exists='append',
    index=False
)
print(f"✅ Loaded {len(date_df)} date records")

✅ Loaded 1096 date records


#### Loading to dimordermethod

In [25]:
# Load with SQLAlchemy types
method_df.to_sql(
    name='dimordermethod',
    con=target_engine,
    if_exists='append',
    index=False,
    chunksize=1000,
    dtype={
        'method_id_BK': Integer(),
        'method_name': String(100),
        'source_system_code': SmallInteger(),
        'start_date': DateTime(),
        'end_date': DateTime(),
        'is_current': SmallInteger()
    }
)
print(f"✅ Successfully loaded {len(method_df)} order method records")

✅ Successfully loaded 4 order method records


#### Loading to dimfactorder

In [31]:
from sqlalchemy import Integer, Numeric, String, DateTime, SmallInteger

# Load fact table with proper data types
fact_df.to_sql(
    name='factorder',
    con=target_engine,
    if_exists='append',
    index=False,
    chunksize=1000,
    dtype={
        'order_id_BK': Integer(),
        'customer_id_SK': Integer(),
        'book_id_SK': Integer(),
        'method_id_SK': Integer(),
        'order_date_SK': Integer(),
        'order_status': String(50),
        'price': Numeric(10,2),
        'shipping_cost': Numeric(10,2),
        'quantity': Integer(),
        'source_system_code': SmallInteger(),
        'created_at': DateTime()
    }
)
print(f"✅ Successfully loaded {len(fact_df)} fact records")

✅ Successfully loaded 63801 fact records
