In [105]:
import mysql.connector
import pymysql
import pandas as pd
from sqlalchemy import create_engine

In [106]:
import os
import pandas as pd
from dotenv import load_dotenv
from sqlalchemy import create_engine

# Load environment variables from .env file
load_dotenv()


# Get DB credentials from environment variables
DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")
DB_HOST = os.getenv("DB_HOST")
DB_PORT = int(os.getenv("DB_PORT") or 3306)
DB_NAME = os.getenv("DB_NAME")
print(f"DB_USER='{DB_USER}'")
print(f"DB_PASSWORD='{DB_PASSWORD}'")
print(f"DB_HOST='{DB_HOST}'")
print(f"DB_PORT='{DB_PORT}'")
print(f"DB_NAME='{DB_NAME}'")

from urllib.parse import quote_plus

password_encoded = quote_plus(DB_PASSWORD)
engine = create_engine(f"mysql+pymysql://{DB_USER}:{password_encoded}@{DB_HOST}:{DB_PORT}/{DB_NAME}")

conn = engine.connect()

print("✅ Database connection successful.")

DB_USER='root'
DB_PASSWORD='12345678'
DB_HOST='host.docker.internal'
DB_PORT='3306'
DB_NAME='sakila'
✅ Database connection successful.


In [107]:
def clean_dataframe(df, table_name=None):
    """
    Removes missing values and duplicates from a DataFrame.
    
    Args:
        df (pd.DataFrame): The DataFrame to clean.
        table_name (str): Optional, name of the table for logging purposes.
    
    Returns:
        pd.DataFrame: Cleaned DataFrame.
    """
    original_shape = df.shape

    # Drop missing values
    df = df.dropna()

    # Drop duplicates
    df = df.drop_duplicates()

    cleaned_shape = df.shape

    if table_name:
        print(f"[{table_name}] Cleaned: {original_shape[0] - cleaned_shape[0]} rows removed")

    return df


In [108]:
# 1. dim_staff
dim_staff_query = "SELECT staff_id, first_name, last_name, store_id FROM staff"
dim_staff = pd.read_sql(dim_staff_query, engine)
dim_staff = clean_dataframe(dim_staff, table_name='dim_staff')
dim_staff.to_sql('dim_staff', con=engine, if_exists='append', index=False)
dim_staff


[dim_staff] Cleaned: 0 rows removed


Unnamed: 0,staff_id,first_name,last_name,store_id
0,1,Mike,Hillyer,1
1,2,Jon,Stephens,2


In [109]:
#film
dim_film_query = "SELECT film_id, title, release_year, language_id FROM film"
dim_film = pd.read_sql(dim_film_query, engine)
dim_film = clean_dataframe(dim_film, table_name='dim_film')
dim_film.to_sql('dim_film', con=engine, if_exists='append', index=False)


[dim_film] Cleaned: 0 rows removed


1000

In [None]:
#store
dim_store_query = "SELECT store_id, manager_staff_id, address_id FROM store"
dim_store = pd.read_sql(dim_store_query, engine)
dim_store = clean_dataframe(dim_store, table_name='dim_store')
dim_store.to_sql('dim_store', con=engine, if_exists='append', index=False)


[dim_store] Cleaned: 0 rows removed


2

In [111]:
# 1. dim_date
date_range = pd.date_range(start='2005-01-01', end='2006-12-31', freq='D')
dim_date = pd.DataFrame({
   'date_id': date_range.strftime('%Y%m%d').astype(int),
   'full_date': date_range,
   'month': date_range.month,
   'year': date_range.year,
})
dim_date = clean_dataframe(dim_date, table_name='dim_date')
dim_date.to_sql('dim_date', con=engine, if_exists='append', index=False)


[dim_date] Cleaned: 0 rows removed


730

In [112]:
# First, ensure all rental records are in dim_rental
rentals = pd.read_sql(
    "SELECT rental_id, rental_date, inventory_id, customer_id FROM rental", 
    engine
)
rentals = clean_dataframe(rentals, table_name='dim_rental')
rentals.to_sql('dim_rental', con=engine, if_exists='append', index=False)

# Then insert your fact table data


[dim_rental] Cleaned: 0 rows removed


16044

In [113]:
# Get rental and inventory data
rental_df = pd.read_sql("SELECT rental_id, rental_date, inventory_id FROM rental", engine)
inventory_df = pd.read_sql("SELECT inventory_id, film_id, store_id FROM inventory", engine)

# Merge rental with inventory
rental_inventory = rental_df.merge(inventory_df, on='inventory_id', how='inner')

# Convert rental_date to date_id format (YYYYMMDD)
rental_inventory['date_id'] = pd.to_datetime(rental_inventory['rental_date']).dt.strftime('%Y%m%d').astype(int)

# Group and count rentals per film-store-date as "inventory_count"
fact_daily_inventory_df = rental_inventory.groupby(
    ['date_id', 'film_id', 'store_id']
).size().reset_index(name='inventory_count')
fact_daily_inventory_df = clean_dataframe(fact_daily_inventory_df, table_name='fact_daily_inventory')
# Insert into your existing table
fact_daily_inventory_df.to_sql('fact_daily_inventory', con=engine, if_exists='append', index=False)


[fact_daily_inventory] Cleaned: 0 rows removed


14102

In [114]:

payments = pd.read_sql("SELECT staff_id, rental_id, payment_date, amount FROM payment", engine)
payments['payment_date'] = pd.to_datetime(payments['payment_date'])
payments['year'] = payments['payment_date'].dt.year
payments['month'] = payments['payment_date'].dt.month
payments['date_id'] = (payments['year'] * 10000 + payments['month'] * 100 + 1).astype(int)

# Group by staff, rental, and month
fact_monthly = payments.groupby(['staff_id', 'rental_id', 'date_id']).agg({
   'amount': 'sum'
}).reset_index()
fact_monthly.rename(columns={'amount': 'monthly_payment_total'}, inplace=True)
fact_monthly = clean_dataframe(fact_monthly, table_name='fact_monthly_payment')
fact_monthly.to_sql('fact_monthly_payment', con=engine, if_exists='append', index=False)

[fact_monthly_payment] Cleaned: 0 rows removed


16044