## ETL Data Processor - Film Rental Information

### Author: Sabrina Hendricks

#### Import the Necessary Libraries

In [2]:
import os
import json
import numpy
import datetime
import certifi
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine

In [3]:
print(f"Running SQL Alchemy Version: {sqlalchemy.__version__}")
print(f"Running PyMongo Version: {pymongo.__version__}")

Running SQL Alchemy Version: 1.4.39
Running PyMongo Version: 4.6.2


#### Declare & Assign Connection Variables for the MongoDB Server, the MySQL Server & Databases

In [4]:
mysql_uid = "root"
mysql_pwd = "Passw0rd123"
mysql_hostname = "localhost"

atlas_cluster_name = "cluster_name.xxxxx"
atlas_user_name = ""
atlas_password = "password"

conn_str = {"local" : f"mongodb://localhost:27017/",
    "atlas" : f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.mongodb.net"
}

src_dbname = "sakila"
dst_dbname = "sakila_dw"

print(f"Local Connection String: {conn_str['local']}")
print(f"Atlas Connection String: {conn_str['atlas']}")

Local Connection String: mongodb://localhost:27017/
Atlas Connection String: mongodb+srv://:password@cluster_name.xxxxx.mongodb.net


#### Define Functions for Getting Data From and Setting Data Into Databases

In [5]:
def get_sql_dataframe(user_id, pwd, host_name, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

#### Populate MongoDB with Source Data

##### Importing data for customers, films, and rentals

In [6]:
client = pymongo.MongoClient(conn_str["local"])
db = client[src_dbname]

# Gets the path of the Current Working Directory for this Notebook, and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"customers" : 'customers.json',
              "films" : 'films.json',
              "rentals" : 'rentals.json'
             }

for file in json_files:
    db.drop_collection(file)
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile)
        file = db[file]
        result = file.insert_many(json_object)
        #print(f"{file} was successfully loaded.")

        
client.close()  

### 1.0. Create and Populate the New Dimension Tables
#### 1.1. Extract Data for Customes and Films from the Source MongoDB Collections Into DataFrames

In [20]:
query = {} # Select all elements (columns), and all documents (rows).
collection = "customers"

df_customers = get_mongo_dataframe(conn_str['local'], src_dbname, collection, query)
df_customers.head(2)

Unnamed: 0,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20


In [15]:
query = {} # Select all elements (columns), and all documents (rows).
collection = "films"

df_films = get_mongo_dataframe(conn_str['local'], src_dbname, collection, query)
df_films.head(2)

Unnamed: 0,film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 05:03:42


#### 1.2 Load Data from CSV file for Inventories

In [16]:
df_inventories = pd.read_csv ('./data/inventories.csv')
df_inventories.head(2)

Unnamed: 0,inventory_id,film_id,store_id,last_update
0,1,1,1,2006-02-15 05:09:17
1,2,1,1,2006-02-15 05:09:17


#### 1.3. Load Data to Lookup the Invoice Date Keys from the Date Dimension Table in  MySQL 

##### 1.3.1. Get the Data from the Date Dimension Table.

In [17]:
sql_dim_date = "SELECT date_key, full_date FROM sakila_dw.dim_date;"
df_dim_date = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_dim_date)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64[ns]').dt.date
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


#### 1.4. Perform Any Necessary Transformations to the DataFrames

In [21]:
# 1. Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_customers.insert(0, "customer_key", range(1, df_customers.shape[0]+1))

# 2. Drop date, address_id, active, create_date, last_update columns as they will not be used
df_customers.drop(['address_id', 'active', 'create_date', 'last_update'], axis=1, inplace=True)

df_customers.head(2)

Unnamed: 0,customer_key,customer_id,store_id,first_name,last_name,email
0,1,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org
1,2,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org


In [22]:
# 1. Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_films.insert(0, "film_key", range(1, df_films.shape[0]+1))

# 2. Drop columns that won't be used
df_films.drop(['last_update'], axis=1, inplace=True)

df_films.head(2)

Unnamed: 0,film_key,film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features
0,1,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes"
1,2,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,,3,4.99,48,12.99,G,"Trailers,Deleted Scenes"


In [23]:
# 1. Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_inventories.insert(0, "inventory_key", range(1, df_inventories.shape[0]+1))

# 2. Drop columns that won't be used
df_inventories.drop(['last_update'], axis=1, inplace=True)

df_inventories.head(2)

Unnamed: 0,inventory_key,inventory_id,film_id,store_id
0,1,1,1,1
1,2,2,1,1


#### 1.5. Load the Transformed DataFrames into the New Data Warehouse by Creating New Tables

In [24]:
#Customers dimension table
dataframe = df_customers
table_name = 'dim_customers'
primary_key = 'customer_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [25]:
#Films dimension table
dataframe = df_films
table_name = 'dim_films'
primary_key = 'film_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [26]:
#Inventories dimension table
dataframe = df_inventories
table_name = 'dim_inventories'
primary_key = 'inventory_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, dataframe, table_name, primary_key, db_operation)

#### 1.6. Validate that the New Dimension Tables were Created.

In [27]:
#Customers
sql_customers = "SELECT * FROM sakila_dw.dim_customers;"
df_dim_customers = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_customers)
df_dim_customers.head(2)

Unnamed: 0,customer_key,customer_id,store_id,first_name,last_name,email
0,1,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org
1,2,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org


In [29]:
#Films
sql_films = "SELECT * FROM sakila_dw.dim_films;"
df_dim_films = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_films)
df_dim_films.head(2)

Unnamed: 0,film_key,film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features
0,1,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes"
1,2,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,,3,4.99,48,12.99,G,"Trailers,Deleted Scenes"


In [30]:
#Inventories
sql_inventories = "SELECT * FROM sakila_dw.dim_inventories;"
df_dim_inventories = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_inventories)
df_dim_inventories.head(2)

Unnamed: 0,inventory_key,inventory_id,film_id,store_id
0,1,1,1,1
1,2,2,1,1


### 2.0. Create and Populate the New Fact Table
#### 2.1. Extract Data from the Source MongoDB Collections Into DataFrames

In [129]:
query = {} # Select all elements (columns), and all documents (rows).

collection = "rentals"

df_fact_rentals = get_mongo_dataframe(conn_str['local'], src_dbname, collection, query)
df_fact_rentals.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53


#### 2.2. Lookup the DateKeys from the Date Dimension Table

In [130]:
# Lookup the Surrogate Primary Key (date_key) that Corresponds to the "rental_date" Column.
df_dim_rental_date = df_dim_date.rename(columns={"date_key" : "rental_date_key", "full_date" : "rental_date"})
df_fact_rentals.rental_date = df_fact_rentals.rental_date.astype('datetime64[ns]').dt.date
df_fact_rentals = pd.merge(df_fact_rentals, df_dim_rental_date, on='rental_date', how='left')
df_fact_rentals.drop(['rental_date'], axis=1, inplace=True)
df_fact_rentals.head(2)

Unnamed: 0,rental_id,inventory_id,customer_id,return_date,staff_id,last_update,rental_date_key
0,1,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53,20050524
1,2,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53,20050524


In [131]:
# Lookup the Surrogate Primary Key (date_key) that Corresponds to the "return_date" Column.
df_dim_return_date = df_dim_date.rename(columns={"date_key" : "return_date_key", "full_date" : "return_date"})
df_fact_rentals.return_date = df_fact_rentals.return_date.astype('datetime64[ns]').dt.date
df_fact_rentals = pd.merge(df_fact_rentals, df_dim_return_date, on='return_date', how='left')
df_fact_rentals.drop(['return_date'], axis=1, inplace=True)
df_fact_rentals.head(2)

Unnamed: 0,rental_id,inventory_id,customer_id,staff_id,last_update,rental_date_key,return_date_key
0,1,367,130,1,2006-02-15 21:30:53,20050524,20050526
1,2,1525,459,1,2006-02-15 21:30:53,20050524,20050528


In [132]:
# Lookup the Surrogate Primary Key (date_key) that Corresponds to the "last_update" Column.
df_dim_last_update = df_dim_date.rename(columns={"date_key" : "last_update_key", "full_date" : "last_update"})
df_fact_rentals.last_update = df_fact_rentals.last_update.astype('datetime64[ns]').dt.date
df_fact_rentals = pd.merge(df_fact_rentals, df_dim_last_update, on='last_update', how='left')
df_fact_rentals.drop(['last_update'], axis=1, inplace=True)
df_fact_rentals.head(2)

Unnamed: 0,rental_id,inventory_id,customer_id,staff_id,rental_date_key,return_date_key,last_update_key
0,1,367,130,1,20050524,20050526,20060215
1,2,1525,459,1,20050524,20050528,20060215


#### 2.3. Lookup the Primary Keys from the Dimension Tables

##### 2.3.1. First, fetch the Surrogate Primary Key and the Business Key from each Dimension table.

In [133]:
sql_dim_customers = "SELECT customer_key, customer_id FROM sakila_dw.dim_customers;"
df_dim_customers = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_dim_customers)
df_dim_customers.head(2)

Unnamed: 0,customer_key,customer_id
0,1,1
1,2,2


In [134]:
sql_dim_films = "SELECT film_key, film_id FROM sakila_dw.dim_films;"
df_dim_films = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_dim_films)
df_dim_films.head(2)

Unnamed: 0,film_key,film_id
0,1,1
1,2,2


In [135]:
#Including the film_id column to get a reference to film_key later on for rentals fact table. 
#The rentals fact table originally does not reference film directly, only through inventory. 
sql_dim_inventories = "SELECT inventory_key, inventory_id, film_id FROM sakila_dw.dim_inventories;"
df_dim_inventories = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_dim_inventories)
df_dim_inventories.head(2)

Unnamed: 0,inventory_key,inventory_id,film_id
0,1,1,1
1,2,2,1


##### 2.3.2. Next, use Business Keys to lookup corresponding Surrogate Primary Keys in the Dimension tables

The rentals fact table contains a reference to the *dim_customers* table by way of the *customer_id* column that enables us to lookup the corresponding *customer_key*.

In [136]:
#Merge the "df_fact_rentals" and "dim_customers" dataframes on the 'customer_id' to
# get the 'customer_key'. Then drop the 'customer_id' column and display the results.
df_fact_rentals = pd.merge(df_fact_rentals, df_dim_customers, on='customer_id', how='inner')
df_fact_rentals.drop(['customer_id'], axis=1, inplace=True)
df_fact_rentals.head(2)

Unnamed: 0,rental_id,inventory_id,staff_id,rental_date_key,return_date_key,last_update_key,customer_key
0,1,367,1,20050524,20050526,20060215,130
1,746,4272,2,20050529,20050602,20060215,130


I want to add a column for *film_key* to the rentals fact table. The *dim_inventories* table contains a reference to the *dim_films* table by way of the *film_id* column that enables us to look up the corresponding *film_key*. 

In [138]:
#Merge the "dim_inventories" and "dim_films" dataframes on the "film_id" to get the 'film_key'. Then drop 'film_id' column. 
df_dim_inventories = pd.merge(df_dim_inventories, df_dim_films, on='film_id', how='inner')
df_dim_inventories.drop(['film_id'], axis=1, inplace=True)
df_dim_inventories.head(2)

Unnamed: 0,inventory_key,inventory_id,film_key
0,1,1,1
1,2,2,1


The rentals fact table contains a reference to the *dim_inventories* table by way of the *inventory_id* column that enables us to lookup the corresponding *inventory_key*. The *film_key* is also now included in the fact table. 

In [139]:
#Merge the "df_fact_rentals" and "dim_inventories" dataframes on the 'inventory_id' to
# get the 'inventory_key'. Then drop the 'inventory_id' column and display the results.
df_fact_rentals = pd.merge(df_fact_rentals, df_dim_inventories, on='inventory_id', how='inner')
df_fact_rentals.drop(['inventory_id'], axis=1, inplace=True)
df_fact_rentals.head(2)

Unnamed: 0,rental_id,staff_id,rental_date_key,return_date_key,last_update_key,customer_key,inventory_key,film_key
0,1,1,20050524,20050526,20060215,130,367,80
1,750,1,20050529,20050530,20060215,269,730,159


In [142]:
# Insert a new "fact_rental_key" column, with an ever-incrementing numeric value, to serve as the surrogate primary key. 
df_fact_rentals.insert(0, 'fact_rental_key', range(1, df_fact_rentals.shape[0]+1))
df_fact_rentals.head(2)

Unnamed: 0,fact_rental_key,rental_id,staff_id,rental_date_key,return_date_key,last_update_key,customer_key,inventory_key,film_key
0,1,1,1,20050524,20050526,20060215,130,367,80
1,2,750,1,20050529,20050530,20060215,269,730,159


#### 2.4. Perform Necessary Transformations to the DataFrames

In [143]:
# Drop staff_id column since not being used
df_fact_rentals.drop(['staff_id'], axis=1, inplace=True)

# Reorder the Column
ordered_columns = ['fact_rental_key','rental_id'
                   ,'customer_key','inventory_key'
                   ,'film_key','rental_date_key','return_date_key', 'last_update_key']

df_fact_rentals = df_fact_rentals[ordered_columns]
df_fact_rentals.head(2)

Unnamed: 0,fact_rental_key,rental_id,customer_key,inventory_key,film_key,rental_date_key,return_date_key,last_update_key
0,1,1,130,367,80,20050524,20050526,20060215
1,2,750,269,730,159,20050529,20050530,20060215


#### 2.5. Load New Fact Table into the sakila_dw Data Warehouse

In [144]:
dataframe = df_fact_rentals
table_name = 'fact_rentals'
primary_key = 'fact_rental_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, dataframe, table_name, primary_key, db_operation)

#### 2.6. Validate that the New Fact Tables were Created

In [148]:
sql_rentals = "SELECT * FROM sakila_dw.fact_rentals;"
df_fact_rentals = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_rentals)
df_fact_rentals.head(2)

Unnamed: 0,fact_rental_key,rental_id,customer_key,inventory_key,film_key,rental_date_key,return_date_key,last_update_key
0,1,1,130,367,80,20050524,20050526,20060215
1,2,750,269,730,159,20050529,20050530,20060215


### 3.0. Demonstrate that the New Data Warehouse Exists and Functions Properly

This query displays a list of films along with the total number of times each film has been rented. For each film, it also shows the name of the customer who most recently rented that film. The results are sorted in descending order based on the total number of rentals for each film.

In [163]:
sql_rental_summary = """
    SELECT
        df.title AS film_title,
        COUNT(fr.rental_id) AS total_rentals,
        dc.first_name AS customer_name,
        MAX(fr.rental_date_key) AS latest_rental_date
    FROM
        `{0}`.`fact_rentals` AS fr
    JOIN
        `{0}`.`dim_films` AS df ON fr.film_key = df.film_key
    JOIN
        `{0}`.`dim_customers` AS dc ON fr.customer_key = dc.customer_key
    GROUP BY
        df.title, dc.first_name
    ORDER BY
        total_rentals DESC;
""".format(dst_dbname)

In [164]:
df_fact_rental_summary = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_rental_summary)
df_fact_rental_summary

Unnamed: 0,film_title,total_rentals,customer_name,latest_rental_date
0,BLANKET BEVERLY,1,CHARLOTTE,20050524
1,CLOSER BANG,1,CASSANDRA,20050529
2,CLONES PINOCCHIO,1,MINNIE,20050527
3,CAUSE DATE,1,DANNY,20050529
4,AFFAIR PREJUDICE,1,DEANNA,20050527
...,...,...,...,...
207,BLADE POLISH,1,JULIE,20050530
208,CHEAPER CLYDE,1,JOAN,20050530
209,BOOGIE AMELIE,1,MARJORIE,20050530
210,ALASKA PHANTOM,1,WENDY,20050530
