# Midterm Project - Miranda Khoury (mrk6xcb)

## Overview

For this project, I use the Sakila sample database. The fact table, **fact_rentals**, contains information about movie rentals at the fictional Sakila movie rental store. I combine information from Sakila's **rental**, **inventory**, and **payment** tables into **fact_rentals** to streamline the dataset and reduce the number of dimension tables in the final database. My designed database also includes four dimension tables, **dim_customers**, **dim_films**, **dim_staff**, and **dim_time**.

## Prerequisites


#### Import the necessary libraries

In [1]:
import os
import numpy
import mysql.connector
import pandas as pd
from sqlalchemy import create_engine
import json
import datetime
import pymongo

#### Declare & Assign Connection Variables for the MySQL Server, MongoDB Server, & Databases with which You'll be Working 

In [2]:
host_name = "localhost"
host_ip = "127.0.0.1"
port = "3306"

# my credentials to log into my MySQL Workbench
mysql_uid = "root"
mysql_pwd = 'Passw0rd123'

# my credentials to log into my personal MongoDB cluster
atlas_cluster_name = "testCluster"
atlas_user_name = "main_user"
atlas_password = "butterfly"

# connection string to connect to my MongoDB cluster
my_conn_str = "mongodb+srv://main_user:butterfly@testcluster.gymn7h2.mongodb.net/?retryWrites=true&w=majority"

mysql_src_db = "sakila"
mongo_src_db = "sakila_mongo"
dst_db = "sakila_dw_final74"

#### Define Functions for Getting Data From and Setting Data Into Databases

In [3]:
def get_sql_dataframe(user_id, pwd, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@localhost/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe


def set_dataframe(user_id, pwd, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@localhost/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

#### Create the New Data Warehouse database, and to Use it, Switch the Connection Context.

In [4]:
conn_str = f"mysql+pymysql://{mysql_uid}:{mysql_pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)

sqlEngine.execute(f"DROP DATABASE IF EXISTS `{dst_db}`;")
sqlEngine.execute(f"CREATE DATABASE `{dst_db}`;")
sqlEngine.execute(f"USE {dst_db};")

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x17aa3eb2b50>

## Sourcing Data from A SQL Server (MySQL)

First, I read in some of the data I'll be working with from MySQL: the **rental**, **staff**, and **customer** tables from the Sakila database loaded on my MySQL server.

#### Extract Data from the Source Database Tables

In [5]:
sakila_customers = "SELECT * FROM sakila.customer;"
df_customers = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_src_db, sakila_customers)
df_customers.head(2)

Unnamed: 0,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-14 23:57:20
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-14 23:57:20


In [6]:
sakila_rentals = "SELECT * FROM sakila.rental;"
df_rentals = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_src_db, sakila_rentals)
df_rentals.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 16:30:53
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 16:30:53


In [7]:
sakila_staff = "SELECT * FROM sakila.staff;"
df_staff = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_src_db, sakila_staff)
df_staff.head(2)

Unnamed: 0,staff_id,first_name,last_name,address_id,picture,email,store_id,active,username,password,last_update
0,1,Mike,Hillyer,3,b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\...,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-14 22:57:16
1,2,Jon,Stephens,4,,Jon.Stephens@sakilastaff.com,2,1,Jon,,2006-02-14 22:57:16


## Sourcing Data from A NoSQL Server (MongoDB)

Then, I read in some more data I'll be working with, the **inventory** and **film** tables from Sakila. First, I load the JSON versions of these tables into MongoDB. Then, we can pretend this data was only available from MongoDB to start with, and I will read it back into this Jupyter Notebook to conduct transformations on.

#### Populate MongoDB with Source Data

In [8]:
client = pymongo.MongoClient(my_conn_str)
db = client[mongo_src_db]

# note that the data are all in the same folder that this JN file is in, so the default data path 
# is the data path needed to access the files
data_dir = os.getcwd()

json_files = {"inventory" : 'inventory.json',
              "film" : "sakila_film.json"
            }

for file in json_files:
    db.drop_collection(file)
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile)
        file = db[file]
        result = file.insert_many(json_object)

        
client.close()        


#### Extract Data from the Source MongoDB Collections Into DataFrames

In [9]:
query = {}
collection = "inventory"

df_inventory = get_mongo_dataframe(my_conn_str, mongo_src_db, collection, query)
df_inventory.head(2)

Unnamed: 0,inventory_id,film_id,store_id,last_update
0,1,1,1,2006-02-15 00:09:17
1,2,1,1,2006-02-15 00:09:17


In [10]:
query = {}
collection = "film"

df_films = get_mongo_dataframe(my_conn_str, mongo_src_db, collection, query)
df_films.head(2)

Unnamed: 0,film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,0,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 00:03:42
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,0,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 00:03:42


## Sourcing Data from A File System

For the last step of the Extract phase, I read in the final pieces of data from a local file system. The file I'll be reading in is in CSV format.

In [11]:
# note that the data are all in the same folder that this JN file is in, so the default data path 
# is the data path needed to access the files
data_dir = os.getcwd()
data_file = os.path.join(data_dir, 'sakila_payment.csv')

df_payments = pd.read_csv(data_file, header=0, index_col=0)
df_payments.head(3)

Unnamed: 0_level_0,customer_id,staff_id,rental_id,amount,payment_date,last_update
payment_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
1,1,1,76,2.99,2005-05-25 11:30:37,2006-02-15 17:12:30
2,1,1,573,0.99,2005-05-28 10:35:23,2006-02-15 17:12:30
3,1,1,1185,5.99,2005-06-15 00:54:12,2006-02-15 17:12:30


## Transformation of Data Using Pandas Dataframes

Now that all the raw tables -- **rental**, **film**, **inventory**, **customer**, **staff**, and **payment** -- have been read in from their various sources, I can perform necessary transformations on the tables and combine some to form the fact table.

First, I perform some transformations on the dimension tables.

#### Transform the Customers Dimension Table

In [12]:
drop_cols = ['store_id','address_id','active','create_date','last_update']
df_customers.drop(drop_cols, axis=1, inplace=True)
df_customers.rename(columns={"customer_id":"customer_key"}, inplace=True)

df_customers.head(2)

Unnamed: 0,customer_key,first_name,last_name,email
0,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org
1,2,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org


#### Transform the Films Dimension Table

In [13]:
drop_cols = ['language_id','original_language_id', 'last_update']
df_films.drop(drop_cols, axis=1, inplace=True)
df_films.rename(columns={"film_id":"film_key"}, inplace=True)

df_films.head(2)

Unnamed: 0,film_key,title,description,release_year,rental_duration,rental_rate,length,replacement_cost,rating,special_features
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes"
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,3,4.99,48,12.99,G,"Trailers,Deleted Scenes"


#### Transform the Staff Dimension Table

In [14]:
drop_cols = ['address_id','picture','password', 'last_update', 'active', 'store_id']
df_staff.drop(drop_cols, axis=1, inplace=True)
df_staff.rename(columns={"staff_id":"staff_key"}, inplace=True)

df_staff.head(2)

Unnamed: 0,staff_key,first_name,last_name,email,username
0,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,Mike
1,2,Jon,Stephens,Jon.Stephens@sakilastaff.com,Jon


Then, I prep the inventory, payment, and rental tables to be ready to be joined to form the fact rental table.
#### Pre-transforming the Tables that Will Become Fact Rentals

In [15]:
drop_cols = ['last_update', 'return_date']
df_rentals.drop(drop_cols, axis=1, inplace=True)
df_rentals.rename(columns={"rental_id":"rental_key", "inventory_id":"inventory_key", "customer_id":"customer_key","staff_id":"staff_key"}, inplace=True)

df_rentals.head(2)

Unnamed: 0,rental_key,rental_date,inventory_key,customer_key,staff_key
0,1,2005-05-24 22:53:30,367,130,1
1,2,2005-05-24 22:54:33,1525,459,1


In [16]:
drop_cols = ['last_update', "store_id"]
df_inventory.drop(drop_cols, axis=1, inplace=True)
df_inventory.rename(columns={"inventory_id":"inventory_key"}, inplace=True)

df_inventory.head(2)

Unnamed: 0,inventory_key,film_id
0,1,1
1,2,1


In [17]:
drop_cols = ['last_update', "customer_id", "staff_id"]
df_payments.drop(drop_cols, axis=1, inplace=True)
df_payments.rename(columns={"rental_id":"rental_key", "payment_id":"payment_key"}, inplace=True)

df_payments.head(2)

Unnamed: 0_level_0,rental_key,amount,payment_date
payment_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
1,76,2.99,2005-05-25 11:30:37
2,573,0.99,2005-05-28 10:35:23


Now we can merge the tables together.
#### Merging Rentals, Payment, and Inventory into Fact Rentals

In [18]:
# First, join payments to rentals on the rental key.
# There is a one-to-one relationship between each rental and payment, i.e. there is a unique payment for each rental
# and a unique rental for each payment. We can therefore use an inner join.
df_fact_rentals1 = pd.merge(df_rentals, df_payments, on='rental_key', how='inner')
df_fact_rentals1.rename(columns={"amount":"payment_amount"}, inplace=True)

df_fact_rentals1.head(2)

Unnamed: 0,rental_key,rental_date,inventory_key,customer_key,staff_key,payment_amount,payment_date
0,1,2005-05-24 22:53:30,367,130,1,2.99,2005-05-24 22:53:30
1,2,2005-05-24 22:54:33,1525,459,1,2.99,2005-05-24 22:54:33


In [19]:
# Then, join inventory to rentals on the inventory key. Drop the inventory key.
df_fact_rentals = pd.merge(df_fact_rentals1, df_inventory, on='inventory_key', how='left')
df_fact_rentals.drop(['inventory_key'], axis=1, inplace=True)
df_fact_rentals.rename(columns={"film_id":"film_key"}, inplace=True)

df_fact_rentals.head(3).sort_values(by=['rental_key'])

Unnamed: 0,rental_key,rental_date,customer_key,staff_key,payment_amount,payment_date,film_key
0,1,2005-05-24 22:53:30,130,1,2.99,2005-05-24 22:53:30,80
1,2,2005-05-24 22:54:33,459,1,2.99,2005-05-24 22:54:33,333
2,3,2005-05-24 23:03:39,408,1,3.99,2005-05-24 23:03:39,373


In [20]:
# Perform some final manipulations: dropping the unnecessary hour/minute/second data from the fact table.
 
df_fact_rentals['rental_date'] = df_fact_rentals['rental_date'].apply(lambda x: x.strftime('%Y-%m-%d')).astype('datetime64')
df_fact_rentals['payment_date'] = df_fact_rentals['payment_date'].astype('datetime64').apply(lambda x: x.strftime('%Y-%m-%d')).astype('datetime64')

df_fact_rentals.head(2)

Unnamed: 0,rental_key,rental_date,customer_key,staff_key,payment_amount,payment_date,film_key
0,1,2005-05-24,130,1,2.99,2005-05-24,80
1,2,2005-05-24,459,1,2.99,2005-05-24,333


## Making A Date Dimension Table Using SQL
Now that the raw data tables are all read in and have been transformed most of the way, I make a data table from scratch -- one for the new date dimension I'm going to add to the data warehouse. I do this by running the SQL script for creating a **dim_date** table in MySQL inside my destination database. With that done, I then integrate it into the data warehouse.

##### Get the Data from the Date Dimension Table.
First, I fetch the Surrogate Primary Key (date_key) and the Business Key (full_date) from the Date Dimension table using the **get_dataframe()** function. I also cast the **full_date** column to the **datetime64** data type using the **.astype()** function.

In [21]:
sql_dim_date = "SELECT date_key, full_date FROM dim_date;"
df_dim_date = get_sql_dataframe(mysql_uid, mysql_pwd, dst_db, sql_dim_date)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64')
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


##### Look Up the DateKeys from the Date Dimension Table.
Next, for each date typed column in the fact table, I look up the corresponding Surrogate Primary Key column.

In [22]:
# Look up the Surrogate Primary Key (date_key) that corresponds to the "rental_date" column.
df_dim_rental_date = df_dim_date.rename(columns={"date_key" : "rental_date_key", "full_date" : "rental_date"})
df_fact_rentals = pd.merge(df_fact_rentals, df_dim_rental_date, on='rental_date', how='left')
df_fact_rentals.drop(['rental_date'], axis=1, inplace=True) 
df_fact_rentals.head(4)

Unnamed: 0,rental_key,customer_key,staff_key,payment_amount,payment_date,film_key,rental_date_key
0,1,130,1,2.99,2005-05-24,80,20050524
1,2,459,1,2.99,2005-05-24,333,20050524
2,3,408,1,3.99,2005-05-24,373,20050524
3,4,333,2,4.99,2005-05-24,535,20050524


In [23]:
# Look up the Surrogate Primary Key (date_key) that corresponds to the "pyment_date" column.
df_dim_paid_date = df_dim_date.rename(columns={"date_key" : "payment_date_key", "full_date" : "payment_date"})
df_fact_rentals = pd.merge(df_fact_rentals, df_dim_paid_date, on='payment_date', how='left')
df_fact_rentals.drop(['payment_date'], axis=1, inplace=True)
df_fact_rentals.head(2)

Unnamed: 0,rental_key,customer_key,staff_key,payment_amount,film_key,rental_date_key,payment_date_key
0,1,130,1,2.99,80,20050524,20050524
1,2,459,1,2.99,333,20050524,20050524


## Loading the Tables Into the Destination Database

Then, I'll load the finalized tables into my destination database, and the data warehouse is done!

In [24]:
# Loading in the dimension tables
db_operation = "insert"

tables = [('dim_customers', df_customers, 'customer_key'),
          ('dim_staff', df_staff, 'staff_key'),
          ('dim_films', df_films, 'film_key')]

for table_name, dataframe, primary_key in tables:
    set_dataframe(mysql_uid, mysql_pwd, dst_db, dataframe, table_name, primary_key, db_operation)

In [25]:
# Loading in the fact table

db_operation = "insert"

table_name = 'fact_rentals'
dataframe = df_fact_rentals
primary_key = 'rental_key'

set_dataframe(mysql_uid, mysql_pwd, dst_db, dataframe, table_name, primary_key, db_operation)

## Querying the Finalized Data Warehouse
Finally, I'll write some queries to prove that my data warehouse was implemented successfully.

In [26]:
# Let's see how many times each movie was rented
# To do this, we join dim_films to fact_rentals, group by title, and count how many rental transactions have occurred with
# the title
sql_query = '''
SELECT f.title AS 'Movie Title',
COUNT(*) AS 'Number of Rentals'
FROM dim_films AS f
LEFT OUTER JOIN fact_rentals AS r
ON f.film_key = r.film_key
GROUP BY f.title;
'''

df_test = get_sql_dataframe(mysql_uid, mysql_pwd, dst_db, sql_query)
df_test.head(5)

Unnamed: 0,Movie Title,Number of Rentals
0,ACADEMY DINOSAUR,23
1,ACE GOLDFINGER,7
2,ADAPTATION HOLES,12
3,AFFAIR PREJUDICE,23
4,AFRICAN EGG,12


In [27]:
# Now let's figure out how many customers each staff members has helped
# We do this by joining dim_staff, dim_customers, and fact_rentals along customer/staff keys and grouping by staff member
sql_query = '''
SELECT s.last_name AS 'Employee Surname',
COUNT(c.last_name) AS 'Customers Served'
FROM dim_staff AS s
INNER JOIN fact_rentals AS r
ON s.staff_key = r.staff_key
INNER JOIN dim_customers AS c
ON r.customer_key = c.customer_key
GROUP BY s.last_name;
'''

df_test = get_sql_dataframe(mysql_uid, mysql_pwd, dst_db, sql_query)
df_test.head(5)

Unnamed: 0,Employee Surname,Customers Served
0,Hillyer,8040
1,Stephens,8004


The project is complete! I successfully extracted data from 3 different sources, transformed it in Python, loaded it into a destination database, and queried from the final data warehouse to prove that it is operational.