#### Import Necessary Libraries

In [2]:
import os
import numpy
import pandas as pd
import json
import datetime
import certifi
import pymongo
import sqlalchemy
from sqlalchemy import create_engine, text

In [3]:
print(f"Running SQL Alchemy Version: {sqlalchemy.__version__}")
print(f"Running PyMongo Version: {pymongo.__version__}")

Running SQL Alchemy Version: 2.0.30
Running PyMongo Version: 4.10.1


#### Declare & Assign Connection Variables for the MySQL Server & Databases with which You'll be Working

In [5]:
host_name = "localhost"
port = "3306"
user_id = "root"
pwd = "MintChipIcecream54$"

src_dbname = "sakila"
dst_dbname = "sakila_2"

#### Create the New Data Warehouse database, and to Use it, Switch the Connection Context

In [7]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)
connection = sqlEngine.connect()

connection.execute(text(f"DROP DATABASE IF EXISTS `{dst_dbname}`;"))
connection.execute(text(f"CREATE DATABASE `{dst_dbname}`;"))
connection.execute(text(f"USE {dst_dbname};"))

connection.close()

#### Define Functions for Getting Data From and Setting Data Into SQL and Mongo Databases

In [9]:
mysql_args = {
    "uid" : "root",
    "pwd" : "MintChipIcecream54$",
    "hostname" : "localhost",
    "dbname" : "sakila_2"
}

# The 'cluster_location' must either be "atlas" or "local".
mongodb_args = {
    "user_name" : "hqr7gc",
    "password" : "6TeV2LvCQhp246ZJ",
    "cluster_name" : "AlexLaplace",
    "cluster_subnet" : "v7k8y",
    "cluster_location" : "atlas", # "local"
    "db_name" : "sakila"
}

In [10]:
def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(text(sql_query), connection);
    connection.close()
    
    return dframe
    

def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()


def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

#### Populate MongoDB with Source Data

In [12]:
client = get_mongo_client(**mongodb_args)

# Gets the path of the Current Working Directory for this Notebook,
# and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd(), 'JSON files')

json_files = {"film" : 'sakila.film.json',
              "inventory" : 'sakila.inventory.json',
              "store" : 'sakila.store.json',
              "rental" : 'sakila.rental.json',
              "customer" : 'sakila.customer.json',
              "payment" : 'sakila.payment.json'
             }

set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)       

In [13]:
data_dir = os.path.join(os.getcwd(), 'JSON files')
print(data_dir)

C:\Users\alexl\DS Systems\Mid-term Project\JSON files


#### Load and Make Transformations to Customer Table from CSV

In [101]:
data_dir = os.path.join(os.getcwd(), 'CSV files')
data_file = os.path.join(data_dir, 'sakila.customer.csv')

df_customer = pd.read_csv(data_file, header=0, index_col=0)
df_customer.head()

Unnamed: 0_level_0,store_id,first_name,last_name,email,address_id,active,create_date,last_update
customer_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20
3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,1,2006-02-14 22:04:36,2006-02-15 04:57:20
4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1,2006-02-14 22:04:36,2006-02-15 04:57:20
5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,1,2006-02-14 22:04:36,2006-02-15 04:57:20


In [103]:
# Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_customer.insert(0, "customer_key", range(1, df_customer.shape[0]+1))

In [105]:
df_customer.reset_index()

Unnamed: 0,customer_id,customer_key,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
1,2,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20
2,3,3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,1,2006-02-14 22:04:36,2006-02-15 04:57:20
3,4,4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1,2006-02-14 22:04:36,2006-02-15 04:57:20
4,5,5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,1,2006-02-14 22:04:36,2006-02-15 04:57:20
...,...,...,...,...,...,...,...,...,...,...
594,595,595,1,TERRENCE,GUNDERSON,TERRENCE.GUNDERSON@sakilacustomer.org,601,1,2006-02-14 22:04:37,2006-02-15 04:57:20
595,596,596,1,ENRIQUE,FORSYTHE,ENRIQUE.FORSYTHE@sakilacustomer.org,602,1,2006-02-14 22:04:37,2006-02-15 04:57:20
596,597,597,1,FREDDIE,DUGGAN,FREDDIE.DUGGAN@sakilacustomer.org,603,1,2006-02-14 22:04:37,2006-02-15 04:57:20
597,598,598,1,WADE,DELVALLE,WADE.DELVALLE@sakilacustomer.org,604,1,2006-02-14 22:04:37,2006-02-15 04:57:20


In [113]:
df_customer = df_customer.reset_index()

In [115]:
df_customer.head()

Unnamed: 0,customer_id,customer_key,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
1,2,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20
2,3,3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,1,2006-02-14 22:04:36,2006-02-15 04:57:20
3,4,4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1,2006-02-14 22:04:36,2006-02-15 04:57:20
4,5,5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,1,2006-02-14 22:04:36,2006-02-15 04:57:20


#### Read Film Table from new MongoDB Collection "sakila" into Pandas DF

In [19]:
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "film"

df_film = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_film.head(2)

Unnamed: 0,film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 05:03:42


In [20]:
# Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_film.insert(0, "film_key", range(1, df_film.shape[0]+1))
df_film.head(2)

Unnamed: 0,film_key,film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42
1,2,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 05:03:42


In [119]:
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "inventory"

df_inventory = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_inventory.head(2)

Unnamed: 0,inventory_id,film_id,store_id,last_update
0,1,1,1,2006-02-15 05:09:17
1,2,1,1,2006-02-15 05:09:17


In [121]:
# Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_inventory.insert(0, "inventory_key", range(1, df_inventory.shape[0]+1))
df_inventory.head(2)

Unnamed: 0,inventory_key,inventory_id,film_id,store_id,last_update
0,1,1,1,1,2006-02-15 05:09:17
1,2,2,1,1,2006-02-15 05:09:17


In [125]:
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "rental"

df_rental = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_rental.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53


sql_language = "SELECT * FROM sakila.language;"
df_language = get_sql_dataframe(sql_language, mysql_args)
df_language.head(6)

 1. Create a List that enumerates the names of each column you wish to remove (drop) from the Pandas DataFrame
drop_cols = ['last_update']
df_film.drop(drop_cols, axis=1, inplace=True)

 2. Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_film.insert(0, "film_key", range(1, df_film.shape[0]+1))

 3. Display the first 2 rows of the dataframe to validate your work
df_film.head(2)

df_film2 = pd.merge(df_film, df_language, on="language_id")
column_to_move = df_film2.pop("name")
df_film2.insert(6, "language", column_to_move)
df_film2.head(2)

 2. Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_actor.insert(0, "actor_key", range(1, df_actor.shape[0]+1))
df_actor.head(2)

df_film2.head(2)

 2. Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_store.insert(0, "store_key", range(1, df_store.shape[0]+1))
df_store.head(2)

sql_actor = "SELECT * FROM sakila.actor;"
df_actor = get_dataframe(user_id, pwd, host_name, src_dbname, sql_actor)
df_actor.head(2)

df_actor.drop(['last_update'], axis=1, inplace=True)
df_actor.columns

#### Lookup the Invoice Date Keys from the Date Dimension Table.
#####  Get the Data from the Date Dimension Table.

In [54]:
sql_dim_date = "SELECT date_key, full_date FROM sakila_2.dim_date;"
df_dim_date = get_sql_dataframe(sql_dim_date, **mysql_args)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64[ns]').dt.date
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


In [129]:
df_dim_rental_date = df_dim_date.rename(columns={"date_key" : "rental_return_date_key", "full_date" : "return_date"})
df_rental.return_date = df_rental.return_date.astype('datetime64[ns]').dt.date
df_rental = pd.merge(df_rental, df_dim_rental_date, on='return_date', how='left')
df_rental.drop(['return_date'], axis=1, inplace=True)
df_rental.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,staff_id,last_update,rental_return_date_key
0,1,2005-05-24 22:53:30,367,130,1,2006-02-15 21:30:53,20050526
1,2,2005-05-24 22:54:33,1525,459,1,2006-02-15 21:30:53,20050528


#### Load the Transformed DataFrames into the New Data Warehouse by Creating New Tables

In [32]:
dataframe = df_customer
table_name = 'dim_customer'
primary_key = 'customer_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [33]:
dataframe = df_film
table_name = 'dim_film'
primary_key = 'film_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [123]:
dataframe = df_inventory
table_name = 'dim_inventory'
primary_key = 'inventory_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [127]:
dataframe = df_rental
table_name = 'dim_rental'
primary_key = 'rental_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

OperationalError: (pymysql.err.OperationalError) (1072, "Key column 'rental_key' doesn't exist in table")
[SQL: ALTER TABLE dim_rental ADD PRIMARY KEY (rental_key);]
(Background on this error at: https://sqlalche.me/e/20/e3q8)

#### Validate that the New Dimension Tables were Created.

In [35]:
sql_customer = "SELECT * FROM sakila_2.dim_customer;"
df_dim_customer = get_sql_dataframe(sql_customer, **mysql_args)
df_dim_customer.head(2)

Unnamed: 0,customer_key,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20


In [52]:
sql_film = "SELECT * FROM sakila_2.dim_film;"
df_dim_film = get_sql_dataframe(sql_film, **mysql_args)
df_dim_film.head(2)

Unnamed: 0,film_key,film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42
1,2,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 05:03:42


In [None]:
sql_inventory = "SELECT * FROM sakila_2.dim_inventory;"
df_dim_inventory = get_sql_dataframe(sql_film, **mysql_args)
df_dim_film.head(2)

### Create and Populate the New Fact Tables
#### Extract Data from the Source MongoDB Collections Into DataFrames

In [57]:
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "rental"

df_fact_rental = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_fact_rental.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53


#### Lookup the DateKeys from the Date Dimension Table.

In [60]:
# Lookup the Surrogate Primary Key (date_key) that Corresponds to the "payment_date" Column.
df_dim_payment_date = df_dim_date.rename(columns={"date_key" : "payment_date_key", "full_date" : "payment_date"})
df_fact_payment.payment_date = df_fact_payment.payment_date.astype('datetime64[ns]').dt.date
df_fact_payment = pd.merge(df_fact_payment, df_dim_payment_date, on='payment_date', how='left')
df_fact_payment.drop(['payment_date'], axis=1, inplace=True)
df_fact_payment.head(2)

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,last_update,payment_date_key
0,1,1,1,76,2.99,2006-02-15 22:12:30,20050525
1,2,1,1,573,0.99,2006-02-15 22:12:30,20050528


In [62]:
df_dim_update_date = df_dim_date.rename(columns={"date_key" : "last_update_key", "full_date" : "last_update"})
df_fact_payment.last_update = df_fact_payment.last_update.astype('datetime64[ns]').dt.date
df_fact_payment = pd.merge(df_fact_payment, df_dim_update_date, on='last_update', how='left')
df_fact_payment.drop(['last_update'], axis=1, inplace=True)
df_fact_payment.head(2)

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,payment_date_key,last_update_key
0,1,1,1,76,2.99,20050525,20060215
1,2,1,1,573,0.99,20050528,20060215


In [64]:
df_dim_rental_date = df_dim_date.rename(columns={"date_key" : "rental_date_key", "full_date" : "rental_date"})
df_fact_rental.rental_date = df_fact_rental.rental_date.astype('datetime64[ns]').dt.date
df_fact_rental = pd.merge(df_fact_rental, df_dim_rental_date, on='rental_date', how='left')
df_fact_rental.drop(['rental_date'], axis=1, inplace=True)
df_fact_rental.head(2)

Unnamed: 0,rental_id,inventory_id,customer_id,return_date,staff_id,last_update,rental_date_key
0,1,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53,20050524
1,2,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53,20050524


In [66]:
df_dim_return_date = df_dim_date.rename(columns={"date_key" : "return_date_key", "full_date" : "return_date"})
df_fact_rental.return_date = df_fact_rental.return_date.astype('datetime64[ns]').dt.date
df_fact_rental = pd.merge(df_fact_rental, df_dim_return_date, on='return_date', how='left')
df_fact_rental.drop(['return_date'], axis=1, inplace=True)
df_fact_rental.head(2)

Unnamed: 0,rental_id,inventory_id,customer_id,staff_id,last_update,rental_date_key,return_date_key
0,1,367,130,1,2006-02-15 21:30:53,20050524,20050526
1,2,1525,459,1,2006-02-15 21:30:53,20050524,20050528


In [68]:
df_dim_update_date = df_dim_date.rename(columns={"date_key" : "last_update_key", "full_date" : "last_update"})
df_fact_rental.last_update = df_fact_rental.last_update.astype('datetime64[ns]').dt.date
df_fact_rental = pd.merge(df_fact_rental, df_dim_update_date, on='last_update', how='left')
df_fact_rental.drop(['last_update'], axis=1, inplace=True)
df_fact_rental.head(2)

Unnamed: 0,rental_id,inventory_id,customer_id,staff_id,rental_date_key,return_date_key,last_update_key
0,1,367,130,1,20050524,20050526,20060215
1,2,1525,459,1,20050524,20050528,20060215


#### Lookup the Primary Keys from the Dimension Tables
**Foreign key relationships** must be established between each newly-crafted **Fact table** and each related **Dimension table**.

#####  First, fetch the Surrogate Primary Key and the Business Key from each Dimension table.

In [117]:
sql_dim_customer = "SELECT customer_key, customer_id FROM sakila_2.dim_customer;"
df_dim_customer = get_sql_dataframe(sql_dim_customer, **mysql_args)
df_dim_customer.head(2)

OperationalError: (pymysql.err.OperationalError) (1054, "Unknown column 'customer_id' in 'field list'")
[SQL: SELECT customer_key, customer_id FROM sakila_2.dim_customer;]
(Background on this error at: https://sqlalche.me/e/20/e3q8)

In [None]:
sql_dim_film = "SELECT film_key, film_id FROM sakila_2.dim_film;"
df_dim_film = get_sql_dataframe(sql_dim_film, **mysql_args)
df_dim_film.head(2)

 #### Next, use Business Keys to lookup corresponding Surrogate Primary Keys in the Dimension tables

In [None]:
df_fact_payment = pd.merge(df_fact_payment, df_dim_customer, on='customer_main_id', how='inner')
df_fact_payment.drop(['customer_main_id'], axis=1, inplace=True)
df_fact_payment.head(2)

In [None]:
df_fact_rental = pd.merge(df_fact_rental, df_dim_customer, on='customer_main_id', how='inner')
df_fact_rental.drop(['customer_main_id'], axis=1, inplace=True)
df_fact_rental.head(2)