## Create Sakila Fact Table Using File System, MongoDB, & MySQL 

#### import libraries

In [1]:
import os
import json
import numpy
import datetime
import pandas as pd

import pymongo
from sqlalchemy import create_engine

#### declare and assign connection variables for MongoDB and MySQL

In [2]:
mysql_uid = "root"  # for localhost
mysql_pwd = "Passw0rd123"
mysql_host = "ds2002-mysql.mysql.database.azure.com"  # for non-localhost

atlas_cluster_name = "ds2002cluster.pohbzdx"
atlas_user_name = "cre3nue"
atlas_password = "LeafLetAndLetLive432"

conn_str = {"local" : f"mongodb://localhost:27017/",
    "atlas" : f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.mongodb.net"
}

src_dbname = "sakila"
dst_dbname = "sakila_etl"

print(f"Local Connection String: {conn_str['local']}")
print(f"Atlas Connection String: {conn_str['atlas']}")

Local Connection String: mongodb://localhost:27017/
Atlas Connection String: mongodb+srv://cre3nue:LeafLetAndLetLive432@ds2002cluster.pohbzdx.mongodb.net


#### functions for getting and setting database data

In [3]:
# original function, for MySQL localhost
def get_sql_dataframe(user_id, pwd, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@localhost/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe

def set_dataframe(user_id, pwd, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@localhost/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

#### populate MongoDB w/ source data

export necessary MySQL dimensions to JSON (using SELECT * statements) and place in data folder of project folder containing this notebook, then run below cell

need to add 0.0.0.0/0 to list of accepted IP addresses in MongoDB so VM can talk to it

In [4]:
client = pymongo.MongoClient(conn_str["atlas"])
db = client[src_dbname]

# gets the path of the Current Working Directory for this Notebook, and then appends the 'data' directory
data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"film" : 'sakila_film.json',
              "film_payment" : 'sakila_payment.json',
              "film_rental" : 'sakila_rental.json',
              "film_inventory" : 'sakila_inventory.json'
             }

for file in json_files:
    db.drop_collection(file)
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile)
        file = db[file]
        result = file.insert_many(json_object)
        print(f"{file} was successfully loaded.")

        
client.close()   

Collection(Database(MongoClient(host=['ac-b8mgtxq-shard-00-02.pohbzdx.mongodb.net:27017', 'ac-b8mgtxq-shard-00-00.pohbzdx.mongodb.net:27017', 'ac-b8mgtxq-shard-00-01.pohbzdx.mongodb.net:27017'], document_class=dict, tz_aware=False, connect=True, authsource='admin', replicaset='atlas-y104y6-shard-0', tls=True), 'sakila'), 'film') was successfully loaded.
Collection(Database(MongoClient(host=['ac-b8mgtxq-shard-00-02.pohbzdx.mongodb.net:27017', 'ac-b8mgtxq-shard-00-00.pohbzdx.mongodb.net:27017', 'ac-b8mgtxq-shard-00-01.pohbzdx.mongodb.net:27017'], document_class=dict, tz_aware=False, connect=True, authsource='admin', replicaset='atlas-y104y6-shard-0', tls=True), 'sakila'), 'film_payment') was successfully loaded.
Collection(Database(MongoClient(host=['ac-b8mgtxq-shard-00-02.pohbzdx.mongodb.net:27017', 'ac-b8mgtxq-shard-00-00.pohbzdx.mongodb.net:27017', 'ac-b8mgtxq-shard-00-01.pohbzdx.mongodb.net:27017'], document_class=dict, tz_aware=False, connect=True, authsource='admin', replicaset='at

#### fact table part 1:  gather data from all source tables

this fact table will be on the film value:  film details versus the amount of money & interest it brings in

so data is needed from:  film, payment, and rental tables

In [5]:
query = {} # select all elements (columns), and all documents (rows)

collection = "film"

df_film = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query)
df_film.head(2)

Unnamed: 0,film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 05:03:42


In [6]:
query = {} # select all elements (columns), and all documents (rows)

collection = "film_payment"

df_film_payment = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query)
df_film_payment.head(2)

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,payment_date,last_update
0,1,1,1,76,2.99,2005-05-25 11:30:37,2006-02-15 22:12:30
1,2,1,1,573,0.99,2005-05-28 10:35:23,2006-02-15 22:12:30


In [7]:
query = {} # select all elements (columns), and all documents (rows)

collection = "film_rental"

df_film_rental = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query)
df_film_rental.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53


In [8]:
query = {} # select all elements (columns), and all documents (rows)

collection = "film_inventory"

df_film_inventory = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query)
df_film_inventory.head(2)

Unnamed: 0,inventory_id,film_id,store_id,last_update
0,1,1,1,2006-02-15 05:09:17
1,2,1,1,2006-02-15 05:09:17


#### fact table part 2:  create fact table using joins

In [9]:
# first, want to join film rental and payment using rental_id

# df_film.columns.values
df_film_rent_and_pay = pd.merge(df_film_rental, df_film_payment, on=['rental_id','customer_id','staff_id'], how='inner')

# keep last_update column from film_rental because it is slightly earlier -- cannot give impression data is more up to date than it is
df_film_rent_and_pay.rename(columns={"last_update_x":"last_update"}, inplace=True)
df_film_rent_and_pay.drop(['last_update_y'], axis=1, inplace=True)
df_film_rent_and_pay.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update,payment_id,amount,payment_date
0,18,2005-05-25 01:10:47,3376,19,2005-05-31 06:35:47,2,2006-02-15 21:30:53,490,0.99,2005-05-25 01:10:47
1,46,2005-05-25 06:04:08,3318,7,2005-06-02 08:18:08,2,2006-02-15 21:30:53,174,5.99,2005-05-25 06:04:08


In [10]:
# then, want to join rental+payment w/ film info
# inventory has film id and inventory id

df_film_and_inv = pd.merge(df_film_rent_and_pay, df_film_inventory, on='inventory_id', how='inner')

# keep last_update column from rent_and_pay because it is slightly earlier -- cannot give impression data is more up to date than it is
df_film_and_inv.rename(columns={"last_update_x":"last_update"}, inplace=True)
df_film_and_inv.drop(['last_update_y'], axis=1, inplace=True)
df_film_and_inv.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update,payment_id,amount,payment_date,film_id,store_id
0,337,2005-05-27 03:22:30,751,19,2005-06-02 03:27:30,1,2006-02-15 21:30:53,493,2.99,2005-05-27 03:22:30,164,2
1,591,2005-05-28 13:11:04,377,19,2005-05-29 17:20:04,2,2006-02-15 21:30:53,494,2.99,2005-05-28 13:11:04,83,2


In [11]:
# finally, join film info to rental+payment info

df_fact_value = pd.merge(df_film_and_inv, df_film, on='film_id', how='inner')

# keep last_update column from film_and_inv because it is slightly earlier -- cannot give impression data is more up to date than it is
df_fact_value.rename(columns={"last_update_x":"last_update"}, inplace=True)
df_fact_value.drop(['last_update_y'], axis=1, inplace=True)
df_fact_value.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update,payment_id,amount,payment_date,...,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features
0,337,2005-05-27 03:22:30,751,19,2005-06-02 03:27:30,1,2006-02-15 21:30:53,493,2.99,2005-05-27 03:22:30,...,A Astounding Documentary of a Mad Cow And a Pi...,2006,1,,4,0.99,55,20.99,PG,"Trailers,Commentaries,Deleted Scenes,Behind th..."
1,591,2005-05-28 13:11:04,377,19,2005-05-29 17:20:04,2,2006-02-15 21:30:53,494,2.99,2005-05-28 13:11:04,...,A Insightful Documentary of a Boat And a Compo...,2006,1,,5,2.99,50,18.99,G,"Trailers,Deleted Scenes,Behind the Scenes"


#### fact table part 3:  adding date dimension

dim_date was created in SQL with lab 2 script beforehand

involves surrogate primary key (date_key) and business key (full_date)

In [12]:
# get date keys from date dimension table

sql_dim_date = "SELECT date_key, full_date FROM sakila_etl.dim_date;"
df_dim_date = get_sql_dataframe(mysql_uid, mysql_pwd, dst_dbname, sql_dim_date)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64')
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


In [13]:
# dates are fun to deal with, first need to get dates from datetime with hours:minutes:seconds in sakila to one without
# this way keeps them as datetime64[ns] by making times null:
# https://stackoverflow.com/questions/45858155/removing-the-timestamp-from-a-datetime-in-pandas-dataframe

df_fact_value['rental_date'] = pd.to_datetime(df_fact_value['rental_date'])
df_fact_value['rental_date'] = df_fact_value['rental_date'].dt.normalize()
df_fact_value['rental_date'] = df_fact_value['rental_date'].dt.floor('D')

df_fact_value['payment_date'] = pd.to_datetime(df_fact_value['payment_date'])
df_fact_value['payment_date'] = df_fact_value['payment_date'].dt.normalize()
df_fact_value['payment_date'] = df_fact_value['payment_date'].dt.floor('D')

df_fact_value['return_date'] = pd.to_datetime(df_fact_value['return_date'])
df_fact_value['return_date'] = df_fact_value['return_date'].dt.normalize()
df_fact_value['return_date'] = df_fact_value['return_date'].dt.floor('D')

# leaving last_update alone because it is for managing purposes, not data analysis, and they'll keep updating it in its format

df_fact_value.head(2)
# df_fact_value.dtypes

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update,payment_id,amount,payment_date,...,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features
0,337,2005-05-27,751,19,2005-06-02,1,2006-02-15 21:30:53,493,2.99,2005-05-27,...,A Astounding Documentary of a Mad Cow And a Pi...,2006,1,,4,0.99,55,20.99,PG,"Trailers,Commentaries,Deleted Scenes,Behind th..."
1,591,2005-05-28,377,19,2005-05-29,2,2006-02-15 21:30:53,494,2.99,2005-05-28,...,A Insightful Documentary of a Boat And a Compo...,2006,1,,5,2.99,50,18.99,G,"Trailers,Deleted Scenes,Behind the Scenes"


In [14]:
# lookup the Surrogate Primary Key (date_key) that corresponds to the "rental_date" column
df_dim_order_date = df_dim_date.rename(columns={"date_key" : "rental_date_key", "full_date" : "rental_date"})
df_fact_value = pd.merge(df_fact_value, df_dim_order_date, on='rental_date', how='inner')
df_fact_value.drop(['rental_date'], axis=1, inplace=True)
df_fact_value.head(2)

Unnamed: 0,rental_id,inventory_id,customer_id,return_date,staff_id,last_update,payment_id,amount,payment_date,film_id,...,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,rental_date_key
0,337,751,19,2005-06-02,1,2006-02-15 21:30:53,493,2.99,2005-05-27,164,...,2006,1,,4,0.99,55,20.99,PG,"Trailers,Commentaries,Deleted Scenes,Behind th...",20050527
1,591,377,19,2005-05-29,2,2006-02-15 21:30:53,494,2.99,2005-05-28,83,...,2006,1,,5,2.99,50,18.99,G,"Trailers,Deleted Scenes,Behind the Scenes",20050528


In [15]:
# lookup the Surrogate Primary Key (date_key) that corresponds to the "payment_date" column
df_dim_order_date = df_dim_date.rename(columns={"date_key" : "payment_date_key", "full_date" : "payment_date"})
df_fact_value = pd.merge(df_fact_value, df_dim_order_date, on='payment_date', how='inner')
df_fact_value.drop(['payment_date'], axis=1, inplace=True)
df_fact_value.head(2)

Unnamed: 0,rental_id,inventory_id,customer_id,return_date,staff_id,last_update,payment_id,amount,film_id,store_id,...,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,rental_date_key,payment_date_key
0,337,751,19,2005-06-02,1,2006-02-15 21:30:53,493,2.99,164,2,...,1,,4,0.99,55,20.99,PG,"Trailers,Commentaries,Deleted Scenes,Behind th...",20050527,20050527
1,591,377,19,2005-05-29,2,2006-02-15 21:30:53,494,2.99,83,2,...,1,,5,2.99,50,18.99,G,"Trailers,Deleted Scenes,Behind the Scenes",20050528,20050528


In [16]:
# lookup the Surrogate Primary Key (date_key) that corresponds to the "return_date" column
df_dim_order_date = df_dim_date.rename(columns={"date_key" : "return_date_key", "full_date" : "return_date"})
df_fact_value = pd.merge(df_fact_value, df_dim_order_date, on='return_date', how='inner')
df_fact_value.drop(['return_date'], axis=1, inplace=True)
df_fact_value.head(2)

Unnamed: 0,rental_id,inventory_id,customer_id,staff_id,last_update,payment_id,amount,film_id,store_id,title,...,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,rental_date_key,payment_date_key,return_date_key
0,337,751,19,1,2006-02-15 21:30:53,493,2.99,164,2,COAST RAINBOW,...,,4,0.99,55,20.99,PG,"Trailers,Commentaries,Deleted Scenes,Behind th...",20050527,20050527,20050602
1,591,377,19,2,2006-02-15 21:30:53,494,2.99,83,2,BLUES INSTINCT,...,,5,2.99,50,18.99,G,"Trailers,Deleted Scenes,Behind the Scenes",20050528,20050528,20050529


#### fact table part 4:  additional transformations

drop unwanted columns, reorder columns, create primary key -- generally make it look as we want from database

In [17]:
# drop some columns of no particular interest
# df_fact_value.columns
drop_columns = ['customer_id','staff_id','payment_id','description','original_language_id','special_features','length','rental_duration']
df_fact_value.drop(drop_columns, axis=1, inplace=True)

# rename Foreign Key Columns
df_fact_value.rename(columns={"rental_id":"rental_key", "inventory_id":"inventory_key", "film_id":"film_key"
                               , "store_id":"store_key"}, inplace=True)

# reorder the columns
ordered_columns = ['film_key','inventory_key','rental_key','store_key','title','release_year','rating','language_id','rental_rate',
                   'amount','replacement_cost','rental_date_key','payment_date_key','return_date_key','last_update']
df_fact_value = df_fact_value[ordered_columns]

# Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_fact_value.insert(0, "fact_value_key", range(1, df_fact_value.shape[0]+1))
df_fact_value.head(2)

Unnamed: 0,fact_value_key,film_key,inventory_key,rental_key,store_key,title,release_year,rating,language_id,rental_rate,amount,replacement_cost,rental_date_key,payment_date_key,return_date_key,last_update
0,1,164,751,337,2,COAST RAINBOW,2006,PG,1,0.99,2.99,20.99,20050527,20050527,20050602,2006-02-15 21:30:53
1,2,83,377,591,2,BLUES INSTINCT,2006,G,1,2.99,2.99,18.99,20050528,20050528,20050529,2006-02-15 21:30:53


#### fact table part 5:  write fact table back to database

In [18]:
dataframe = df_fact_value
table_name = 'fact_value'
primary_key = 'fact_value_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, dst_dbname, dataframe, table_name, primary_key, db_operation)

#### final check:  demonstrate new data warehouse exists and contains correct data

1. must sample at least 3 tables:  2 dimensions and fact table
2. must perform some aggregation
3. must have some kind of grouping

In [19]:
# the {0} is the destination database name, sakila_etl
sql_test = """
    SELECT dim_film.`rating`,
        SUM(fact_value.`amount`) AS `total_rental_amount`,
        AVG(fact_value.`rental_rate`) AS `avg_rental_rate`
    FROM `sakila_etl`.`fact_value` AS fact_value
    INNER JOIN `sakila_etl`.`dim_film` AS dim_film
    ON fact_value.rating = dim_film.rating
    GROUP BY dim_film.`rating`
    ORDER BY total_rental_amount DESC;
"""

df_test = get_sql_dataframe(mysql_uid, mysql_pwd, src_dbname, sql_test)

df_test.head()

Unnamed: 0,rating,total_rental_amount,avg_rental_rate
0,R,1363.05,4.99
1,PG,580.06,0.99
2,G,532.22,2.99
