# Sakila ETL Data Pipeline

In this Extract-Transform-Load pipeline, data is taken from the Sakila sample database and used to construct a data warehouse that stores information about movie rentals. 

The data were taken from three different sources, from a mongo-db cluster, from a json API endpoint, and a MySQL server. The data were all initially stored in the Sakila Database. The code that extracted and prepared the relevant data can be seen in the `prepare_data.ipynb` notebook.

## Define code for interacting with data sources
The following code blocks bring in the needed libraries and set up variables and functions used for interacting with the data sources

In [1]:
import pandas as pd
import os

import pymongo
from sqlalchemy import create_engine
from sqlalchemy import text

In [2]:

mysql_uid = "gab8un"
mysql_pwd = "Passw0rd123"
mysql_url = "mysql-gab8un.mysql.database.azure.com"

atlas_cluster_name = "cluster0.q0atcnd"
atlas_user_name = "gab8un"
atlas_password = "XwlxqfpWD8PxodaA"

mongo_connections = {
    "local": f"mongodb://localhost:27017/",
    "atlas": f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.mongodb.net"
}

conn_str = mongo_connections['atlas']
source_db = 'sakila'
dest_db = 'sakila_dw'

In [3]:

def get_sql_dataframe(user_id, pwd, db_url, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{db_url}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe

def execute_sql(user_id, pwd, db_url, sql):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{db_url}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    connection.execute(sql)
    connection.close()



def set_dataframe(user_id, pwd, db_url, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{db_url}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

## Load data from database

The majority of the data that is used comes from the MySQL database, while other data comes from an API endpoint and a MongoDB cluster. The following code blocks load the data from the various sources into pandas dataframes to be used later on.


In [4]:
rentals_sql = "SELECT r.*, p.amount FROM sakila.rental AS r INNER JOIN sakila.payment AS p ON r.rental_id = p.rental_id;"
rentals_df = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_url, source_db, rentals_sql)
rentals_df.head(5)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update,amount
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53,2.99
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53,2.99
2,3,2005-05-24 23:03:39,1711,408,2005-06-01 22:12:39,1,2006-02-15 21:30:53,3.99
3,4,2005-05-24 23:04:41,2452,333,2005-06-03 01:43:41,2,2006-02-15 21:30:53,4.99
4,5,2005-05-24 23:05:21,2079,222,2005-06-02 04:33:21,1,2006-02-15 21:30:53,6.99


In [5]:
staff_sql = "SELECT * FROM sakila.staff;"
staff_df = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_url, source_db, staff_sql)
staff_df.head(5)

Unnamed: 0,staff_id,first_name,last_name,address_id,picture,email,store_id,active,username,password,last_update
0,1,Mike,Hillyer,3,b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\...,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-15 03:57:16
1,2,Jon,Stephens,4,,Jon.Stephens@sakilastaff.com,2,1,Jon,,2006-02-15 03:57:16


In [6]:
stores_sql = "SELECT store_id, manager_staff_id, address FROM sakila.store AS s INNER JOIN sakila.address as a ON s.address_id = a.address_id;"
stores_df = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_url, source_db, stores_sql)
stores_df.head(5)

Unnamed: 0,store_id,manager_staff_id,address
0,1,1,47 MySakila Drive
1,2,2,28 MySQL Boulevard


In [7]:
payment_sql = "SELECT * FROM sakila.payment";
payment_df = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_url, source_db, payment_sql)
payment_df.head(5)

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,payment_date,last_update
0,1,1,1,76,2.99,2005-05-25 11:30:37,2006-02-15 22:12:30
1,2,1,1,573,0.99,2005-05-28 10:35:23,2006-02-15 22:12:30
2,3,1,1,1185,5.99,2005-06-15 00:54:12,2006-02-15 22:12:30
3,4,1,2,1422,0.99,2005-06-15 18:02:53,2006-02-15 22:12:30
4,5,1,2,1476,9.99,2005-06-15 21:08:46,2006-02-15 22:12:30


In [8]:
inventory_sql = "SELECT * FROM sakila.inventory;"
inventory_df = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_url, source_db, inventory_sql)
inventory_df.head(5)

Unnamed: 0,inventory_id,film_id,store_id,last_update
0,1,1,1,2006-02-15 05:09:17
1,2,1,1,2006-02-15 05:09:17
2,3,1,1,2006-02-15 05:09:17
3,4,1,1,2006-02-15 05:09:17
4,5,1,2,2006-02-15 05:09:17


## Retrieve data from remote sources
The following code blocks load data from the MongoDB cluster and the api endpoint

In [9]:
film_api_endpoint = "https://api.npoint.io/04c66e9121c9572e6442"
film_df = pd.read_json(film_api_endpoint)
film_df.head(5)

Unnamed: 0,title,length,rating,film_id,description,language_id,last_update,rental_rate,release_year,rental_duration,replacement_cost,special_features,original_language_id
0,ACADEMY DINOSAUR,86,PG,1,A Epic Drama of a Feminist And a Mad Scientist...,1,1139979822000,0.99,2006,6,20.99,"Deleted Scenes,Behind the Scenes",
1,ACE GOLDFINGER,48,G,2,A Astounding Epistle of a Database Administrat...,1,1139979822000,4.99,2006,3,12.99,"Trailers,Deleted Scenes",
2,ADAPTATION HOLES,50,NC-17,3,A Astounding Reflection of a Lumberjack And a ...,1,1139979822000,2.99,2006,7,18.99,"Trailers,Deleted Scenes",
3,AFFAIR PREJUDICE,117,G,4,A Fanciful Documentary of a Frisbee And a Lumb...,1,1139979822000,2.99,2006,5,26.99,"Commentaries,Behind the Scenes",
4,AFRICAN EGG,130,G,5,A Fast-Paced Documentary of a Pastry Chef And ...,1,1139979822000,2.99,2006,6,22.99,Deleted Scenes,


In [10]:
query = {}
collection = "customers"
customers_df= get_mongo_dataframe(conn_str, source_db, collection, query)
customers_df.head(5)

Unnamed: 0,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,1139954676000,1139979440000
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,1139954676000,1139979440000
2,3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,1,1139954676000,1139979440000
3,4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1,1139954676000,1139979440000
4,5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,1,1139954676000,1139979440000


## Perform Transformations

In order to store the data into the data warehouse, it must be transformed into a star schema. The star schema used for this data warehouse has the rental as its fact table and has dimension tables for staff, stores, customers, and films. The following code blocks perform any transformations needed to fit this schema. Those transformations include renaming and dropping columns, as well as joining data from other tables to denormalize the data. 

In [11]:
dim_staff_df = staff_df.drop(['address_id', 'picture', 'store_id', 'active', 'username', 'password', 'last_update'], axis=1)
dim_staff_df.rename(columns={"staff_id": "staff_key"}, inplace=True)
dim_staff_df.head(2)

Unnamed: 0,staff_key,first_name,last_name,email
0,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com
1,2,Jon,Stephens,Jon.Stephens@sakilastaff.com


In [12]:
dim_stores_df = stores_df.rename(columns={"store_id": "store_key", "manager_staff_id": "staff_key"})
dim_stores_df.fact_stores_df = pd.merge(dim_stores_df, dim_staff_df, on="staff_key", how='inner')
dim_stores_df.fact_stores_df.drop(['staff_key'], axis=1, inplace=True)
dim_stores_df.rename(columns={
    "first_name": "manager_first_name", 
    "last_name": "manager_first_name",
    "email": "manager_email"
}, inplace=True)
dim_stores_df.head()

  dim_stores_df.fact_stores_df = pd.merge(dim_stores_df, dim_staff_df, on="staff_key", how='inner')


Unnamed: 0,store_key,staff_key,address
0,1,1,47 MySakila Drive
1,2,2,28 MySQL Boulevard


In [13]:
dim_customers_df = customers_df.rename(columns={"customer_id":"customer_key"})
dim_customers_df.drop([
    "store_id",
    "address_id", 
    "active",
    "create_date", 
    "last_update",
], axis=1, inplace=True)
dim_customers_df.head(2)

Unnamed: 0,customer_key,first_name,last_name,email
0,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org
1,2,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org


In [14]:
dim_films_df = film_df.rename(columns={"film_id":"film_key"})
dim_films_df.drop([
    "description", 
    "last_update",
    "rental_rate", 
    "special_features", 
    "original_language_id",
    "language_id"
], axis=1, inplace=True)
dim_films_df.head(2)

Unnamed: 0,title,length,rating,film_key,release_year,rental_duration,replacement_cost
0,ACADEMY DINOSAUR,86,PG,1,2006,6,20.99
1,ACE GOLDFINGER,48,G,2,2006,3,12.99


In [71]:
fact_rentals_df = pd.merge(rentals_df, inventory_df, on='inventory_id', how='inner')
fact_rentals_df.drop(['inventory_id', 'last_update_x', 'last_update_y'], axis=1, inplace=True)
fact_rentals_df.rename(columns={
    "rental_id": "fact_rental_key", 
    "film_id": "film_key",
    "staff_id": "staff_key",
    "store_id": "store_key",
    "customer_id": "customer_key",
}, inplace=True)
order=['fact_rental_key', 'film_key', 'staff_key', 'store_key', 'customer_key', 'rental_date', 'return_date', 'amount']
fact_rentals_df = fact_rentals_df[order]
fact_rentals_df.sort_values(by=['fact_rental_key'], inplace=True)
fact_rentals_df.reset_index(drop=True, inplace=True)
fact_rentals_df.head(10)

Unnamed: 0,fact_rental_key,film_key,staff_key,store_key,customer_key,rental_date,return_date,amount
0,1,80,1,1,130,2005-05-24 22:53:30,2005-05-26 22:04:30,2.99
1,2,333,1,2,459,2005-05-24 22:54:33,2005-05-28 19:40:33,2.99
2,3,373,1,2,408,2005-05-24 23:03:39,2005-06-01 22:12:39,3.99
3,4,535,2,1,333,2005-05-24 23:04:41,2005-06-03 01:43:41,4.99
4,5,450,1,2,222,2005-05-24 23:05:21,2005-06-02 04:33:21,6.99
5,6,613,1,1,549,2005-05-24 23:08:07,2005-05-27 01:32:07,0.99
6,7,870,2,2,269,2005-05-24 23:11:53,2005-05-29 20:34:53,1.99
7,8,510,2,1,239,2005-05-24 23:31:46,2005-05-27 23:33:46,4.99
8,9,565,1,1,126,2005-05-25 00:00:40,2005-05-28 00:22:40,4.99
9,10,396,2,2,399,2005-05-25 00:02:21,2005-05-31 22:44:21,5.99


In [17]:
# Create the database if it doesn't exist
create_db_sql = f"CREATE SCHEMA IF NOT EXISTS {dest_db}"
execute_sql(mysql_uid, mysql_pwd, mysql_url, create_db_sql)

## Add Date Dimension
The following code blocks will set up the date dimension. Before running, ensure to run `create_date_dim.sql` which can be found in the base directory


In [27]:
# Ensure the date dimension is created 
date_dim_sql = "SELECT * FROM sakila_dw.dim_date";
date_dim_df = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_url, source_db, date_dim_sql)
date_dim_df.head(5)

Unnamed: 0,date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,...,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
0,20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
1,20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
2,20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
3,20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
4,20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


In [56]:
# Join the date dimension
date_key_df = date_dim_df.get(['full_date', 'date_key'])
date_key_df = date_key_df.astype({'full_date':'datetime64'})
date_key_df.head(10)



Unnamed: 0,full_date,date_key
0,2000-01-01,20000101
1,2000-01-02,20000102
2,2000-01-03,20000103
3,2000-01-04,20000104
4,2000-01-05,20000105
5,2000-01-06,20000106
6,2000-01-07,20000107
7,2000-01-08,20000108
8,2000-01-09,20000109
9,2000-01-10,20000110


In [72]:

# Get rid of the hours minutes and seconds
fact_rentals_df['rental_date'] = fact_rentals_df['rental_date'].dt.floor("D") 
# Join the date key
fact_rentals_df = pd.merge(
    fact_rentals_df, 
    date_key_df,
    left_on="rental_date", 
    right_on="full_date", 
    how="left"
)
fact_rentals_df.rename(columns={"date_key":"rental_date_key"}, inplace=True)
fact_rentals_df.drop(["full_date", "rental_date"], axis=1, inplace=True)
fact_rentals_df.head(10)

Unnamed: 0,fact_rental_key,film_key,staff_key,store_key,customer_key,return_date,amount,rental_date_key
0,1,80,1,1,130,2005-05-26 22:04:30,2.99,20050524
1,2,333,1,2,459,2005-05-28 19:40:33,2.99,20050524
2,3,373,1,2,408,2005-06-01 22:12:39,3.99,20050524
3,4,535,2,1,333,2005-06-03 01:43:41,4.99,20050524
4,5,450,1,2,222,2005-06-02 04:33:21,6.99,20050524
5,6,613,1,1,549,2005-05-27 01:32:07,0.99,20050524
6,7,870,2,2,269,2005-05-29 20:34:53,1.99,20050524
7,8,510,2,1,239,2005-05-27 23:33:46,4.99,20050524
8,9,565,1,1,126,2005-05-28 00:22:40,4.99,20050525
9,10,396,2,2,399,2005-05-31 22:44:21,5.99,20050525


In [73]:
# Get rid of the hours minutes and seconds
fact_rentals_df['return_date'] = fact_rentals_df['return_date'].dt.floor("D") 
# Join the date key
fact_rentals_df = pd.merge(
    fact_rentals_df, 
    date_key_df,
    left_on="return_date", 
    right_on="full_date", 
    how="left"
)
fact_rentals_df.rename(columns={"date_key":"return_date_key"}, inplace=True)
fact_rentals_df.drop(["full_date", "return_date"], axis=1, inplace=True)
fact_rentals_df.head(10)

Unnamed: 0,fact_rental_key,film_key,staff_key,store_key,customer_key,amount,rental_date_key,return_date_key
0,1,80,1,1,130,2.99,20050524,20050526.0
1,2,333,1,2,459,2.99,20050524,20050528.0
2,3,373,1,2,408,3.99,20050524,20050601.0
3,4,535,2,1,333,4.99,20050524,20050603.0
4,5,450,1,2,222,6.99,20050524,20050602.0
5,6,613,1,1,549,0.99,20050524,20050527.0
6,7,870,2,2,269,1.99,20050524,20050529.0
7,8,510,2,1,239,4.99,20050524,20050527.0
8,9,565,1,1,126,4.99,20050525,20050528.0
9,10,396,2,2,399,5.99,20050525,20050531.0


## Load the data into the data warehouse
Once all transformations are made in memory, the data can be stored in the destination database

In [20]:
table_name = 'dim_customers'
primary_key = 'customer_key'
db_operation = 'insert'
table = dim_customers_df
set_dataframe(mysql_uid, mysql_pwd, mysql_url, dest_db, table, table_name, primary_key, db_operation )

In [21]:

table_name = 'dim_films'
primary_key = 'film_key'
db_operation = 'insert'
table = dim_films_df 
set_dataframe(mysql_uid, mysql_pwd, mysql_url, dest_db, table, table_name, primary_key, db_operation )

In [23]:

table_name = 'dim_staff'
primary_key = 'staff_key'
db_operation = 'insert'
table = dim_staff_df 
set_dataframe(mysql_uid, mysql_pwd, mysql_url, dest_db, table, table_name, primary_key, db_operation )

In [24]:
table_name = 'dim_stores'
primary_key = 'store_key'
db_operation = 'insert'
table = dim_stores_df 
set_dataframe(mysql_uid, mysql_pwd, mysql_url, dest_db, table, table_name, primary_key, db_operation )

In [25]:
table_name = 'fact_rentals'
primary_key = 'fact_rental_key'
db_operation = 'insert'
table = fact_rentals_df 
set_dataframe(mysql_uid, mysql_pwd, mysql_url, dest_db, table, table_name, primary_key, db_operation )

## Ensure the data has been loaded into the database
The following code blocks showcase the functionality of the data warehouse. Various queries are made to the data warehouse regarding the rental business fact that was loaded into the warehouse by this notebook. 

The three example queries to showcase the data warehouse are:
- Checking how many rentals each employee has handled
- Getting the top rented movies
- Getting the total rental revenue

In [None]:
# Check how many rentals each employee has handled 
rental_count_sql = """
SELECT staff.first_name, staff.last_name, COUNT(rentals.fact_rental_key) AS rental_count FROM sakila_dw.fact_rentals AS rentals
INNER JOIN sakila_dw.dim_staff AS staff
ON rentals.staff_key = staff.staff_key
GROUP BY staff.first_name, staff.last_name;
"""
rental_count_df = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_url, dest_db, rental_count_sql)
rental_count_df.head()

Unnamed: 0,first_name,last_name,rental_count
0,Jon,Stephens,8004
1,Mike,Hillyer,8040


In [None]:
# Get the top 100 most rented movies
top_movies_sql = """ 
SELECT film.title, COUNT(rental.fact_rental_key) AS rental_count FROM sakila_dw.fact_rentals AS rental
INNER JOIN sakila_dw.dim_films AS film
ON rental.film_key = film.film_key
GROUP BY film.title
ORDER BY COUNT(rental.fact_rental_key) DESC
LIMIT 100;
"""
top_movies_df = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_url, dest_db, top_movies_sql)
top_movies_df


Unnamed: 0,title,rental_count
0,BUCKET BROTHERHOOD,34
1,ROCKETEER MOTHER,33
2,GRIT CLOCKWORK,32
3,RIDGEMONT SUBMARINE,32
4,JUGGLER HARDLY,32
...,...,...
95,STAGECOACH ARMAGEDDON,26
96,MALKOVICH PET,26
97,BROTHERHOOD BLANKET,26
98,FELLOWSHIP AUTUMN,26


In [74]:
# Get total rental profit
total_revenue_sql = "SELECT SUM(amount) AS total_profit from sakila_dw.fact_rentals;"
total_revenue_df = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_url, dest_db, total_revenue_sql)
total_revenue_df.head()

Unnamed: 0,total_profit
0,67406.56
