In [1]:
import mysql.connector
from datetime import datetime, timedelta
import logging

In [2]:
# MySQL connection config
config = {
    'user': 'root',
    'password': 'farah123',
    'host': 'localhost',
    'database': 'dw_schema',  
    'raise_on_warnings': True
}

In [3]:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

In [4]:
# Schema names
SRC_SCHEMA = 'src_schema'
DW_SCHEMA = 'dw_schema'

In [30]:
def get_connection():
    """Create and return database connection"""
    try:
        conn = mysql.connector.connect(**config)
        return conn
    except mysql.connector.Error as e:
        logging.error(f"Error connecting to database: {e}")
        return None

## Extract data from src and Load to dw

In [57]:
def fill_dim_date(start_date_str, end_date_str):
    """Fill date dimension table"""
    conn = get_connection()
    if not conn:
        return
    
    cursor = conn.cursor()
    
    try:
        start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
        end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
        
        current_date = start_date
        while current_date <= end_date:
            day = current_date.day
            month = current_date.month
            month_name = current_date.strftime("%B")
            quarter = f"Q{(month - 1) // 3 + 1}"
            year = current_date.year
            day_name = current_date.strftime("%A")
            is_weekend = 1 if current_date.weekday() >= 5 else 0
            
            sql = """
            INSERT IGNORE INTO dw_schema.dim_date 
            (full_date, day, month, month_name, quarter, year, day_name, is_weekend) 
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
            """
            values = (current_date.date(), day, month, month_name, quarter, year, day_name, is_weekend)
            
            cursor.execute(sql, values)
            current_date += timedelta(days=1)
        
        conn.commit()
        logging.info(f"Dim_date populated from {start_date_str} to {end_date_str}")
        
    except Exception as e:
        logging.error(f"Error filling dim_date: {e}")
        conn.rollback()
    finally:
        cursor.close()
        conn.close()

def fill_dim_staff():
    """Extract and load staff dimension data"""
    conn = get_connection()
    if not conn:
        return
    
    cursor = conn.cursor()
    
    try:
        # Extract staff data from source
        extract_sql = """
        SELECT DISTINCT 
            s.staff_id,
            COALESCE(s.first_name, 'Unknown') as first_name,
            COALESCE(s.last_name, 'Unknown') as last_name,
            COALESCE(s.email, 'unknown@email.com') as email,
            COALESCE(st.store_id, 0) as store_id,
            CASE WHEN s.active = 1 THEN 'Active' ELSE 'Inactive' END as staff_status,
            COALESCE(CONCAT(a.address, ', ', c.city, ', ', co.country), 'Unknown') as address
        FROM src_schema.staff s
        LEFT JOIN src_schema.store st ON s.store_id = st.store_id
        LEFT JOIN src_schema.address a ON s.address_id = a.address_id
        LEFT JOIN src_schema.city c ON a.city_id = c.city_id
        LEFT JOIN src_schema.country co ON c.country_id = co.country_id
        """
        
        cursor.execute(extract_sql)
        staff_data = cursor.fetchall()
        
        # Load into dimension table
        for staff in staff_data:
            staff_id, first_name, last_name, email, store_id, staff_status, address = staff
            store_name = f"Store {store_id}" if store_id > 0 else "Unknown Store"
            
            insert_sql = """
            INSERT INTO dw_schema.dim_staff 
            (first_name, last_name, email, store_name, staff_status, address)
            VALUES (%s, %s, %s, %s, %s, %s) AS new_values
            ON DUPLICATE KEY UPDATE
            first_name = new_values.first_name,
            last_name = new_values.last_name,
            email = new_values.email,
            store_name = new_values.store_name,
            staff_status = new_values.staff_status,
            address = new_values.address
            """
            
            cursor.execute(insert_sql, (first_name, last_name, email, store_name, staff_status, address))
        
        conn.commit()
        logging.info(f"Dim_staff populated with {len(staff_data)} records")
        
    except Exception as e:
        logging.error(f"Error filling dim_staff: {e}")
        conn.rollback()
    finally:
        cursor.close()
        conn.close()


In [4]:
# Call the function to fill dim_date
fill_dim_date('2020-01-01', '2025-12-31')

Dim_date populated from 2020-01-01 to 2025-12-31


In [38]:
def fill_dim_staff():
    """Extract and load staff dimension data"""
    conn = get_connection()
    if not conn:
        return
    
    cursor = conn.cursor()
    
    try:
        # Extract staff data from source
        extract_sql = """
        SELECT DISTINCT 
            s.staff_id,
            COALESCE(s.first_name, 'Unknown') as first_name,
            COALESCE(s.last_name, 'Unknown') as last_name,
            COALESCE(s.email, 'unknown@email.com') as email,
            COALESCE(st.store_id, 0) as store_id,
            CASE WHEN s.active = 1 THEN 'Active' ELSE 'Inactive' END as staff_status,
            COALESCE(CONCAT(a.address, ', ', c.city, ', ', co.country), 'Unknown') as address
        FROM src_schema.staff s
        LEFT JOIN src_schema.store st ON s.store_id = st.store_id
        LEFT JOIN src_schema.address a ON s.address_id = a.address_id
        LEFT JOIN src_schema.city c ON a.city_id = c.city_id
        LEFT JOIN src_schema.country co ON c.country_id = co.country_id
        """
        
        cursor.execute(extract_sql)
        staff_data = cursor.fetchall()
        
        # Load into dimension table
        for staff in staff_data:
            staff_id, first_name, last_name, email, store_id, staff_status, address = staff
            store_name = f"Store {store_id}" if store_id > 0 else "Unknown Store"
            
            insert_sql = """
            INSERT INTO dw_schema.dim_staff 
            (first_name, last_name, email, store_name, staff_status, address)
            VALUES (%s, %s, %s, %s, %s, %s) AS new_values
            ON DUPLICATE KEY UPDATE
            first_name = new_values.first_name,
            last_name = new_values.last_name,
            email = new_values.email,
            store_name = new_values.store_name,
            staff_status = new_values.staff_status,
            address = new_values.address
            """
            
            cursor.execute(insert_sql, (first_name, last_name, email, store_name, staff_status, address))
        
        conn.commit()
        logging.info(f"Dim_staff populated with {len(staff_data)} records")
        
    except Exception as e:
        logging.error(f"Error filling dim_staff: {e}")
        conn.rollback()
    finally:
        cursor.close()
        conn.close()

In [47]:
def fill_dim_rental():
    """Extract and load rental dimension data"""
    conn = get_connection()
    if not conn:
        return
    
    cursor = conn.cursor()
    
    try:
        extract_sql = """
        SELECT DISTINCT
            r.rental_id,
            DATE(r.rental_date) as rental_date,
            DATE(r.return_date) as return_date,
            COALESCE(CONCAT(c.first_name, ' ', c.last_name), 'Unknown Customer') as customer_name,
            COALESCE(f.title, 'Unknown Film') as film_title,
            CASE 
                WHEN r.return_date IS NULL THEN 'Not Returned'
                WHEN r.return_date > DATE_ADD(r.rental_date, INTERVAL f.rental_duration DAY) THEN 'Late Return'
                ELSE 'Returned On Time'
            END as rental_status
        FROM src_schema.rental r
        LEFT JOIN src_schema.customer c ON r.customer_id = c.customer_id
        LEFT JOIN src_schema.inventory i ON r.inventory_id = i.inventory_id
        LEFT JOIN src_schema.film f ON i.film_id = f.film_id
        """
        
        cursor.execute(extract_sql)
        rental_data = cursor.fetchall()
        
        for rental in rental_data:
            rental_id, rental_date, return_date, customer_name, film_title, rental_status = rental
            
            insert_sql = """
            INSERT INTO dw_schema.dim_rental 
            (rental_date, return_date, customer_name, film_title, rental_status)
            VALUES (%s, %s, %s, %s, %s) AS new_values
            ON DUPLICATE KEY UPDATE
            return_date = new_values.return_date,
            customer_name = new_values.customer_name,
            film_title = new_values.film_title,
            rental_status = new_values.rental_status
            """
            
            cursor.execute(insert_sql, (rental_date, return_date, customer_name, film_title, rental_status))
        
        conn.commit()
        logging.info(f"Dim_rental populated with {len(rental_data)} records")
        
    except Exception as e:
        logging.error(f"Error filling dim_rental: {e}")
        conn.rollback()
    finally:
        cursor.close()
        conn.close()

In [40]:
def fill_dim_film():
    """Extract and load film dimension data"""
    conn = get_connection()
    if not conn:
        return
    
    cursor = conn.cursor()
    
    try:
        extract_sql = """
        SELECT DISTINCT
            f.film_id,
            COALESCE(f.title, 'Unknown Title') as title,
            COALESCE(f.description, 'No description available') as description,
            COALESCE(f.release_year, 0) as release_year,
            COALESCE(l.name, 'Unknown') as language,
            COALESCE(f.rating, 'Not Rated') as rating,
            COALESCE(f.special_features, 'None') as special_features,
            COALESCE(c.name, 'Unknown') as category
        FROM src_schema.film f
        LEFT JOIN src_schema.language l ON f.language_id = l.language_id
        LEFT JOIN src_schema.film_category fc ON f.film_id = fc.film_id
        LEFT JOIN src_schema.category c ON fc.category_id = c.category_id
        """
        
        cursor.execute(extract_sql)
        film_data = cursor.fetchall()
        
        for film in film_data:
            film_id, title, description, release_year, language, rating, special_features, category = film
            
            insert_sql = """
            INSERT INTO dw_schema.dim_film 
            (title, description, release_year, language, rating, special_features, category)
            VALUES (%s, %s, %s, %s, %s, %s, %s) AS new_values
            ON DUPLICATE KEY UPDATE
            description = new_values.description,
            release_year = new_values.release_year,
            language = new_values.language,
            rating = new_values.rating,
            special_features = new_values.special_features,
            category = new_values.category
            """
            
            cursor.execute(insert_sql, (title, description, release_year, language, rating, special_features, category))
        
        conn.commit()
        logging.info(f"Dim_film populated with {len(film_data)} records")
        
    except Exception as e:
        logging.error(f"Error filling dim_film: {e}")
        conn.rollback()
    finally:
        cursor.close()
        conn.close()

In [41]:
def fill_dim_store():
    """Extract and load store dimension data"""
    conn = get_connection()
    if not conn:
        return
    
    cursor = conn.cursor()
    
    try:
        extract_sql = """
        SELECT DISTINCT
            s.store_id,
            CONCAT('Store ', s.store_id) as store_name,
            COALESCE(a.address, 'Unknown Address') as address,
            COALESCE(c.city, 'Unknown City') as city,
            COALESCE(co.country, 'Unknown Country') as country,
            COALESCE(CONCAT(st.first_name, ' ', st.last_name), 'Unknown Manager') as manager_name
        FROM src_schema.store s
        LEFT JOIN src_schema.address a ON s.address_id = a.address_id
        LEFT JOIN src_schema.city c ON a.city_id = c.city_id
        LEFT JOIN src_schema.country co ON c.country_id = co.country_id
        LEFT JOIN src_schema.staff st ON s.manager_staff_id = st.staff_id
        """
        
        cursor.execute(extract_sql)
        store_data = cursor.fetchall()
        
        for store in store_data:
            store_id, store_name, address, city, country, manager_name = store
            
            insert_sql = """
            INSERT INTO dw_schema.dim_store 
            (store_name, address, city, country, manager_name)
            VALUES (%s, %s, %s, %s, %s) AS new_values
            ON DUPLICATE KEY UPDATE
            address = new_values.address,
            city = new_values.city,
            country = new_values.country,
            manager_name = new_values.manager_name
            """
            
            cursor.execute(insert_sql, (store_name, address, city, country, manager_name))
        
        conn.commit()
        logging.info(f"Dim_store populated with {len(store_data)} records")
        
    except Exception as e:
        logging.error(f"Error filling dim_store: {e}")
        conn.rollback()
    finally:
        cursor.close()
        conn.close()

In [50]:
# Helper functions to get dimension keys
def get_rental_key(cursor, rental_date, customer_name, film_title):
    """Get rental key from dimension table by matching business attributes"""
    try:
        cursor.execute("""
            SELECT rental_key FROM dw_schema.dim_rental 
            WHERE rental_date = %s AND customer_name = %s AND film_title = %s 
            LIMIT 1
        """, (rental_date, customer_name, film_title))
        result = cursor.fetchone()
        return result[0] if result else None
    except Exception as e:
        logging.error(f"Error getting rental key: {e}")
        return None

def get_staff_key(cursor, first_name, last_name):
    """Get staff key from dimension table by matching name"""
    try:
        cursor.execute("""
            SELECT staff_key FROM dw_schema.dim_staff 
            WHERE first_name = %s AND last_name = %s 
            LIMIT 1
        """, (first_name, last_name))
        result = cursor.fetchone()
        return result[0] if result else None
    except Exception as e:
        logging.error(f"Error getting staff key: {e}")
        return None

def get_film_key(cursor, title):
    """Get film key from dimension table by title"""
    try:
        cursor.execute("""
            SELECT film_key FROM dw_schema.dim_film 
            WHERE title = %s 
            LIMIT 1
        """, (title,))
        result = cursor.fetchone()
        return result[0] if result else None
    except Exception as e:
        logging.error(f"Error getting film key: {e}")
        return None

def get_store_key(cursor, store_name):
    """Get store key from dimension table by store name"""
    try:
        cursor.execute("""
            SELECT store_key FROM dw_schema.dim_store 
            WHERE store_name = %s 
            LIMIT 1
        """, (store_name,))
        result = cursor.fetchone()
        return result[0] if result else None
    except Exception as e:
        logging.error(f"Error getting store key: {e}")
        return None

def get_date_key(cursor, date_value):
    """Get date key from dimension table"""
    try:
        cursor.execute("SELECT date_key FROM dw_schema.dim_date WHERE full_date = %s LIMIT 1", (date_value,))
        result = cursor.fetchone()
        return result[0] if result else None
    except Exception as e:
        logging.error(f"Error getting date key: {e}")
        return None

In [54]:
def fill_fact_monthly_payment():
    """Simple approach to fill monthly payment fact table"""
    conn = get_connection()
    if not conn:
        return
    
    cursor = conn.cursor()
    
    try:
        # Clear existing data
        cursor.execute("TRUNCATE TABLE dw_schema.fact_monthly_payment")
        logging.info("Cleared existing fact_monthly_payment data")
        
        # Get payment data with all needed information
        extract_sql = """
        SELECT 
            p.payment_id,
            DATE(p.payment_date) as payment_date,
            p.amount,
            MONTHNAME(p.payment_date) as month_name,
            CONCAT(c.first_name, ' ', c.last_name) as customer_name,
            f.title as film_title,
            s.first_name as staff_first_name,
            s.last_name as staff_last_name
        FROM src_schema.payment p
        JOIN src_schema.rental r ON p.rental_id = r.rental_id
        JOIN src_schema.customer c ON r.customer_id = c.customer_id
        JOIN src_schema.inventory i ON r.inventory_id = i.inventory_id
        JOIN src_schema.film f ON i.film_id = f.film_id
        JOIN src_schema.staff s ON p.staff_id = s.staff_id
        WHERE p.payment_date IS NOT NULL
        ORDER BY p.payment_id
        """
        
        cursor.execute(extract_sql)
        payment_data = cursor.fetchall()
        
        successful_inserts = 0
        
        for payment in payment_data:
            payment_id, payment_date, amount, month_name, customer_name, film_title, staff_first_name, staff_last_name = payment
            
            # Get rental_key
            cursor.execute("""
                SELECT rental_key FROM dw_schema.dim_rental 
                WHERE customer_name = %s AND film_title = %s 
                LIMIT 1
            """, (customer_name, film_title))
            rental_result = cursor.fetchone()
            rental_key = rental_result[0] if rental_result else None
            
            # Get staff_key
            cursor.execute("""
                SELECT staff_key FROM dw_schema.dim_staff 
                WHERE first_name = %s AND last_name = %s 
                LIMIT 1
            """, (staff_first_name, staff_last_name))
            staff_result = cursor.fetchone()
            staff_key = staff_result[0] if staff_result else None
            
            # Get date_key
            cursor.execute("""
                SELECT date_key FROM dw_schema.dim_date 
                WHERE full_date = %s 
                LIMIT 1
            """, (payment_date,))
            date_result = cursor.fetchone()
            date_key = date_result[0] if date_result else None
            
            # Insert if all keys found
            if rental_key and staff_key and date_key:
                insert_sql = """
                INSERT INTO dw_schema.fact_monthly_payment 
                (rental_key, staff_key, date_key, amount, month_name)
                VALUES (%s, %s, %s, %s, %s)
                """
                cursor.execute(insert_sql, (rental_key, staff_key, date_key, amount, month_name))
                successful_inserts += 1
            else:
                logging.warning(f"Missing keys for payment {payment_id}: rental_key={rental_key}, staff_key={staff_key}, date_key={date_key}")
        
        conn.commit()
        logging.info(f"Successfully inserted {successful_inserts} records into fact_monthly_payment")
        
    except Exception as e:
        logging.error(f"Error filling fact_monthly_payment: {e}")
        conn.rollback()
    finally:
        cursor.close()
        conn.close()


In [67]:
def fill_fact_daily_inventory(start_date_str, end_date_str):
    """Fill fact_daily_inventory for a given date range"""
    conn = get_connection()
    if not conn:
        return
    
    cursor = conn.cursor()
    
    try:
        start_date = datetime.strptime(start_date_str, "%Y-%m-%d").date()
        end_date = datetime.strptime(end_date_str, "%Y-%m-%d").date()
        
        if start_date > end_date:
            logging.warning("Start date is after end date.")
            return

        # Delete existing records in the selected date range
        delete_sql = """
            DELETE FROM dw_schema.fact_daily_inventory 
            WHERE date_key IN (
                SELECT date_key FROM dw_schema.dim_date 
                WHERE full_date BETWEEN %s AND %s
            )
        """
        cursor.execute(delete_sql, (start_date, end_date))
        logging.info(f"Deleted existing fact_daily_inventory records from {start_date} to {end_date}")

        successful_inserts = 0
        current_date = start_date
        
        while current_date <= end_date:
            inventory_sql = """
                SELECT 
                    f.title,
                    st.store_id,
                    COUNT(i.inventory_id) as total_copies,
                    COUNT(CASE 
                        WHEN r.rental_date <= %s 
                        AND (r.return_date IS NULL OR DATE(r.return_date) > %s) 
                        THEN 1 
                    END) as rented_copies
                FROM src_schema.inventory i
                JOIN src_schema.film f ON i.film_id = f.film_id
                JOIN src_schema.store st ON i.store_id = st.store_id
                LEFT JOIN src_schema.rental r ON i.inventory_id = r.inventory_id
                GROUP BY f.title, st.store_id
            """
            cursor.execute(inventory_sql, (current_date, current_date))
            inventory_data = cursor.fetchall()
            
            for film_title, store_id, total_copies, rented_copies in inventory_data:
                available_copies = total_copies - rented_copies

                # Get dimension keys
                cursor.execute("SELECT film_key FROM dw_schema.dim_film WHERE title = %s LIMIT 1", (film_title,))
                film_row = cursor.fetchone()
                film_key = film_row[0] if film_row else None

                store_name = f"Store {store_id}"
                cursor.execute("SELECT store_key FROM dw_schema.dim_store WHERE store_name = %s LIMIT 1", (store_name,))
                store_row = cursor.fetchone()
                store_key = store_row[0] if store_row else None

                cursor.execute("SELECT date_key FROM dw_schema.dim_date WHERE full_date = %s LIMIT 1", (current_date,))
                date_row = cursor.fetchone()
                date_key = date_row[0] if date_row else None

                if film_key and store_key and date_key:
                    insert_sql = """
                        INSERT INTO dw_schema.fact_daily_inventory 
                        (date_key, film_key, store_key, available_qty, rented_qty)
                        VALUES (%s, %s, %s, %s, %s)
                    """
                    cursor.execute(insert_sql, (date_key, film_key, store_key, available_copies, rented_copies))
                    successful_inserts += 1

            current_date += timedelta(days=1)
            if (current_date - start_date).days % 30 == 0:
                logging.info(f"Processed up to {current_date}")

        conn.commit()
        logging.info(f"Inserted {successful_inserts} records into fact_daily_inventory")
        
    except Exception as e:
        logging.error(f"Error in fill_fact_daily_inventory: {e}")
        conn.rollback()
    finally:
        cursor.close()
        conn.close()

In [48]:
def run_full_etl():
    """Run the complete ETL process"""
    logging.info("Starting full ETL process...")
    
    # Fill dimension tables
    fill_dim_staff()
    fill_dim_film()
    fill_dim_store()
    fill_dim_rental()
    
    # Fill fact tables
    fill_fact_monthly_payment()
    fill_fact_daily_inventory()
    
    logging.info("ETL process completed successfully!")


In [62]:
fill_dim_date('2005-01-01', '2025-12-31')


2025-05-25 00:32:05,131 - INFO - Dim_date populated from 2005-01-01 to 2025-12-31


In [49]:
# Run complete ETL process
run_full_etl()

2025-05-24 23:41:59,135 - INFO - Starting full ETL process...
2025-05-24 23:41:59,145 - INFO - Dim_staff populated with 2 records
2025-05-24 23:41:59,318 - INFO - Dim_film populated with 1000 records
2025-05-24 23:41:59,325 - INFO - Dim_store populated with 2 records
2025-05-24 23:42:01,947 - INFO - Dim_rental populated with 16044 records
2025-05-24 23:42:08,290 - INFO - Fact_monthly_payment populated with 16044 records
2025-05-24 23:45:42,420 - INFO - Fact_daily_inventory populated successfully
2025-05-24 23:45:42,421 - INFO - ETL process completed successfully!


In [None]:
# Fill fact tables
fill_fact_monthly_payment()

In [68]:
fill_fact_daily_inventory('2025-1-1','2025-12-31')

2025-05-25 00:47:47,356 - INFO - Deleted existing fact_daily_inventory records from 2025-01-01 to 2025-12-31
2025-05-25 00:49:58,501 - INFO - Processed up to 2025-01-31
2025-05-25 00:52:26,159 - INFO - Processed up to 2025-03-02
2025-05-25 00:54:36,934 - INFO - Processed up to 2025-04-01
2025-05-25 00:56:48,549 - INFO - Processed up to 2025-05-01
2025-05-25 00:59:08,733 - INFO - Processed up to 2025-05-31
2025-05-25 01:01:24,568 - INFO - Processed up to 2025-06-30
2025-05-25 01:03:37,699 - INFO - Processed up to 2025-07-30
2025-05-25 01:05:50,199 - INFO - Processed up to 2025-08-29
2025-05-25 01:08:02,535 - INFO - Processed up to 2025-09-28
2025-05-25 01:10:17,700 - INFO - Processed up to 2025-10-28
2025-05-25 01:12:34,367 - INFO - Processed up to 2025-11-27
2025-05-25 01:14:47,804 - INFO - Processed up to 2025-12-27
2025-05-25 01:15:10,986 - INFO - Inserted 555165 records into fact_daily_inventory


In [35]:
def validate_etl():
    """Validate the ETL results"""
    conn = get_connection()
    if not conn:
        return
    
    cursor = conn.cursor()
    
    try:
        # Check dimension table counts
        tables = ['dim_date', 'dim_staff', 'dim_rental', 'dim_film', 'dim_store', 
                 'fact_monthly_payment', 'fact_daily_inventory']
        
        for table in tables:
            cursor.execute(f"SELECT COUNT(*) FROM dw_schema.{table}")
            count = cursor.fetchone()[0]
            logging.info(f"{table}: {count} records")
        
        # Check for null values in key fields
        cursor.execute("""
            SELECT COUNT(*) FROM dw_schema.fact_monthly_payment 
            WHERE rental_key IS NULL OR staff_key IS NULL OR date_key IS NULL
        """)
        null_count = cursor.fetchone()[0]
        if null_count > 0:
            logging.warning(f"Found {null_count} records with null keys in fact_monthly_payment")
        
    except Exception as e:
        logging.error(f"Error validating ETL: {e}")
    finally:
        cursor.close()
        conn.close()