## DS2002-Midterm 
### Author: Emily McMahon
##### Overview: The process began by populating three dataframes from the Sakila database's data tables, which were later merged into a single fact table. Before merging, the time component was removed from the date columns. The `dim_date` table was then used to generate secondary keys for the rental and return dates in the fact table. Since the rental dataframe only included these two date columns, a separate rental dimension table was deemed unnecessary. The remaining two dataframes were transformed into dimension tables, and all three dimension tables, along with the fact table, were subsequently loaded back into MySQL. Finally, the average purchase amount was calculated to demonstrate proper functionality.  

#### Import the Necessary Libraries


In [3]:
import os
import json
import numpy
import datetime
import certifi
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine, text

In [4]:
print(f"Running SQL Alchemy Version: {sqlalchemy.__version__}")
print(f"Running PyMongo Version: {pymongo.__version__}")

Running SQL Alchemy Version: 2.0.30
Running PyMongo Version: 4.10.1


In [7]:
host_name = "127.0.0.1"
port = "3306"
user_id = "root"
pwd = "lollypop3232"

src_dbname = "sakila"
dst_dbname = "sakila_dm"


mysql_args = {
    "uid" : "root",
    "pwd" : "lollypop3232",
    "hostname" : "127.0.0.1",
    "dbname" : "sakila"
}

# The 'cluster_location' must either be "atlas" or "local".
mongodb_args = {
    "user_name" : "feu6sx",
    "password" : "Passw0rd1234",
    "cluster_name" : "cluster0",
    "cluster_subnet" : "r1kyv",
    "cluster_location" : "atlas", # "local"
    "db_name" : "sakila_tables"
}

#### Define Functions for Getting Data From and Setting Data Into Databases

In [10]:
def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(sql_query, connection);
    #connection.close()
    
    return dframe
    

def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    #connection.close()


def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    #mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    #mongo_client.close()

#### Create the New Data Warehouse database, and to Use it, Switch the Connection Context.

In [13]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}:{port}/{src_dbname}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)
connection = sqlEngine.connect()

#connection.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
connection.execute(text(f"DROP DATABASE IF EXISTS `{dst_dbname}`;"))
#connection.execute(f"CREATE DATABASE `{dst_dbname}`;")
connection.execute(text(f"CREATE DATABASE `{dst_dbname}`;"))
#connection.execute(f"USE {dst_dbname};")
connection.execute(text(f"USE {dst_dbname};"))


#connection.close()

<sqlalchemy.engine.cursor.CursorResult at 0x14b793bd0>

#### Populate MongoDB with Source Data
You only need to run this cell once; however, the operation is *idempotent*.  In other words, it can be run multiple times without changing the end result.

In [16]:
client = get_mongo_client(**mongodb_args)

# Gets the path of the Current Working Directory for this Notebook,
# and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd(), 'Downloads','sakila-db')

json_files = {"customer" : 'sakila_customer.json'
             }

set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)         

### Create and Populate the New Dimension Tables¶
#### Extract Customer Data from the Source MongoDB Collection Into DataFrames

In [18]:
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "customer"

df_customer = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_customer.head(2)

Unnamed: 0,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20


#### Extract Rental Data from the Source MongoDB Collection Into DataFrames

In [22]:
sql_rental = "SELECT * FROM sakila.rental;"
df_rental= get_sql_dataframe(sql_rental, **mysql_args)
df_rental.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53


#### Extract Date & Time Data from the Sourse MySQL Schema and put into DataFrames
##### Precurser: Lab 2c was run in MySQL using sakila_dm instead of northwind_dw in order to populate the destination database. This must be done before running the following cell.

In [27]:
sql_dim_date = "SELECT date_key, full_date FROM sakila_dm.dim_date;"
df_dim_date=get_sql_dataframe(sql_dim_date, **mysql_args)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64[ns]')
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


#### For the second extraction source, extract Payment Data from a Local Source (my personal computer) into DataFrames.

In [30]:
# First define the file path
pwd='/Users/emilymcmahon/Downloads/DS-2002-main/Projects/sakila_payment.json'
df_payment=pd.read_json(pwd)
df_payment.head(2)

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,payment_date,last_update
0,1,1,1,76,2.99,2005-05-25 11:30:37,2006-02-15 22:12:30
1,2,1,1,573,0.99,2005-05-28 10:35:23,2006-02-15 22:12:30


#### For the third extraction source, extract Staff Data from MySQL Schema and put into DataFrames.

In [33]:
sql_staff = "SELECT * FROM sakila.staff;"
df_staff=get_sql_dataframe(sql_staff, **mysql_args)
df_staff.head(2)

Unnamed: 0,staff_id,first_name,last_name,address_id,picture,email,store_id,active,username,password,last_update
0,1,Mike,Hillyer,3,b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\...,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-15 03:57:16
1,2,Jon,Stephens,4,,Jon.Stephens@sakilastaff.com,2,1,Jon,,2006-02-15 03:57:16


### Perform Any Necessary Transformations to the DataFrames to ensure seamless merge

#### Customer Data Transformation

In [37]:
df_customer.rename(columns={"customer_id":"customer_key","store_id":"store_key","first_name":"customer_first_name",
                           "last_name":"customer_last_name", "email":"customer_email", 'address_id':'address_key'},
                   inplace=True)
df_customer.drop(['last_update', 'active', 'create_date'], axis=1, inplace=True)
df_customer.head(2)

Unnamed: 0,customer_key,store_key,customer_first_name,customer_last_name,customer_email,address_key
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6


#### Rental Data Transformation

In [40]:
df_rental.rename(columns={"rental_id" : "rental_key", "inventory_id":"inventory_key", "customer_id":"customer_key",
                         "staff_id":"staff_key"}, inplace=True)

# Drop unnecessary columns: 'last_update', 'rental_time', 'return_time'
df_rental.drop(columns=['last_update', 'rental_time', 'return_time'], inplace=True, errors='ignore')

# Ensure 'rental_date' and 'return_date' are in the correct date format (YYYY-MM-DD)
df_rental['rental_date'] = pd.to_datetime(df_rental['rental_date']).dt.date
df_rental['return_date'] = pd.to_datetime(df_rental['return_date']).dt.date

df_rental.head(5)

Unnamed: 0,rental_key,rental_date,inventory_key,customer_key,return_date,staff_key
0,1,2005-05-24,367,130,2005-05-26,1
1,2,2005-05-24,1525,459,2005-05-28,1
2,3,2005-05-24,1711,408,2005-06-01,1
3,4,2005-05-24,2452,333,2005-06-03,2
4,5,2005-05-24,2079,222,2005-06-02,1


#### Payment Data Transformation

In [43]:
df_payment.rename(columns={"payment_id": "payment_key", "rental_id": "rental_key"}, inplace=True)

# Split 'payment_date' into 'payment_date' and 'payment_time'
split_dates = df_payment['payment_date'].str.split(' ', n=1, expand=True)

df_payment['payment_date'] = split_dates[0]

df_payment['payment_time'] = split_dates[1] if len(split_dates.columns) > 1 else None

df_payment.drop(['customer_id', 'staff_id', 'last_update','payment_time'], axis=1, inplace=True,errors='ignore')

df_payment.head(5)


Unnamed: 0,payment_key,rental_key,amount,payment_date
0,1,76,2.99,2005-05-25
1,2,573,0.99,2005-05-28
2,3,1185,5.99,2005-06-15
3,4,1422,0.99,2005-06-15
4,5,1476,9.99,2005-06-15


#### Staff Data Transformation

In [55]:
df_staff.rename(columns={"staff_id" : "staff_key", "first_name" : "staff_first_name", "last_name":"staff_last_name"}, inplace=True)

df_staff.drop(['last_update', 'picture', 'email', 'active', 'username', 'password', 'address_id', 'store_id'], axis=1, inplace=True,errors='ignore')

df_staff.head(2)

Unnamed: 0,staff_key,staff_first_name,staff_last_name
0,1,Mike,Hillyer
1,2,Jon,Stephens


#### Using constructed Data Frames, Create and Transform Fact Table

In [49]:
df_frp = pd.merge(df_rental, df_payment, on = 'rental_key', how= 'right')

df_frp = pd.merge(df_frp, df_customer, on = 'customer_key', how = 'right')

df_frp = pd.merge(df_frp, df_staff, on = 'staff_key', how = 'right')

df_frp.head(2)

Unnamed: 0,rental_key,rental_date,inventory_key,customer_key,return_date,staff_key,payment_key,amount,payment_date,store_key,customer_first_name,customer_last_name,customer_email,address_key,staff_first_name,staff_last_name
0,573,2005-05-28,4020,1,2005-06-03,1,2,0.99,2005-05-28,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,Mike,Hillyer
1,1476,2005-06-15,1407,1,2005-06-25,1,5,9.99,2005-06-15,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,Mike,Hillyer


#### DateKeys Created from Date Dimention Table

In [59]:
# Create intermediate DataFrame and make necessary transformations
df_dim_rental_date = df_dim_date.rename(columns={"date_key" : "rental_date_key", "full_date" : "rental_date"})

df_frp['rental_date'] = df_frp['rental_date'].astype('datetime64[ns]')

# merge with fact table
df_frp = pd.merge(df_frp, df_dim_rental_date, on='rental_date', how='left')

# clean up
df_frp.drop(['rental_date'], axis=1, inplace=True)

df_frp.head(2)

Unnamed: 0,rental_key,inventory_key,customer_key,return_date,staff_key,payment_key,amount,payment_date,store_key,customer_first_name,customer_last_name,customer_email,address_key,staff_first_name,staff_last_name,rental_date_key
0,573,4020,1,2005-06-03,1,2,0.99,2005-05-28,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,Mike,Hillyer,20050528
1,1476,1407,1,2005-06-25,1,5,9.99,2005-06-15,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,Mike,Hillyer,20050615


###### Repeat this process for the 2 remaining date columns

In [62]:
df_dim_return_date = df_dim_date.rename(columns={"date_key" : "return_date_key", "full_date" : "return_date"})
df_frp['return_date'] = df_frp['return_date'].astype('datetime64[ns]')
df_frp = pd.merge(df_frp, df_dim_return_date, on='return_date', how='left')
df_frp.drop(['return_date'], axis=1, inplace=True)

df_frp.head(2)

Unnamed: 0,rental_key,inventory_key,customer_key,staff_key,payment_key,amount,payment_date,store_key,customer_first_name,customer_last_name,customer_email,address_key,staff_first_name,staff_last_name,rental_date_key,return_date_key
0,573,4020,1,1,2,0.99,2005-05-28,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,Mike,Hillyer,20050528,20050603.0
1,1476,1407,1,1,5,9.99,2005-06-15,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,Mike,Hillyer,20050615,20050625.0


In [64]:
df_dim_payment_date = df_dim_date.rename(columns={"date_key" : "payment_date_key", "full_date" : "payment_date"})
df_frp['payment_date'] = df_frp['payment_date'].astype('datetime64[ns]')
df_frp = pd.merge(df_frp, df_dim_payment_date, on='payment_date', how='left')
df_frp.drop(['payment_date'], axis=1, inplace=True)

df_frp.head(2)

Unnamed: 0,rental_key,inventory_key,customer_key,staff_key,payment_key,amount,store_key,customer_first_name,customer_last_name,customer_email,address_key,staff_first_name,staff_last_name,rental_date_key,return_date_key,payment_date_key
0,573,4020,1,1,2,0.99,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,Mike,Hillyer,20050528,20050603.0,20050528
1,1476,1407,1,1,5,9.99,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,Mike,Hillyer,20050615,20050625.0,20050615


#### Drop and Reorder Columns of Fact Table

In [73]:
# dropping all columns except for keys
df_frp.drop(['rental_key','amount', 'customer_first_name','customer_last_name', 'customer_email', 'staff_first_name', 
            'staff_last_name'], axis=1, inplace=True,errors='ignore')

# Reordering the columns in a way that makes sense to me
reordered_columns = [ 'rental_date_key', 'return_date_key', 'payment_key', 'payment_date_key',
                    'inventory_key', 'customer_key', 'staff_key', 'store_key', 'address_key']
df_frp = df_frp[reordered_columns]
df_frp.head(2)

Unnamed: 0,rental_date_key,return_date_key,payment_key,payment_date_key,inventory_key,customer_key,staff_key,store_key,address_key
0,20050528,20050603.0,2,20050528,4020,1,1,1,5
1,20050615,20050625.0,5,20050615,1407,1,1,1,5


#### Add Primary Key

In [77]:
df_frp.insert(0, "fact_rental_purchase_key", range(1, df_frp.shape[0]+1))

df_frp.head(2)

Unnamed: 0,fact_rental_purchase_key,rental_date_key,return_date_key,payment_key,payment_date_key,inventory_key,customer_key,staff_key,store_key,address_key
0,1,20050528,20050603.0,2,20050528,4020,1,1,1,5
1,2,20050615,20050625.0,5,20050615,1407,1,1,1,5


#### Drop Unnessary Keys from Data Frames that will act as Dimension Tables

In [82]:
# This removes redundant or unneeded information
df_customer.drop(['store_key', 'address_key'], axis=1, inplace=True)

df_customer.head(2)

Unnamed: 0,customer_key,customer_first_name,customer_last_name,customer_email
0,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org
1,2,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org


In [87]:
df_payment.drop(['rental_key', 'payment_date'], axis=1, inplace=True)

df_payment.head(2)

Unnamed: 0,payment_key,amount
0,1,2.99
1,2,0.99


###### Not Necessary for Rental DF

#### Load Fact Table & Dimension Tables Back into Data Mart in MySQL

In [101]:
# fact table:
df=df_frp
table_name='fact_rental_payment'
pk_column='fact_rental_purchase_key'
db_operation='insert'

mysql_args = {
    "uid" : "root",
    "pwd" : "lollypop3232",
    "hostname" : "127.0.0.1",
    "dbname" : "sakila_dm"
}
set_dataframe(df, table_name, pk_column, db_operation, **mysql_args)

In [103]:
# customer dimension:
df=df_customer
table_name='dim_customer'
pk_column='customer_key'
db_operation='insert'

mysql_args = {
    "uid" : "root",
    "pwd" : "lollypop3232",
    "hostname" : "127.0.0.1",
    "dbname" : "sakila_dm"
}
set_dataframe(df, table_name, pk_column, db_operation, **mysql_args)

In [107]:
# payment dimension:

df=df_payment
table_name='dim_payment'
pk_column='payment_key'
db_operation='insert'

mysql_args = {
    "uid" : "root",
    "pwd" : "lollypop3232",
    "hostname" : "127.0.0.1",
    "dbname" : "sakila_dm"
}
set_dataframe(df, table_name, pk_column, db_operation, **mysql_args)

In [109]:
# staff dimension:

df=df_staff
table_name='dim_staff'
pk_column='staff_key'
db_operation='insert'

mysql_args = {
    "uid" : "root",
    "pwd" : "lollypop3232",
    "hostname" : "127.0.0.1",
    "dbname" : "sakila_dm"
}
set_dataframe(df, table_name, pk_column, db_operation, **mysql_args)

#### Validate the creation of Fact Table & New Dimension Tables in MySQL

In [116]:
sql_frp = "SELECT * FROM sakila_dm.fact_rental_payment;"
df_sql_frp=get_sql_dataframe(sql_frp, **mysql_args)
df_sql_frp.head(200)

Unnamed: 0,fact_rental_purchase_key,rental_date_key,return_date_key,payment_key,payment_date_key,inventory_key,customer_key,staff_key,store_key,address_key
0,1,20050528,20050603.0,2,20050528,4020,1,1,1,5
1,2,20050615,20050625.0,5,20050615,1407,1,1,1,5
2,3,20050616,20050617.0,6,20050616,726,1,1,1,5
3,4,20050618,20050619.0,8,20050618,3497,1,1,1,5
4,5,20050621,20050628.0,9,20050621,4566,1,1,1,5
...,...,...,...,...,...,...,...,...,...,...
195,196,20050618,20050622.0,386,20050618,268,15,1,1,19
196,197,20050620,20050627.0,387,20050620,1998,15,1,1,19
197,198,20050710,20050713.0,391,20050710,529,15,1,1,19
198,199,20050711,20050716.0,392,20050711,2997,15,1,1,19


In [118]:
sql_customer = "SELECT * FROM sakila_dm.dim_customer;"
df_sql_customer=get_sql_dataframe(sql_customer, **mysql_args)
df_sql_customer.head(200)

Unnamed: 0,customer_key,customer_first_name,customer_last_name,customer_email
0,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org
1,2,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org
2,3,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org
3,4,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org
4,5,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org
...,...,...,...,...
195,196,ALMA,AUSTIN,ALMA.AUSTIN@sakilacustomer.org
196,197,SUE,PETERS,SUE.PETERS@sakilacustomer.org
197,198,ELSIE,KELLEY,ELSIE.KELLEY@sakilacustomer.org
198,199,BETH,FRANKLIN,BETH.FRANKLIN@sakilacustomer.org


In [120]:
sql_payment = "SELECT * FROM sakila_dm.dim_payment;"
df_sql_payment = get_sql_dataframe(sql_payment, **mysql_args)
df_sql_payment.head(200)

Unnamed: 0,payment_key,amount
0,1,2.99
1,2,0.99
2,3,5.99
3,4,0.99
4,5,9.99
...,...,...
195,196,5.99
196,197,4.99
197,198,4.99
198,199,3.99


In [126]:
sql_staff = "SELECT * FROM sakila_dm.dim_staff;"
df_sql_staff = get_sql_dataframe(sql_staff, **mysql_args)
df_sql_staff.head(2)

Unnamed: 0,staff_key,staff_first_name,staff_last_name
0,1,Mike,Hillyer
1,2,Jon,Stephens


#### Aggregation Performance

In [129]:
# Determine the average purchace amount

sql_avg_payment = "SELECT AVG(amount) FROM sakila_dm.dim_payment;"
avg_payment_amount = get_sql_dataframe(sql_avg_payment, **mysql_args)
avg_payment_amount.head(1)

Unnamed: 0,AVG(amount)
0,4.201356


### That's All Folks!