## Midterm Project
#### Using Sakila Database

#### Necessary imports

In [8]:
import os
import numpy
import pandas as pd
import json
import datetime
import pymongo
from sqlalchemy import create_engine

#### Connecting to MySQL Server and Databases

In [9]:
host_name = "localhost"
port = "3306"
user_id = "root"
pwd = "Passw0rd123"

src_dbname = "sakila"
dst_dbname = "film_rental_dw2"

In [10]:
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

In [11]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)

sqlEngine.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
sqlEngine.execute(f"CREATE DATABASE `{dst_dbname}`;")
sqlEngine.execute(f"USE {dst_dbname};")

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x2ecc51fc0a0>

### Dimension Table 1 from MySQL: Inventory
This code is calling the inventory table from the Sakila database.

In [5]:
sql_inventory = "SELECT * FROM sakila.inventory;"
df_inventory = get_dataframe(user_id, pwd, host_name, src_dbname, sql_inventory)
df_inventory.head(2)

Unnamed: 0,inventory_id,film_id,store_id,last_update
0,1,1,1,2006-02-15 05:09:17
1,2,1,1,2006-02-15 05:09:17


This code transforms the original inventory table by dropping an unnecessary column.

In [6]:
drop_cols = ['last_update']
df_inventory.drop(drop_cols, axis=1, inplace=True)

df_inventory.rename(columns={"inventory_id":"inventory_key"}, inplace=True)

df_inventory.head(2)

Unnamed: 0,inventory_key,film_id,store_id
0,1,1,1
1,2,1,1


Then creates the inventory dimension table in the new data warehouse named film_rental_dw2.

In [7]:
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, df_inventory, "dim_inventory", "inventory_key", db_operation)

### Dimension Table 2 from MongoDB: Customer
This code connects to my cluster in MongoDB and Source Databases.

In [13]:
mysql_uid = "root"
mysql_pwd = "Passw0rd123"

atlas_cluster_name = "dsproject"
atlas_user_name = "aps6cuq"
atlas_password = "JpVjKZssyc3PYxhi"

conn_str = {"local" : f"mongodb://localhost:27017/",
    "atlas" : "mongodb://aps6cuq:JpVjKZssyc3PYxhi@ac-miov3sw-shard-00-00.yysm8yq.mongodb.net:27017,ac-miov3sw-shard-00-01.yysm8yq.mongodb.net:27017,ac-miov3sw-shard-00-02.yysm8yq.mongodb.net:27017/?ssl=true&replicaSet=atlas-ctv53l-shard-0&authSource=admin&retryWrites=true&w=majority"
}

src_dbname = "dsproject"
dst_dbname = "film_rental_dw2"

print(f"Local Connection String: {conn_str['local']}")
print(f"Atlas Connection String: {conn_str['atlas']}")

Local Connection String: mongodb://localhost:27017/
Atlas Connection String: mongodb://aps6cuq:JpVjKZssyc3PYxhi@ac-miov3sw-shard-00-00.yysm8yq.mongodb.net:27017,ac-miov3sw-shard-00-01.yysm8yq.mongodb.net:27017,ac-miov3sw-shard-00-02.yysm8yq.mongodb.net:27017/?ssl=true&replicaSet=atlas-ctv53l-shard-0&authSource=admin&retryWrites=true&w=majority


In [14]:
def get_sql_dataframe(user_id, pwd, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@localhost/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe


def set_dataframe(user_id, pwd, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@localhost/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

The following code uploads the customer table (a json file) to MongoDB.

In [15]:
client = pymongo.MongoClient(conn_str["atlas"])
db = client[src_dbname]

data_dir = os.path.join(os.getcwd(), '')

json_files = {"ds_customer" : 'sakila_customer.json'
             }

for file in json_files:
    db.drop_collection(file)
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile)
        file = db[file]
        result = file.insert_many(json_object)
        #print(f"{file} was successfully loaded.")

        
client.close()        

#### Extracting Data from MongoDB
This code retrieves the customer table from MongoDB.

In [16]:
query = {}
collection = "ds_customer"

df_customer = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query)
df_customer.head(2)

Unnamed: 0,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20


#### Transformations
I decided to drop unnecessary columns.

In [17]:
df_customer.rename(columns={"customer_id":"customer_key"}, inplace=True)

drop_cols = ['email','last_update','active']
df_customer.drop(drop_cols, axis=1, inplace=True)

df_customer.head(2)

Unnamed: 0,customer_key,store_id,first_name,last_name,address_id,create_date
0,1,1,MARY,SMITH,5,2006-02-14 22:04:36
1,2,1,PATRICIA,JOHNSON,6,2006-02-14 22:04:36


This code creates the customer dimension table in the new data warehouse.

In [18]:
dataframe = df_customer
table_name = 'dim_customer'
primary_key = 'customer_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, dst_dbname, dataframe, table_name, primary_key, db_operation)

This code checks if the new dataframe was created.

In [19]:
sql_suppliers = "SELECT * FROM film_rental_dw2.dim_customer;"
df_dim_suppliers = get_sql_dataframe(mysql_uid, mysql_pwd, dst_dbname, sql_suppliers)
df_dim_suppliers.head(2)

Unnamed: 0,customer_key,store_id,first_name,last_name,address_id,create_date
0,1,1,MARY,SMITH,5,2006-02-14 22:04:36
1,2,1,PATRICIA,JOHNSON,6,2006-02-14 22:04:36


### Dimension Table 3 from CSV File: Staff
This code retrieves the staff table CSV file (originally from the Sakila database on MySQL) from my local file system.

In [15]:
data_dir = os.path.join(os.getcwd(), '')
data_file = os.path.join(data_dir, 'sakila_staff.csv')

df = pd.read_csv(data_file)
df.head()

Unnamed: 0,staff_id,first_name,last_name,address_id,email,store_id,active,username,password,last_update
0,1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-15 03:57:16
1,2,Jon,Stephens,4,Jon.Stephens@sakilastaff.com,2,1,Jon,,2006-02-15 03:57:16


### Transformations
I decided to drop unnecessary columns.

In [16]:
drop_cols = ["username","password"]
df.drop(drop_cols, axis=1, inplace=True)
df.rename(columns={"inventory_id":"inventory_key"}, inplace=True)

df.head(2)

Unnamed: 0,staff_id,first_name,last_name,address_id,email,store_id,active,last_update
0,1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,2006-02-15 03:57:16
1,2,Jon,Stephens,4,Jon.Stephens@sakilastaff.com,2,1,2006-02-15 03:57:16


This code creates the staff dimension table in the new data warehouse.

In [17]:
dataframe = df
table_name = 'dim_staff'
primary_key = 'staff_id'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, dst_dbname, dataframe, table_name, primary_key, db_operation)

### Date Dimension Table
This code retrieves the date dimension table I already created through running the script in MySQL.

In [24]:
sql_dim_date = "SELECT * FROM film_rental_dw2.dim_date;"
df_dim_date = get_dataframe(user_id, pwd, host_name, "film_rental_dw2", sql_dim_date)
df_dim_date.head(2)

Unnamed: 0,date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,...,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
0,20200101,2020-01-01,2020/01/01,01/01/2020,01/01/2020,4,Wednesday,1,1,Weekday,...,N,1,2020,2020-01,2020Q1,7,3,2020,2020-07,2020Q3
1,20200102,2020-01-02,2020/01/02,01/02/2020,02/01/2020,5,Thursday,2,2,Weekday,...,N,1,2020,2020-01,2020Q1,7,3,2020,2020-07,2020Q3


### The Fact Table: Rental Payment Table
This code retrieves the "rental" table from the original Sakila database.

In [107]:
sql_rental = "SELECT * FROM sakila.rental;"
df_rental = get_dataframe(user_id, pwd, host_name, dst_dbname, sql_rental)
df_rental.rename(columns={"rental_id":"rental_key"}, inplace=True)
df_rental.head(2)

Unnamed: 0,rental_key,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53


This code retrieves the "payment" table from the original Sakila database in MySQL.

In [108]:
sql_payment = "SELECT * FROM sakila.payment;"
df_payment = get_dataframe(user_id, pwd, host_name, dst_dbname, sql_payment)
df_payment.rename(columns={"payment_id":"payment_key"}, inplace=True)
df_payment.head(2)

Unnamed: 0,payment_key,customer_id,staff_id,rental_id,amount,payment_date,last_update
0,1,1,1,76,2.99,2005-05-25 11:30:37,2006-02-15 22:12:30
1,2,1,1,573,0.99,2005-05-28 10:35:23,2006-02-15 22:12:30


This code merges the rental and payment tables together to create the fact rental payment table. I also dropped duplicate columns, otherwise there would be unnecessary extra columns.

In [132]:
df_rental_payment = pd.merge(df_rental, df_payment, on=['customer_id', 'staff_id'], how='inner', suffixes=('', '_remove'))
df_rental_payment.drop([i for i in df_rental_payment.columns if 'remove' in i], axis=1, inplace=True)

df_rental_payment.head(2)

Unnamed: 0,rental_key,rental_date,inventory_id,customer_id,return_date,staff_id,last_update,payment_key,rental_id,amount,payment_date
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53,3504,1,2.99,2005-05-24 22:53:30
1,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53,3505,746,2.99,2005-05-29 09:25:10


This code retrieves the customer dimension table that I created earlier in this notebook to include the customer key in the fact table.

In [133]:
sql_dim_customer = "SELECT * FROM film_rental_dw2.dim_customer;"
df_dim_customer = get_dataframe(user_id, pwd, host_name, "film_rental_dw2", sql_dim_customer)
df_dim_customer.head(2)

Unnamed: 0,customer_key,store_id,first_name,last_name,address_id,create_date
0,1,1,MARY,SMITH,5,2006-02-14 22:04:36
1,2,1,PATRICIA,JOHNSON,6,2006-02-14 22:04:36


This code then merges these two tables to include the customer key.

In [134]:
df_rental_payment = df_rental_payment.rename(columns={"customer_id" : "customer_key"})
df_rental_payment = pd.merge(df_rental_payment, df_dim_customer, on='customer_key', how='left',  suffixes=('', '_remove'))
df_rental_payment.drop([i for i in df_rental_payment.columns if 'remove' in i], axis=1, inplace=True)

df_rental_payment.head(2)

Unnamed: 0,rental_key,rental_date,inventory_id,customer_key,return_date,staff_id,last_update,payment_key,rental_id,amount,payment_date,store_id,first_name,last_name,address_id,create_date
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53,3504,1,2.99,2005-05-24 22:53:30,1,CHARLOTTE,HUNTER,134,2006-02-14 22:04:36
1,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53,3505,746,2.99,2005-05-29 09:25:10,1,CHARLOTTE,HUNTER,134,2006-02-14 22:04:36


This process is then repeated for the staff dimension table and the inventory dimension table. 

In [135]:
sql_dim_staff = "SELECT * FROM film_rental_dw2.dim_staff;"
df_dim_staff = get_dataframe(user_id, pwd, host_name, "film_rental_dw2", sql_dim_staff)
df_dim_staff = df_dim_staff.rename(columns={"staff_id" : "staff_key"})

df_dim_staff.head(2)

Unnamed: 0,staff_key,first_name,last_name,address_id,email,store_id,active,last_update
0,1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,2006-02-15 03:57:16
1,2,Jon,Stephens,4,Jon.Stephens@sakilastaff.com,2,1,2006-02-15 03:57:16


In [136]:
df_rental_payment = df_rental_payment.rename(columns={"staff_id" : "staff_key"})
df_rental_payment = pd.merge(df_rental_payment, df_dim_staff, on='staff_key', how='inner', suffixes=('', '_remove'))
df_rental_payment.drop([i for i in df_rental_payment.columns if 'remove' in i], axis=1, inplace=True)

df_rental_payment.head(2)

Unnamed: 0,rental_key,rental_date,inventory_id,customer_key,return_date,staff_key,last_update,payment_key,rental_id,amount,payment_date,store_id,first_name,last_name,address_id,create_date,email,active
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53,3504,1,2.99,2005-05-24 22:53:30,1,CHARLOTTE,HUNTER,134,2006-02-14 22:04:36,Mike.Hillyer@sakilastaff.com,1
1,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53,3505,746,2.99,2005-05-29 09:25:10,1,CHARLOTTE,HUNTER,134,2006-02-14 22:04:36,Mike.Hillyer@sakilastaff.com,1


In [137]:
sql_dim_inventory = "SELECT * FROM film_rental_dw2.dim_inventory;"
df_dim_inventory = get_dataframe(user_id, pwd, host_name, "film_rental_dw2", sql_dim_inventory)

df_dim_inventory.head(2)

Unnamed: 0,inventory_key,film_id,store_id
0,1,1,1
1,2,1,1


In [138]:
df_rental_payment = df_rental_payment.rename(columns={"inventory_id" : "inventory_key"})
df_rental_payment = pd.merge(df_rental_payment, df_dim_inventory, on='inventory_key', how='inner', suffixes=('', '_remove'))
df_rental_payment.drop([i for i in df_rental_payment.columns if 'remove' in i], axis=1, inplace=True)

df_rental_payment.head(2)

Unnamed: 0,rental_key,rental_date,inventory_key,customer_key,return_date,staff_key,last_update,payment_key,rental_id,amount,payment_date,store_id,first_name,last_name,address_id,create_date,email,active,film_id
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53,3504,1,2.99,2005-05-24 22:53:30,1,CHARLOTTE,HUNTER,134,2006-02-14 22:04:36,Mike.Hillyer@sakilastaff.com,1,80
1,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53,3505,746,2.99,2005-05-29 09:25:10,1,CHARLOTTE,HUNTER,134,2006-02-14 22:04:36,Mike.Hillyer@sakilastaff.com,1,80


This code selects specific columns from the date dimension table.

In [139]:
sql_dim_date = "SELECT date_key, full_date FROM film_rental_dw2.dim_date;"
df_dim_date = get_dataframe(user_id, pwd, host_name, dst_dbname, sql_dim_date)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64')
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20200101,2020-01-01
1,20200102,2020-01-02


The following code looks up the Surrogate Primary Key column for each date in the fact table.

In [140]:
df_dim_rental_date = df_dim_date.rename(columns={"date_key" : "rental_date_key", "full_date" : "rental_date"})
df_dim_rental_date.head(2)
df_rental_payment = pd.merge(df_rental_payment, df_dim_rental_date, on='rental_date', how='left')
df_rental_payment.drop(['rental_date'], axis=1, inplace=True)
df_rental_payment.head(2)

Unnamed: 0,rental_key,inventory_key,customer_key,return_date,staff_key,last_update,payment_key,rental_id,amount,payment_date,store_id,first_name,last_name,address_id,create_date,email,active,film_id,rental_date_key
0,1,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53,3504,1,2.99,2005-05-24 22:53:30,1,CHARLOTTE,HUNTER,134,2006-02-14 22:04:36,Mike.Hillyer@sakilastaff.com,1,80,
1,1,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53,3505,746,2.99,2005-05-29 09:25:10,1,CHARLOTTE,HUNTER,134,2006-02-14 22:04:36,Mike.Hillyer@sakilastaff.com,1,80,


This process is repeated for the payment_date.

In [141]:
df_dim_payment_date = df_dim_date.rename(columns={"date_key" : "payment_date_key", "full_date" : "payment_date"})
df_rental_payment = pd.merge(df_rental_payment, df_dim_payment_date, on='payment_date', how='left')
df_rental_payment.drop(['payment_date'], axis=1, inplace=True)
df_rental_payment.head(2)

Unnamed: 0,rental_key,inventory_key,customer_key,return_date,staff_key,last_update,payment_key,rental_id,amount,store_id,first_name,last_name,address_id,create_date,email,active,film_id,rental_date_key,payment_date_key
0,1,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53,3504,1,2.99,1,CHARLOTTE,HUNTER,134,2006-02-14 22:04:36,Mike.Hillyer@sakilastaff.com,1,80,,
1,1,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53,3505,746,2.99,1,CHARLOTTE,HUNTER,134,2006-02-14 22:04:36,Mike.Hillyer@sakilastaff.com,1,80,,


### Transformations
I dropped columns that were included when joining the dimension tables. These columns are in the dimension tables and are not needed in the fact table. I also reordered the columns and created a primary key for the fact table.

In [142]:
# Drop columns
drop_columns = ['last_name','email','active','last_update','rental_id','first_name',
                'last_name','address_id']
df_rental_payment.drop(drop_columns, axis=1, inplace=True)

# Reorder columns
ordered_columns = ['rental_key','inventory_key','customer_key','staff_key','payment_key',
                   'rental_date_key','payment_date_key', 'return_date', 'amount']
df_rental_payment = df_rental_payment[ordered_columns]

# Create a primary key
df_rental_payment.insert(0, "rental_payment_key", range(1, df_rental_payment.shape[0]+1))
df_rental_payment.head(2)

Unnamed: 0,rental_payment_key,rental_key,inventory_key,customer_key,staff_key,payment_key,rental_date_key,payment_date_key,return_date,amount
0,1,1,367,130,1,3504,,,2005-05-26 22:04:30,2.99
1,2,1,367,130,1,3505,,,2005-05-26 22:04:30,2.99


I then wrote the fact_rental_payment table into the film_rental_dw2 data warehouse that I created in MySQL.

In [148]:
table_name = "fact_rental_payment"
primary_key = "rental_payment_key"
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, "film_rental_dw2", df_rental_payment, table_name, primary_key, db_operation)

Finally, I implemented a SELECT statement to test if the data warehouse works!

In [20]:
sql_test = """
    SELECT last_name AS customer_name
    FROM dim_customer
    GROUP BY customer_name;
"""

df_test = get_dataframe(user_id, pwd, host_name, "film_rental_dw2", sql_test)
df_test.head(10)

Unnamed: 0,customer_name
0,SMITH
1,JOHNSON
2,WILLIAMS
3,JONES
4,BROWN
5,DAVIS
6,MILLER
7,WILSON
8,MOORE
9,TAYLOR
