# DS 2002 Midterm Project Spring 2023 - Cheryl Bai

This project was done using the Sakila sample dataset from Microsoft.

#### Import the necessary libraries

In [1]:
import os
import json
import numpy
import datetime
import pandas as pd

import pymongo
from sqlalchemy import create_engine

#### Declare & Assign Connection Variables for the MongoDB Server, the MySQL Server & Databases with which You'll be Working 

In [2]:
host_name = "localhost"
host_ip = "127.0.0.1"
port = "3306"
mysql_uid = "root"
mysql_pwd = "Passw0rd123"

atlas_cluster_name = "DS2002"
atlas_user_name = "eqk9vb"
atlas_password = "RtDwy7sYyhTm1pFB"

conn_str = {"local" : f"mongodb://localhost:27017/",
    "atlas" : f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.mongodb.net"
}

src_dbname = "sakila"
dst_dbname = "sakila_dw"

print(f"Local Connection String: {conn_str['local']}")
print(f"Atlas Connection String: {conn_str['atlas']}")

Local Connection String: mongodb://localhost:27017/
Atlas Connection String: mongodb+srv://eqk9vb:RtDwy7sYyhTm1pFB@DS2002.mongodb.net


#### Define Functions for Getting Data From and Setting Data Into Databases

In [3]:
def get_sql_dataframe(user_id, pwd, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@localhost/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe


def set_dataframe(user_id, pwd, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@localhost/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

#### Create the New Data Warehouse Database and Switch the Connection Context 

In [4]:
conn_str_sql = f"mysql+pymysql://{mysql_uid}:{mysql_pwd}@{host_name}"
sqlEngine = create_engine(conn_str_sql, pool_recycle=3600)

sqlEngine.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
sqlEngine.execute(f"CREATE DATABASE `{dst_dbname}`;")
sqlEngine.execute(f"USE {dst_dbname};")

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x27b624c7370>

#### Populate MongoDB with Source Data

In [5]:
client = pymongo.MongoClient(conn_str["local"])
db = client[src_dbname]

# Gets the path of the Current Working Directory for this Notebook, and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"film" : 'sakila_film.json',
              "store" : 'sakila_store.json',
             }

for file in json_files:
    db.drop_collection(file)
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile)
        file = db[file]
        result = file.insert_many(json_object)
        print(f"{file} was successfully loaded.")

        
client.close()        

Collection(Database(MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True), 'sakila'), 'film') was successfully loaded.
Collection(Database(MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True), 'sakila'), 'store') was successfully loaded.


### 1.0 Create and Populate the Dimension Tables
#### 1.1 Extract Data from the Source MySQL Database Tables

In [6]:
sql_staff = "SELECT * FROM sakila.staff;"
df_staff = get_sql_dataframe(mysql_uid, mysql_pwd, src_dbname, sql_staff)
df_staff.head(2)

Unnamed: 0,staff_id,first_name,last_name,address_id,picture,email,store_id,active,username,password,last_update
0,1,Mike,Hillyer,3,b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\...,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-15 03:57:16
1,2,Jon,Stephens,4,,Jon.Stephens@sakilastaff.com,2,1,Jon,,2006-02-15 03:57:16


#### 1.2 Extract Data from the Source MongoDB Collections into Dataframes

In [7]:
query = {}
collection = "film"

df_film = get_mongo_dataframe(conn_str['local'], src_dbname, collection, query)
df_film.head(2)

Unnamed: 0,film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,0,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,0,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 05:03:42


#### 1.3 Extract Data from the Local Source

In [8]:
df_customer = pd.read_csv("sakila_customer.csv")
df_customer.head(2)

Unnamed: 0,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20


#### 1.4 Perform Any Necessary Transformations

In [9]:
drop_cols = ['address_id', 'picture', 'email', 'active', 'username', 'password', 'last_update', 'store_id']
df_staff.drop(drop_cols, axis=1, inplace=True)
df_staff.rename(columns={"staff_id":"staff_key"}, inplace=True)

df_staff.head(2)

Unnamed: 0,staff_key,first_name,last_name
0,1,Mike,Hillyer
1,2,Jon,Stephens


In [10]:
drop_cols = ['description', 'language_id', 'original_language_id', 'length', 'rating', 'special_features', 'last_update']
df_film.drop(drop_cols, axis=1, inplace=True)
df_film.rename(columns={"film_id":"film_key"}, inplace=True)

df_film.head(2)

Unnamed: 0,film_key,title,release_year,rental_duration,rental_rate,replacement_cost
0,1,ACADEMY DINOSAUR,2006,6,0.99,20.99
1,2,ACE GOLDFINGER,2006,3,4.99,12.99


In [11]:
drop_cols = ['email', 'address_id', 'active', 'create_date', 'last_update', 'store_id']
df_customer.drop(drop_cols, axis=1, inplace=True)
df_customer.rename(columns={"customer_id":"customer_key"}, inplace=True)

df_customer.head(2)

Unnamed: 0,customer_key,first_name,last_name
0,1,MARY,SMITH
1,2,PATRICIA,JOHNSON


#### 1.5 Load the Transformed Dataframes into the New Data Warehouse by Creating New Tables

In [12]:
db_operation = "insert"

tables = [('dim_staff', df_staff, 'staff_key'),
          ('dim_film', df_film, 'film_key'),
          ('dim_customer', df_customer, 'customer_key')]

for table_name, dataframe, primary_key in tables:
    set_dataframe(mysql_uid, mysql_pwd, dst_dbname, dataframe, table_name, primary_key, db_operation)

#### 1.6 Validate that the New Dimension Tables were Created

In [13]:
sql_staff = "SELECT * FROM sakila_dw.dim_staff;"
df_dim_staff = get_sql_dataframe(mysql_uid, mysql_pwd, dst_dbname, sql_staff)
df_dim_staff.head(2)

Unnamed: 0,staff_key,first_name,last_name
0,1,Mike,Hillyer
1,2,Jon,Stephens


In [14]:
sql_film = "SELECT * FROM sakila_dw.dim_film;"
df_dim_film = get_sql_dataframe(mysql_uid, mysql_pwd, dst_dbname, sql_film)
df_dim_film.head(2)

Unnamed: 0,film_key,title,release_year,rental_duration,rental_rate,replacement_cost
0,1,ACADEMY DINOSAUR,2006,6,0.99,20.99
1,2,ACE GOLDFINGER,2006,3,4.99,12.99


In [15]:
sql_customer = "SELECT * FROM sakila_dw.dim_customer;"
df_dim_customer = get_sql_dataframe(mysql_uid, mysql_pwd, dst_dbname, sql_customer)
df_dim_customer.head(2)

Unnamed: 0,customer_key,first_name,last_name
0,1,MARY,SMITH
1,2,PATRICIA,JOHNSON


### 2.0 Create and Populate the Fact Table

#### 2.1 Get All the Data from Each of the Tables Involved

In [16]:
sql_rental = "SELECT * FROM sakila.rental;"
df_rental = get_sql_dataframe(mysql_uid, mysql_pwd, src_dbname, sql_rental)
df_rental.rental_date = df_rental.rental_date.astype('datetime64').dt.date
df_rental.return_date = df_rental.return_date.astype('datetime64').dt.date
df_rental.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
0,1,2005-05-24,367,130,2005-05-26,1,2006-02-15 21:30:53
1,2,2005-05-24,1525,459,2005-05-28,1,2006-02-15 21:30:53


In [17]:
sql_inventory = "SELECT * FROM sakila.inventory;"
df_inventory = get_sql_dataframe(mysql_uid, mysql_pwd, src_dbname, sql_inventory)
df_inventory.head(2)

Unnamed: 0,inventory_id,film_id,store_id,last_update
0,1,1,1,2006-02-15 05:09:17
1,2,1,1,2006-02-15 05:09:17


#### 2.2 Join the Rental and Inventory Tables

In [18]:
df_fact_rental = pd.merge(df_rental, df_inventory, on='inventory_id', how='left')
df_fact_rental.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update_x,film_id,store_id,last_update_y
0,1,2005-05-24,367,130,2005-05-26,1,2006-02-15 21:30:53,80,1,2006-02-15 05:09:17
1,2,2005-05-24,1525,459,2005-05-28,1,2006-02-15 21:30:53,333,2,2006-02-15 05:09:17


#### 2.3 Get the Data from the Date Dimension Table

In [19]:
sql_date = "SELECT date_key, full_date FROM sakila_dw.dim_date;"
df_dim_date = get_sql_dataframe(mysql_uid, mysql_pwd, src_dbname, sql_date)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64').dt.date
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


#### 2.4 Lookup the DateKeys from the Date Dimension Table

In [20]:
df_dim_rental_date = df_dim_date.rename(columns={"date_key" : "rental_date_key", "full_date" : "rental_date"})
df_fact_rental = pd.merge(df_fact_rental, df_dim_rental_date, on='rental_date', how='left')
df_fact_rental.drop(['rental_date'], axis=1, inplace=True)
df_fact_rental.head(2)

Unnamed: 0,rental_id,inventory_id,customer_id,return_date,staff_id,last_update_x,film_id,store_id,last_update_y,rental_date_key
0,1,367,130,2005-05-26,1,2006-02-15 21:30:53,80,1,2006-02-15 05:09:17,
1,2,1525,459,2005-05-28,1,2006-02-15 21:30:53,333,2,2006-02-15 05:09:17,


In [21]:
df_dim_return_date = df_dim_date.rename(columns={"date_key" : "return_date_key", "full_date" : "return_date"})
df_fact_rental = pd.merge(df_fact_rental, df_dim_return_date, on='return_date', how='left')
df_fact_rental.drop(['return_date'], axis=1, inplace=True)
df_fact_rental.head(2)

Unnamed: 0,rental_id,inventory_id,customer_id,staff_id,last_update_x,film_id,store_id,last_update_y,rental_date_key,return_date_key
0,1,367,130,1,2006-02-15 21:30:53,80,1,2006-02-15 05:09:17,,
1,2,1525,459,1,2006-02-15 21:30:53,333,2,2006-02-15 05:09:17,,


#### 2.5 Perform Any Additional Transformations

In [22]:
# Drop some columns of no particular interest
drop_columns = ['inventory_id', 'last_update_x', 'store_id', 'last_update_y']
df_fact_rental.drop(drop_columns, axis=1, inplace=True)

# Rename Foreign Key Columns
df_fact_rental.rename(columns={'rental_id':'rental_key', 'customer_id': 'customer_key', 'staff_id':'staff_key',
                              'film_id':'film_key'}, inplace=True)

# Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_fact_rental.insert(0, "fact_rental_key", range(1, df_fact_rental.shape[0]+1))
df_fact_rental.head(2)

Unnamed: 0,fact_rental_key,rental_key,customer_key,staff_key,film_key,rental_date_key,return_date_key
0,1,1,130,1,80,,
1,2,2,459,1,333,,


#### 2.6 Write the Dataframe Back to the Database

In [23]:
table_name = "fact_rental"
primary_key = "fact_rental_key"
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, dst_dbname, df_fact_rental, table_name, primary_key, db_operation)

### 3.0 Validate that the New Fact Table was Created

In [37]:
sql_test = """
    SELECT films.`title` AS `film`,
        AVG(films.`rental_rate`) AS `avg_rental_rate`
    FROM `{0}`.`fact_rental` AS rentals
    INNER JOIN `{0}`.`dim_film` AS films
    ON rentals.`film_key` = films.`film_key`
    GROUP BY films.`title`;
""".format(dst_dbname)

df_test = get_sql_dataframe(mysql_uid, mysql_pwd, src_dbname, sql_test)

In [38]:
df_test.head(5)

Unnamed: 0,film,avg_rental_rate
0,BLANKET BEVERLY,2.99
1,FREAKY POCUS,2.99
2,GRADUATE LORD,2.99
3,LOVE SUICIDES,0.99
4,IDOLS SNATCHERS,2.99


In [39]:
sql_test = """
    SELECT customers.`last_name` AS `customer_name`,
        COUNT(customers.`customer_key`) AS `total_customer_count`
    FROM `{0}`.`fact_rental` AS rentals
    INNER JOIN `{0}`.`dim_customer` AS customers
    ON rentals.customer_key = customers.customer_key
    GROUP BY customers.`last_name`;
""".format(dst_dbname)

df_test = get_sql_dataframe(mysql_uid, mysql_pwd, src_dbname, sql_test)

In [40]:
df_test.head(5)

Unnamed: 0,customer_name,total_customer_count
0,HUNTER,24
1,PURDY,27
2,HANSEN,21
3,WALTERS,30
4,ROMERO,34
