In [None]:
## required libraries:
# pip install pandas sqlalchemy PyMySQL

## required data:
# Requirements/FileDB.sql to be imported into the MySQL database

# Develop an ETL to fill the tables in the designed Dimensional model in part 
## required functionality:
# The ETL contains the following tasks. 
# ○ Extract the related data from the related tables to fill the first fact table 
# ○ Make sure to handle (even if the data is clean) 
# ■ Redundant data (if it exists) 
# ■ missing values 
# ○ Load the data into the (facts and dimensions tables) 
# ● Make sure to run the ETL with current data, then try to add some data (not 
# cleaned), to make sure the ETL is working well.
# ● Try to use automation tools like (Airflow) (Bouns)

In [5]:
#library import & check
import pandas as pd
from sqlalchemy import create_engine

In [6]:
# mySQL setup
username='root'
password='rootuser'
host='127.0.0.1'
port='3306'

databaseNameSource='sakila' 
databaseNameETL='rental_film_dw'

# connection string format
connectionStrSource=f"mysql+pymysql://{username}:{password}@{host}:{port}/{databaseNameSource}"
engineSource = create_engine(connectionStrSource)

# connection string format
connectionStrETL=f"mysql+pymysql://{username}:{password}@{host}:{port}/{databaseNameETL}"
engineETL = create_engine(connectionStrETL)

In [None]:
# --- ETL Process ---

print("Starting ETL process...")

# --- 1. Populate Dim_Date ---
print("Populating Dim_Date...")
# Find min and max dates from relevant source tables (payment and rental)
min_payment_date_query = "SELECT MIN(payment_date) FROM payment"
max_payment_date_query = "SELECT MAX(payment_date) FROM payment"
min_rental_date_query = "SELECT MIN(rental_date) FROM rental"
max_rental_date_query = "SELECT MAX(rental_date) FROM rental"

min_payment_date = pd.read_sql(min_payment_date_query, engineSource).iloc[0, 0].date()
max_payment_date = pd.read_sql(max_payment_date_query, engineSource).iloc[0, 0].date()
min_rental_date = pd.read_sql(min_rental_date_query, engineSource).iloc[0, 0].date()
max_rental_date = pd.read_sql(max_rental_date_query, engineSource).iloc[0, 0].date()

start_date = min(min_payment_date, min_rental_date)
end_date = max(max_payment_date, max_rental_date)

# Generate dates
date_range = pd.date_range(start=start_date, end=end_date)
dim_date_df = pd.DataFrame({
    'date_key': date_range.strftime('%Y%m%d').astype(int),
    'date': date_range.date,
    'day_of_week': date_range.dayofweek + 1, # Monday=1, Sunday=7
    'day_name': date_range.day_name(),
    'day_of_month': date_range.day,
    'day_of_year': date_range.dayofyear,
    'week_of_year': date_range.isocalendar().week.astype(int),
    'month': date_range.month,
    'month_name': date_range.month_name(),
    'quarter': date_range.quarter,
    'year': date_range.year,
    'fiscal_year': date_range.year,
    'is_weekend': ((date_range.dayofweek == 5) | (date_range.dayofweek == 6))
})

# Load into Dim_Date
try:
    dim_date_df.to_sql('Dim_Date', engineETL, if_exists='append', index=False)
    print("Dim_Date populated successfully.")
except Exception as e:
    print(f"Error populating Dim_Date: {e}")

Starting ETL process...
Populating Dim_Date...
Dim_Date populated successfully.


  dim_date_df.to_sql('Dim_Date', engineETL, if_exists='append', index=False)


In [None]:
# --- 2. Populate Dim_Staff ---
print("Populating Dim_Staff...")
staff_query = "SELECT staff_id, first_name, last_name, email, username FROM staff"
dim_staff_df = pd.read_sql(staff_query, engineSource)

# Handle missing values if any (email can be NULL)
dim_staff_df['email'] = dim_staff_df['email'].fillna('') # Replace None with empty string

# Load into Dim_Staff
try:
    dim_staff_df.to_sql('Dim_Staff', engineETL, if_exists='append', index=False)
    print("Dim_Staff populated successfully.")
except Exception as e:
    print(f"Error populating Dim_Staff: {e}")

# --- 3. Populate Dim_Film ---
print("Populating Dim_Film...")
film_query = """
SELECT
    film_id,
    title,
    description,
    release_year,
    language_id,
    original_language_id,
    rental_duration,
    rental_rate,
    length,
    replacement_cost,
    rating,
    special_features
FROM film
"""
dim_film_df = pd.read_sql(film_query, engineSource)

# Handle missing values for description and special_features if any
dim_film_df['description'] = dim_film_df['description'].fillna('')
dim_film_df['special_features'] = dim_film_df['special_features'].fillna('')

# Load into Dim_Film
try:
    dim_film_df.to_sql('Dim_Film', engineETL, if_exists='append', index=False)
    print("Dim_Film populated successfully.")
except Exception as e:
    print(f"Error populating Dim_Film: {e}")

Populating Dim_Staff...
Dim_Staff populated successfully.
Populating Dim_Film...
Dim_Film populated successfully.
Populating Dim_Store...
Dim_Store populated successfully.


  dim_staff_df.to_sql('Dim_Staff', engineETL, if_exists='append', index=False)
  dim_film_df.to_sql('Dim_Film', engineETL, if_exists='append', index=False)
  dim_store_df.to_sql('Dim_Store', engineETL, if_exists='append', index=False)


In [None]:
# --- 4. Populate Dim_Store ---
print("Populating Dim_Store...")
store_query = """
SELECT
    s.store_id,
    s.address_id
FROM store s
"""
dim_store_df = pd.read_sql(store_query, engineSource)

# Load into Dim_Store
try:
    dim_store_df.to_sql('Dim_Store', engineETL, if_exists='append', index=False)
    print("Dim_Store populated successfully.")
except Exception as e:
    print(f"Error populating Dim_Store: {e}")

In [None]:
# --- 5. Populate Dim_Rent ---
print("Populating Dim_Rent...")
rent_query = """
SELECT
    rental_id,
    inventory_id,
    customer_id,
    rental_date,
    return_date
FROM rental
"""
dim_rent_df = pd.read_sql(rent_query, engineSource)

# Handle missing values (return_date can be NULL)
dim_rent_df['return_date'] = dim_rent_df['return_date'].fillna(pd.NaT) # Use NaT for missing datetime

# Load into Dim_Rent
try:
    dim_rent_df.to_sql('Dim_Rent', engineETL, if_exists='append', index=False)
    print("Dim_Rent populated successfully.")
except Exception as e:
    print(f"Error populating Dim_Rent: {e}")

print("Fetching surrogate keys...")
# Dim_Date - date_key is explicitly generated and known
dim_date_map = dim_date_df[['date', 'date_key']].set_index('date').to_dict()['date_key']

# Dim_Staff
staff_surrogate_map_query = "SELECT staff_id, staff_key FROM Dim_Staff"
staff_surrogate_map_df = pd.read_sql(staff_surrogate_map_query, engineETL)
staff_surrogate_map = staff_surrogate_map_df.set_index('staff_id')['staff_key'].to_dict()

# Dim_Film
film_surrogate_map_query = "SELECT film_id, film_key FROM Dim_Film"
film_surrogate_map_df = pd.read_sql(film_surrogate_map_query, engineETL)
film_surrogate_map = film_surrogate_map_df.set_index('film_id')['film_key'].to_dict()

# Dim_Store
store_surrogate_map_query = "SELECT store_id, store_key FROM Dim_Store"
store_surrogate_map_df = pd.read_sql(store_surrogate_map_query, engineETL)
store_surrogate_map = store_surrogate_map_df.set_index('store_id')['store_key'].to_dict()

# Dim_Rent
rent_surrogate_map_query = "SELECT rental_id, rent_key FROM Dim_Rent"
rent_surrogate_map_df = pd.read_sql(rent_surrogate_map_query, engineETL)
rent_surrogate_map = rent_surrogate_map_df.set_index('rental_id')['rent_key'].to_dict()

Populating Dim_Rent...
Dim_Rent populated successfully.
Fetching surrogate keys...


  dim_rent_df.to_sql('Dim_Rent', engineETL, if_exists='append', index=False)


In [None]:
# --- 5. Populate Dim_Rent ---
print("Populating Dim_Rent...")
rent_query = """
SELECT
    rental_id,
    inventory_id,
    customer_id,
    rental_date,
    return_date
FROM rental
"""
dim_rent_df = pd.read_sql(rent_query, engineSource)

# Handle missing values (return_date can be NULL)
dim_rent_df['return_date'] = dim_rent_df['return_date'].fillna(pd.NaT) # Use NaT for missing datetime

# Load into Dim_Rent
try:
    dim_rent_df.to_sql('Dim_Rent', engineETL, if_exists='append', index=False)
    print("Dim_Rent populated successfully.")
except Exception as e:
    print(f"Error populating Dim_Rent: {e}")

print("Fetching surrogate keys...")
# Dim_Date - date_key is explicitly generated and known
dim_date_map = dim_date_df[['date', 'date_key']].set_index('date').to_dict()['date_key']

# Dim_Staff
staff_surrogate_map_query = "SELECT staff_id, staff_key FROM Dim_Staff"
staff_surrogate_map_df = pd.read_sql(staff_surrogate_map_query, engineETL)
staff_surrogate_map = staff_surrogate_map_df.set_index('staff_id')['staff_key'].to_dict()

# Dim_Film
film_surrogate_map_query = "SELECT film_id, film_key FROM Dim_Film"
film_surrogate_map_df = pd.read_sql(film_surrogate_map_query, engineETL)
film_surrogate_map = film_surrogate_map_df.set_index('film_id')['film_key'].to_dict()

# Dim_Store
store_surrogate_map_query = "SELECT store_id, store_key FROM Dim_Store"
store_surrogate_map_df = pd.read_sql(store_surrogate_map_query, engineETL)
store_surrogate_map = store_surrogate_map_df.set_index('store_id')['store_key'].to_dict()

# Dim_Rent
rent_surrogate_map_query = "SELECT rental_id, rent_key FROM Dim_Rent"
rent_surrogate_map_df = pd.read_sql(rent_surrogate_map_query, engineETL)
rent_surrogate_map = rent_surrogate_map_df.set_index('rental_id')['rent_key'].to_dict()


Populating Dim_Rent...
Error populating Dim_Rent: (pymysql.err.IntegrityError) (1062, "Duplicate entry '1' for key 'dim_rent.rental_id'")
[SQL: INSERT INTO `Dim_Rent` (rental_id, inventory_id, customer_id, rental_date, return_date) VALUES (%(rental_id)s, %(inventory_id)s, %(customer_id)s, %(rental_date)s, %(return_date)s)]
[parameters: [{'rental_id': 1, 'inventory_id': 367, 'customer_id': 130, 'rental_date': datetime.datetime(2005, 5, 24, 22, 53, 30), 'return_date': datetime.datetime(2005, 5, 26, 22, 4, 30)}, {'rental_id': 2, 'inventory_id': 1525, 'customer_id': 459, 'rental_date': datetime.datetime(2005, 5, 24, 22, 54, 33), 'return_date': datetime.datetime(2005, 5, 28, 19, 40, 33)}, {'rental_id': 3, 'inventory_id': 1711, 'customer_id': 408, 'rental_date': datetime.datetime(2005, 5, 24, 23, 3, 39), 'return_date': datetime.datetime(2005, 6, 1, 22, 12, 39)}, {'rental_id': 4, 'inventory_id': 2452, 'customer_id': 333, 'rental_date': datetime.datetime(2005, 5, 24, 23, 4, 41), 'return_date':

In [None]:
# --- 6. Populate Fact_Monthly_Payment ---
print("Populating Fact_Monthly_Payment...")
payment_fact_query = """
SELECT
    p.payment_date,
    p.staff_id,
    p.rental_id,
    p.amount
FROM payment p
"""
fact_monthly_payment_df = pd.read_sql(payment_fact_query, engineSource)

# Transform: Aggregate and map surrogate keys
# Extract month for aggregation
fact_monthly_payment_df['payment_month_start'] = fact_monthly_payment_df['payment_date'].dt.to_period('M').dt.start_time

# First, ensure all necessary columns are present for grouping
fact_monthly_payment_df['payment_date_only'] = fact_monthly_payment_df['payment_date'].dt.date

fact_monthly_payment_agg = fact_monthly_payment_df.groupby([
    'payment_date_only', 'staff_id', 'rental_id'
]).agg(
    payment_amount=('amount', 'sum'),
    payment_count=('amount', 'count')
).reset_index()

# Map date_key, staff_key, rent_key
fact_monthly_payment_agg['date_key'] = fact_monthly_payment_agg['payment_date_only'].map(dim_date_map)
fact_monthly_payment_agg['staff_key'] = fact_monthly_payment_agg['staff_id'].map(staff_surrogate_map)
fact_monthly_payment_agg['rent_key'] = fact_monthly_payment_agg['rental_id'].map(rent_surrogate_map)

# Select relevant columns for the fact table
fact_monthly_payment_final_df = fact_monthly_payment_agg[[
    'date_key', 'staff_key', 'rent_key', 'payment_amount', 'payment_count'
]]

# Handle potential missing keys if any (shouldn't happen with correct joins/maps)
fact_monthly_payment_final_df.dropna(inplace=True)
fact_monthly_payment_final_df = fact_monthly_payment_final_df.astype({
    'date_key': int,
    'staff_key': int,
    'rent_key': int
})

# Load into Fact_Monthly_Payment
try:
    fact_monthly_payment_final_df.to_sql('Fact_Monthly_Payment', engineETL, if_exists='append', index=False)
    print("Fact_Monthly_Payment populated successfully.")
except Exception as e:
    print(f"Error populating Fact_Monthly_Payment: {e}")

Populating Fact_Monthly_Payment...


A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  fact_monthly_payment_final_df.dropna(inplace=True)


Fact_Monthly_Payment populated successfully.


  fact_monthly_payment_final_df.to_sql('Fact_Monthly_Payment', engineETL, if_exists='append', index=False)


In [None]:
# --- 7. Populate Fact_Daily_Inventory ---
print("Populating Fact_Daily_Inventory...")

# Get distinct inventory items with their film and store IDs
inventory_query = "SELECT inventory_id, film_id, store_id FROM inventory"
source_inventory_df = pd.read_sql(inventory_query, engineSource)

# Group by film_id and store_id to get inventory count for each film-store combination
daily_inventory_agg = source_inventory_df.groupby(['film_id', 'store_id']).agg(
    inventory_count=('inventory_id', 'count')
).reset_index()

# Get all unique dates from Dim_Date
all_dates_df = pd.read_sql("SELECT date, date_key FROM Dim_Date", engineETL)

# Create a Cartesian product of daily_inventory_agg and all_dates_df
fact_daily_inventory_df = pd.merge(daily_inventory_agg.assign(key=1), all_dates_df.assign(key=1), on='key').drop('key', axis=1)

# Map surrogate keys
fact_daily_inventory_df['film_key'] = fact_daily_inventory_df['film_id'].map(film_surrogate_map)
fact_daily_inventory_df['store_key'] = fact_daily_inventory_df['store_id'].map(store_surrogate_map)

# Select and rename columns for the fact table
fact_daily_inventory_final_df = fact_daily_inventory_df[[
    'date_key', 'film_key', 'store_key', 'inventory_count'
]]

# Handle potential missing keys
fact_daily_inventory_final_df.dropna(inplace=True)
fact_daily_inventory_final_df = fact_daily_inventory_final_df.astype({
    'date_key': int,
    'film_key': int,
    'store_key': int
})

# Load into Fact_Daily_Inventory
try:
    fact_daily_inventory_final_df.to_sql('Fact_Daily_Inventory', engineETL, if_exists='append', index=False)
    print("Fact_Daily_Inventory populated successfully.")
except Exception as e:
    print(f"Error populating Fact_Daily_Inventory: {e}")

print("ETL process completed.")

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  fact_daily_inventory_final_df.dropna(inplace=True)


Populating Fact_Daily_Inventory...
Fact_Daily_Inventory populated successfully.
ETL process completed.


  fact_daily_inventory_final_df.to_sql('Fact_Daily_Inventory', engineETL, if_exists='append', index=False)
