# Midterm Assignment

Emma Singer (ecs9ne)

In this project I will use the sakila database from MYSQL to create a datawarehouse with a fact table and dimension tables that represent the process of renting dvds from the store.  I will create a staff dimension table using data pulled directly from MYSQL, a customers dimension table using data from a csv file, and an inventory dimension table using data from a MongoDB (that was uploaded via a json downloaded from MYSQL).  I will also create a date dimension table in MYSQL that will create date keys in my fact table which are validated in this script.  Finally, I will demonstrate the functionality of the database by authoring a SQL query to answer questions about the rental business process using the dimension and fact tables I created.

In [1]:
# Import necessary libraries for the project

In [2]:
import os
import json
import pprint
import datetime
import pymongo
import requests
import requests.exceptions
import numpy
from sqlalchemy import create_engine
import pandas as pd

In [3]:
# Connect to MYSQL server and identify source and destination databases

In [4]:
host_name = "localhost"
host_ip = "127.0.0.1"
port = "3306"
user_id = "root"
pwd = "Passw0rd123"

src_dbname = "sakila"
dst_dbname = "sakila_dw"

# Creating sakila datawarehouse

In [5]:
# Defining functions for getting data from and setting data into dataframes (lab 3)

In [6]:
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

In [7]:
# Drop if exists then create sakila_dw (lab 3)

In [8]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)

sqlEngine.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
sqlEngine.execute(f"CREATE DATABASE `{dst_dbname}`;")
sqlEngine.execute(f"USE {dst_dbname};")

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x271e739bc70>

# Creating staff dimension table using data pulled directly from MYSQL

In [9]:
# Extract staff data from MYSQL

In [10]:
sql_staff = "SELECT * FROM sakila.staff;"
df_staff = get_dataframe(user_id, pwd, host_name, src_dbname, sql_staff)
df_staff.head(2)

Unnamed: 0,staff_id,first_name,last_name,address_id,picture,email,store_id,active,username,password,last_update
0,1,Mike,Hillyer,3,b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\...,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-15 03:57:16
1,2,Jon,Stephens,4,,Jon.Stephens@sakilastaff.com,2,1,Jon,,2006-02-15 03:57:16


In [11]:
# Perform necessary transformations to data

In [12]:
drop_cols = ['picture','password','last_update']
df_staff.drop(drop_cols, axis=1, inplace=True)
df_staff.rename(columns={"staff_id":"staff_key", "address_id":"address_key", "store_id":"store_key"}, inplace=True)

df_staff.head(2)

Unnamed: 0,staff_key,first_name,last_name,address_key,email,store_key,active,username
0,1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,Mike
1,2,Jon,Stephens,4,Jon.Stephens@sakilastaff.com,2,1,Jon


In [13]:
# Insert data into `dim_staff` table

In [14]:
db_operation = "insert"

tables = [('dim_staff', df_staff, 'staff_key')]

for table_name, dataframe, primary_key in tables:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

# Creating customers dimension table using data from a csv file

SQL query used to join and download customer and address tables into one csv file:

SELECT * FROM sakila.customer
INNER JOIN sakila.address
ON customer.address_id = address.address_id;

In [15]:
# Reading data from csv file

In [16]:
data_dir = os.path.join(os.getcwd(), 'Midterm_data')
customers = os.path.join(data_dir, 'sakila_customer_address.csv')
df_customers = pd.read_csv(customers, header=0)

df_customers.head()

Unnamed: 0,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update,address_id.1,address,address2,district,city_id,postal_code,phone,location,last_update.1
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20,5,1913 Hanoi Way,,Nagasaki,463,35200,28303384290,...,2014-09-25 22:31:53
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20,6,1121 Loja Avenue,,California,449,17886,838635286649,...,2014-09-25 22:34:01
2,3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,1,2006-02-14 22:04:36,2006-02-15 04:57:20,7,692 Joliet Street,,Attika,38,83579,448477190408,...,2014-09-25 22:31:07
3,4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1,2006-02-14 22:04:36,2006-02-15 04:57:20,8,1566 Inegl Manor,,Mandalay,349,53561,705814003527,...,2014-09-25 22:32:18
4,5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,1,2006-02-14 22:04:36,2006-02-15 04:57:20,9,53 Idfu Parkway,,Nantou,361,42399,10655648674,...,2014-09-25 22:33:16


In [17]:
# Performing necessary transformations

In [18]:
drop_cols = ['active','create_date','last_update','address_id.1','address2','location', 'last_update.1']
df_customers.drop(drop_cols, axis=1, inplace=True)
df_customers.rename(columns={"customer_id":"customer_key", "store_id":"store_key", "address_id":"address_key"
                             , "city_id":"city_key"}, inplace=True)

df_customers.head(2)

Unnamed: 0,customer_key,store_key,first_name,last_name,email,address_key,address,district,city_key,postal_code,phone
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1913 Hanoi Way,Nagasaki,463,35200,28303384290
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1121 Loja Avenue,California,449,17886,838635286649


In [19]:
# load `dim_customers` into MYSQL

In [20]:
db_operation = "insert"

tables = [('dim_customers', df_customers, 'customer_key')]

for table_name, dataframe, primary_key in tables:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

# Creating inventory dimension table using data uploaded from a json file into MongoDB

SQL query used to join inventory and film tables and download into one json file:

SELECT * FROM sakila.inventory
INNER JOIN sakila.film
ON inventory.film_id = film.film_id;

In [21]:
# Connecting to MongoDB locally

In [22]:
mysql_uid = "root"
mysql_pwd = "Passw0rd123"

atlas_cluster_name = "sandbox.zibbf"
atlas_user_name = "m001-student"
atlas_password = "m001-mongodb-basics"

conn_str = {"local" : f"mongodb://localhost:27017/",
    "atlas" : f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.mongodb.net"
}

src_dbname = "sakila"
dst_dbname = "sakila_dw"

print(f"Local Connection String: {conn_str['local']}")
print(f"Atlas Connection String: {conn_str['atlas']}")

Local Connection String: mongodb://localhost:27017/
Atlas Connection String: mongodb+srv://m001-student:m001-mongodb-basics@sandbox.zibbf.mongodb.net


In [23]:
# Defining functions for getting data into databases via MongoDB (lab 4)

In [24]:
def get_sql_dataframe(user_id, pwd, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@localhost/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe


def set_dataframe(user_id, pwd, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@localhost/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

In [25]:
# Uploading json files (downloaded from MYSQL) into MongoDB

In [26]:
client = pymongo.MongoClient(conn_str["local"])
db = client[src_dbname]

# Gets the path of the Current Working Directory for this Notebook, and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd(), 'Midterm_data')

json_files = {"inventory" : 'sakila_inventory_film.json'}

for file in json_files:
    db.drop_collection(file)
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile)
        file = db[file]
        result = file.insert_many(json_object)
        #print(f"{file} was successfully loaded.")

        
client.close()  

In [27]:
# Extract data from MongoDB into inventory data frame

In [28]:
query = {}
collection = "inventory"

df_inventory = get_mongo_dataframe(conn_str['local'], src_dbname, collection, query)
df_inventory.head(2)

Unnamed: 0,inventory_id,film_id,store_id,last_update,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features
0,1,1,1,2006-02-15 05:03:42,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes"
1,2,1,1,2006-02-15 05:03:42,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes"


In [29]:
# Making necessary transformations to data

In [30]:
drop_cols = ['last_update','description','language_id','original_language_id','special_features']
df_inventory.drop(drop_cols, axis=1, inplace=True)
df_inventory.rename(columns={"inventory_id":"inventory_key", "film_id":"film_key", "store_id":"store_key"}, inplace=True)

df_inventory.head(2)

Unnamed: 0,inventory_key,film_key,store_key,title,release_year,rental_duration,rental_rate,length,replacement_cost,rating
0,1,1,1,ACADEMY DINOSAUR,2006,6,0.99,86,20.99,PG
1,2,1,1,ACADEMY DINOSAUR,2006,6,0.99,86,20.99,PG


In [31]:
# Creating `dim_inventory` table and inserting data into it

In [32]:
dataframe = df_inventory
table_name = 'dim_inventory'
primary_key = 'inventory_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, dst_dbname, dataframe, table_name, primary_key, db_operation)


In [33]:
# Validate `dim_inventory` table

In [34]:
sql_inventory = "SELECT * FROM sakila_dw.dim_inventory;"
df_dim_inventory = get_sql_dataframe(mysql_uid, mysql_pwd, dst_dbname, sql_inventory)
df_dim_inventory.head(2)

Unnamed: 0,inventory_key,film_key,store_key,title,release_year,rental_duration,rental_rate,length,replacement_cost,rating
0,1,1,1,ACADEMY DINOSAUR,2006,6,0.99,86,20.99,PG
1,2,1,1,ACADEMY DINOSAUR,2006,6,0.99,86,20.99,PG


# Creating fact table using data from MYSQL

In [35]:
# Get data from rental table in MYSQL

In [36]:
sql_rentals = "SELECT * FROM sakila.rental;"
df_rentals = get_dataframe(user_id, pwd, host_name, src_dbname, sql_rentals)
df_rentals.rename(columns={"rental_id":"rental_key"}, inplace=True)
df_rentals.head(2)

Unnamed: 0,rental_key,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53


In [37]:
# Get data from payment table

In [38]:
sql_payment = "SELECT * FROM sakila.payment;"
df_payment = get_dataframe(user_id, pwd, host_name, src_dbname, sql_payment)
df_payment.rename(columns={"payment_id":"payment_key", "rental_id":"rental_key"}, inplace=True)
df_payment.head(2)

Unnamed: 0,payment_key,customer_id,staff_id,rental_key,amount,payment_date,last_update
0,1,1,1,76,2.99,2005-05-25 11:30:37,2006-02-15 22:12:30
1,2,1,1,573,0.99,2005-05-28 10:35:23,2006-02-15 22:12:30


In [39]:
# Get data from customers table

In [40]:
sql_customers = "SELECT * FROM sakila.customer;"
df_customers = get_dataframe(user_id, pwd, host_name, src_dbname, sql_customers)
df_customers.rename(columns={"customer_id":"customer_key"}, inplace=True)
df_customers.head(2)

Unnamed: 0,customer_key,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20


In [41]:
# Using merge to join rental table with payment table 

In [42]:
df_rentals = pd.merge(df_rentals, df_payment, on='rental_key', how='inner')
df_rentals.drop(['customer_id_y', 'staff_id_y', 'last_update_x', 'last_update_y','payment_date'], axis=1, inplace=True)
df_rentals.rename(columns={"inventory_id":"inventory_key", "customer_id_x":"customer_key", "staff_id_x":"staff_key"}, inplace=True)
df_rentals.head(2)

Unnamed: 0,rental_key,rental_date,inventory_key,customer_key,return_date,staff_key,payment_key,amount
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,3504,2.99
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,12377,2.99


In [43]:
# Using merge to join rental table with inventory table

In [44]:
df_fact_rental = pd.merge(df_rentals, df_inventory, on='inventory_key', how='inner')
df_fact_rental.drop(['release_year', 'length', 'rating'], axis=1, inplace=True)
df_fact_rental.rename(columns={"film_id":"film_key", "store_id":"store_key", "amount":"payment_amount"}, inplace=True)
df_fact_rental.head(2)

Unnamed: 0,rental_key,rental_date,inventory_key,customer_key,return_date,staff_key,payment_key,payment_amount,film_key,store_key,title,rental_duration,rental_rate,replacement_cost
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,3504,2.99,80,1,BLANKET BEVERLY,7,2.99,21.99
1,1577,2005-06-16 04:03:28,367,327,2005-06-24 22:40:28,2,8828,3.99,80,1,BLANKET BEVERLY,7,2.99,21.99


In [45]:
# Using merge to join fact rental table with customers table

In [46]:
df_fact_rental = pd.merge(df_fact_rental, df_customers, on='customer_key', how='inner')
df_fact_rental.drop(['store_id','email','address_id','active','create_date','last_update'], axis=1, inplace=True)
df_fact_rental.rename(columns={"first_name":"customer_first_name","last_name":"customer_last_name"}, inplace=True)
df_fact_rental.head(2)

Unnamed: 0,rental_key,rental_date,inventory_key,customer_key,return_date,staff_key,payment_key,payment_amount,film_key,store_key,title,rental_duration,rental_rate,replacement_cost,customer_first_name,customer_last_name
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,3504,2.99,80,1,BLANKET BEVERLY,7,2.99,21.99,CHARLOTTE,HUNTER
1,2535,2005-06-19 01:39:04,901,130,2005-06-28 01:33:04,2,3510,2.99,200,2,CURTAIN VIDEOTAPE,7,0.99,27.99,CHARLOTTE,HUNTER


In [47]:
# Additional transformation to fact_rental table

In [48]:
df_fact_rental.rename(columns={"amount":"payment_amount"}, inplace=True)
ordered_columns = ['rental_key','inventory_key','customer_key','staff_key','payment_key','film_key','store_key'
                   ,'title','payment_amount','rental_rate','replacement_cost','customer_first_name'
                   ,'customer_last_name','rental_date','return_date','rental_duration']

df_fact_rental = df_fact_rental[ordered_columns]

df_fact_rental.head(2)

Unnamed: 0,rental_key,inventory_key,customer_key,staff_key,payment_key,film_key,store_key,title,payment_amount,rental_rate,replacement_cost,customer_first_name,customer_last_name,rental_date,return_date,rental_duration
0,1,367,130,1,3504,80,1,BLANKET BEVERLY,2.99,2.99,21.99,CHARLOTTE,HUNTER,2005-05-24 22:53:30,2005-05-26 22:04:30,7
1,2535,901,130,2,3510,200,2,CURTAIN VIDEOTAPE,2.99,0.99,27.99,CHARLOTTE,HUNTER,2005-06-19 01:39:04,2005-06-28 01:33:04,7


# Creating and Integrating Date Dimension

In [49]:
# Turn rental and return date columns into datetime date columns for merge

In [50]:
df_fact_rental['rental_date'] = pd.to_datetime(df_fact_rental['rental_date']).dt.date
df_fact_rental['return_date'] = pd.to_datetime(df_fact_rental['return_date']).dt.date

In [51]:
# Get data from date dimension table

In [52]:
sql_dim_date = "SELECT date_key, full_date FROM sakila_dw.dim_date;"
df_dim_date = get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_date)
df_dim_date['full_date'] = pd.to_datetime(df_dim_date['full_date']).dt.date
df_dim_date.head()

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02
2,20000103,2000-01-03
3,20000104,2000-01-04
4,20000105,2000-01-05


In [53]:
# Lookup the Surrogate Primary Key (date_key) that Corresponds to the "rental_date" column

In [54]:
df_dim_rental_date = df_dim_date.rename(columns={"date_key" : "rental_date_key", "full_date" : "rental_date"})
df_fact_rental = pd.merge(df_fact_rental, df_dim_rental_date, on='rental_date', how='inner')
df_fact_rental.drop(['rental_date'], axis=1, inplace=True)
df_fact_rental.head(2)

Unnamed: 0,rental_key,inventory_key,customer_key,staff_key,payment_key,film_key,store_key,title,payment_amount,rental_rate,replacement_cost,customer_first_name,customer_last_name,return_date,rental_duration,rental_date_key
0,1,367,130,1,3504,80,1,BLANKET BEVERLY,2.99,2.99,21.99,CHARLOTTE,HUNTER,2005-05-26,7,20050524
1,2535,901,130,2,3510,200,2,CURTAIN VIDEOTAPE,2.99,0.99,27.99,CHARLOTTE,HUNTER,2005-06-28,7,20050619


In [55]:
df_dim_return_date = df_dim_date.rename(columns={"date_key" : "return_date_key", "full_date" : "return_date"})
df_fact_rental = pd.merge(df_fact_rental, df_dim_return_date, on='return_date', how='inner')
df_fact_rental.drop(['return_date'], axis=1, inplace=True)
df_fact_rental.head(2)

Unnamed: 0,rental_key,inventory_key,customer_key,staff_key,payment_key,film_key,store_key,title,payment_amount,rental_rate,replacement_cost,customer_first_name,customer_last_name,rental_duration,rental_date_key,return_date_key
0,1,367,130,1,3504,80,1,BLANKET BEVERLY,2.99,2.99,21.99,CHARLOTTE,HUNTER,7,20050524,20050526
1,16,389,316,2,8554,86,1,BOOGIE AMELIE,4.99,4.99,11.99,STEVEN,CURLEY,6,20050525,20050526


In [56]:
# Write dataframe back to database

In [57]:
table_name = "fact_rental"
primary_key = "rental_key"
db_operation = "insert"

set_dataframe(user_id, pwd, dst_dbname, df_fact_rental, table_name, primary_key, db_operation)

# SQL statements to demonstrate functionality

In [58]:
# Return last names of customers with their average amount paid renting dvds when helped by employee Mike

In [59]:
sql_test = """
    SELECT customers.`last_name`
        , ROUND(AVG(rentals.`payment_amount`), 3) AS `avg_payment_amount`
    FROM sakila_dw.`fact_rental` AS rentals
    INNER JOIN sakila_dw.`dim_customers` AS customers
    ON rentals.`customer_key` = customers.`customer_key`
    INNER JOIN sakila_dw.`dim_staff` AS staff
    ON rentals.`staff_key` = staff.`staff_key`
    WHERE staff.`staff_key` = '1'
    GROUP BY rentals.`customer_key`
    ORDER BY customers.`last_name`;
    """.format(dst_dbname)

df_test = get_dataframe(user_id, pwd, host_name, src_dbname, sql_test)

In [60]:
df_test.head()

Unnamed: 0,last_name,avg_payment_amount
0,ABNEY,4.99
1,ADAM,3.323
2,ADAMS,4.99
3,ALEXANDER,4.99
4,ALLARD,3.99
