### Importing Libraries

#### Using MongoDB and MySQL

In [13]:
import os
import json
import numpy
import datetime
import certifi
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine, text

In [15]:
print(f"Running SQL Alchemy Version: {sqlalchemy.__version__}")
print(f"Running PyMongo Version: {pymongo.__version__}")

Running SQL Alchemy Version: 2.0.30
Running PyMongo Version: 4.10.1


### Declare and Assign Connection Variables

#### Taking these from Lab 3 and 4 and Combining Them

In [19]:
host_name = "localhost"
port = "3306"
user_id = "root"
pwd = "tyler123"

src_dbname = "sakila"
dst_dbname = "sakila_dw"

MySQL_args = {
    "uid" : "root",
    "pwd" : "tyler123",
    "hostname" : "localhost",
   "dbname" : "sakila"
}
dw_args = {
    "uid" : "root",
    "pwd" : "tyler123",
    "hostname" : "localhost",
   "dbname" : "sakila_dw"
}

mongodb_args = {
    "user_name" : "hcabele",
    "password" : "Claire123",
    "cluster_name" : "hannahacluster",
    "cluster_subnet" : "gjgoj",
    "cluster_location" : "atlas", # "local"
    "db_name" : "sakila_dw2"
}

### Define Functions for Getting Data from and Setting Databases

#### Combining Lab 3 and Lab 4 Defining Function Statements and Renaming Some of the "def" Equations to Separate the SQL and Mongo Statements

In [23]:
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe


def set_sql_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()
def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(text(sql_query), connection);
    connection.close()
    
    return dframe
    

def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()


def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

### Create New Datawarehouse database

#### From Lab 3 in Order to Establish Connection to SQL

In [27]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)
connection = sqlEngine.connect()

connection.execute(text(f"DROP DATABASE IF EXISTS `{dst_dbname}`;"))
connection.execute(text(f"CREATE DATABASE `{dst_dbname}`;"))
connection.execute(text(f"USE {dst_dbname};"))

connection.close()

### Extract Data From Source Database

#### Taking Data from the Customer Table in Sakila

In [31]:
sql_customer = "SELECT * FROM sakila.customer;"
df_customer = get_dataframe(user_id, pwd, host_name, src_dbname, sql_customer)
df_customer.head(2)

Unnamed: 0,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20


### Transform the Data

#### Dropping Columns that are Unnecessary and Creating a Customer Key

In [35]:
drop_cols = ['last_update','active'] #unnecessary columns
df_customer.drop(drop_cols, axis=1, inplace=True)
df_customer.insert(0, "customer_key", range(1, df_customer.shape[0]+1)) #inserting primary key
df_customer.head(2)

Unnamed: 0,customer_key,customer_id,store_id,first_name,last_name,email,address_id,create_date
0,1,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,2006-02-14 22:04:36
1,2,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,2006-02-14 22:04:36


## Extract Data from Source Database Tables MongoDB

#### Establishing a Connection to Mongo and Getting the json File for the Sakila Payment Table

In [39]:
client = get_mongo_client(**mongodb_args)

data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"payment" : 'sakila_payment.json',
             }

set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)   

In [41]:
client = get_mongo_client(**mongodb_args)

query = {} 
collection = "payment"

df_payment = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_payment.head(2)

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,payment_date,last_update
0,1,1,1,76,2.99,2005-05-25 11:30:37,2006-02-15 22:12:30
1,2,1,1,573,0.99,2005-05-28 10:35:23,2006-02-15 22:12:30


#### Transforming the Data

#### Deleting Unnecessary columns and Inserting a Primary Key

In [45]:
df_payment.insert(0, "payment_key", range(1, df_payment.shape[0]+1)) #inserting primary key
df_payment.drop(['last_update'], axis=1, inplace=True) #unnecessary column
df_payment.head(2)

Unnamed: 0,payment_key,payment_id,customer_id,staff_id,rental_id,amount,payment_date
0,1,1,1,1,76,2.99,2005-05-25 11:30:37
1,2,2,1,1,573,0.99,2005-05-28 10:35:23


### Extract Data from CSV File

#### Collecting the Store Table from my Data Folder in Jupyter After Downloading it as a csv File and Importing it into Jupyter

In [49]:
data_dir = os.path.join(os.getcwd(), 'data')
data_file = os.path.join(data_dir, 'sakila_store.csv')

df_store = pd.read_csv(data_file, header=0)
df_store.head(2)

Unnamed: 0,store_id,manager_staff_id,address_id,last_update
0,1,1,1,2006-02-15 04:57:12
1,2,2,2,2006-02-15 04:57:12


#### Inserting Store Key into sakila_store and Transforming the Data

In [52]:
df_store.insert(0, "store_key", range(1, df_store.shape[0]+1)) #inserting primary key
df_store.drop(['last_update'], axis=1, inplace=True) #unnecessary column
df_store.head(2)

Unnamed: 0,store_key,store_id,manager_staff_id,address_id
0,1,1,1,1
1,2,2,2,2


### Inserting the Dimension Tables

#### Creating sakila_dw and Creaing the Dimension Tables for it

In [56]:
db_operation = 'insert'
tables = [('dim_customer', df_customer, 'customer_key'),
          ('dim_payment', df_payment, 'payment_key'),
          ('dim_store', df_store, 'store_key')]

In [58]:
for table_name, dataframe, primary_key in tables:
    set_dataframe(dataframe, table_name, primary_key, db_operation, **dw_args)

### Run Lab 2c using sakila_dw instead of using northwind_dw2 to get Date Dimension Table

#### Getting Primary Key from Tables and Dropping ID Column

In [62]:
sql_dim_customer = "SELECT customer_key, customer_id FROM sakila_dw.dim_customer;"
df_dim_customer = get_dataframe(user_id, pwd, host_name, src_dbname, sql_customer)
df_dim_customer.drop(['customer_id'], axis=1, inplace=True)
df_dim_customer.head(2)

Unnamed: 0,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
1,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20


In [64]:
sql_dim_payment = "SELECT payment_key, payment_id FROM sakila_dw.dim_payment;"
df_dim_payment = get_sql_dataframe(sql_dim_payment, **dw_args)
df_dim_payment.drop(['payment_id'], axis=1, inplace=True)
df_dim_payment.head(2) 

Unnamed: 0,payment_key
0,1
1,2


In [66]:
sql_dim_store = "SELECT store_key FROM sakila_dw.dim_store;"
df_dim_store = get_sql_dataframe(sql_dim_store, **dw_args)
df_dim_store.head(2)

Unnamed: 0,store_key
0,1
1,2


### Create Fact Orders Table: Goal is to Find Top Spending Customers per Store

In [69]:
sql_fact_orders = """
    SELECT s.store_id,
        s.manager_staff_id AS store_manager,
        c.customer_id,
        c.first_name AS customer_first_name,
        c.last_name AS customer_last_name,
        p.amount AS payment_amount,
        p.payment_date AS payment_date
   FROM sakila.payment AS p
   INNER JOIN sakila.customer AS c
   ON p.customer_id = c.customer_id
   INNER JOIN sakila.store AS s
   ON c.store_id = s.store_id
"""

df_fact_orders = get_dataframe(user_id, pwd, host_name, src_dbname, sql_fact_orders)
df_fact_orders.head(2)

Unnamed: 0,store_id,store_manager,customer_id,customer_first_name,customer_last_name,payment_amount,payment_date
0,1,1,1,MARY,SMITH,2.99,2005-05-25 11:30:37
1,1,1,1,MARY,SMITH,0.99,2005-05-28 10:35:23


### Get Data From the Tables Involved in the Fact Orders Table Using get_dataframe Function

#### Customer Table

In [73]:
sql_customer= "SELECT * FROM sakila.customer;"
df_customer = get_dataframe(user_id, pwd, host_name, src_dbname, sql_customer)
df_customer.rename(columns={"id":"customer_id"}, inplace=True) #renaming the id column
df_customer.drop(['last_update'], axis=1, inplace=True) #dropping unnecessary column
df_customer.head(2)

Unnamed: 0,customer_id,store_id,first_name,last_name,email,address_id,active,create_date
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36


#### Payment Table

In [76]:
sql_payment= "SELECT * FROM sakila.payment;"
df_payment = get_dataframe(user_id, pwd, host_name, src_dbname, sql_payment)
df_payment.rename(columns={"id":"payment_id"}, inplace=True) #renaming the id column
df_payment.drop(['last_update'], axis=1, inplace=True) #dropping unnecessary column
df_payment.head(2)

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,payment_date
0,1,1,1,76,2.99,2005-05-25 11:30:37
1,2,1,1,573,0.99,2005-05-28 10:35:23


#### Store Table

In [79]:
sql_store= "SELECT * FROM sakila.store;"
df_store = get_dataframe(user_id, pwd, host_name, src_dbname, sql_store)
df_store.rename(columns={"id":"store_id"}, inplace=True) #renaming the id column
df_store.drop(['last_update'], axis=1, inplace=True) #dropping unnecessary column
df_store.head(2)

Unnamed: 0,store_id,manager_staff_id,address_id
0,1,1,1
1,2,2,2


### Merging the Customer Table with the Payment Table on 'customer_id'

In [82]:
df_customer = pd.merge(df_customer, df_payment, on='customer_id', how='inner')
df_customer.drop(['address_id'], axis=1, inplace=True)
df_customer.head(2)

Unnamed: 0,customer_id,store_id,first_name,last_name,email,active,create_date,payment_id,staff_id,rental_id,amount,payment_date
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1,2006-02-14 22:04:36,1,1,76,2.99,2005-05-25 11:30:37
1,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1,2006-02-14 22:04:36,2,1,573,0.99,2005-05-28 10:35:23


### Mergining Customer Table with Store Table on 'store_id'

In [85]:
df_fact_orders = pd.merge(df_customer, df_store, on='store_id', how='inner')
df_fact_orders.head(2)

Unnamed: 0,customer_id,store_id,first_name,last_name,email,active,create_date,payment_id,staff_id,rental_id,amount,payment_date,manager_staff_id,address_id
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1,2006-02-14 22:04:36,1,1,76,2.99,2005-05-25 11:30:37,1,1
1,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1,2006-02-14 22:04:36,2,1,573,0.99,2005-05-28 10:35:23,1,1


In [87]:
df_fact_orders.shape

(16044, 14)

### Fetch Surrogate Primary Key and Business Key From Dimension Tables with Get Dataframe Code

#### Customer Dimension Table

In [91]:
sql_dim_customer = "SELECT customer_key, customer_id FROM sakila_dw.dim_customer;"
df_dim_customer = get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_customer)
df_dim_customer.head(2)

Unnamed: 0,customer_key,customer_id
0,1,1
1,2,2


#### Payment Dimension Table

In [94]:
sql_dim_payment = "SELECT payment_key, payment_id FROM sakila_dw.dim_payment;"
df_dim_payment = get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_payment)
df_dim_payment.head(2)

Unnamed: 0,payment_key,payment_id
0,1,1
1,2,2


#### Store Dimension Table

In [97]:
sql_dim_store = "SELECT store_key, store_id FROM sakila_dw.dim_store;"
df_dim_store = get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_store)
df_dim_store.head(2)

Unnamed: 0,store_key,store_id
0,1,1
1,2,2


### Use Business Key to Merge Dimension Tables with Fact Orders Table on ID Columns

#### Mergining Cusomter Dimension Table with Fact Orders Table

In [101]:
df_fact_orders = pd.merge(df_fact_orders, df_dim_customer, on='customer_id', how='inner')
df_fact_orders.drop(['customer_id'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,store_id,first_name,last_name,email,active,create_date,payment_id,staff_id,rental_id,amount,payment_date,manager_staff_id,address_id,customer_key
0,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1,2006-02-14 22:04:36,1,1,76,2.99,2005-05-25 11:30:37,1,1,1
1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1,2006-02-14 22:04:36,2,1,573,0.99,2005-05-28 10:35:23,1,1,1


#### Mergining Payment Dimension Table with Fact Orders Table

In [104]:
df_fact_orders = pd.merge(df_fact_orders, df_dim_payment, on='payment_id', how='inner')
df_fact_orders.drop(['payment_id'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,store_id,first_name,last_name,email,active,create_date,staff_id,rental_id,amount,payment_date,manager_staff_id,address_id,customer_key,payment_key
0,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1,2006-02-14 22:04:36,1,76,2.99,2005-05-25 11:30:37,1,1,1,1
1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1,2006-02-14 22:04:36,1,573,0.99,2005-05-28 10:35:23,1,1,1,2


#### Merging Store Dimension Table with Fact Orders Table

In [107]:
df_fact_orders = pd.merge(df_fact_orders, df_dim_store, on='store_id', how='inner')
df_fact_orders.drop(['store_id'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,first_name,last_name,email,active,create_date,staff_id,rental_id,amount,payment_date,manager_staff_id,address_id,customer_key,payment_key,store_key
0,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1,2006-02-14 22:04:36,1,76,2.99,2005-05-25 11:30:37,1,1,1,1,1
1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1,2006-02-14 22:04:36,1,573,0.99,2005-05-28 10:35:23,1,1,1,2,1


### Lookup DateKeys From Date Dimension Table

#### Getting Surrogate Primary Key and Business Key from Date Dimension

In [111]:
sql_dim_date = "SELECT date_key, full_date FROM sakila_dw.dim_date;"
df_dim_date = get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_date)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64[ns]').dt.date
df_dim_date.head(2)

Unnamed: 0,date_key,full_date


### Lookup Corresponding Primary Key for Each Date Column in Fact Orders

#### Payment was the Only Table with a Date Dimension, so Utilizing Payment Date to Connect to Dim Date and then Merging it to Fact Oders

In [117]:
df_dim_payment = df_dim_date.rename(columns={"date_key" : "payment", "full_date" : "payment_date"})
df_fact_orders.payment_date = df_fact_orders.payment_date.astype('datetime64[ns]').dt.date

df_fact_orders = pd.merge(df_fact_orders, df_dim_payment, on='payment_date', how='left')
df_fact_orders.head(2)

Unnamed: 0,first_name,last_name,email,active,create_date,staff_id,rental_id,amount,payment_date,manager_staff_id,address_id,customer_key,payment_key,store_key,payment_x,payment_y
0,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1,2006-02-14 22:04:36,1,76,2.99,2005-05-25,1,1,1,1,1,,
1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1,2006-02-14 22:04:36,1,573,0.99,2005-05-28,1,1,1,2,1,,


#### Inserting 'fact_order_key' into the Fact Order Table

In [120]:
df_fact_orders.insert(0, 'fact_order_key', range(1, df_fact_orders.shape[0]+1))
df_fact_orders.head(2)

Unnamed: 0,fact_order_key,first_name,last_name,email,active,create_date,staff_id,rental_id,amount,payment_date,manager_staff_id,address_id,customer_key,payment_key,store_key,payment_x,payment_y
0,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1,2006-02-14 22:04:36,1,76,2.99,2005-05-25,1,1,1,1,1,,
1,2,MARY,SMITH,MARY.SMITH@sakilacustomer.org,1,2006-02-14 22:04:36,1,573,0.99,2005-05-28,1,1,1,2,1,,


### Setting the Fact Order Dataframe so it Appears as a Table in sakila_dw

#### Ensuring Fact Orders is now a Table

In [124]:
table_name = "fact_orders"
pk_column = "fact_order_key"
db_operation = "insert"
db_name = "sakila_dw"
df = df_fact_orders


set_sql_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation)

### Demonstrate the the New Data Warehouse Exists

#### Goal is to Have the Top Spending Custoemers Listed in Descending Order with the Amount they Spent and at What Store

In [128]:
sql_test = """
    SELECT store.store_key AS `Store`,
        customer.first_name AS `Customer First Name`,
        customer.last_name AS `Customer Last Name`,
        SUM(orders.amount) AS `Total Spent`
    FROM `{0}`.fact_orders AS orders
    INNER JOIN `{0}`.dim_customer AS customer
    ON orders.customer_key = customer.customer_key
    INNER JOIN `{0}`.dim_store AS store
    ON orders.store_key = store.store_key
    GROUP BY store.store_key, customer.first_name, customer.last_name
    ORDER BY `Total Spent` DESC;
""".format(dst_dbname)

df_test = get_dataframe(user_id, pwd, host_name, db_name, sql_test)
df_test.head()

Unnamed: 0,Store,Customer First Name,Customer Last Name,Total Spent
0,2,KARL,SEAL,221.55
1,1,ELEANOR,HUNT,216.54
2,1,CLARA,SHAW,195.58
3,2,RHONDA,KENNEDY,194.61
4,2,MARION,SNYDER,194.61
