#### Import the Necessary Libraries for MySQL and MongoDB

In [2]:
#mysql
import os
import numpy
import pandas as pd
from sqlalchemy import create_engine, text

#mongo
import json
import datetime
import certifi
import pymongo
import sqlalchemy

#### Set up MongoDB connection

In [3]:
mongodb_args = {
    "user_name" : "snehasmoothedan",
    "password" : "password101",
    "cluster_name" : "Sandbox",
    "cluster_subnet" : "eylzz",
    "cluster_location" : "atlas",
    "db_name" : "sakila"
}


def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

client = get_mongo_client(**mongodb_args)

# Gets the path of the Current Working Directory for this Notebook,
# and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd(), 'data')

#### Set up MySQL connection

In [4]:
host_name = "localhost"
port = "3306"
user_id = "sneham"
pwd = "password"

src_dbname = "sakila"
dst_dbname = "sakila_dw"


def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()


conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)
connection = sqlEngine.connect()

connection.execute(text(f"DROP DATABASE IF EXISTS `{dst_dbname}`;"))
connection.execute(text(f"CREATE DATABASE `{dst_dbname}`;"))
connection.execute(text(f"USE {dst_dbname};"))

connection.close()

#### **Date Dimension**
**Run the dim_date_sql file to create the date dimension. I tried to do this entirely in python, but was unable to.**

#### Extract data from source database tables

In [5]:
sql_payment = "SELECT * FROM sakila.payment;"
df_payment = get_dataframe(user_id, pwd, host_name, src_dbname, sql_payment)
df_payment.head(2)

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,payment_date,last_update
0,1,1,1,76,2.99,2005-05-25 11:30:37,2006-02-15 22:12:30
1,2,1,1,573,0.99,2005-05-28 10:35:23,2006-02-15 22:12:30


In [6]:
sql_inventory = "SELECT * FROM sakila.inventory;"
df_inventory = get_dataframe(user_id, pwd, host_name, src_dbname, sql_inventory)
df_inventory.head(2)

Unnamed: 0,inventory_id,film_id,store_id,last_update
0,1,1,1,2006-02-15 05:09:17
1,2,1,1,2006-02-15 05:09:17


In [7]:
client = get_mongo_client(**mongodb_args)
query = {}
collection = "film"
df_film = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_film.head(2)

Unnamed: 0,film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",1139979822000
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",1139979822000


In [8]:
sql_staff = "SELECT * FROM sakila.staff;"
df_staff = get_dataframe(user_id, pwd, host_name, src_dbname, sql_staff)
df_staff.head(2)

Unnamed: 0,staff_id,first_name,last_name,address_id,picture,email,store_id,active,username,password,last_update
0,1,Mike,Hillyer,3,b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\...,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-15 03:57:16
1,2,Jon,Stephens,4,,Jon.Stephens@sakilastaff.com,2,1,Jon,,2006-02-15 03:57:16


In [9]:
df_customer = pd.read_json('customer_data.json')
df_customer.head(2)

Unnamed: 0,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,1139954676000,1139979440000
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,1139954676000,1139979440000


#### Perform Any Necessary Transformations

In [10]:
drop_cols = ['last_update','last_update','store_id']
df_inventory.drop(drop_cols, axis=1, inplace=True)
df_inventory.head(2)

Unnamed: 0,inventory_id,film_id
0,1,1
1,2,1


In [11]:
drop_cols = ['active','create_date','last_update','store_id','address_id']
df_customer.drop(drop_cols, axis=1, inplace=True)
df_customer.insert(0, "customer_key", range(1, df_customer.shape[0]+1))
df_customer.head(2)

Unnamed: 0,customer_key,customer_id,first_name,last_name,email
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org
1,2,2,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org


In [12]:
drop_cols = ['language_id','original_language_id','last_update']
df_film.drop(drop_cols, axis=1, inplace=True)
df_film.insert(0, "film_key", range(1, df_film.shape[0]+1))
df_film.head(2)

Unnamed: 0,film_key,film_id,title,description,release_year,rental_duration,rental_rate,length,replacement_cost,rating,special_features
0,1,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes"
1,2,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,3,4.99,48,12.99,G,"Trailers,Deleted Scenes"


In [13]:
drop_cols = ['picture','email','last_update']
df_staff.drop(drop_cols, axis=1, inplace=True)
df_staff.insert(0, "staff_key", range(1, df_staff.shape[0]+1))
df_staff.head(2)

Unnamed: 0,staff_key,staff_id,first_name,last_name,address_id,store_id,active,username,password
0,1,1,Mike,Hillyer,3,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964
1,2,2,Jon,Stephens,4,2,1,Jon,


In [14]:
drop_cols = ['customer_id','staff_id','last_update']
df_payment.drop(drop_cols, axis=1, inplace=True)
df_payment.head(2)

Unnamed: 0,payment_id,rental_id,amount,payment_date
0,1,76,2.99,2005-05-25 11:30:37
1,2,573,0.99,2005-05-28 10:35:23


#### Load the Transformed DataFrames into the New Data Warehouse by Creating New Tables


In [15]:
db_operation = "insert"

tables = [('dim_customers', df_customer, 'customer_key'),
          ('dim_film', df_film, 'film_key'),
          ('dim_staff', df_staff, 'staff_key')]

for table_name, dataframe, primary_key in tables:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

#### Create & clean the fact table

In [16]:
sql_rentals = "SELECT * FROM sakila.rental;"
df_fact_rentals = get_dataframe(user_id, pwd, host_name, src_dbname, sql_rentals)
drop_cols = ['last_update']
df_fact_rentals.drop(drop_cols, axis=1, inplace=True)

df_fact_rentals = pd.merge(df_fact_rentals, df_inventory, on='inventory_id', how='left')
df_fact_rentals.drop(['inventory_id'], axis=1, inplace=True)

df_fact_rentals = pd.merge(df_fact_rentals, df_payment, on='rental_id', how='left')
df_fact_rentals.drop(['payment_id'], axis=1, inplace=True)
df_fact_rentals.head(2)

Unnamed: 0,rental_id,rental_date,customer_id,return_date,staff_id,film_id,amount,payment_date
0,1,2005-05-24 22:53:30,130,2005-05-26 22:04:30,1,80,2.99,2005-05-24 22:53:30
1,2,2005-05-24 22:54:33,459,2005-05-28 19:40:33,1,333,2.99,2005-05-24 22:54:33


#### Connect date keys to fact table

In [17]:
sql_dim_date = "SELECT date_key, full_date FROM sakila_dw.dim_date;"
df_dim_date = get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_date)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64[ns]').dt.date
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


In [18]:
df_rental_date = df_dim_date.rename(columns={"date_key" : "rental_date_key", "full_date" : "rental_date"})
df_fact_rentals.rental_date = df_fact_rentals.rental_date.astype('datetime64[ns]').dt.date

df_fact_rentals = pd.merge(df_fact_rentals, df_rental_date, on='rental_date', how='left')
df_fact_rentals.drop(['rental_date'], axis=1, inplace=True)
df_fact_rentals.head(2)

Unnamed: 0,rental_id,customer_id,return_date,staff_id,film_id,amount,payment_date,rental_date_key
0,1,130,2005-05-26 22:04:30,1,80,2.99,2005-05-24 22:53:30,20050524
1,2,459,2005-05-28 19:40:33,1,333,2.99,2005-05-24 22:54:33,20050524


In [19]:
df_return_date = df_dim_date.rename(columns={"date_key" : "return_date_key", "full_date" : "return_date"})
df_fact_rentals.return_date = df_fact_rentals.return_date.astype('datetime64[ns]').dt.date

df_fact_rentals = pd.merge(df_fact_rentals, df_return_date, on='return_date', how='left')
df_fact_rentals.drop(['return_date'], axis=1, inplace=True)
df_fact_rentals.head(2)

Unnamed: 0,rental_id,customer_id,staff_id,film_id,amount,payment_date,rental_date_key,return_date_key
0,1,130,1,80,2.99,2005-05-24 22:53:30,20050524,20050526.0
1,2,459,1,333,2.99,2005-05-24 22:54:33,20050524,20050528.0


In [20]:
df_payment_date = df_dim_date.rename(columns={"date_key" : "payment_date_key", "full_date" : "payment_date"})
df_fact_rentals.payment_date = df_fact_rentals.payment_date.astype('datetime64[ns]').dt.date

df_fact_rentals = pd.merge(df_fact_rentals, df_payment_date, on='payment_date', how='left')
df_fact_rentals.drop(['payment_date'], axis=1, inplace=True)
df_fact_rentals.head(2)

Unnamed: 0,rental_id,customer_id,staff_id,film_id,amount,rental_date_key,return_date_key,payment_date_key
0,1,130,1,80,2.99,20050524,20050526.0,20050524
1,2,459,1,333,2.99,20050524,20050528.0,20050524


#### Connect dimension tables to fact_rentals

In [21]:
sql_customers = "SELECT customer_key, customer_id FROM sakila_dw.dim_customers;"
df_dim_customers = get_dataframe(user_id, pwd, host_name, src_dbname, sql_customers)

df_fact_rentals = pd.merge(df_fact_rentals, df_dim_customers, on='customer_id', how='inner')
df_fact_rentals.drop(['customer_id'], axis=1, inplace=True)
df_fact_rentals.head(2)

Unnamed: 0,rental_id,staff_id,film_id,amount,rental_date_key,return_date_key,payment_date_key,customer_key
0,1,1,80,2.99,20050524,20050526.0,20050524,130
1,2,1,333,2.99,20050524,20050528.0,20050524,459


In [22]:
sql_staff = "SELECT staff_key, staff_id FROM sakila_dw.dim_staff;"
df_dim_staff = get_dataframe(user_id, pwd, host_name, src_dbname, sql_staff)

df_fact_rentals = pd.merge(df_fact_rentals, df_dim_staff, on='staff_id', how='inner')
df_fact_rentals.drop(['staff_id'], axis=1, inplace=True)
df_fact_rentals.head(2)

Unnamed: 0,rental_id,film_id,amount,rental_date_key,return_date_key,payment_date_key,customer_key,staff_key
0,1,80,2.99,20050524,20050526.0,20050524,130,1
1,2,333,2.99,20050524,20050528.0,20050524,459,1


In [23]:
sql_film = "SELECT film_key, film_id FROM sakila_dw.dim_film;"
df_dim_film = get_dataframe(user_id, pwd, host_name, src_dbname, sql_film)

df_fact_rentals = pd.merge(df_fact_rentals, df_dim_film, on='film_id', how='inner')
df_fact_rentals.drop(['film_id'], axis=1, inplace=True)
df_fact_rentals.head(2)

Unnamed: 0,rental_id,amount,rental_date_key,return_date_key,payment_date_key,customer_key,staff_key,film_key
0,1,2.99,20050524,20050526.0,20050524,130,1,80
1,2,2.99,20050524,20050528.0,20050524,459,1,333


#### Execute last transformations on fact_rentals

In [24]:
df_fact_rentals = df_fact_rentals[['rental_id','amount','film_key','customer_key','staff_key','rental_date_key','return_date_key','payment_date_key',]]

df_fact_rentals.insert(0, 'fact_rental_key', range(1, len(df_fact_rentals) + 1))

df_fact_rentals.head(2)

Unnamed: 0,fact_rental_key,rental_id,amount,film_key,customer_key,staff_key,rental_date_key,return_date_key,payment_date_key
0,1,1,2.99,80,130,1,20050524,20050526.0,20050524
1,2,2,2.99,333,459,1,20050524,20050528.0,20050524


#### Write Dataframe back to database

In [25]:
table_name = "fact_rentals"
primary_key = "fact_rental_key"
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, df_fact_rentals, table_name, primary_key, db_operation)

## SQL Statements

In [26]:
## Returns the total rental payment for each film

sql_test = """
SELECT
    `dim_film`.`title`,
    SUM(`fact_rentals`.`amount`) AS total_rental_payment
FROM {0}.`fact_rentals`
INNER JOIN {0}.`dim_film`
ON `dim_film`.`film_key` = `fact_rentals`.`film_key`
GROUP BY `fact_rentals`.`film_key`
ORDER BY `total_rental_payment` DESC
""".format(dst_dbname)

df_test = get_dataframe(user_id, pwd, host_name, src_dbname, sql_test)
df_test.head()

Unnamed: 0,title,total_rental_payment
0,TELEGRAPH VOYAGE,231.73
1,WIFE TURN,223.69
2,ZORRO ARK,214.69
3,GOODFELLAS SALUTE,209.69
4,SATURDAY LAMBS,204.72


In [27]:
## Returns the total number of times a customer rented films

sql_test = """
SELECT
    `dim_customers`.`last_name`,
    COUNT(`fact_rentals`.`rental_id`) AS rental_count
FROM {0}.`fact_rentals`
INNER JOIN {0}.`dim_customers`
ON `dim_customers`.`customer_key` = `fact_rentals`.`customer_key`
GROUP BY dim_customers.customer_id, `dim_customers`.`last_name`
ORDER BY `rental_count` DESC
""".format(dst_dbname)

df_test = get_dataframe(user_id, pwd, host_name, src_dbname, sql_test)
df_test.head()

Unnamed: 0,last_name,rental_count
0,HUNT,46
1,SEAL,45
2,DEAN,42
3,SHAW,42
4,SANDERS,41
