### Midterm Project

In [14]:
import os
import numpy
import pandas as pd
import pymongo
import json
from sqlalchemy import create_engine

#### Connect to SQL

In [3]:
host_name = "localhost"
host_ip = "127.0.0.1"
port = "3306"
user_id="root"
pwd = "Passw0rd123"

src_dbname = "sakila"
dst_dbname = "sakila_dim_mart"

In [4]:
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

In [5]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)

sqlEngine.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
sqlEngine.execute(f"CREATE DATABASE `{dst_dbname}`;")
sqlEngine.execute(f"USE {dst_dbname};")

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x23209b89bb0>

##### Create Dim Mart for Sakila Rentals:
1. Get relevant tables from Sakila database in SQL
* payment as part of fact table
* staff as a dimension table
* customer as a dimension table

In [32]:
sql_payment = "SELECT * FROM sakila.payment;"
df_payment = get_dataframe(user_id, pwd, host_name, src_dbname, sql_payment)
df_payment.rename(columns={"last_update":"payment_last_update"}, inplace=True)
df_payment.head(2)

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,payment_date,payment_last_update
0,1,1,1,76,2.99,2005-05-25 11:30:37,2006-02-15 22:12:30
1,2,1,1,573,0.99,2005-05-28 10:35:23,2006-02-15 22:12:30


In [30]:
sql_staff = "SELECT * FROM sakila.staff;"
df_staff = get_dataframe(user_id, pwd, host_name, src_dbname, sql_staff)
df_staff.head(2)

Unnamed: 0,staff_id,first_name,last_name,address_id,picture,email,store_id,active,username,password,last_update
0,1,Mike,Hillyer,3,b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\...,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-15 03:57:16
1,2,Jon,Stephens,4,,Jon.Stephens@sakilastaff.com,2,1,Jon,,2006-02-15 03:57:16


In [29]:
sql_customer = "SELECT * FROM sakila.customer;"
df_customer = get_dataframe(user_id, pwd, host_name, src_dbname, sql_customer)
#df_customer.rename(columns={"last_update":"customer_last_update", "create_date":"customer_create_date"}, inplace=True)
df_customer.head(2)

Unnamed: 0,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20


2. Get data from MongoDB

Populate MongoDB with JSON file of staff data from Sakila

In [22]:
mysql_uid = "root"
mysql_pwd = "Passw0rd123"

#atlas_cluster_name = "sandbox.zibbf"
#atlas_user_name = "m001-student"
#atlas_password = "m001-mongodb-basics"

conn_str = {"local" : f"mongodb://localhost:27017/",
#    "atlas" : f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.mongodb.net"
}

print(f"Local Connection String: {conn_str['local']}")
#print(f"Atlas Connection String: {conn_str['atlas']}")

Local Connection String: mongodb://localhost:27017/


In [23]:
def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe

In [15]:
client = pymongo.MongoClient(conn_str["local"])
db = client[src_dbname]

# Gets the path of the Current Working Directory for this Notebook, and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"inventory" : 'sakila_inventory.json'}

for file in json_files:
    db.drop_collection(file)
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile)
        file = db[file]
        result = file.insert_many(json_object)
        
client.close()

In [26]:
query = {}
collection = "inventory"

df_inventory = get_mongo_dataframe(conn_str['local'], src_dbname, collection, query)
df_inventory.rename(columns={"last_update":"inventory_last_update"}, inplace=True)
df_inventory.head(2)

Unnamed: 0,inventory_id,film_id,store_id,inventory_last_update
0,1,1,1,2006-02-15 05:09:17
1,2,1,1,2006-02-15 05:09:17


3. Get data from file system

In [50]:
df_rental = pd.read_csv(os.path.join(data_dir, "sakila_rental.csv"), sep=';')
df_rental.rename(columns={"last_update":"rental_last_update"}, inplace=True)
df_rental.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,rental_last_update
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53


### Join Payment, Rental, and Inventory Tables to Create Fact Table

In [73]:
df_rental_inv = pd.merge(df_rental, df_inventory, on="inventory_id", how="left")
print(df_rental_inv.shape)
df_rental_inv.head(2)

(16044, 10)


Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,rental_last_update,film_id,store_id,inventory_last_update
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53,80,1,2006-02-15 05:09:17
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53,333,2,2006-02-15 05:09:17


In [75]:
# we join on customer_id and staff_id as wlel because each rental_id corresponds to one customer/staff and so does each payment_id
# the staff and customer should match for matching rentals and payments
df_rental_inv_pay = pd.merge(df_rental_inv, df_payment, on=["rental_id", "customer_id", "staff_id"], how="left")
df_rental_inv_pay.drop(columns=["payment_id"], inplace=True)
print(df_rental_inv_pay.shape)
df_rental_inv_pay.head(2)

(16044, 13)


Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,rental_last_update,film_id,store_id,inventory_last_update,amount,payment_date,payment_last_update
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53,80,1,2006-02-15 05:09:17,2.99,2005-05-24 22:53:30,2006-02-15 22:13:16
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53,333,2,2006-02-15 05:09:17,,NaT,NaT


In [76]:
ordered_columns = ["rental_id", "inventory_id", "customer_id", "staff_id", "film_id", 
                   "store_id", "amount", "rental_date", "payment_date", "return_date", "rental_last_update", "inventory_last_update",
                  "payment_last_update"]

df_fctrentals = df_rental_inv_pay[ordered_columns]
df_fctrentals.rename(columns={"customer_id":"customer_key",
                              "staff_id":"staff_key",
                              "film_id": "film_key",
                             "store_id":"store_key"}, inplace=True)
df_fctrentals.insert(0, "fact_rental_key", range(1, df_fctrentals.shape[0]+1))

print(df_fctrentals.shape)

(16044, 14)


In [77]:
df_fctrentals.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 16044 entries, 0 to 16043
Data columns (total 14 columns):
 #   Column                 Non-Null Count  Dtype         
---  ------                 --------------  -----         
 0   fact_rental_key        16044 non-null  int64         
 1   rental_id              16044 non-null  int64         
 2   inventory_id           16044 non-null  int64         
 3   customer_key           16044 non-null  int64         
 4   staff_key              16044 non-null  int64         
 5   film_key               16044 non-null  int64         
 6   store_key              16044 non-null  int64         
 7   amount                 7966 non-null   float64       
 8   rental_date            16044 non-null  object        
 9   payment_date           7966 non-null   datetime64[ns]
 10  return_date            15861 non-null  object        
 11  rental_last_update     16044 non-null  object        
 12  inventory_last_update  16044 non-null  object        
 13  p

In [78]:
df_fctrentals.head(2)

Unnamed: 0,fact_rental_key,rental_id,inventory_id,customer_key,staff_key,film_key,store_key,amount,rental_date,payment_date,return_date,rental_last_update,inventory_last_update,payment_last_update
0,1,1,367,130,1,80,1,2.99,2005-05-24 22:53:30,2005-05-24 22:53:30,2005-05-26 22:04:30,2006-02-15 21:30:53,2006-02-15 05:09:17,2006-02-15 22:13:16
1,2,2,1525,459,1,333,2,,2005-05-24 22:54:33,NaT,2005-05-28 19:40:33,2006-02-15 21:30:53,2006-02-15 05:09:17,NaT


#### Write back fact table to SQL database

In [79]:
db_operation = "insert"
set_dataframe(user_id, pwd, host_name, dst_dbname, df_fctrentals, "fact_rentals", "fact_rental_key", db_operation)

In [80]:
sql_check = f"""SELECT * FROM {dst_dbname}.fact_rentals;"""
df_check = get_dataframe(user_id, pwd, host_name, src_dbname, sql_check)
print(df_check.shape)
df_check.head(2)

(16044, 14)


Unnamed: 0,fact_rental_key,rental_id,inventory_id,customer_key,staff_key,film_key,store_key,amount,rental_date,payment_date,return_date,rental_last_update,inventory_last_update,payment_last_update
0,1,1,367,130,1,80,1,2.99,2005-05-24 22:53:30,2005-05-24 22:53:30,2005-05-26 22:04:30,2006-02-15 21:30:53,2006-02-15 05:09:17,2006-02-15 22:13:16
1,2,2,1525,459,1,333,2,,2005-05-24 22:54:33,NaT,2005-05-28 19:40:33,2006-02-15 21:30:53,2006-02-15 05:09:17,NaT
